Files
agent-claw/agentclaw/app/agent_claw_main/main.py.bak

318 lines
12 KiB
Python

"""
agent-claw Runtime 1 — main assistant agent.
Entrypoint for AgentCore CodeZip deployment.
"""
import os
from strands import Agent, tool
from strands.models import BedrockModel
from bedrock_agentcore.runtime import BedrockAgentCoreApp
from channels.telegram import TelegramAdapter
from prompt_builder import build_system_prompt, invalidate_prompt
from tools import web as web_tools
from tools import workspace as ws_tools
from tools import messaging
from tools.home_assistant import home_assistant, set_ha_config
from mcp.client.streamable_http import streamablehttp_client
from strands.tools.mcp.mcp_client import MCPClient
import httpx
import botocore.auth
import botocore.awsrequest
import boto3
from urllib.parse import urlparse as _urlparse
WORKSPACE_MCP_URL = 'https://25hugrzw4uwtueeg77jsmft6lq0wunmd.lambda-url.us-east-1.on.aws/mcp'
OAUTH_START_URL = os.environ.get('OAUTH_START_URL', '')
USERS_TABLE_NAME = os.environ.get('USERS_TABLE_NAME', 'agent-claw-users')
class _SigV4HttpxAuth(httpx.Auth):
"""SigV4 auth for Lambda Function URL with AWS_IAM, plus X-Actor-Id header."""
def __init__(self, region: str = 'us-east-1', actor_id: str = ''):
self._region = region
self._actor_id = actor_id
def auth_flow(self, request):
creds = boto3.Session().get_credentials().get_frozen_credentials()
parsed = _urlparse(str(request.url))
aws_req = botocore.awsrequest.AWSRequest(
method=request.method,
url=str(request.url),
data=request.content or b'',
headers={
'Host': parsed.hostname,
'Content-Type': request.headers.get('content-type', 'application/json'),
'Accept': request.headers.get('accept', 'application/json, text/event-stream'),
}
)
botocore.auth.SigV4Auth(creds, 'lambda', self._region).add_auth(aws_req)
for k, v in aws_req.headers.items():
request.headers[k] = v
if self._actor_id:
request.headers['x-actor-id'] = self._actor_id
yield request
from bedrock_agentcore.memory.integrations.strands.config import AgentCoreMemoryConfig
from bedrock_agentcore.memory.integrations.strands.session_manager import AgentCoreMemorySessionManager
from strands_tools.code_interpreter import AgentCoreCodeInterpreter as _CodeInterpreterClient
# Initialise once per warm session
_code_interpreter = _CodeInterpreterClient(region='us-east-1')
app = BedrockAgentCoreApp()
# ── Tool definitions ──────────────────────────────────────────────────────
@tool
def send_message(text: str) -> str:
"""Send a message to the user. Use multiple calls to send incrementally - send the direct answer first, then elaboration. Each call delivers immediately to the user."""
return messaging.send(text)
@tool
def web_search(query: str) -> str:
"""Search the web using Brave Search. Returns titles, URLs, and snippets."""
return web_tools.brave_search(query)
@tool
def web_fetch(url: str) -> str:
"""Fetch and extract readable text content from a URL."""
return web_tools.web_fetch(url)
@tool
def read_workspace_file(path: str) -> str:
"""Read a file from the agent workspace (SOUL.md, HEARTBEAT.md, etc.)"""
return ws_tools.read_file(path)
@tool
def write_workspace_file(path: str, content: str) -> str:
"""Write or update a file in the agent workspace."""
result = ws_tools.write_file(path, content)
invalidate_prompt() # force system prompt rebuild if persona files changed
return result
@tool
def connect_google_account() -> str:
"""Generate a Google OAuth authorization URL for the current user to connect their Google account.
Use this when the user wants to connect Google Workspace (Gmail, Calendar, Drive, etc.)
or when Google tools fail due to missing credentials."""
if not OAUTH_START_URL:
return 'Google OAuth is not configured. Set OAUTH_START_URL environment variable.'
actor_id = _current_actor_id
if not actor_id:
return 'Cannot determine actor_id for OAuth flow.'
url = f'{OAUTH_START_URL}?actor_id={actor_id}'
return f'Please open this URL to connect your Google account:\n{url}\n\nAfter authorizing, Google Workspace tools (Gmail, Calendar, Drive) will be available.'
@tool
def manage_service(action: str, service: str, config: dict | None = None) -> str:
"""Enroll, update, remove, or list external services for your account.
Actions:
- "enroll": Add or update a service (requires service name and config dict).
- "remove": Remove a service by name.
- "list": List all enrolled services (shows service names, not secrets).
Supported services:
- "home_assistant": config = {"url": "https://your-ha-url", "token": "long-lived-access-token"}
Examples:
- Enroll HA: manage_service(action="enroll", service="home_assistant",
config={"url": "https://ha.example.com", "token": "eyJ..."})
- Remove HA: manage_service(action="remove", service="home_assistant")
- List all: manage_service(action="list")
"""
actor_id = _current_actor_id
if not actor_id:
return 'Cannot determine actor_id.'
ddb = boto3.resource('dynamodb', region_name='us-east-1')
table = ddb.Table(USERS_TABLE_NAME)
if action == 'list':
resp = table.get_item(Key={'actor_id': actor_id})
services = resp.get('Item', {}).get('services', {})
if not services:
return 'No services enrolled.'
lines = [f"- {svc}: configured" for svc in services]
return 'Enrolled services:\n' + '\n'.join(lines)
elif action == 'enroll':
if not service:
return 'service name is required.'
if not config:
return 'config dict is required for enroll.'
# Validate known services
if service == 'home_assistant':
if 'url' not in config or 'token' not in config:
return 'home_assistant config requires "url" and "token" keys.'
# Update in-memory config immediately for this session
set_ha_config(config['url'], config['token'])
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET services = if_not_exists(services, :empty), services.#svc = :cfg',
ExpressionAttributeNames={'#svc': service},
ExpressionAttributeValues={':cfg': config, ':empty': {}},
)
return f'Service "{service}" enrolled successfully.'
elif action == 'remove':
if not service:
return 'service name is required.'
if service == 'home_assistant':
set_ha_config('', '')
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='REMOVE services.#svc',
ExpressionAttributeNames={'#svc': service},
)
return f'Service "{service}" removed.'
else:
return f'Unknown action: {action}. Use "enroll", "remove", or "list".'
# ── Entrypoint ────────────────────────────────────────────────────────────
# Module-level actor_id for tool closures (set per-invocation)
_current_actor_id: str = ''
@app.entrypoint
def main(payload: dict, context) -> dict:
"""Handle an invocation from agent-runner Lambda."""
global _current_actor_id
# Set up channel adapter
adapter_config = payload.get('channel_adapter', {})
channel_type = adapter_config.get('type', 'telegram')
if channel_type == 'telegram':
adapter = TelegramAdapter(
chat_id=adapter_config.get('target_id', ''),
bot_token_secret_arn=adapter_config.get('bot_token_secret_arn', ''),
)
else:
raise ValueError(f"Unsupported channel type: {channel_type}")
messaging.set_adapter(adapter)
# Start typing indicator immediately, keep it alive in background
import threading
_typing_active = True
def _keep_typing():
adapter.send_typing()
import time
while _typing_active:
time.sleep(4)
if _typing_active:
adapter.send_typing()
typing_thread = threading.Thread(target=_keep_typing, daemon=True)
typing_thread.start()
# Set up AgentCore Memory session manager (short + long term via session_manager)
MEMORY_ID = 'agentclaw_AgentClawMemory-i7Csf776AH'
actor_id = payload.get('actor_id', adapter_config.get('target_id', 'default'))
session_id = payload.get('session_id', f'session-{actor_id}')
_current_actor_id = actor_id
memory_config = AgentCoreMemoryConfig(
memory_id=MEMORY_ID,
session_id=session_id,
actor_id=actor_id,
)
session_manager = AgentCoreMemorySessionManager(
agentcore_memory_config=memory_config,
region_name='us-east-1',
)
# Inject per-user service configs
user_profile = payload.get('user_profile', {})
services = user_profile.get('services', {})
ha_cfg = services.get('home_assistant', {})
set_ha_config(ha_cfg.get('url', ''), ha_cfg.get('token', ''))
# Build system prompt — base cached, user context injected per-invocation
user_context = ''
if user_profile:
name = user_profile.get('display_name', '')
username = user_profile.get('telegram_username', '')
google_email = user_profile.get('google_email', '')
user_context = f'Name: {name}'
if username:
user_context += f'\nTelegram username: @{username}'
if google_email:
user_context += f'\nGoogle account: {google_email}'
else:
user_context += '\nGoogle account: not connected (use connect_google_account tool to connect)'
enrolled = list(services.keys())
if enrolled:
user_context += f'\nEnrolled services: {", ".join(enrolled)}'
system_prompt = build_system_prompt(user_context=user_context, actor_id=actor_id)
# Model: claude-sonnet-4-6 via cross-region inference
model = BedrockModel(
model_id="us.anthropic.claude-sonnet-4-6",
region_name="us-east-1",
)
base_tools = [send_message, web_search, web_fetch, read_workspace_file, write_workspace_file,
_code_interpreter.code_interpreter, home_assistant, connect_google_account,
manage_service]
def _run_agent(tools):
agent = Agent(
model=model,
system_prompt=system_prompt,
session_manager=session_manager,
tools=tools,
)
return agent(payload.get('prompt', ''))
workspace_mcp_client = MCPClient(
lambda: streamablehttp_client(WORKSPACE_MCP_URL, timeout=20, auth=_SigV4HttpxAuth(actor_id=actor_id))
)
workspace_tools = []
google_email = user_profile.get('google_email', '')
if google_email:
try:
with workspace_mcp_client:
workspace_tools = workspace_mcp_client.list_tools_sync()
except Exception as e:
print(f'[main] workspace_mcp unavailable ({type(e).__name__}) — continuing without it')
else:
print(f'[main] actor={actor_id} has no google_email — skipping workspace_mcp')
try:
result = _run_agent(base_tools + list(workspace_tools))
finally:
_typing_active = False
# Flush buffered memory events
session_manager.close()
# Deliver final response
if not messaging.was_sent() and result.message:
msg = result.message
if isinstance(msg, dict):
content = msg.get('content', {})
if isinstance(content, dict):
msg = content.get('text', str(content))
elif isinstance(content, list):
msg = ' '.join(c.get('text', '') for c in content if isinstance(c, dict))
else:
msg = str(content)
adapter.send(str(msg))
return {'result': result.message}
app.run()