""" agent-claw Runtime 1 — main assistant agent. Entrypoint for AgentCore CodeZip deployment. """ import os from strands import Agent, tool from strands.models import BedrockModel from bedrock_agentcore.runtime import BedrockAgentCoreApp from channels.telegram import TelegramAdapter from prompt_builder import build_system_prompt, invalidate_prompt from tools import web as web_tools from tools import workspace as ws_tools from tools import messaging from tools.home_assistant import home_assistant from mcp.client.streamable_http import streamablehttp_client from strands.tools.mcp.mcp_client import MCPClient import httpx import botocore.auth import botocore.awsrequest import boto3 from urllib.parse import urlparse as _urlparse WORKSPACE_MCP_URL = 'https://25hugrzw4uwtueeg77jsmft6lq0wunmd.lambda-url.us-east-1.on.aws/mcp' class _SigV4HttpxAuth(httpx.Auth): """SigV4 auth for Lambda Function URL with AWS_IAM.""" def __init__(self, region: str = 'us-east-1'): self._region = region def auth_flow(self, request): creds = boto3.Session().get_credentials().get_frozen_credentials() parsed = _urlparse(str(request.url)) aws_req = botocore.awsrequest.AWSRequest( method=request.method, url=str(request.url), data=request.content or b'', headers={ 'Host': parsed.hostname, 'Content-Type': request.headers.get('content-type', 'application/json'), 'Accept': request.headers.get('accept', 'application/json, text/event-stream'), } ) botocore.auth.SigV4Auth(creds, 'lambda', self._region).add_auth(aws_req) for k, v in aws_req.headers.items(): request.headers[k] = v yield request from bedrock_agentcore.memory.integrations.strands.config import AgentCoreMemoryConfig from bedrock_agentcore.memory.integrations.strands.session_manager import AgentCoreMemorySessionManager from strands_tools.code_interpreter import AgentCoreCodeInterpreter as _CodeInterpreterClient # Initialise once per warm session _code_interpreter = _CodeInterpreterClient(region='us-east-1') app = BedrockAgentCoreApp() # ── Tool definitions ────────────────────────────────────────────────────── @tool def send_message(text: str) -> str: """Send a message to the user through their channel (Telegram, Slack, etc.)""" return messaging.send(text) @tool def web_search(query: str) -> str: """Search the web using Brave Search. Returns titles, URLs, and snippets.""" return web_tools.brave_search(query) @tool def web_fetch(url: str) -> str: """Fetch and extract readable text content from a URL.""" return web_tools.web_fetch(url) @tool def read_workspace_file(path: str) -> str: """Read a file from the agent workspace (SOUL.md, HEARTBEAT.md, etc.)""" return ws_tools.read_file(path) @tool def write_workspace_file(path: str, content: str) -> str: """Write or update a file in the agent workspace.""" result = ws_tools.write_file(path, content) invalidate_prompt() # force system prompt rebuild if persona files changed return result # ── Entrypoint ──────────────────────────────────────────────────────────── @app.entrypoint def main(payload: dict, context) -> dict: """Handle an invocation from agent-runner Lambda.""" # Set up channel adapter adapter_config = payload.get('channel_adapter', {}) channel_type = adapter_config.get('type', 'telegram') if channel_type == 'telegram': adapter = TelegramAdapter( chat_id=adapter_config.get('target_id', ''), bot_token_secret_arn=adapter_config.get('bot_token_secret_arn', ''), ) else: # Future channels: instantiate appropriate adapter raise ValueError(f"Unsupported channel type: {channel_type}") messaging.set_adapter(adapter) # Start typing indicator immediately, keep it alive in background import threading _typing_active = True def _keep_typing(): adapter.send_typing() import time while _typing_active: time.sleep(4) if _typing_active: adapter.send_typing() typing_thread = threading.Thread(target=_keep_typing, daemon=True) typing_thread.start() # Set up AgentCore Memory session manager (short + long term via session_manager) MEMORY_ID = 'agentclaw_AgentClawMemory-i7Csf776AH' actor_id = payload.get('actor_id', adapter_config.get('target_id', 'default')) session_id = payload.get('session_id', f'session-{actor_id}') memory_config = AgentCoreMemoryConfig( memory_id=MEMORY_ID, session_id=session_id, actor_id=actor_id, ) session_manager = AgentCoreMemorySessionManager( agentcore_memory_config=memory_config, region_name='us-east-1', ) # Build system prompt — base cached, user context injected per-invocation user_profile = payload.get('user_profile', {}) user_context = '' if user_profile: name = user_profile.get('display_name', '') username = user_profile.get('telegram_username', '') user_context = f'Name: {name}' if username: user_context += f'\nTelegram username: @{username}' system_prompt = build_system_prompt(user_context=user_context, actor_id=actor_id) # Model: claude-sonnet-4-6 via cross-region inference model = BedrockModel( model_id="us.anthropic.claude-sonnet-4-6", region_name="us-east-1", ) base_tools = [send_message, web_search, web_fetch, read_workspace_file, write_workspace_file, _code_interpreter.code_interpreter, home_assistant] def _run_agent(tools): agent = Agent( model=model, system_prompt=system_prompt, session_manager=session_manager, tools=tools, ) return agent(payload.get('prompt', '')) workspace_mcp_client = MCPClient( lambda: streamablehttp_client(WORKSPACE_MCP_URL, timeout=20, auth=_SigV4HttpxAuth()) ) workspace_tools = [] try: with workspace_mcp_client: workspace_tools = workspace_mcp_client.list_tools_sync() except Exception as e: print(f'[main] workspace_mcp unavailable ({type(e).__name__}) — continuing without it') try: result = _run_agent(base_tools + list(workspace_tools)) finally: _typing_active = False # Flush buffered memory events session_manager.close() # Deliver final response — only send if agent didn't already call send_message tool. # If the tool was used, the response is already delivered. The fallback handles # cases where the agent responds directly without calling the tool. if not messaging.was_sent() and result.message: # Extract plain text from Strands result (avoid sending raw dict/JSON) msg = result.message if isinstance(msg, dict): content = msg.get('content', {}) if isinstance(content, dict): msg = content.get('text', str(content)) elif isinstance(content, list): msg = ' '.join(c.get('text', '') for c in content if isinstance(c, dict)) else: msg = str(content) adapter.send(str(msg)) return {'result': result.message} app.run()