diff --git a/DESIGN.md b/DESIGN.md new file mode 100644 index 0000000..cf65b85 --- /dev/null +++ b/DESIGN.md @@ -0,0 +1,414 @@ +# agent-claw: Final Architecture Design + +*Decisions locked 2026-05-04* + +--- + +## System Overview + +A serverless personal assistant running on AWS AgentCore. No always-on compute. Telegram-first, channel-agnostic by design. Two AgentCore runtimes: one user-facing, one background (REM). + +``` +Telegram ──→ API GW ──→ Lambda(tg-ingest) ──→ SQS FIFO ──→ Lambda(agent-runner) + │ + InvokeAgentRuntime(Runtime 1) + │ + Runtime 1: main assistant + reads: AgentCore Memory, factbase (Gateway), + S3 workspace + delivers via: channel adapter → Telegram + │ + session ends (idle) + │ + EventBridge ──→ Lambda(rem-runner) + │ + InvokeAgentRuntime(Runtime 2) + │ + Runtime 2: REM agent + calls: factbase workflow add/maintain + reads: AgentCore Memory session summary + writes: factbase (via AgentCore Gateway) + +EventBridge ──→ Lambda(heartbeat-runner) ──→ InvokeAgentRuntime(Runtime 1, heartbeat prompt) +EventBridge ──→ Lambda(rem-runner) ──→ InvokeAgentRuntime(Runtime 2, nightly maintain) +``` + +--- + +## CDK Stack + +**Stack: `AgentClawStack`** + +All resources are create-or-use-existing via CDK context parameters or SSM lookups. Deploy is idempotent. + +```typescript +// Context parameters (CDK --context flags or cdk.json) +// agentcore-memory-arn: existing memory ARN (optional, creates if absent) +// workspace-bucket-name: existing S3 bucket (optional, creates if absent) +// telegram-bot-token-secret-arn: existing secret (required before deploy) +// factbase-gateway-url: AgentCore Gateway endpoint for factbase tools +``` + +**Resources provisioned:** + +| Resource | Type | Notes | +|---|---|---| +| `workspace-bucket` | S3 | SOUL.md, AGENTS.md, USER.md, HEARTBEAT.md | +| `session-store` | DynamoDB | actor_id → {session_id, created_at} | +| `personal-memory` | AgentCore Memory | SUMMARIZATION + USER_PREFERENCE strategies | +| `runtime-1` | AgentCore Runtime | main assistant, CodeZip, Sonnet | +| `runtime-2` | AgentCore Runtime | REM agent, CodeZip, Haiku | +| `message-queue` | SQS FIFO | MessageGroupId=actor_id, serializes per user | +| `rem-queue` | SQS | triggers REM after session idle | +| `tg-ingest` | Lambda | validates webhook, enqueues, sends typing | +| `agent-runner` | Lambda | SQS trigger, invokes Runtime 1, delivers response | +| `rem-runner` | Lambda | EventBridge + SQS trigger, invokes Runtime 2 | +| `webhook-api` | API GW HTTP | POST /telegram → tg-ingest | +| `heartbeat-rule` | EventBridge | every 30m → agent-runner (heartbeat prompt) | +| `rem-nightly` | EventBridge | nightly → rem-runner (factbase maintain) | +| `secrets` | Secrets Manager | bot-token, brave-api-key | + +--- + +## Runtime 1: Main Assistant + +**Language:** Python + Strands + bedrock-agentcore SDK +**Deploy:** CodeZip +**Model:** `us.anthropic.claude-3-7-sonnet-20250219-v1:0` +**Lifecycle:** `idleRuntimeSessionTimeout: 21600` (6hr), `maxLifetime: 28800` (8hr) + +### Session start +1. Check in-memory cache for workspace files (SOUL.md, AGENTS.md, USER.md) +2. If cache miss: load from S3, cache in-memory +3. `RetrieveMemoryRecords(query=recent_message, namespace=/preferences/actor_id/)` → top-k preferences +4. `ListEvents(session_id)` → recent conversation turns (if resuming) +5. Build system prompt: workspace files + retrieved memory +6. Initialize Strands agent with `AgentCoreMemorySessionManager` + +### Tools + +| Tool | Description | +|---|---| +| `send_message(text, metadata?)` | Delivers via channel adapter (decoupled) | +| `web_search(query)` | Brave Search API | +| `web_fetch(url)` | HTTP + readability extraction | +| `read_workspace_file(path)` | S3 get_object | +| `write_workspace_file(path, content)` | S3 put_object + update in-memory cache | +| `search_memory(query)` | AgentCore Memory RetrieveMemoryRecords | +| `factbase_search(query)` | factbase via AgentCore Gateway | + +**NOT included in Runtime 1:** factbase write tools (writes are async via Runtime 2 only) + +### Channel adapter (decoupled) + +```python +class ChannelAdapter(Protocol): + def send_message(self, text: str) -> str: ... # returns message_id + def send_typing(self) -> None: ... + def edit_message(self, msg_id: str, text: str) -> None: ... + +class TelegramAdapter: + def send_message(self, text): ... + def send_typing(self): ... # sendChatAction(typing) + def edit_message(self, msg_id, text): ... + +# Future: SlackAdapter, DiscordAdapter +# Channel type + target_id come from SQS message payload +# agent-runner instantiates correct adapter before invoking Runtime 1 +# adapter is passed in payload, not hardcoded in agent +``` + +The agent calls `send_message(text)` — it doesn't know or care about Telegram specifics. + +--- + +## Runtime 2: REM Agent + +**Language:** Python + Strands + bedrock-agentcore SDK +**Deploy:** CodeZip +**Model:** `us.anthropic.claude-3-5-haiku-20241022-v1:0` +**Lifecycle:** short sessions, `maxLifetime: 1800` (30min) + +### Trigger: session ended +1. agent-runner Lambda enqueues session summary to rem-queue when main session goes idle +2. rem-runner Lambda invokes Runtime 2 with: + - session summary (from AgentCore Memory SUMMARIZATION) + - actor_id for namespace routing + +### Trigger: nightly (EventBridge cron) +1. rem-runner Lambda invokes Runtime 2 with: "Run factbase maintain" + +### Tools + +| Tool | Description | +|---|---| +| `factbase_workflow_add(summary)` | factbase workflow add via AgentCore Gateway | +| `factbase_workflow_maintain()` | factbase maintain workflow | +| `read_workspace_file(path)` | S3 (for HEARTBEAT.md context) | +| `write_workspace_file(path, content)` | S3 (update HEARTBEAT.md if needed) | + +### REM flow +``` +Receive: session summary text +→ factbase workflow add: "Here is a conversation summary. Extract any new or updated + facts about people, projects, or decisions and add/update factbase entities." +→ factbase guides agent through: what's new, what conflicts, what to write +→ On nightly schedule: factbase workflow maintain +``` + +--- + +## Lambda: tg-ingest + +**Trigger:** API Gateway POST /telegram +**Runtime:** Python 3.12 + +```python +def handler(event, context): + # 1. Validate Telegram secret token header + if not validate_telegram_secret(event): + return {"statusCode": 403} + + # 2. Parse Telegram Update + update = json.loads(event["body"]) + chat_id = extract_chat_id(update) + message_text = extract_message(update) + + # 3. Send typing action immediately (non-blocking) + send_telegram_typing(chat_id) + + # 4. Enqueue to SQS FIFO + sqs.send_message( + QueueUrl=MESSAGE_QUEUE_URL, + MessageGroupId=str(chat_id), # serializes per conversation + MessageDeduplicationId=str(update["update_id"]), + MessageBody=json.dumps({ + "channel": "telegram", + "chat_id": chat_id, + "messages": [{"text": message_text, "from": update["message"]["from"]}], + "update_id": update["update_id"], + "timestamp": update["message"]["date"], + }) + ) + + return {"statusCode": 200} # Telegram acks within 3s requirement +``` + +--- + +## Lambda: agent-runner + +**Trigger:** SQS FIFO (batch_size=10, MessageGroupId=actor_id) +**Runtime:** Python 3.12 + +```python +def handler(event, context): + # 1. Batch all SQS records (same actor, serialized by FIFO) + records = [json.loads(r["body"]) for r in event["Records"]] + channel = records[0]["channel"] + chat_id = records[0]["chat_id"] + actor_id = f"{channel}:{chat_id}" + + # 2. Look up or create session in DynamoDB + session_id = get_or_create_session(actor_id) + + # 3. Bundle messages + if len(records) == 1: + prompt = records[0]["messages"][0]["text"] + else: + msgs = "\n".join(f"[{i+1}] {r['messages'][0]['text']}" for i, r in enumerate(records)) + prompt = f"You have {len(records)} queued messages:\n{msgs}" + + # 4. Invoke Runtime 1 + payload = { + "prompt": prompt, + "actor_id": actor_id, + "session_id": session_id, + "channel_adapter": {"type": channel, "target_id": str(chat_id)}, + "bot_token_secret_arn": BOT_TOKEN_SECRET_ARN, + } + + response = agentcore.invoke_agent_runtime( + agentRuntimeArn=RUNTIME_1_ARN, + runtimeSessionId=session_id, + payload=json.dumps(payload).encode() + ) + + # 5. Stream response — agent delivers its own messages via send_message tool + for chunk in response["response"]: + pass # consume stream; agent handles delivery internally + + # 6. Enqueue session summary for REM processing + sqs.send_message(QueueUrl=REM_QUEUE_URL, MessageBody=json.dumps({ + "actor_id": actor_id, + "session_id": session_id, + })) +``` + +--- + +## Lambda: rem-runner + +**Trigger:** SQS (rem-queue) + EventBridge (nightly) +**Runtime:** Python 3.12 + +```python +def handler(event, context): + # Determine trigger source + if is_eventbridge(event): + # Nightly: run maintain on factbase + payload = {"prompt": "Run factbase maintain workflow to consolidate memories, detect conflicts, and generate review questions."} + session_id = f"rem-nightly-{today()}" + else: + # Post-session: extract new facts + record = json.loads(event["Records"][0]["body"]) + actor_id = record["actor_id"] + session_id = record["session_id"] + summary = get_session_summary(actor_id, session_id) + payload = { + "prompt": f"Conversation summary to process:\n\n{summary}\n\nExtract new entities, facts, and updates.", + "actor_id": actor_id, + } + # Use dedicated REM session (separate from user session) + session_id = f"rem-{actor_id}-{today()}" + + agentcore.invoke_agent_runtime( + agentRuntimeArn=RUNTIME_2_ARN, + runtimeSessionId=session_id, + payload=json.dumps(payload).encode() + ) +``` + +--- + +## Factbase KB Design (perspective.yaml) + +```yaml +organization: Daniel Levy +focus: Personal chief of staff KB — professional network, projects, commitments, decisions + +entity_types: + person: Professional contacts — colleagues, customers, AWS team, interviewers, vendors + company: Organizations — AWS customers, target employers, partners, tools/services + project: Active and past projects — work deliverables, side projects, products + commitment: Explicit commitments made (by Daniel or to Daniel) with due dates + decision: Significant decisions made, rationale, outcomes + meeting: Key conversations — participants, decisions, follow-ups + +review: + stale_days: 90 + required_fields: + person: [current_role, company, last_contact_date] + commitment: [due_date, status, owner] + decision: [date, outcome] +``` + +--- + +## S3 Workspace Layout + +``` +s3://agent-claw-workspace-{account_id}/ +├── SOUL.md ← personality, tone, operating rules +├── AGENTS.md ← workspace conventions, memory rules +├── USER.md ← Daniel's profile, preferences +├── IDENTITY.md ← agent name, emoji +├── HEARTBEAT.md ← periodic task checklist +└── TOOLS.md ← tool-specific notes +``` + +Seeded from existing OpenClaw workspace files. CDK deploy script copies them if bucket is new. + +--- + +## Project Structure + +``` +agent-claw/ +├── cdk/ ← CDK TypeScript stack +│ ├── bin/agent-claw.ts +│ ├── lib/agent-claw-stack.ts +│ └── lib/constructs/ +│ ├── agentcore-runtime.ts +│ ├── memory.ts +│ └── eventbridge-rules.ts +├── src/ +│ ├── runtime-1/ ← Main assistant (CodeZip deploy) +│ │ ├── main.py +│ │ ├── tools/ +│ │ │ ├── __init__.py +│ │ │ ├── web.py ← web_search, web_fetch +│ │ │ ├── workspace.py ← read/write S3 workspace files +│ │ │ ├── memory.py ← search_memory (AgentCore Memory) +│ │ │ └── messaging.py ← send_message (channel adapter) +│ │ ├── channels/ +│ │ │ ├── adapter.py ← ChannelAdapter protocol +│ │ │ └── telegram.py ← TelegramAdapter +│ │ ├── prompt_builder.py ← builds system prompt from workspace + memory +│ │ └── pyproject.toml +│ ├── runtime-2/ ← REM agent (CodeZip deploy) +│ │ ├── main.py +│ │ ├── tools/ +│ │ │ ├── factbase.py ← factbase workflow tools via AgentCore Gateway +│ │ │ └── workspace.py ← shared workspace tools +│ │ └── pyproject.toml +│ └── lambdas/ +│ ├── tg-ingest/ +│ │ ├── handler.py +│ │ └── requirements.txt +│ ├── agent-runner/ +│ │ ├── handler.py +│ │ └── requirements.txt +│ └── rem-runner/ +│ ├── handler.py +│ └── requirements.txt +├── workspace/ ← seed files for S3 workspace +│ ├── SOUL.md +│ ├── AGENTS.md +│ ├── USER.md +│ └── HEARTBEAT.md +└── README.md +``` + +--- + +## Build Phases + +### Phase 0 — Infrastructure spike (1-2 days) +- CDK stack: deploy API GW + tg-ingest Lambda + SQS + AgentCore Runtime 1 (minimal agent) +- Test: Telegram message → Lambda → SQS → Runtime 1 → response back to Telegram +- Confirm: cold start latency, session warm behavior, basic round-trip + +### Phase 1 — Full main assistant (1 week) +- Runtime 1 with all tools: web_search, web_fetch, workspace read/write, send_message +- System prompt builder loading from S3 +- AgentCore Memory: short-term turns + USER_PREFERENCE +- Channel adapter: TelegramAdapter with typing action +- DynamoDB session mapping + +**Done when:** full conversation works with personality, web search, and memory across messages. + +### Phase 2 — REM + factbase (1 week) +- Runtime 2 deployed with factbase workflow tools +- rem-runner Lambda + rem-queue +- Post-session trigger from agent-runner +- EventBridge nightly REM schedule +- factbase perspective.yaml defined, KB seeded + +**Done when:** after a conversation, REM fires and writes relevant facts to factbase; next session the agent can search factbase for context. + +### Phase 3 — Heartbeat + scheduling (3-4 days) +- EventBridge heartbeat rule (every 30m → Runtime 1 with heartbeat prompt) +- HEARTBEAT.md checklist loaded at heartbeat time +- HEARTBEAT_OK suppression +- Dynamic cron via `create_cron` tool (EventBridge SDK) + +### Phase 4 — Polish +- Streaming progress (Telegram edit-in-place) +- Commitments system +- Error handling, DLQ, CloudWatch alarms +- Multi-user support (actor_id scoping already built in) + +--- + +*Design finalized 2026-05-04. All pre-development questions resolved.*