Files
agent-claw/agentclaw/app/agent_claw_main/main.py
daniel 893c110729 multi-tenant Phase 1: user registry + per-user memory
- CDK: add agent-claw-users DynamoDB table (actor_id PK, RETAIN policy)
- CDK: grant agent-runner read/write on users table; add USERS_TABLE_NAME env
- CDK: fix cdk.json app field (was object, must be command string)
- CDK: add UsersTableName output
- agent-runner: get_or_create_user() auto-registers users on first contact
  (stores display_name, telegram_username, created_at, allowed)
- agent-runner: pass user_profile in AgentCore payload
- prompt_builder: split base prompt (cached) from per-user context (injected per-call)
  removes USER.md/MEMORY.md from shared load; user name/username injected dynamically
- main.py: extract user_profile from payload, build user_context string for prompt
2026-05-06 20:36:22 -05:00

210 lines
7.4 KiB
Python

"""
agent-claw Runtime 1 — main assistant agent.
Entrypoint for AgentCore CodeZip deployment.
"""
import os
from strands import Agent, tool
from strands.models import BedrockModel
from bedrock_agentcore.runtime import BedrockAgentCoreApp
from channels.telegram import TelegramAdapter
from prompt_builder import build_system_prompt, invalidate_prompt
from tools import web as web_tools
from tools import workspace as ws_tools
from tools import messaging
from tools.home_assistant import home_assistant
from mcp.client.streamable_http import streamablehttp_client
from strands.tools.mcp.mcp_client import MCPClient
import httpx
import botocore.auth
import botocore.awsrequest
import boto3
from urllib.parse import urlparse as _urlparse
WORKSPACE_MCP_URL = 'https://25hugrzw4uwtueeg77jsmft6lq0wunmd.lambda-url.us-east-1.on.aws/mcp'
class _SigV4HttpxAuth(httpx.Auth):
"""SigV4 auth for Lambda Function URL with AWS_IAM."""
def __init__(self, region: str = 'us-east-1'):
self._region = region
def auth_flow(self, request):
creds = boto3.Session().get_credentials().get_frozen_credentials()
parsed = _urlparse(str(request.url))
aws_req = botocore.awsrequest.AWSRequest(
method=request.method,
url=str(request.url),
data=request.content or b'',
headers={
'Host': parsed.hostname,
'Content-Type': request.headers.get('content-type', 'application/json'),
'Accept': request.headers.get('accept', 'application/json, text/event-stream'),
}
)
botocore.auth.SigV4Auth(creds, 'lambda', self._region).add_auth(aws_req)
for k, v in aws_req.headers.items():
request.headers[k] = v
yield request
from bedrock_agentcore.memory.integrations.strands.config import AgentCoreMemoryConfig
from bedrock_agentcore.memory.integrations.strands.session_manager import AgentCoreMemorySessionManager
from strands_tools.code_interpreter import AgentCoreCodeInterpreter as _CodeInterpreterClient
# Initialise once per warm session
_code_interpreter = _CodeInterpreterClient(region='us-east-1')
app = BedrockAgentCoreApp()
# ── Tool definitions ──────────────────────────────────────────────────────
@tool
def send_message(text: str) -> str:
"""Send a message to the user through their channel (Telegram, Slack, etc.)"""
return messaging.send(text)
@tool
def web_search(query: str) -> str:
"""Search the web using Brave Search. Returns titles, URLs, and snippets."""
return web_tools.brave_search(query)
@tool
def web_fetch(url: str) -> str:
"""Fetch and extract readable text content from a URL."""
return web_tools.web_fetch(url)
@tool
def read_workspace_file(path: str) -> str:
"""Read a file from the agent workspace (SOUL.md, HEARTBEAT.md, etc.)"""
return ws_tools.read_file(path)
@tool
def write_workspace_file(path: str, content: str) -> str:
"""Write or update a file in the agent workspace."""
result = ws_tools.write_file(path, content)
invalidate_prompt() # force system prompt rebuild if persona files changed
return result
# ── Entrypoint ────────────────────────────────────────────────────────────
@app.entrypoint
def main(payload: dict, context) -> dict:
"""Handle an invocation from agent-runner Lambda."""
# Set up channel adapter
adapter_config = payload.get('channel_adapter', {})
channel_type = adapter_config.get('type', 'telegram')
if channel_type == 'telegram':
adapter = TelegramAdapter(
chat_id=adapter_config.get('target_id', ''),
bot_token_secret_arn=adapter_config.get('bot_token_secret_arn', ''),
)
else:
# Future channels: instantiate appropriate adapter
raise ValueError(f"Unsupported channel type: {channel_type}")
messaging.set_adapter(adapter)
# Start typing indicator immediately, keep it alive in background
import threading
_typing_active = True
def _keep_typing():
adapter.send_typing()
import time
while _typing_active:
time.sleep(4)
if _typing_active:
adapter.send_typing()
typing_thread = threading.Thread(target=_keep_typing, daemon=True)
typing_thread.start()
# Set up AgentCore Memory session manager (short + long term via session_manager)
MEMORY_ID = 'agentclaw_AgentClawMemory-i7Csf776AH'
actor_id = payload.get('actor_id', adapter_config.get('target_id', 'default'))
session_id = payload.get('session_id', f'session-{actor_id}')
memory_config = AgentCoreMemoryConfig(
memory_id=MEMORY_ID,
session_id=session_id,
actor_id=actor_id,
)
session_manager = AgentCoreMemorySessionManager(
agentcore_memory_config=memory_config,
region_name='us-east-1',
)
# Build system prompt — base cached, user context injected per-invocation
user_profile = payload.get('user_profile', {})
user_context = ''
if user_profile:
name = user_profile.get('display_name', '')
username = user_profile.get('telegram_username', '')
user_context = f'Name: {name}'
if username:
user_context += f'\nTelegram username: @{username}'
system_prompt = build_system_prompt(user_context=user_context)
# Model: claude-sonnet-4-6 via cross-region inference
model = BedrockModel(
model_id="us.anthropic.claude-sonnet-4-6",
region_name="us-east-1",
)
base_tools = [send_message, web_search, web_fetch, read_workspace_file, write_workspace_file,
_code_interpreter.code_interpreter, home_assistant]
def _run_agent(tools):
agent = Agent(
model=model,
system_prompt=system_prompt,
session_manager=session_manager,
tools=tools,
)
return agent(payload.get('prompt', ''))
workspace_mcp_client = MCPClient(
lambda: streamablehttp_client(WORKSPACE_MCP_URL, timeout=20, auth=_SigV4HttpxAuth())
)
workspace_tools = []
try:
with workspace_mcp_client:
workspace_tools = workspace_mcp_client.list_tools_sync()
except Exception as e:
print(f'[main] workspace_mcp unavailable ({type(e).__name__}) — continuing without it')
try:
result = _run_agent(base_tools + list(workspace_tools))
finally:
_typing_active = False
# Flush buffered memory events
session_manager.close()
# Deliver final response — only send if agent didn't already call send_message tool.
# If the tool was used, the response is already delivered. The fallback handles
# cases where the agent responds directly without calling the tool.
if not messaging.was_sent() and result.message:
# Extract plain text from Strands result (avoid sending raw dict/JSON)
msg = result.message
if isinstance(msg, dict):
content = msg.get('content', {})
if isinstance(content, dict):
msg = content.get('text', str(content))
elif isinstance(content, list):
msg = ' '.join(c.get('text', '') for c in content if isinstance(c, dict))
else:
msg = str(content)
adapter.send(str(msg))
return {'result': result.message}
app.run()