""" agent-claw Runtime 1 — main assistant agent. Entrypoint for AgentCore CodeZip deployment. """ import os from strands import Agent, tool from strands.models import BedrockModel from bedrock_agentcore.runtime import BedrockAgentCoreApp from channels.telegram import TelegramAdapter from prompt_builder import build_system_prompt, invalidate_prompt from tools import web as web_tools from tools import workspace as ws_tools from tools import messaging from tools.scheduler import schedule_reminder, list_reminders, cancel_reminder import tools.scheduler as _scheduler_module from tools.home_assistant import home_assistant, set_ha_config from tools.google_workspace import list_calendars, get_calendar_events, list_gmail_messages, get_gmail_message import tools.google_workspace as _gws import httpx import botocore.auth import botocore.awsrequest import boto3 from urllib.parse import urlparse as _urlparse OAUTH_START_URL = ( os.environ.get('OAUTH_START_URL') or 'https://sptejrymri.execute-api.us-east-1.amazonaws.com/oauth/start' ) USERS_TABLE_NAME = os.environ.get('USERS_TABLE_NAME', 'agent-claw-users') class _SigV4HttpxAuth(httpx.Auth): """SigV4 auth for Lambda Function URL with AWS_IAM, plus X-Actor-Id header.""" def __init__(self, region: str = 'us-east-1', actor_id: str = ''): self._region = region self._actor_id = actor_id def auth_flow(self, request): creds = boto3.Session().get_credentials().get_frozen_credentials() parsed = _urlparse(str(request.url)) aws_req = botocore.awsrequest.AWSRequest( method=request.method, url=str(request.url), data=request.content or b'', headers={ 'Host': parsed.hostname, 'Content-Type': request.headers.get('content-type', 'application/json'), 'Accept': request.headers.get('accept', 'application/json, text/event-stream'), } ) botocore.auth.SigV4Auth(creds, 'lambda', self._region).add_auth(aws_req) for k, v in aws_req.headers.items(): request.headers[k] = v if self._actor_id: request.headers['x-actor-id'] = self._actor_id yield request from bedrock_agentcore.memory.integrations.strands.config import AgentCoreMemoryConfig from bedrock_agentcore.memory.integrations.strands.session_manager import AgentCoreMemorySessionManager # code_interpreter removed — causes [Errno 98] port 8080 conflict on warm container re-init app = BedrockAgentCoreApp() # ── Tool definitions ────────────────────────────────────────────────────── # NOTE: send_message tool removed — delivery handled by agent-runner streaming consumer @tool def web_search(query: str) -> str: """Search the web using Brave Search. Returns titles, URLs, and snippets.""" return web_tools.brave_search(query) @tool def web_fetch(url: str) -> str: """Fetch and extract readable text content from a URL.""" return web_tools.web_fetch(url) @tool def read_workspace_file(path: str) -> str: """Read a file from the agent workspace (SOUL.md, HEARTBEAT.md, etc.)""" return ws_tools.read_file(path) @tool def write_workspace_file(path: str, content: str) -> str: """Write or update a file in the agent workspace.""" result = ws_tools.write_file(path, content) invalidate_prompt() # force system prompt rebuild if persona files changed return result @tool def connect_google_account(label: str = 'primary') -> str: """Connect a Google account with a custom label (e.g. 'work', 'personal'). Defaults to 'primary'. Use this when the user wants to connect Google Workspace (Gmail, Calendar, Drive, etc.) or when Google tools fail due to missing credentials.""" if not OAUTH_START_URL: return 'Google OAuth is not configured. Set OAUTH_START_URL environment variable.' actor_id = _current_actor_id if not actor_id: return 'Cannot determine actor_id for OAuth flow.' url = f'{OAUTH_START_URL}?actor_id={actor_id}&label={label}' return f'Please open this URL to connect your Google account as "{label}":\n{url}\n\nAfter authorizing, Google Workspace tools (Gmail, Calendar, Drive) will be available.' @tool def list_google_accounts() -> str: """List all connected Google accounts and their labels.""" actor_id = _current_actor_id if actor_id: try: safe_actor_id = actor_id.replace(':', '-') prefix = f'agent-claw/google-credentials/{safe_actor_id}/' sm = boto3.client('secretsmanager', region_name='us-east-1') paginator = sm.get_paginator('list_secrets') accounts = {} for page in paginator.paginate(Filters=[{'Key': 'name', 'Values': [prefix]}]): for s in page['SecretList']: label = s['Name'][len(prefix):] try: import json as _json val = _json.loads(sm.get_secret_value(SecretId=s['Name'])['SecretString']) accounts[label] = val.get('email', s['Name']) except Exception: accounts[label] = s['Name'] if accounts: parts = [f'{label} ({email})' for label, email in accounts.items()] return 'Connected Google accounts: ' + ', '.join(parts) except Exception as e: print(f'[list_google_accounts] SM lookup failed, falling back: {e}') accounts = _gws._current_google_accounts if not accounts: return 'No Google accounts connected. Use connect_google_account to add one.' parts = [f'{label} ({email})' for label, email in accounts.items()] return 'Connected Google accounts: ' + ', '.join(parts) @tool def remove_google_account(label: str) -> str: """Remove a connected Google account by label (e.g. 'work', 'personal').""" actor_id = _current_actor_id if not actor_id: return 'Cannot determine actor_id.' safe_actor_id = actor_id.replace(':', '-') ddb = boto3.resource('dynamodb', region_name='us-east-1') table = ddb.Table(USERS_TABLE_NAME) resp = table.get_item(Key={'actor_id': actor_id}) accounts = resp.get('Item', {}).get('google_accounts', {}) if label not in accounts: return f'No Google account with label "{label}" found.' if len(accounts) <= 1: return 'Cannot remove the last Google account. At least one must remain.' email = accounts.get(label, label) sm = boto3.client('secretsmanager', region_name='us-east-1') try: sm.delete_secret( SecretId=f'agent-claw/google-credentials/{safe_actor_id}/{label}', ForceDeleteWithoutRecovery=True, ) except Exception: pass # secret may already be gone table.update_item( Key={'actor_id': actor_id}, UpdateExpression='REMOVE google_accounts.#label', ExpressionAttributeNames={'#label': label}, ) return f'Disconnected {label} ({email}) from your Google accounts.' @tool def manage_service(action: str, service: str, config: dict | None = None) -> str: """Enroll, update, remove, or list external services for your account. Actions: - "enroll": Add or update a service (requires service name and config dict). - "remove": Remove a service by name. - "list": List all enrolled services (shows service names, not secrets). Supported services: - "home_assistant": config = {"url": "https://your-ha-url", "token": "long-lived-access-token"} Examples: - Enroll HA: manage_service(action="enroll", service="home_assistant", config={"url": "https://ha.example.com", "token": "eyJ..."}) - Remove HA: manage_service(action="remove", service="home_assistant") - List all: manage_service(action="list") """ actor_id = _current_actor_id if not actor_id: return 'Cannot determine actor_id.' ddb = boto3.resource('dynamodb', region_name='us-east-1') table = ddb.Table(USERS_TABLE_NAME) if action == 'list': resp = table.get_item(Key={'actor_id': actor_id}) services = resp.get('Item', {}).get('services', {}) if not services: return 'No services enrolled.' lines = [f"- {svc}: configured" for svc in services] return 'Enrolled services:\n' + '\n'.join(lines) elif action == 'enroll': if not service: return 'service name is required.' if not config: return 'config dict is required for enroll.' if service == 'home_assistant': if 'url' not in config or 'token' not in config: return 'home_assistant config requires "url" and "token" keys.' set_ha_config(config['url'], config['token']) table.update_item( Key={'actor_id': actor_id}, UpdateExpression='SET services = if_not_exists(services, :empty), services.#svc = :cfg', ExpressionAttributeNames={'#svc': service}, ExpressionAttributeValues={':cfg': config, ':empty': {}}, ) return f'Service "{service}" enrolled successfully.' elif action == 'remove': if not service: return 'service name is required.' if service == 'home_assistant': set_ha_config('', '') table.update_item( Key={'actor_id': actor_id}, UpdateExpression='REMOVE services.#svc', ExpressionAttributeNames={'#svc': service}, ) return f'Service "{service}" removed.' else: return f'Unknown action: {action}. Use "enroll", "remove", or "list".' # ── Entrypoint ──────────────────────────────────────────────────────────── # Module-level actor_id for tool closures (set per-invocation) _current_actor_id: str = '' _current_chat_id: str = '' @app.entrypoint async def main(payload: dict, context): """Handle an invocation from agent-runner Lambda (streaming).""" global _current_actor_id # Set up channel adapter adapter_config = payload.get('channel_adapter', {}) channel_type = adapter_config.get('type', 'telegram') actor_id_early = payload.get('actor_id', adapter_config.get('target_id', 'default')) _current_actor_id = actor_id_early _gws._current_actor_id = actor_id_early # sync to google_workspace module if channel_type == 'telegram': adapter = TelegramAdapter( chat_id=adapter_config.get('target_id', ''), bot_token_secret_arn=adapter_config.get('bot_token_secret_arn', ''), message_thread_id=adapter_config.get('message_thread_id'), ) else: raise ValueError(f"Unsupported channel type: {channel_type}") messaging.set_adapter(adapter) # Start typing indicator immediately, keep it alive in background import threading _typing_active = True def _keep_typing(): adapter.send_typing() import time while _typing_active: time.sleep(4) if _typing_active: adapter.send_typing() typing_thread = threading.Thread(target=_keep_typing, daemon=True) typing_thread.start() # Set up AgentCore Memory session manager (short + long term via session_manager) MEMORY_ID = 'agentclaw_AgentClawMemory-i7Csf776AH' actor_id = payload.get('actor_id', adapter_config.get('target_id', 'default')) session_id = payload.get('session_id', f'session-{actor_id}') _current_actor_id = actor_id chat_id = adapter_config.get('target_id', '') _current_chat_id = chat_id _scheduler_module._current_actor_id = actor_id _scheduler_module._current_chat_id = chat_id memory_config = AgentCoreMemoryConfig( memory_id=MEMORY_ID, session_id=session_id, actor_id=actor_id, ) session_manager = AgentCoreMemorySessionManager( agentcore_memory_config=memory_config, region_name='us-east-1', ) # Inject per-user service configs user_profile = payload.get('user_profile', {}) services = user_profile.get('services', {}) ha_cfg = services.get('home_assistant', {}) set_ha_config(ha_cfg.get('url', ''), ha_cfg.get('token', '')) # Sync google_accounts to google_workspace module google_accounts = user_profile.get('google_accounts', {}) _gws._current_google_accounts = google_accounts # Build system prompt — base cached, user context injected per-invocation user_context = '' if user_profile: name = user_profile.get('display_name', '') username = user_profile.get('telegram_username', '') user_context = f'Name: {name}' if username: user_context += f'\nTelegram username: @{username}' if google_accounts: acct_list = ', '.join(f'{label} ({email})' for label, email in google_accounts.items()) user_context += f'\nGoogle accounts: {acct_list}' else: user_context += '\nGoogle account: not connected (use connect_google_account tool to connect)' enrolled = list(services.keys()) if enrolled: user_context += f'\nEnrolled services: {", ".join(enrolled)}' system_prompt = build_system_prompt(user_context=user_context, actor_id=actor_id) # Inject current datetime so the model always has accurate time context from datetime import datetime from zoneinfo import ZoneInfo _tz = ZoneInfo('America/Chicago') _now = datetime.now(_tz) _time_str = _now.strftime('%A, %B %d, %Y %I:%M %p %Z') system_prompt = system_prompt + f'\n\nCurrent date/time: {_time_str}' print(f'[main] System prompt time injection: {_time_str}') # Model: claude-sonnet-4-6 via cross-region inference # NOTE: extended thinking disabled — causes retry/duplicate issues with streaming from botocore.config import Config as BotoConfig model = BedrockModel( model_id="us.anthropic.claude-sonnet-4-6", region_name="us-east-1", boto_client_config=BotoConfig(read_timeout=600, connect_timeout=10), ) base_tools = [web_search, web_fetch, read_workspace_file, write_workspace_file, home_assistant, connect_google_account, list_google_accounts, remove_google_account, manage_service, schedule_reminder, list_reminders, cancel_reminder, list_calendars, get_calendar_events, list_gmail_messages, get_gmail_message] agent = Agent( model=model, system_prompt=system_prompt, session_manager=session_manager, tools=base_tools, ) final_message = None try: async for event in agent.stream_async(payload.get('prompt', '')): if 'result' in event: final_message = event['result'].message yield event except Exception as e: # Catch ALL exceptions including ReadTimeoutError to prevent AgentCore retry. # A retry re-runs the full agent loop causing duplicate Telegram messages. print(f'[main] Agent error (suppressed to prevent retry): {type(e).__name__}: {e}') if final_message: yield {'data': str(final_message), 'result': {'message': final_message}} finally: _typing_active = False session_manager.close() app.run()