Files
agent-claw/agentclaw/app/agent_claw_main/main.py

552 lines
22 KiB
Python

"""
agent-claw Runtime 1 — main assistant agent.
Entrypoint for AgentCore CodeZip deployment.
"""
import os
from strands import Agent, tool
from strands.models import BedrockModel
from bedrock_agentcore.runtime import BedrockAgentCoreApp
from channels.telegram import TelegramAdapter
from prompt_builder import build_system_prompt, invalidate_prompt
import memory_manager
import config
from tools import web as web_tools
from tools import workspace as ws_tools
from tools import messaging
from tools.scheduler import schedule_reminder, list_reminders, cancel_reminder
import tools.scheduler as _scheduler_module
from tools.home_assistant import home_assistant, set_ha_config
from tools.google_workspace import list_calendars, get_calendar_events, list_gmail_messages, get_gmail_message
from tools.send_file import send_file as _send_file_impl
from tools.mcp_tools import manage_mcp_connection
import tools.mcp_tools as _mcp_tools_module
import tools.google_workspace as _gws
import mcp_loader
import httpx
import botocore.auth
import botocore.awsrequest
import boto3
from urllib.parse import urlparse as _urlparse
OAUTH_START_URL = (
os.environ.get('OAUTH_START_URL')
or 'https://sptejrymri.execute-api.us-east-1.amazonaws.com/oauth/start'
)
USERS_TABLE_NAME = os.environ.get('USERS_TABLE_NAME', 'agent-claw-users')
EXECUTION_ROLE_ARN = os.environ.get('EXECUTION_ROLE_ARN', '')
class _SigV4HttpxAuth(httpx.Auth):
"""SigV4 auth for Lambda Function URL with AWS_IAM, plus X-Actor-Id header."""
def __init__(self, region: str = 'us-east-1', actor_id: str = ''):
self._region = region
self._actor_id = actor_id
def auth_flow(self, request):
creds = boto3.Session().get_credentials().get_frozen_credentials()
parsed = _urlparse(str(request.url))
aws_req = botocore.awsrequest.AWSRequest(
method=request.method,
url=str(request.url),
data=request.content or b'',
headers={
'Host': parsed.hostname,
'Content-Type': request.headers.get('content-type', 'application/json'),
'Accept': request.headers.get('accept', 'application/json, text/event-stream'),
}
)
botocore.auth.SigV4Auth(creds, 'lambda', self._region).add_auth(aws_req)
for k, v in aws_req.headers.items():
request.headers[k] = v
if self._actor_id:
request.headers['x-actor-id'] = self._actor_id
yield request
from bedrock_agentcore.memory.integrations.strands.config import AgentCoreMemoryConfig
from bedrock_agentcore.memory.integrations.strands.session_manager import AgentCoreMemorySessionManager
# code_interpreter removed — causes [Errno 98] port 8080 conflict on warm container re-init
from tools.code_interpreter import run_code
app = BedrockAgentCoreApp()
_aws_mcp_client = None
_aws_mcp_tools = []
try:
from strands.tools.mcp import MCPClient
from mcp_proxy_for_aws.client import aws_iam_streamablehttp_client
_aws_mcp_client = MCPClient(
lambda: aws_iam_streamablehttp_client(config.AWS_MCP_URL, aws_service="aws-mcp")
)
_aws_mcp_tools = [_aws_mcp_client]
print('[main] AWS MCP client created')
except Exception as _e:
import traceback
print(f'[main] AWS MCP client failed: {type(_e).__name__}: {_e}')
print(traceback.format_exc())
# ── Tool definitions ──────────────────────────────────────────────────────
# NOTE: send_message tool removed — delivery handled by agent-runner streaming consumer
@tool
def web_search(query: str) -> str:
"""Search the web using Brave Search. Returns titles, URLs, and snippets."""
return web_tools.brave_search(query)
@tool
def send_file(file_content: str, filename: str, caption: str = '') -> str:
"""Send a file to the user as a Telegram document attachment.
Use this when you need to send code, data, or any text content as a downloadable file.
Args:
file_content: The text content of the file to send.
filename: The filename with extension (e.g. 'report.txt', 'data.csv', 'script.py').
caption: Optional caption to display with the file.
"""
return _send_file_impl(file_content, filename, caption)
@tool
def web_fetch(url: str) -> str:
"""Fetch and extract readable text content from a URL."""
return web_tools.web_fetch(url)
@tool
def read_workspace_file(path: str) -> str:
"""Read a file from the agent workspace (SOUL.md, HEARTBEAT.md, etc.)"""
return ws_tools.read_file(path)
@tool
def write_workspace_file(path: str, content: str) -> str:
"""Write or update a file in the agent workspace."""
result = ws_tools.write_file(path, content)
invalidate_prompt() # force system prompt rebuild if persona files changed
return result
@tool
def connect_google_account(label: str = 'primary') -> str:
"""Connect a Google account with a custom label (e.g. 'work', 'personal'). Defaults to 'primary'.
Use this when the user wants to connect Google Workspace (Gmail, Calendar, Drive, etc.)
or when Google tools fail due to missing credentials."""
if not OAUTH_START_URL:
return 'Google OAuth is not configured. Set OAUTH_START_URL environment variable.'
actor_id = _current_actor_id
if not actor_id:
return 'Cannot determine actor_id for OAuth flow.'
url = f'{OAUTH_START_URL}?actor_id={actor_id}&label={label}'
return f'Please open this URL to connect your Google account as "{label}":\n{url}\n\nAfter authorizing, Google Workspace tools (Gmail, Calendar, Drive) will be available.'
@tool
def list_google_accounts() -> str:
"""List all connected Google accounts and their labels."""
actor_id = _current_actor_id
if actor_id:
try:
safe_actor_id = actor_id.replace(':', '-')
prefix = f'agent-claw/google-credentials/{safe_actor_id}/'
sm = boto3.client('secretsmanager', region_name='us-east-1')
paginator = sm.get_paginator('list_secrets')
accounts = {}
for page in paginator.paginate(Filters=[{'Key': 'name', 'Values': [prefix]}]):
for s in page['SecretList']:
label = s['Name'][len(prefix):]
try:
import json as _json
val = _json.loads(sm.get_secret_value(SecretId=s['Name'])['SecretString'])
accounts[label] = val.get('email', s['Name'])
except Exception:
accounts[label] = s['Name']
if accounts:
parts = [f'{label} ({email})' for label, email in accounts.items()]
return 'Connected Google accounts: ' + ', '.join(parts)
except Exception as e:
print(f'[list_google_accounts] SM lookup failed, falling back: {e}')
accounts = _gws._current_google_accounts
if not accounts:
return 'No Google accounts connected. Use connect_google_account to add one.'
parts = [f'{label} ({email})' for label, email in accounts.items()]
return 'Connected Google accounts: ' + ', '.join(parts)
@tool
def remove_google_account(label: str) -> str:
"""Remove a connected Google account by label (e.g. 'work', 'personal')."""
actor_id = _current_actor_id
if not actor_id:
return 'Cannot determine actor_id.'
safe_actor_id = actor_id.replace(':', '-')
ddb = boto3.resource('dynamodb', region_name='us-east-1')
table = ddb.Table(USERS_TABLE_NAME)
resp = table.get_item(Key={'actor_id': actor_id})
accounts = resp.get('Item', {}).get('google_accounts', {})
if label not in accounts:
return f'No Google account with label "{label}" found.'
if len(accounts) <= 1:
return 'Cannot remove the last Google account. At least one must remain.'
email = accounts.get(label, label)
sm = boto3.client('secretsmanager', region_name='us-east-1')
try:
sm.delete_secret(
SecretId=f'agent-claw/google-credentials/{safe_actor_id}/{label}',
ForceDeleteWithoutRecovery=True,
)
except Exception:
pass # secret may already be gone
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='REMOVE google_accounts.#label',
ExpressionAttributeNames={'#label': label},
)
return f'Disconnected {label} ({email}) from your Google accounts.'
@tool
def manage_service(action: str, service: str, config: dict | None = None) -> str:
"""Enroll, update, remove, or list external services for your account.
Actions:
- "enroll": Add or update a service (requires service name and config dict).
- "remove": Remove a service by name.
- "list": List all enrolled services (shows service names, not secrets).
Supported services:
- "home_assistant": config = {"url": "https://your-ha-url", "token": "long-lived-access-token"}
Examples:
- Enroll HA: manage_service(action="enroll", service="home_assistant",
config={"url": "https://ha.example.com", "token": "eyJ..."})
- Remove HA: manage_service(action="remove", service="home_assistant")
- List all: manage_service(action="list")
"""
actor_id = _current_actor_id
if not actor_id:
return 'Cannot determine actor_id.'
ddb = boto3.resource('dynamodb', region_name='us-east-1')
table = ddb.Table(USERS_TABLE_NAME)
if action == 'list':
resp = table.get_item(Key={'actor_id': actor_id})
services = resp.get('Item', {}).get('services', {})
if not services:
return 'No services enrolled.'
lines = [f"- {svc}: configured" for svc in services]
return 'Enrolled services:\n' + '\n'.join(lines)
elif action == 'enroll':
if not service:
return 'service name is required.'
if not config:
return 'config dict is required for enroll.'
if service == 'home_assistant':
if 'url' not in config or 'token' not in config:
return 'home_assistant config requires "url" and "token" keys.'
set_ha_config(config['url'], config['token'])
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET services = if_not_exists(services, :empty), services.#svc = :cfg',
ExpressionAttributeNames={'#svc': service},
ExpressionAttributeValues={':cfg': config, ':empty': {}},
)
return f'Service "{service}" enrolled successfully.'
elif action == 'remove':
if not service:
return 'service name is required.'
if service == 'home_assistant':
set_ha_config('', '')
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='REMOVE services.#svc',
ExpressionAttributeNames={'#svc': service},
)
return f'Service "{service}" removed.'
else:
return f'Unknown action: {action}. Use "enroll", "remove", or "list".'
@tool
def request_iam_permission(action: str, resource: str, reason: str) -> str:
"""Request IAM permission from the user. ALWAYS call this before apply_iam_permission.
Sends the user a Telegram message explaining what permission is needed and why.
After calling this, wait for the user to explicitly say 'yes' before proceeding.
Args:
action: IAM action to request (e.g. 's3:PutObject')
resource: Resource ARN the action applies to (e.g. 'arn:aws:s3:::my-bucket/*')
reason: Why this permission is needed
"""
msg = (
f"⚠️ *IAM Permission Request*\n\n"
f"I need to add the following permission to my execution role:\n\n"
f"• Action: `{action}`\n"
f"• Resource: `{resource}`\n"
f"• Reason: {reason}\n\n"
f"Reply *yes* to approve or *no* to deny."
)
messaging.send(msg)
return "Permission request sent. Wait for the user to reply 'yes' before calling apply_iam_permission."
@tool
def apply_iam_permission(action: str, resource: str, policy_name: str) -> str:
"""Apply an IAM permission to the agent execution role.
Only call this after the user has explicitly approved via request_iam_permission.
Args:
action: IAM action to allow (e.g. 's3:PutObject')
resource: Resource ARN the action applies to
policy_name: Unique name for the inline policy (e.g. 'AllowS3PutObject')
"""
if not EXECUTION_ROLE_ARN:
return 'EXECUTION_ROLE_ARN not configured.'
import json as _json
role_name = EXECUTION_ROLE_ARN.split('/')[-1]
policy_doc = _json.dumps({
'Version': '2012-10-17',
'Statement': [{'Effect': 'Allow', 'Action': action, 'Resource': resource}],
})
boto3.client('iam', region_name='us-east-1').put_role_policy(
RoleName=role_name,
PolicyName=policy_name,
PolicyDocument=policy_doc,
)
return f"Applied policy '{policy_name}': Allow {action} on {resource}."
@tool
def aws_list_lambda_functions(region: str = "us-east-1") -> str:
"""List AWS Lambda functions in the specified region. Uses execution role credentials directly via boto3."""
import boto3
client = boto3.client("lambda", region_name=region)
paginator = client.get_paginator("list_functions")
functions = []
for page in paginator.paginate():
for fn in page["Functions"]:
functions.append(f"{fn['FunctionName']} ({fn['Runtime']})")
return f"{len(functions)} Lambda functions in {region}:\n" + "\n".join(functions)
@tool
def aws_get_cost_and_usage(start_date: str, end_date: str, granularity: str = "MONTHLY") -> str:
"""Get AWS Cost and Usage report. start_date and end_date in YYYY-MM-DD format. Uses execution role credentials."""
import boto3
client = boto3.client("ce", region_name="us-east-1")
response = client.get_cost_and_usage(
TimePeriod={"Start": start_date, "End": end_date},
Granularity=granularity,
Metrics=["UnblendedCost"]
)
lines = []
for result in response["ResultsByTime"]:
period = f"{result['TimePeriod']['Start']} to {result['TimePeriod']['End']}"
cost = result["Total"]["UnblendedCost"]["Amount"]
unit = result["Total"]["UnblendedCost"]["Unit"]
lines.append(f"{period}: {cost} {unit}")
return "\n".join(lines)
@tool
def aws_describe_service(service: str, region: str = "us-east-1") -> str:
"""Describe an AWS service. service can be: lambda, s3, cloudformation, dynamodb, sqs. Returns summary of key resources."""
import boto3
session = boto3.Session(region_name=region)
if service == "s3":
client = session.client("s3")
buckets = client.list_buckets()["Buckets"]
return f"{len(buckets)} S3 buckets: " + ", ".join(b["Name"] for b in buckets[:20])
elif service == "cloudformation":
client = session.client("cloudformation")
stacks = client.list_stacks(StackStatusFilter=["CREATE_COMPLETE", "UPDATE_COMPLETE", "ROLLBACK_COMPLETE"])["StackSummaries"]
return f"{len(stacks)} stacks: " + ", ".join(s["StackName"] for s in stacks[:20])
elif service == "dynamodb":
client = session.client("dynamodb")
tables = client.list_tables()["TableNames"]
return f"{len(tables)} DynamoDB tables: " + ", ".join(tables[:20])
elif service == "sqs":
client = session.client("sqs")
queues = client.list_queues().get("QueueUrls", [])
return f"{len(queues)} SQS queues: " + ", ".join(q.split("/")[-1] for q in queues[:20])
else:
return f"Service {service} not yet implemented. Try: lambda, s3, cloudformation, dynamodb, sqs"
# ── Entrypoint ────────────────────────────────────────────────────────────
# Module-level actor_id for tool closures (set per-invocation)
_current_actor_id: str = ''
_current_chat_id: str = ''
@app.entrypoint
async def main(payload: dict, context):
"""Handle an invocation from agent-runner Lambda (streaming)."""
global _current_actor_id
# Set up channel adapter
adapter_config = payload.get('channel_adapter', {})
channel_type = adapter_config.get('type', 'telegram')
actor_id_early = payload.get('actor_id', adapter_config.get('target_id', 'default'))
_current_actor_id = actor_id_early
_gws._current_actor_id = actor_id_early # sync to google_workspace module
if channel_type == 'telegram':
adapter = TelegramAdapter(
chat_id=adapter_config.get('target_id', ''),
bot_token_secret_arn=adapter_config.get('bot_token_secret_arn', ''),
message_thread_id=adapter_config.get('message_thread_id'),
)
else:
raise ValueError(f"Unsupported channel type: {channel_type}")
messaging.set_adapter(adapter)
# Start typing indicator immediately, keep it alive in background
import threading
_typing_active = True
def _keep_typing():
adapter.send_typing()
import time
while _typing_active:
time.sleep(4)
if _typing_active:
adapter.send_typing()
typing_thread = threading.Thread(target=_keep_typing, daemon=True)
typing_thread.start()
# Set up AgentCore Memory session manager (short + long term via session_manager)
MEMORY_ID = 'agentclaw_AgentClawMemory-i7Csf776AH'
actor_id = payload.get('actor_id', adapter_config.get('target_id', 'default'))
session_id = payload.get('session_id', f'session-{actor_id}')
_current_actor_id = actor_id
chat_id = adapter_config.get('target_id', '')
_current_chat_id = chat_id
_scheduler_module._current_actor_id = actor_id
_scheduler_module._current_chat_id = chat_id
_mcp_tools_module._current_actor_id = actor_id
# Run compaction if flagged from previous invocation (trims old events before load)
memory_manager.check_and_compact(actor_id, session_id)
memory_config = AgentCoreMemoryConfig(
memory_id=MEMORY_ID,
session_id=session_id,
actor_id=actor_id,
)
session_manager = AgentCoreMemorySessionManager(
agentcore_memory_config=memory_config,
region_name='us-east-1',
)
# Inject per-user service configs
user_profile = payload.get('user_profile', {})
services = user_profile.get('services', {})
ha_cfg = services.get('home_assistant', {})
set_ha_config(ha_cfg.get('url', ''), ha_cfg.get('token', ''))
# Sync google_accounts to google_workspace module
google_accounts = user_profile.get('google_accounts', {})
_gws._current_google_accounts = google_accounts
# Build system prompt — base cached, user context injected per-invocation
user_context = ''
if user_profile:
name = user_profile.get('display_name', '')
username = user_profile.get('telegram_username', '')
user_context = f'Name: {name}'
if username:
user_context += f'\nTelegram username: @{username}'
if google_accounts:
acct_list = ', '.join(f'{label} ({email})' for label, email in google_accounts.items())
user_context += f'\nGoogle accounts: {acct_list}'
else:
user_context += '\nGoogle account: not connected (use connect_google_account tool to connect)'
enrolled = list(services.keys())
if enrolled:
user_context += f'\nEnrolled services: {", ".join(enrolled)}'
system_prompt = build_system_prompt(user_context=user_context, actor_id=actor_id)
# Inject long-term memory block before conversation history
ltm_block = memory_manager.load_ltm(actor_id)
if ltm_block:
system_prompt = system_prompt + '\n\n---\n\n' + ltm_block
# Inject current datetime so the model always has accurate time context
from datetime import datetime
from zoneinfo import ZoneInfo
_tz = ZoneInfo('America/Chicago')
_now = datetime.now(_tz)
_time_str = _now.strftime('%A, %B %d, %Y %I:%M %p %Z')
system_prompt = system_prompt + f'\n\nCurrent date/time: {_time_str}'
system_prompt += 'AWS access: Native boto3 tools available (aws_list_lambda_functions, aws_get_cost_and_usage, aws_describe_service). If AWS MCP Server connected at startup: call_aws (any AWS API), search_documentation, read_documentation, run_script also available.'
print(f'[main] System prompt time injection: {_time_str}')
# Model: claude-sonnet-4-6 via cross-region inference
# NOTE: extended thinking disabled — causes retry/duplicate issues with streaming
from botocore.config import Config as BotoConfig
model = BedrockModel(
model_id=config.AGENT_MODEL_ID,
region_name="us-east-1",
boto_client_config=BotoConfig(read_timeout=600, connect_timeout=10),
)
base_tools = [web_search, web_fetch, read_workspace_file, write_workspace_file,
home_assistant, connect_google_account, list_google_accounts, remove_google_account,
manage_service, manage_mcp_connection, schedule_reminder, list_reminders, cancel_reminder,
list_calendars, get_calendar_events, list_gmail_messages, get_gmail_message,
run_code, send_file, request_iam_permission, apply_iam_permission,
aws_list_lambda_functions, aws_get_cost_and_usage, aws_describe_service]
# Load user's dynamic MCP connections
mcp_connections = services.get('mcp_connections', [])
mcp_clients, _mcp_to_close = mcp_loader.load_mcp_tools(mcp_connections, actor_id)
all_tools = base_tools + _aws_mcp_tools + mcp_clients
agent = Agent(
model=model,
system_prompt=system_prompt,
session_manager=session_manager,
tools=all_tools,
)
final_message = None
try:
async for event in agent.stream_async(payload.get('prompt', '')):
if 'result' in event:
final_message = event['result'].message
yield event
except Exception as e:
# Catch ALL exceptions including ReadTimeoutError to prevent AgentCore retry.
# A retry re-runs the full agent loop causing duplicate Telegram messages.
print(f'[main] Agent error (suppressed to prevent retry): {type(e).__name__}: {e}')
if final_message:
yield {'data': str(final_message), 'result': {'message': final_message}}
finally:
_typing_active = False
session_manager.close()
mcp_loader.close_mcp_clients(_mcp_to_close)
# Check if session exceeds window — flag for compaction on next invocation
memory_manager.check_window_and_flag(actor_id, session_id)
app.run()