Add final architecture design doc
This commit is contained in:
414
DESIGN.md
Normal file
414
DESIGN.md
Normal file
@@ -0,0 +1,414 @@
|
||||
# agent-claw: Final Architecture Design
|
||||
|
||||
*Decisions locked 2026-05-04*
|
||||
|
||||
---
|
||||
|
||||
## System Overview
|
||||
|
||||
A serverless personal assistant running on AWS AgentCore. No always-on compute. Telegram-first, channel-agnostic by design. Two AgentCore runtimes: one user-facing, one background (REM).
|
||||
|
||||
```
|
||||
Telegram ──→ API GW ──→ Lambda(tg-ingest) ──→ SQS FIFO ──→ Lambda(agent-runner)
|
||||
│
|
||||
InvokeAgentRuntime(Runtime 1)
|
||||
│
|
||||
Runtime 1: main assistant
|
||||
reads: AgentCore Memory, factbase (Gateway),
|
||||
S3 workspace
|
||||
delivers via: channel adapter → Telegram
|
||||
│
|
||||
session ends (idle)
|
||||
│
|
||||
EventBridge ──→ Lambda(rem-runner)
|
||||
│
|
||||
InvokeAgentRuntime(Runtime 2)
|
||||
│
|
||||
Runtime 2: REM agent
|
||||
calls: factbase workflow add/maintain
|
||||
reads: AgentCore Memory session summary
|
||||
writes: factbase (via AgentCore Gateway)
|
||||
|
||||
EventBridge ──→ Lambda(heartbeat-runner) ──→ InvokeAgentRuntime(Runtime 1, heartbeat prompt)
|
||||
EventBridge ──→ Lambda(rem-runner) ──→ InvokeAgentRuntime(Runtime 2, nightly maintain)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## CDK Stack
|
||||
|
||||
**Stack: `AgentClawStack`**
|
||||
|
||||
All resources are create-or-use-existing via CDK context parameters or SSM lookups. Deploy is idempotent.
|
||||
|
||||
```typescript
|
||||
// Context parameters (CDK --context flags or cdk.json)
|
||||
// agentcore-memory-arn: existing memory ARN (optional, creates if absent)
|
||||
// workspace-bucket-name: existing S3 bucket (optional, creates if absent)
|
||||
// telegram-bot-token-secret-arn: existing secret (required before deploy)
|
||||
// factbase-gateway-url: AgentCore Gateway endpoint for factbase tools
|
||||
```
|
||||
|
||||
**Resources provisioned:**
|
||||
|
||||
| Resource | Type | Notes |
|
||||
|---|---|---|
|
||||
| `workspace-bucket` | S3 | SOUL.md, AGENTS.md, USER.md, HEARTBEAT.md |
|
||||
| `session-store` | DynamoDB | actor_id → {session_id, created_at} |
|
||||
| `personal-memory` | AgentCore Memory | SUMMARIZATION + USER_PREFERENCE strategies |
|
||||
| `runtime-1` | AgentCore Runtime | main assistant, CodeZip, Sonnet |
|
||||
| `runtime-2` | AgentCore Runtime | REM agent, CodeZip, Haiku |
|
||||
| `message-queue` | SQS FIFO | MessageGroupId=actor_id, serializes per user |
|
||||
| `rem-queue` | SQS | triggers REM after session idle |
|
||||
| `tg-ingest` | Lambda | validates webhook, enqueues, sends typing |
|
||||
| `agent-runner` | Lambda | SQS trigger, invokes Runtime 1, delivers response |
|
||||
| `rem-runner` | Lambda | EventBridge + SQS trigger, invokes Runtime 2 |
|
||||
| `webhook-api` | API GW HTTP | POST /telegram → tg-ingest |
|
||||
| `heartbeat-rule` | EventBridge | every 30m → agent-runner (heartbeat prompt) |
|
||||
| `rem-nightly` | EventBridge | nightly → rem-runner (factbase maintain) |
|
||||
| `secrets` | Secrets Manager | bot-token, brave-api-key |
|
||||
|
||||
---
|
||||
|
||||
## Runtime 1: Main Assistant
|
||||
|
||||
**Language:** Python + Strands + bedrock-agentcore SDK
|
||||
**Deploy:** CodeZip
|
||||
**Model:** `us.anthropic.claude-3-7-sonnet-20250219-v1:0`
|
||||
**Lifecycle:** `idleRuntimeSessionTimeout: 21600` (6hr), `maxLifetime: 28800` (8hr)
|
||||
|
||||
### Session start
|
||||
1. Check in-memory cache for workspace files (SOUL.md, AGENTS.md, USER.md)
|
||||
2. If cache miss: load from S3, cache in-memory
|
||||
3. `RetrieveMemoryRecords(query=recent_message, namespace=/preferences/actor_id/)` → top-k preferences
|
||||
4. `ListEvents(session_id)` → recent conversation turns (if resuming)
|
||||
5. Build system prompt: workspace files + retrieved memory
|
||||
6. Initialize Strands agent with `AgentCoreMemorySessionManager`
|
||||
|
||||
### Tools
|
||||
|
||||
| Tool | Description |
|
||||
|---|---|
|
||||
| `send_message(text, metadata?)` | Delivers via channel adapter (decoupled) |
|
||||
| `web_search(query)` | Brave Search API |
|
||||
| `web_fetch(url)` | HTTP + readability extraction |
|
||||
| `read_workspace_file(path)` | S3 get_object |
|
||||
| `write_workspace_file(path, content)` | S3 put_object + update in-memory cache |
|
||||
| `search_memory(query)` | AgentCore Memory RetrieveMemoryRecords |
|
||||
| `factbase_search(query)` | factbase via AgentCore Gateway |
|
||||
|
||||
**NOT included in Runtime 1:** factbase write tools (writes are async via Runtime 2 only)
|
||||
|
||||
### Channel adapter (decoupled)
|
||||
|
||||
```python
|
||||
class ChannelAdapter(Protocol):
|
||||
def send_message(self, text: str) -> str: ... # returns message_id
|
||||
def send_typing(self) -> None: ...
|
||||
def edit_message(self, msg_id: str, text: str) -> None: ...
|
||||
|
||||
class TelegramAdapter:
|
||||
def send_message(self, text): ...
|
||||
def send_typing(self): ... # sendChatAction(typing)
|
||||
def edit_message(self, msg_id, text): ...
|
||||
|
||||
# Future: SlackAdapter, DiscordAdapter
|
||||
# Channel type + target_id come from SQS message payload
|
||||
# agent-runner instantiates correct adapter before invoking Runtime 1
|
||||
# adapter is passed in payload, not hardcoded in agent
|
||||
```
|
||||
|
||||
The agent calls `send_message(text)` — it doesn't know or care about Telegram specifics.
|
||||
|
||||
---
|
||||
|
||||
## Runtime 2: REM Agent
|
||||
|
||||
**Language:** Python + Strands + bedrock-agentcore SDK
|
||||
**Deploy:** CodeZip
|
||||
**Model:** `us.anthropic.claude-3-5-haiku-20241022-v1:0`
|
||||
**Lifecycle:** short sessions, `maxLifetime: 1800` (30min)
|
||||
|
||||
### Trigger: session ended
|
||||
1. agent-runner Lambda enqueues session summary to rem-queue when main session goes idle
|
||||
2. rem-runner Lambda invokes Runtime 2 with:
|
||||
- session summary (from AgentCore Memory SUMMARIZATION)
|
||||
- actor_id for namespace routing
|
||||
|
||||
### Trigger: nightly (EventBridge cron)
|
||||
1. rem-runner Lambda invokes Runtime 2 with: "Run factbase maintain"
|
||||
|
||||
### Tools
|
||||
|
||||
| Tool | Description |
|
||||
|---|---|
|
||||
| `factbase_workflow_add(summary)` | factbase workflow add via AgentCore Gateway |
|
||||
| `factbase_workflow_maintain()` | factbase maintain workflow |
|
||||
| `read_workspace_file(path)` | S3 (for HEARTBEAT.md context) |
|
||||
| `write_workspace_file(path, content)` | S3 (update HEARTBEAT.md if needed) |
|
||||
|
||||
### REM flow
|
||||
```
|
||||
Receive: session summary text
|
||||
→ factbase workflow add: "Here is a conversation summary. Extract any new or updated
|
||||
facts about people, projects, or decisions and add/update factbase entities."
|
||||
→ factbase guides agent through: what's new, what conflicts, what to write
|
||||
→ On nightly schedule: factbase workflow maintain
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Lambda: tg-ingest
|
||||
|
||||
**Trigger:** API Gateway POST /telegram
|
||||
**Runtime:** Python 3.12
|
||||
|
||||
```python
|
||||
def handler(event, context):
|
||||
# 1. Validate Telegram secret token header
|
||||
if not validate_telegram_secret(event):
|
||||
return {"statusCode": 403}
|
||||
|
||||
# 2. Parse Telegram Update
|
||||
update = json.loads(event["body"])
|
||||
chat_id = extract_chat_id(update)
|
||||
message_text = extract_message(update)
|
||||
|
||||
# 3. Send typing action immediately (non-blocking)
|
||||
send_telegram_typing(chat_id)
|
||||
|
||||
# 4. Enqueue to SQS FIFO
|
||||
sqs.send_message(
|
||||
QueueUrl=MESSAGE_QUEUE_URL,
|
||||
MessageGroupId=str(chat_id), # serializes per conversation
|
||||
MessageDeduplicationId=str(update["update_id"]),
|
||||
MessageBody=json.dumps({
|
||||
"channel": "telegram",
|
||||
"chat_id": chat_id,
|
||||
"messages": [{"text": message_text, "from": update["message"]["from"]}],
|
||||
"update_id": update["update_id"],
|
||||
"timestamp": update["message"]["date"],
|
||||
})
|
||||
)
|
||||
|
||||
return {"statusCode": 200} # Telegram acks within 3s requirement
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Lambda: agent-runner
|
||||
|
||||
**Trigger:** SQS FIFO (batch_size=10, MessageGroupId=actor_id)
|
||||
**Runtime:** Python 3.12
|
||||
|
||||
```python
|
||||
def handler(event, context):
|
||||
# 1. Batch all SQS records (same actor, serialized by FIFO)
|
||||
records = [json.loads(r["body"]) for r in event["Records"]]
|
||||
channel = records[0]["channel"]
|
||||
chat_id = records[0]["chat_id"]
|
||||
actor_id = f"{channel}:{chat_id}"
|
||||
|
||||
# 2. Look up or create session in DynamoDB
|
||||
session_id = get_or_create_session(actor_id)
|
||||
|
||||
# 3. Bundle messages
|
||||
if len(records) == 1:
|
||||
prompt = records[0]["messages"][0]["text"]
|
||||
else:
|
||||
msgs = "\n".join(f"[{i+1}] {r['messages'][0]['text']}" for i, r in enumerate(records))
|
||||
prompt = f"You have {len(records)} queued messages:\n{msgs}"
|
||||
|
||||
# 4. Invoke Runtime 1
|
||||
payload = {
|
||||
"prompt": prompt,
|
||||
"actor_id": actor_id,
|
||||
"session_id": session_id,
|
||||
"channel_adapter": {"type": channel, "target_id": str(chat_id)},
|
||||
"bot_token_secret_arn": BOT_TOKEN_SECRET_ARN,
|
||||
}
|
||||
|
||||
response = agentcore.invoke_agent_runtime(
|
||||
agentRuntimeArn=RUNTIME_1_ARN,
|
||||
runtimeSessionId=session_id,
|
||||
payload=json.dumps(payload).encode()
|
||||
)
|
||||
|
||||
# 5. Stream response — agent delivers its own messages via send_message tool
|
||||
for chunk in response["response"]:
|
||||
pass # consume stream; agent handles delivery internally
|
||||
|
||||
# 6. Enqueue session summary for REM processing
|
||||
sqs.send_message(QueueUrl=REM_QUEUE_URL, MessageBody=json.dumps({
|
||||
"actor_id": actor_id,
|
||||
"session_id": session_id,
|
||||
}))
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Lambda: rem-runner
|
||||
|
||||
**Trigger:** SQS (rem-queue) + EventBridge (nightly)
|
||||
**Runtime:** Python 3.12
|
||||
|
||||
```python
|
||||
def handler(event, context):
|
||||
# Determine trigger source
|
||||
if is_eventbridge(event):
|
||||
# Nightly: run maintain on factbase
|
||||
payload = {"prompt": "Run factbase maintain workflow to consolidate memories, detect conflicts, and generate review questions."}
|
||||
session_id = f"rem-nightly-{today()}"
|
||||
else:
|
||||
# Post-session: extract new facts
|
||||
record = json.loads(event["Records"][0]["body"])
|
||||
actor_id = record["actor_id"]
|
||||
session_id = record["session_id"]
|
||||
summary = get_session_summary(actor_id, session_id)
|
||||
payload = {
|
||||
"prompt": f"Conversation summary to process:\n\n{summary}\n\nExtract new entities, facts, and updates.",
|
||||
"actor_id": actor_id,
|
||||
}
|
||||
# Use dedicated REM session (separate from user session)
|
||||
session_id = f"rem-{actor_id}-{today()}"
|
||||
|
||||
agentcore.invoke_agent_runtime(
|
||||
agentRuntimeArn=RUNTIME_2_ARN,
|
||||
runtimeSessionId=session_id,
|
||||
payload=json.dumps(payload).encode()
|
||||
)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Factbase KB Design (perspective.yaml)
|
||||
|
||||
```yaml
|
||||
organization: Daniel Levy
|
||||
focus: Personal chief of staff KB — professional network, projects, commitments, decisions
|
||||
|
||||
entity_types:
|
||||
person: Professional contacts — colleagues, customers, AWS team, interviewers, vendors
|
||||
company: Organizations — AWS customers, target employers, partners, tools/services
|
||||
project: Active and past projects — work deliverables, side projects, products
|
||||
commitment: Explicit commitments made (by Daniel or to Daniel) with due dates
|
||||
decision: Significant decisions made, rationale, outcomes
|
||||
meeting: Key conversations — participants, decisions, follow-ups
|
||||
|
||||
review:
|
||||
stale_days: 90
|
||||
required_fields:
|
||||
person: [current_role, company, last_contact_date]
|
||||
commitment: [due_date, status, owner]
|
||||
decision: [date, outcome]
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## S3 Workspace Layout
|
||||
|
||||
```
|
||||
s3://agent-claw-workspace-{account_id}/
|
||||
├── SOUL.md ← personality, tone, operating rules
|
||||
├── AGENTS.md ← workspace conventions, memory rules
|
||||
├── USER.md ← Daniel's profile, preferences
|
||||
├── IDENTITY.md ← agent name, emoji
|
||||
├── HEARTBEAT.md ← periodic task checklist
|
||||
└── TOOLS.md ← tool-specific notes
|
||||
```
|
||||
|
||||
Seeded from existing OpenClaw workspace files. CDK deploy script copies them if bucket is new.
|
||||
|
||||
---
|
||||
|
||||
## Project Structure
|
||||
|
||||
```
|
||||
agent-claw/
|
||||
├── cdk/ ← CDK TypeScript stack
|
||||
│ ├── bin/agent-claw.ts
|
||||
│ ├── lib/agent-claw-stack.ts
|
||||
│ └── lib/constructs/
|
||||
│ ├── agentcore-runtime.ts
|
||||
│ ├── memory.ts
|
||||
│ └── eventbridge-rules.ts
|
||||
├── src/
|
||||
│ ├── runtime-1/ ← Main assistant (CodeZip deploy)
|
||||
│ │ ├── main.py
|
||||
│ │ ├── tools/
|
||||
│ │ │ ├── __init__.py
|
||||
│ │ │ ├── web.py ← web_search, web_fetch
|
||||
│ │ │ ├── workspace.py ← read/write S3 workspace files
|
||||
│ │ │ ├── memory.py ← search_memory (AgentCore Memory)
|
||||
│ │ │ └── messaging.py ← send_message (channel adapter)
|
||||
│ │ ├── channels/
|
||||
│ │ │ ├── adapter.py ← ChannelAdapter protocol
|
||||
│ │ │ └── telegram.py ← TelegramAdapter
|
||||
│ │ ├── prompt_builder.py ← builds system prompt from workspace + memory
|
||||
│ │ └── pyproject.toml
|
||||
│ ├── runtime-2/ ← REM agent (CodeZip deploy)
|
||||
│ │ ├── main.py
|
||||
│ │ ├── tools/
|
||||
│ │ │ ├── factbase.py ← factbase workflow tools via AgentCore Gateway
|
||||
│ │ │ └── workspace.py ← shared workspace tools
|
||||
│ │ └── pyproject.toml
|
||||
│ └── lambdas/
|
||||
│ ├── tg-ingest/
|
||||
│ │ ├── handler.py
|
||||
│ │ └── requirements.txt
|
||||
│ ├── agent-runner/
|
||||
│ │ ├── handler.py
|
||||
│ │ └── requirements.txt
|
||||
│ └── rem-runner/
|
||||
│ ├── handler.py
|
||||
│ └── requirements.txt
|
||||
├── workspace/ ← seed files for S3 workspace
|
||||
│ ├── SOUL.md
|
||||
│ ├── AGENTS.md
|
||||
│ ├── USER.md
|
||||
│ └── HEARTBEAT.md
|
||||
└── README.md
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Build Phases
|
||||
|
||||
### Phase 0 — Infrastructure spike (1-2 days)
|
||||
- CDK stack: deploy API GW + tg-ingest Lambda + SQS + AgentCore Runtime 1 (minimal agent)
|
||||
- Test: Telegram message → Lambda → SQS → Runtime 1 → response back to Telegram
|
||||
- Confirm: cold start latency, session warm behavior, basic round-trip
|
||||
|
||||
### Phase 1 — Full main assistant (1 week)
|
||||
- Runtime 1 with all tools: web_search, web_fetch, workspace read/write, send_message
|
||||
- System prompt builder loading from S3
|
||||
- AgentCore Memory: short-term turns + USER_PREFERENCE
|
||||
- Channel adapter: TelegramAdapter with typing action
|
||||
- DynamoDB session mapping
|
||||
|
||||
**Done when:** full conversation works with personality, web search, and memory across messages.
|
||||
|
||||
### Phase 2 — REM + factbase (1 week)
|
||||
- Runtime 2 deployed with factbase workflow tools
|
||||
- rem-runner Lambda + rem-queue
|
||||
- Post-session trigger from agent-runner
|
||||
- EventBridge nightly REM schedule
|
||||
- factbase perspective.yaml defined, KB seeded
|
||||
|
||||
**Done when:** after a conversation, REM fires and writes relevant facts to factbase; next session the agent can search factbase for context.
|
||||
|
||||
### Phase 3 — Heartbeat + scheduling (3-4 days)
|
||||
- EventBridge heartbeat rule (every 30m → Runtime 1 with heartbeat prompt)
|
||||
- HEARTBEAT.md checklist loaded at heartbeat time
|
||||
- HEARTBEAT_OK suppression
|
||||
- Dynamic cron via `create_cron` tool (EventBridge SDK)
|
||||
|
||||
### Phase 4 — Polish
|
||||
- Streaming progress (Telegram edit-in-place)
|
||||
- Commitments system
|
||||
- Error handling, DLQ, CloudWatch alarms
|
||||
- Multi-user support (actor_id scoping already built in)
|
||||
|
||||
---
|
||||
|
||||
*Design finalized 2026-05-04. All pre-development questions resolved.*
|
||||
Reference in New Issue
Block a user