""" agent-claw Runtime 1 — main assistant agent. Entrypoint for AgentCore CodeZip deployment. """ import os from strands import Agent, tool from strands.models import BedrockModel from bedrock_agentcore.runtime import BedrockAgentCoreApp from channels.telegram import TelegramAdapter from prompt_builder import build_system_prompt, invalidate_prompt from tools import web as web_tools from tools import workspace as ws_tools from tools import messaging from tools.home_assistant import home_assistant from mcp.client.streamable_http import streamablehttp_client from strands.tools.mcp.mcp_client import MCPClient import httpx import botocore.auth import botocore.awsrequest import boto3 from urllib.parse import urlparse as _urlparse WORKSPACE_MCP_URL = 'https://25hugrzw4uwtueeg77jsmft6lq0wunmd.lambda-url.us-east-1.on.aws/mcp' OAUTH_START_URL = os.environ.get('OAUTH_START_URL', '') class _SigV4HttpxAuth(httpx.Auth): """SigV4 auth for Lambda Function URL with AWS_IAM, plus X-Actor-Id header.""" def __init__(self, region: str = 'us-east-1', actor_id: str = ''): self._region = region self._actor_id = actor_id def auth_flow(self, request): creds = boto3.Session().get_credentials().get_frozen_credentials() parsed = _urlparse(str(request.url)) aws_req = botocore.awsrequest.AWSRequest( method=request.method, url=str(request.url), data=request.content or b'', headers={ 'Host': parsed.hostname, 'Content-Type': request.headers.get('content-type', 'application/json'), 'Accept': request.headers.get('accept', 'application/json, text/event-stream'), } ) botocore.auth.SigV4Auth(creds, 'lambda', self._region).add_auth(aws_req) for k, v in aws_req.headers.items(): request.headers[k] = v if self._actor_id: request.headers['x-actor-id'] = self._actor_id yield request from bedrock_agentcore.memory.integrations.strands.config import AgentCoreMemoryConfig from bedrock_agentcore.memory.integrations.strands.session_manager import AgentCoreMemorySessionManager from strands_tools.code_interpreter import AgentCoreCodeInterpreter as _CodeInterpreterClient # Initialise once per warm session _code_interpreter = _CodeInterpreterClient(region='us-east-1') app = BedrockAgentCoreApp() # ── Tool definitions ────────────────────────────────────────────────────── @tool def send_message(text: str) -> str: """Send a message to the user through their channel (Telegram, Slack, etc.)""" return messaging.send(text) @tool def web_search(query: str) -> str: """Search the web using Brave Search. Returns titles, URLs, and snippets.""" return web_tools.brave_search(query) @tool def web_fetch(url: str) -> str: """Fetch and extract readable text content from a URL.""" return web_tools.web_fetch(url) @tool def read_workspace_file(path: str) -> str: """Read a file from the agent workspace (SOUL.md, HEARTBEAT.md, etc.)""" return ws_tools.read_file(path) @tool def write_workspace_file(path: str, content: str) -> str: """Write or update a file in the agent workspace.""" result = ws_tools.write_file(path, content) invalidate_prompt() # force system prompt rebuild if persona files changed return result @tool def connect_google_account() -> str: """Generate a Google OAuth authorization URL for the current user to connect their Google account. Use this when the user wants to connect Google Workspace (Gmail, Calendar, Drive, etc.) or when Google tools fail due to missing credentials.""" if not OAUTH_START_URL: return 'Google OAuth is not configured. Set OAUTH_START_URL environment variable.' # actor_id is injected into the tool's closure via _current_actor_id module-level var actor_id = _current_actor_id if not actor_id: return 'Cannot determine actor_id for OAuth flow.' url = f'{OAUTH_START_URL}?actor_id={actor_id}' return f'Please open this URL to connect your Google account:\n{url}\n\nAfter authorizing, Google Workspace tools (Gmail, Calendar, Drive) will be available.' # ── Entrypoint ──────────────────────────────────────────────────────────── # Module-level actor_id for tool closures (set per-invocation) _current_actor_id: str = '' @app.entrypoint def main(payload: dict, context) -> dict: """Handle an invocation from agent-runner Lambda.""" global _current_actor_id # Set up channel adapter adapter_config = payload.get('channel_adapter', {}) channel_type = adapter_config.get('type', 'telegram') if channel_type == 'telegram': adapter = TelegramAdapter( chat_id=adapter_config.get('target_id', ''), bot_token_secret_arn=adapter_config.get('bot_token_secret_arn', ''), ) else: # Future channels: instantiate appropriate adapter raise ValueError(f"Unsupported channel type: {channel_type}") messaging.set_adapter(adapter) # Start typing indicator immediately, keep it alive in background import threading _typing_active = True def _keep_typing(): adapter.send_typing() import time while _typing_active: time.sleep(4) if _typing_active: adapter.send_typing() typing_thread = threading.Thread(target=_keep_typing, daemon=True) typing_thread.start() # Set up AgentCore Memory session manager (short + long term via session_manager) MEMORY_ID = 'agentclaw_AgentClawMemory-i7Csf776AH' actor_id = payload.get('actor_id', adapter_config.get('target_id', 'default')) session_id = payload.get('session_id', f'session-{actor_id}') _current_actor_id = actor_id memory_config = AgentCoreMemoryConfig( memory_id=MEMORY_ID, session_id=session_id, actor_id=actor_id, ) session_manager = AgentCoreMemorySessionManager( agentcore_memory_config=memory_config, region_name='us-east-1', ) # Build system prompt — base cached, user context injected per-invocation user_profile = payload.get('user_profile', {}) user_context = '' if user_profile: name = user_profile.get('display_name', '') username = user_profile.get('telegram_username', '') google_email = user_profile.get('google_email', '') user_context = f'Name: {name}' if username: user_context += f'\nTelegram username: @{username}' if google_email: user_context += f'\nGoogle account: {google_email}' else: user_context += '\nGoogle account: not connected (use connect_google_account tool to connect)' system_prompt = build_system_prompt(user_context=user_context, actor_id=actor_id) # Model: claude-sonnet-4-6 via cross-region inference model = BedrockModel( model_id="us.anthropic.claude-sonnet-4-6", region_name="us-east-1", ) base_tools = [send_message, web_search, web_fetch, read_workspace_file, write_workspace_file, _code_interpreter.code_interpreter, home_assistant, connect_google_account] def _run_agent(tools): agent = Agent( model=model, system_prompt=system_prompt, session_manager=session_manager, tools=tools, ) return agent(payload.get('prompt', '')) workspace_mcp_client = MCPClient( lambda: streamablehttp_client(WORKSPACE_MCP_URL, timeout=20, auth=_SigV4HttpxAuth(actor_id=actor_id)) ) workspace_tools = [] google_email = user_profile.get('google_email', '') if google_email: # Only attempt workspace-mcp if user has connected Google try: with workspace_mcp_client: workspace_tools = workspace_mcp_client.list_tools_sync() except Exception as e: print(f'[main] workspace_mcp unavailable ({type(e).__name__}) — continuing without it') else: print(f'[main] actor={actor_id} has no google_email — skipping workspace_mcp') try: result = _run_agent(base_tools + list(workspace_tools)) finally: _typing_active = False # Flush buffered memory events session_manager.close() # Deliver final response — only send if agent didn't already call send_message tool. # If the tool was used, the response is already delivered. The fallback handles # cases where the agent responds directly without calling the tool. if not messaging.was_sent() and result.message: # Extract plain text from Strands result (avoid sending raw dict/JSON) msg = result.message if isinstance(msg, dict): content = msg.get('content', {}) if isinstance(content, dict): msg = content.get('text', str(content)) elif isinstance(content, list): msg = ' '.join(c.get('text', '') for c in content if isinstance(c, dict)) else: msg = str(content) adapter.send(str(msg)) return {'result': result.message} app.run()