Phase 0: CDK stack + Lambdas + AgentCore Runtime 1 scaffold
- CDK TypeScript stack (AgentClawStack): - S3 workspace bucket with BucketDeployment seed - DynamoDB session-store (actor_id → session_id, TTL) - SQS FIFO message queue (serialized per actor) - Lambda: tg-ingest (webhook validation, typing action, SQS enqueue) - Lambda: agent-runner (SQS → InvokeAgentRuntime, session management) - API Gateway HTTP: POST /telegram → tg-ingest - AgentCore Runtime 1 IAM execution role - CDK outputs: WebhookUrl, WorkspaceBucketName, Runtime1RoleArn - Runtime 1 (Python + Strands + BedrockAgentCoreApp): - main.py: entrypoint, Strands agent, tool wiring - channels/: ChannelAdapter Protocol + TelegramAdapter (decoupled) - tools/: web_search (Brave), web_fetch, read/write_workspace_file, send_message - prompt_builder.py: loads SOUL.md/AGENTS.md/USER.md from S3 (cached) - Lambdas: - tg-ingest: validate X-Telegram-Bot-Api-Secret-Token, send typing, enqueue FIFO - agent-runner: session lookup/create in DDB, bundle batched messages, InvokeAgentRuntime - workspace/: seed files (SOUL.md, AGENTS.md, USER.md, IDENTITY.md, HEARTBEAT.md) NOTE: AgentCore Runtime 1 creation via CfnResource deferred — deploy CDK first, create runtime manually with the output Role ARN, then redeploy with runtime1Arn context param.
This commit is contained in:
118
src/lambdas/agent-runner/handler.py
Normal file
118
src/lambdas/agent-runner/handler.py
Normal file
@@ -0,0 +1,118 @@
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import uuid
|
||||
import boto3
|
||||
from typing import Any
|
||||
|
||||
# AWS clients
|
||||
_ddb = None
|
||||
_agentcore = None
|
||||
|
||||
|
||||
def get_ddb():
|
||||
global _ddb
|
||||
if _ddb is None:
|
||||
_ddb = boto3.resource('dynamodb')
|
||||
return _ddb
|
||||
|
||||
|
||||
def get_agentcore():
|
||||
global _agentcore
|
||||
if _agentcore is None:
|
||||
_agentcore = boto3.client('bedrock-agentcore', region_name='us-east-1')
|
||||
return _agentcore
|
||||
|
||||
|
||||
def get_or_create_session(actor_id: str) -> str:
|
||||
"""Look up active session for actor, or create a new one."""
|
||||
table = get_ddb().Table(os.environ['SESSION_TABLE_NAME'])
|
||||
|
||||
response = table.get_item(Key={'actor_id': actor_id})
|
||||
item = response.get('Item')
|
||||
|
||||
now = int(time.time())
|
||||
ttl_8hr = now + (8 * 3600)
|
||||
|
||||
if item and item.get('ttl', 0) > now:
|
||||
# Active session exists — extend TTL
|
||||
table.update_item(
|
||||
Key={'actor_id': actor_id},
|
||||
UpdateExpression='SET #ttl = :ttl',
|
||||
ExpressionAttributeNames={'#ttl': 'ttl'},
|
||||
ExpressionAttributeValues={':ttl': ttl_8hr},
|
||||
)
|
||||
return item['session_id']
|
||||
|
||||
# Create new session
|
||||
session_id = str(uuid.uuid4())
|
||||
table.put_item(Item={
|
||||
'actor_id': actor_id,
|
||||
'session_id': session_id,
|
||||
'created_at': str(now),
|
||||
'ttl': ttl_8hr,
|
||||
})
|
||||
return session_id
|
||||
|
||||
|
||||
def handler(event, context):
|
||||
# ── Parse SQS records (FIFO — all from same actor) ───────────────────
|
||||
records = []
|
||||
for record in event.get('Records', []):
|
||||
try:
|
||||
records.append(json.loads(record['body']))
|
||||
except (json.JSONDecodeError, KeyError):
|
||||
continue
|
||||
|
||||
if not records:
|
||||
return
|
||||
|
||||
first = records[0]
|
||||
channel = first.get('channel', 'telegram')
|
||||
chat_id = first.get('chat_id', '')
|
||||
actor_id = f"{channel}:{chat_id}"
|
||||
|
||||
# ── Get or create AgentCore session ──────────────────────────────────
|
||||
session_id = get_or_create_session(actor_id)
|
||||
|
||||
# ── Bundle messages ───────────────────────────────────────────────────
|
||||
if len(records) == 1:
|
||||
prompt = records[0]['messages'][0]['text']
|
||||
else:
|
||||
lines = [
|
||||
f"[{i+1}] {r['messages'][0]['text']}"
|
||||
for i, r in enumerate(records)
|
||||
]
|
||||
prompt = f"You have {len(records)} queued messages:\n" + "\n".join(lines)
|
||||
|
||||
# ── Build payload for AgentCore Runtime 1 ────────────────────────────
|
||||
payload: dict[str, Any] = {
|
||||
'prompt': prompt,
|
||||
'actor_id': actor_id,
|
||||
'session_id': session_id,
|
||||
'channel_adapter': {
|
||||
'type': channel,
|
||||
'target_id': str(chat_id),
|
||||
'bot_token_secret_arn': os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', ''),
|
||||
},
|
||||
}
|
||||
|
||||
# ── Invoke AgentCore Runtime 1 ────────────────────────────────────────
|
||||
runtime_arn = os.environ.get('RUNTIME_1_ARN', '')
|
||||
if not runtime_arn or runtime_arn == 'PLACEHOLDER_SET_AFTER_RUNTIME_DEPLOY':
|
||||
print(f"[agent-runner] RUNTIME_1_ARN not set — skipping AgentCore invoke")
|
||||
print(f"[agent-runner] Would have sent: {json.dumps(payload)[:200]}")
|
||||
return
|
||||
|
||||
client = get_agentcore()
|
||||
response = client.invoke_agent_runtime(
|
||||
agentRuntimeArn=runtime_arn,
|
||||
runtimeSessionId=session_id,
|
||||
payload=json.dumps(payload).encode(),
|
||||
)
|
||||
|
||||
# Consume streaming response (agent delivers to Telegram via send_message tool)
|
||||
for chunk in response.get('response', []):
|
||||
pass # intentional no-op — agent handles delivery internally
|
||||
|
||||
print(f"[agent-runner] Completed session={session_id} actor={actor_id}")
|
||||
Reference in New Issue
Block a user