Add EventBridge scheduling: schedule_reminder, list_reminders, cancel_reminder

This commit is contained in:
daniel
2026-05-07 23:24:48 -05:00
parent 825294d433
commit 58ed60f7b7
46 changed files with 3605 additions and 24 deletions

View File

@@ -21,7 +21,8 @@
"USERS_TABLE_NAME": "agent-claw-users",
"WORKSPACE_BUCKET_NAME": "agent-claw-workspace-495395224548",
"TELEGRAM_BOT_TOKEN_SECRET_ARN": "arn:aws:secretsmanager:us-east-1:495395224548:secret:agent-claw/telegram-bot-token-Oq3in3",
"BRAVE_API_KEY_SECRET_ARN": "arn:aws:secretsmanager:us-east-1:495395224548:secret:agent-claw/brave-api-key-uUSgzi"
"BRAVE_API_KEY_SECRET_ARN": "arn:aws:secretsmanager:us-east-1:495395224548:secret:agent-claw/brave-api-key-uUSgzi",
"SCHEDULER_LAMBDA_ARN": "arn:aws:lambda:us-east-1:495395224548:function:agent-claw-scheduler"
}
}
],
@@ -59,4 +60,4 @@
"configBundles": [],
"abTests": [],
"httpGateways": []
}
}

View File

@@ -13,6 +13,8 @@ from prompt_builder import build_system_prompt, invalidate_prompt
from tools import web as web_tools
from tools import workspace as ws_tools
from tools import messaging
from tools.scheduler import schedule_reminder, list_reminders, cancel_reminder
import tools.scheduler as _scheduler_module
from tools.home_assistant import home_assistant, set_ha_config
from mcp.client.streamable_http import streamablehttp_client
from strands.tools.mcp.mcp_client import MCPClient
@@ -178,6 +180,7 @@ def manage_service(action: str, service: str, config: dict | None = None) -> str
# Module-level actor_id for tool closures (set per-invocation)
_current_actor_id: str = ''
_current_chat_id: str = ''
@app.entrypoint
@@ -217,6 +220,10 @@ async def main(payload: dict, context):
actor_id = payload.get('actor_id', adapter_config.get('target_id', 'default'))
session_id = payload.get('session_id', f'session-{actor_id}')
_current_actor_id = actor_id
chat_id = adapter_config.get('target_id', '')
_current_chat_id = chat_id
_scheduler_module._current_actor_id = actor_id
_scheduler_module._current_chat_id = chat_id
memory_config = AgentCoreMemoryConfig(
memory_id=MEMORY_ID,
@@ -258,7 +265,9 @@ async def main(payload: dict, context):
from zoneinfo import ZoneInfo
_tz = ZoneInfo('America/Chicago')
_now = datetime.now(_tz)
system_prompt += f'\n\nCurrent date/time: {_now.strftime("%A, %B %d, %Y %I:%M %p %Z")}'
_time_str = _now.strftime('%A, %B %d, %Y %I:%M %p %Z')
system_prompt = system_prompt + f'\n\nCurrent date/time: {_time_str}'
print(f'[main] System prompt time injection: {_time_str}')
# Model: claude-sonnet-4-6 via cross-region inference with extended thinking
from botocore.config import Config as BotoConfig
@@ -273,7 +282,7 @@ async def main(payload: dict, context):
base_tools = [web_search, web_fetch, read_workspace_file, write_workspace_file,
_code_interpreter.code_interpreter, home_assistant, connect_google_account,
manage_service]
manage_service, schedule_reminder, list_reminders, cancel_reminder]
workspace_mcp_client = MCPClient(
lambda: streamablehttp_client(WORKSPACE_MCP_URL, timeout=20, auth=_SigV4HttpxAuth(actor_id=actor_id))

View File

@@ -0,0 +1,317 @@
"""
agent-claw Runtime 1 — main assistant agent.
Entrypoint for AgentCore CodeZip deployment.
"""
import os
from strands import Agent, tool
from strands.models import BedrockModel
from bedrock_agentcore.runtime import BedrockAgentCoreApp
from channels.telegram import TelegramAdapter
from prompt_builder import build_system_prompt, invalidate_prompt
from tools import web as web_tools
from tools import workspace as ws_tools
from tools import messaging
from tools.home_assistant import home_assistant, set_ha_config
from mcp.client.streamable_http import streamablehttp_client
from strands.tools.mcp.mcp_client import MCPClient
import httpx
import botocore.auth
import botocore.awsrequest
import boto3
from urllib.parse import urlparse as _urlparse
WORKSPACE_MCP_URL = 'https://25hugrzw4uwtueeg77jsmft6lq0wunmd.lambda-url.us-east-1.on.aws/mcp'
OAUTH_START_URL = os.environ.get('OAUTH_START_URL', '')
USERS_TABLE_NAME = os.environ.get('USERS_TABLE_NAME', 'agent-claw-users')
class _SigV4HttpxAuth(httpx.Auth):
"""SigV4 auth for Lambda Function URL with AWS_IAM, plus X-Actor-Id header."""
def __init__(self, region: str = 'us-east-1', actor_id: str = ''):
self._region = region
self._actor_id = actor_id
def auth_flow(self, request):
creds = boto3.Session().get_credentials().get_frozen_credentials()
parsed = _urlparse(str(request.url))
aws_req = botocore.awsrequest.AWSRequest(
method=request.method,
url=str(request.url),
data=request.content or b'',
headers={
'Host': parsed.hostname,
'Content-Type': request.headers.get('content-type', 'application/json'),
'Accept': request.headers.get('accept', 'application/json, text/event-stream'),
}
)
botocore.auth.SigV4Auth(creds, 'lambda', self._region).add_auth(aws_req)
for k, v in aws_req.headers.items():
request.headers[k] = v
if self._actor_id:
request.headers['x-actor-id'] = self._actor_id
yield request
from bedrock_agentcore.memory.integrations.strands.config import AgentCoreMemoryConfig
from bedrock_agentcore.memory.integrations.strands.session_manager import AgentCoreMemorySessionManager
from strands_tools.code_interpreter import AgentCoreCodeInterpreter as _CodeInterpreterClient
# Initialise once per warm session
_code_interpreter = _CodeInterpreterClient(region='us-east-1')
app = BedrockAgentCoreApp()
# ── Tool definitions ──────────────────────────────────────────────────────
@tool
def send_message(text: str) -> str:
"""Send a message to the user. Use multiple calls to send incrementally - send the direct answer first, then elaboration. Each call delivers immediately to the user."""
return messaging.send(text)
@tool
def web_search(query: str) -> str:
"""Search the web using Brave Search. Returns titles, URLs, and snippets."""
return web_tools.brave_search(query)
@tool
def web_fetch(url: str) -> str:
"""Fetch and extract readable text content from a URL."""
return web_tools.web_fetch(url)
@tool
def read_workspace_file(path: str) -> str:
"""Read a file from the agent workspace (SOUL.md, HEARTBEAT.md, etc.)"""
return ws_tools.read_file(path)
@tool
def write_workspace_file(path: str, content: str) -> str:
"""Write or update a file in the agent workspace."""
result = ws_tools.write_file(path, content)
invalidate_prompt() # force system prompt rebuild if persona files changed
return result
@tool
def connect_google_account() -> str:
"""Generate a Google OAuth authorization URL for the current user to connect their Google account.
Use this when the user wants to connect Google Workspace (Gmail, Calendar, Drive, etc.)
or when Google tools fail due to missing credentials."""
if not OAUTH_START_URL:
return 'Google OAuth is not configured. Set OAUTH_START_URL environment variable.'
actor_id = _current_actor_id
if not actor_id:
return 'Cannot determine actor_id for OAuth flow.'
url = f'{OAUTH_START_URL}?actor_id={actor_id}'
return f'Please open this URL to connect your Google account:\n{url}\n\nAfter authorizing, Google Workspace tools (Gmail, Calendar, Drive) will be available.'
@tool
def manage_service(action: str, service: str, config: dict | None = None) -> str:
"""Enroll, update, remove, or list external services for your account.
Actions:
- "enroll": Add or update a service (requires service name and config dict).
- "remove": Remove a service by name.
- "list": List all enrolled services (shows service names, not secrets).
Supported services:
- "home_assistant": config = {"url": "https://your-ha-url", "token": "long-lived-access-token"}
Examples:
- Enroll HA: manage_service(action="enroll", service="home_assistant",
config={"url": "https://ha.example.com", "token": "eyJ..."})
- Remove HA: manage_service(action="remove", service="home_assistant")
- List all: manage_service(action="list")
"""
actor_id = _current_actor_id
if not actor_id:
return 'Cannot determine actor_id.'
ddb = boto3.resource('dynamodb', region_name='us-east-1')
table = ddb.Table(USERS_TABLE_NAME)
if action == 'list':
resp = table.get_item(Key={'actor_id': actor_id})
services = resp.get('Item', {}).get('services', {})
if not services:
return 'No services enrolled.'
lines = [f"- {svc}: configured" for svc in services]
return 'Enrolled services:\n' + '\n'.join(lines)
elif action == 'enroll':
if not service:
return 'service name is required.'
if not config:
return 'config dict is required for enroll.'
# Validate known services
if service == 'home_assistant':
if 'url' not in config or 'token' not in config:
return 'home_assistant config requires "url" and "token" keys.'
# Update in-memory config immediately for this session
set_ha_config(config['url'], config['token'])
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET services = if_not_exists(services, :empty), services.#svc = :cfg',
ExpressionAttributeNames={'#svc': service},
ExpressionAttributeValues={':cfg': config, ':empty': {}},
)
return f'Service "{service}" enrolled successfully.'
elif action == 'remove':
if not service:
return 'service name is required.'
if service == 'home_assistant':
set_ha_config('', '')
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='REMOVE services.#svc',
ExpressionAttributeNames={'#svc': service},
)
return f'Service "{service}" removed.'
else:
return f'Unknown action: {action}. Use "enroll", "remove", or "list".'
# ── Entrypoint ────────────────────────────────────────────────────────────
# Module-level actor_id for tool closures (set per-invocation)
_current_actor_id: str = ''
@app.entrypoint
def main(payload: dict, context) -> dict:
"""Handle an invocation from agent-runner Lambda."""
global _current_actor_id
# Set up channel adapter
adapter_config = payload.get('channel_adapter', {})
channel_type = adapter_config.get('type', 'telegram')
if channel_type == 'telegram':
adapter = TelegramAdapter(
chat_id=adapter_config.get('target_id', ''),
bot_token_secret_arn=adapter_config.get('bot_token_secret_arn', ''),
)
else:
raise ValueError(f"Unsupported channel type: {channel_type}")
messaging.set_adapter(adapter)
# Start typing indicator immediately, keep it alive in background
import threading
_typing_active = True
def _keep_typing():
adapter.send_typing()
import time
while _typing_active:
time.sleep(4)
if _typing_active:
adapter.send_typing()
typing_thread = threading.Thread(target=_keep_typing, daemon=True)
typing_thread.start()
# Set up AgentCore Memory session manager (short + long term via session_manager)
MEMORY_ID = 'agentclaw_AgentClawMemory-i7Csf776AH'
actor_id = payload.get('actor_id', adapter_config.get('target_id', 'default'))
session_id = payload.get('session_id', f'session-{actor_id}')
_current_actor_id = actor_id
memory_config = AgentCoreMemoryConfig(
memory_id=MEMORY_ID,
session_id=session_id,
actor_id=actor_id,
)
session_manager = AgentCoreMemorySessionManager(
agentcore_memory_config=memory_config,
region_name='us-east-1',
)
# Inject per-user service configs
user_profile = payload.get('user_profile', {})
services = user_profile.get('services', {})
ha_cfg = services.get('home_assistant', {})
set_ha_config(ha_cfg.get('url', ''), ha_cfg.get('token', ''))
# Build system prompt — base cached, user context injected per-invocation
user_context = ''
if user_profile:
name = user_profile.get('display_name', '')
username = user_profile.get('telegram_username', '')
google_email = user_profile.get('google_email', '')
user_context = f'Name: {name}'
if username:
user_context += f'\nTelegram username: @{username}'
if google_email:
user_context += f'\nGoogle account: {google_email}'
else:
user_context += '\nGoogle account: not connected (use connect_google_account tool to connect)'
enrolled = list(services.keys())
if enrolled:
user_context += f'\nEnrolled services: {", ".join(enrolled)}'
system_prompt = build_system_prompt(user_context=user_context, actor_id=actor_id)
# Model: claude-sonnet-4-6 via cross-region inference
model = BedrockModel(
model_id="us.anthropic.claude-sonnet-4-6",
region_name="us-east-1",
)
base_tools = [send_message, web_search, web_fetch, read_workspace_file, write_workspace_file,
_code_interpreter.code_interpreter, home_assistant, connect_google_account,
manage_service]
def _run_agent(tools):
agent = Agent(
model=model,
system_prompt=system_prompt,
session_manager=session_manager,
tools=tools,
)
return agent(payload.get('prompt', ''))
workspace_mcp_client = MCPClient(
lambda: streamablehttp_client(WORKSPACE_MCP_URL, timeout=20, auth=_SigV4HttpxAuth(actor_id=actor_id))
)
workspace_tools = []
google_email = user_profile.get('google_email', '')
if google_email:
try:
with workspace_mcp_client:
workspace_tools = workspace_mcp_client.list_tools_sync()
except Exception as e:
print(f'[main] workspace_mcp unavailable ({type(e).__name__}) — continuing without it')
else:
print(f'[main] actor={actor_id} has no google_email — skipping workspace_mcp')
try:
result = _run_agent(base_tools + list(workspace_tools))
finally:
_typing_active = False
# Flush buffered memory events
session_manager.close()
# Deliver final response
if not messaging.was_sent() and result.message:
msg = result.message
if isinstance(msg, dict):
content = msg.get('content', {})
if isinstance(content, dict):
msg = content.get('text', str(content))
elif isinstance(content, list):
msg = ' '.join(c.get('text', '') for c in content if isinstance(c, dict))
else:
msg = str(content)
adapter.send(str(msg))
return {'result': result.message}
app.run()

View File

@@ -0,0 +1,141 @@
"""EventBridge scheduling tools: schedule_reminder, list_reminders, cancel_reminder."""
import json
import os
import re
import boto3
from strands import tool
# Injected by main.py before each invocation
_current_actor_id: str = ''
_current_chat_id: str = ''
SCHEDULER_LAMBDA_ARN = os.environ.get('SCHEDULER_LAMBDA_ARN', '')
ACCOUNT_ID = os.environ.get('AWS_ACCOUNT_ID', '')
REGION = 'us-east-1'
def _eb():
return boto3.client('events', region_name=REGION)
def _rule_prefix() -> str:
safe = re.sub(r'[^a-zA-Z0-9_-]', '-', _current_actor_id)
return f'agent-claw-reminder-{safe}-'
@tool
def schedule_reminder(message: str, when_utc: str) -> str:
"""Schedule a one-time reminder to be sent via Telegram at a specific UTC time.
Args:
message: The reminder text to send.
when_utc: ISO 8601 UTC datetime, e.g. '2026-05-09T09:00:00' (no timezone suffix).
"""
if not SCHEDULER_LAMBDA_ARN:
return 'SCHEDULER_LAMBDA_ARN not configured.'
if not _current_chat_id:
return 'chat_id not available.'
# Convert ISO datetime to EventBridge cron: cron(min hour day month ? year)
try:
from datetime import datetime
dt = datetime.fromisoformat(when_utc.rstrip('Z'))
cron_expr = f'cron({dt.minute} {dt.hour} {dt.day} {dt.month} ? {dt.year})'
except ValueError as e:
return f'Invalid when_utc format: {e}'
import time
rule_name = f'{_rule_prefix()}{int(time.time())}'
eb = _eb()
eb.put_rule(
Name=rule_name,
ScheduleExpression=cron_expr,
State='ENABLED',
)
# Grant EventBridge permission to invoke the Lambda
lm = boto3.client('lambda', region_name=REGION)
try:
lm.add_permission(
FunctionName=SCHEDULER_LAMBDA_ARN,
StatementId=rule_name,
Action='lambda:InvokeFunction',
Principal='events.amazonaws.com',
SourceArn=f'arn:aws:events:{REGION}:{_account_id()}:rule/{rule_name}',
)
except lm.exceptions.ResourceConflictException:
pass
eb.put_targets(
Rule=rule_name,
Targets=[{
'Id': 'scheduler',
'Arn': SCHEDULER_LAMBDA_ARN,
'Input': json.dumps({
'chat_id': _current_chat_id,
'message': message,
'rule_name': rule_name,
}),
}],
)
return f'Reminder scheduled: "{message}" at {when_utc} UTC (rule: {rule_name})'
@tool
def list_reminders() -> str:
"""List all pending reminders for the current user."""
eb = _eb()
prefix = _rule_prefix()
rules = []
kwargs: dict = {'NamePrefix': prefix}
while True:
resp = eb.list_rules(**kwargs)
rules.extend(resp.get('Rules', []))
token = resp.get('NextToken')
if not token:
break
kwargs['NextToken'] = token
if not rules:
return 'No pending reminders.'
lines = []
for r in rules:
lines.append(f"- {r['Name']}: {r.get('ScheduleExpression', '')} [{r.get('State', '')}]")
return '\n'.join(lines)
@tool
def cancel_reminder(rule_name: str) -> str:
"""Cancel a scheduled reminder by its rule name.
Args:
rule_name: The EventBridge rule name (from list_reminders).
"""
prefix = _rule_prefix()
if not rule_name.startswith(prefix):
return f'Rule "{rule_name}" does not belong to your account.'
eb = _eb()
try:
eb.remove_targets(Rule=rule_name, Ids=['scheduler'])
eb.delete_rule(Name=rule_name)
except eb.exceptions.ResourceNotFoundException:
return f'Rule "{rule_name}" not found.'
# Remove Lambda permission
lm = boto3.client('lambda', region_name=REGION)
try:
lm.remove_permission(FunctionName=SCHEDULER_LAMBDA_ARN, StatementId=rule_name)
except Exception:
pass
return f'Reminder "{rule_name}" cancelled.'
def _account_id() -> str:
if ACCOUNT_ID:
return ACCOUNT_ID
return boto3.client('sts', region_name=REGION).get_caller_identity()['Account']

View File

@@ -16,16 +16,16 @@
}
}
},
"c0db2a060be885d61722dfe6fbd3967e1c956826682078f6338123bf0c797e5b": {
"49f9e3ee598c0259165125872304200dbdffee263d76fca541a8630534d8f5c5": {
"displayName": "AgentRunner/Code",
"source": {
"path": "asset.c0db2a060be885d61722dfe6fbd3967e1c956826682078f6338123bf0c797e5b",
"path": "asset.49f9e3ee598c0259165125872304200dbdffee263d76fca541a8630534d8f5c5",
"packaging": "zip"
},
"destinations": {
"495395224548-us-east-1-82b9a17b": {
"495395224548-us-east-1-06bebbe8": {
"bucketName": "cdk-hnb659fds-assets-495395224548-us-east-1",
"objectKey": "c0db2a060be885d61722dfe6fbd3967e1c956826682078f6338123bf0c797e5b.zip",
"objectKey": "49f9e3ee598c0259165125872304200dbdffee263d76fca541a8630534d8f5c5.zip",
"region": "us-east-1",
"assumeRoleArn": "arn:${AWS::Partition}:iam::495395224548:role/cdk-hnb659fds-file-publishing-role-495395224548-us-east-1"
}
@@ -46,16 +46,31 @@
}
}
},
"a31aaa0bc9eab4fd6f17f10795fba05983dba0c88e83a263fe9fffe930da06b9": {
"8e7324457a5952eb51f04a34fbc5ba853252e7157d8d8958ac5fda92e72edb1f": {
"displayName": "Scheduler/Code",
"source": {
"path": "asset.8e7324457a5952eb51f04a34fbc5ba853252e7157d8d8958ac5fda92e72edb1f",
"packaging": "zip"
},
"destinations": {
"495395224548-us-east-1-89bca2fb": {
"bucketName": "cdk-hnb659fds-assets-495395224548-us-east-1",
"objectKey": "8e7324457a5952eb51f04a34fbc5ba853252e7157d8d8958ac5fda92e72edb1f.zip",
"region": "us-east-1",
"assumeRoleArn": "arn:${AWS::Partition}:iam::495395224548:role/cdk-hnb659fds-file-publishing-role-495395224548-us-east-1"
}
}
},
"c6cd323425a93776b45e2e0806064efbc5c84a3d6d78532282df6dd62cc14bda": {
"displayName": "AgentClawStack Template",
"source": {
"path": "AgentClawStack.template.json",
"packaging": "file"
},
"destinations": {
"495395224548-us-east-1-477d6bc7": {
"495395224548-us-east-1-51c91ff7": {
"bucketName": "cdk-hnb659fds-assets-495395224548-us-east-1",
"objectKey": "a31aaa0bc9eab4fd6f17f10795fba05983dba0c88e83a263fe9fffe930da06b9.json",
"objectKey": "c6cd323425a93776b45e2e0806064efbc5c84a3d6d78532282df6dd62cc14bda.json",
"region": "us-east-1",
"assumeRoleArn": "arn:${AWS::Partition}:iam::495395224548:role/cdk-hnb659fds-file-publishing-role-495395224548-us-east-1"
}

View File

@@ -32,7 +32,7 @@
{
"type": "aws:cdk:creationStack",
"data": [
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:290:5)",
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:331:5)",
"<anonymous> (/Users/daniel/agent-claw/cdk/bin/agent-claw.ts:8:1)",
"...node internals, ts-node, ts-node, ts-node..."
]
@@ -46,7 +46,7 @@
{
"type": "aws:cdk:creationStack",
"data": [
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:294:5)",
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:335:5)",
"<anonymous> (/Users/daniel/agent-claw/cdk/bin/agent-claw.ts:8:1)",
"...node internals, ts-node, ts-node, ts-node..."
]
@@ -60,7 +60,7 @@
{
"type": "aws:cdk:creationStack",
"data": [
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:298:5)",
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:339:5)",
"<anonymous> (/Users/daniel/agent-claw/cdk/bin/agent-claw.ts:8:1)",
"...node internals, ts-node, ts-node, ts-node..."
]
@@ -74,7 +74,7 @@
{
"type": "aws:cdk:creationStack",
"data": [
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:303:5)",
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:344:5)",
"<anonymous> (/Users/daniel/agent-claw/cdk/bin/agent-claw.ts:8:1)",
"...node internals, ts-node, ts-node, ts-node..."
]
@@ -88,7 +88,7 @@
{
"type": "aws:cdk:creationStack",
"data": [
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:308:5)",
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:349:5)",
"<anonymous> (/Users/daniel/agent-claw/cdk/bin/agent-claw.ts:8:1)",
"...node internals, ts-node, ts-node, ts-node..."
]
@@ -102,7 +102,7 @@
{
"type": "aws:cdk:creationStack",
"data": [
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:313:5)",
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:354:5)",
"<anonymous> (/Users/daniel/agent-claw/cdk/bin/agent-claw.ts:8:1)",
"...node internals, ts-node, ts-node, ts-node..."
]
@@ -116,7 +116,7 @@
{
"type": "aws:cdk:creationStack",
"data": [
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:318:5)",
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:359:5)",
"<anonymous> (/Users/daniel/agent-claw/cdk/bin/agent-claw.ts:8:1)",
"...node internals, ts-node, ts-node, ts-node..."
]
@@ -130,7 +130,7 @@
{
"type": "aws:cdk:creationStack",
"data": [
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:323:5)",
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:364:5)",
"<anonymous> (/Users/daniel/agent-claw/cdk/bin/agent-claw.ts:8:1)",
"...node internals, ts-node, ts-node, ts-node..."
]
@@ -144,7 +144,21 @@
{
"type": "aws:cdk:creationStack",
"data": [
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:328:5)",
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:369:5)",
"<anonymous> (/Users/daniel/agent-claw/cdk/bin/agent-claw.ts:8:1)",
"...node internals, ts-node, ts-node, ts-node..."
]
}
],
"/AgentClawStack/SchedulerLambdaArn": [
{
"type": "aws:cdk:logicalId",
"data": "SchedulerLambdaArn"
},
{
"type": "aws:cdk:creationStack",
"data": [
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:374:5)",
"<anonymous> (/Users/daniel/agent-claw/cdk/bin/agent-claw.ts:8:1)",
"...node internals, ts-node, ts-node, ts-node..."
]
@@ -296,6 +310,36 @@
]
}
],
"/AgentClawStack/Scheduler/Resource": [
{
"type": "aws:cdk:logicalId",
"data": "SchedulerCFE73206"
},
{
"type": "aws:cdk:creationStack",
"data": [
"...new Function2 in aws-cdk-lib...",
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:290:25)",
"<anonymous> (/Users/daniel/agent-claw/cdk/bin/agent-claw.ts:8:1)",
"...node internals, ts-node, ts-node, ts-node..."
]
}
],
"/AgentClawStack/Scheduler/EventBridgeInvoke": [
{
"type": "aws:cdk:logicalId",
"data": "SchedulerEventBridgeInvoke72A0529A"
},
{
"type": "aws:cdk:creationStack",
"data": [
"...WrappedClass.addPermission in aws-cdk-lib...",
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:303:17)",
"<anonymous> (/Users/daniel/agent-claw/cdk/bin/agent-claw.ts:8:1)",
"...node internals, ts-node, ts-node, ts-node..."
]
}
],
"/AgentClawStack/CDKMetadata/Default": [
{
"type": "aws:cdk:logicalId",
@@ -516,6 +560,21 @@
]
}
],
"/AgentClawStack/Scheduler/ServiceRole/Resource": [
{
"type": "aws:cdk:logicalId",
"data": "SchedulerServiceRole62CDA70C"
},
{
"type": "aws:cdk:creationStack",
"data": [
"...new Function2 in aws-cdk-lib...",
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:290:25)",
"<anonymous> (/Users/daniel/agent-claw/cdk/bin/agent-claw.ts:8:1)",
"...node internals, ts-node, ts-node, ts-node..."
]
}
],
"/AgentClawStack/TgIngest/ServiceRole/DefaultPolicy/Resource": [
{
"type": "aws:cdk:logicalId",
@@ -611,5 +670,20 @@
"...node internals, ts-node, ts-node, ts-node..."
]
}
],
"/AgentClawStack/Scheduler/ServiceRole/DefaultPolicy/Resource": [
{
"type": "aws:cdk:logicalId",
"data": "SchedulerServiceRoleDefaultPolicyFA0D8235"
},
{
"type": "aws:cdk:creationStack",
"data": [
"...environmentFromArn.grantRead in aws-cdk-lib...",
"new AgentClawStack (/Users/daniel/agent-claw/cdk/lib/agent-claw-stack.ts:301:20)",
"<anonymous> (/Users/daniel/agent-claw/cdk/bin/agent-claw.ts:8:1)",
"...node internals, ts-node, ts-node, ts-node..."
]
}
]
}

View File

@@ -387,7 +387,7 @@
"Properties": {
"Code": {
"S3Bucket": "cdk-hnb659fds-assets-495395224548-us-east-1",
"S3Key": "c0db2a060be885d61722dfe6fbd3967e1c956826682078f6338123bf0c797e5b.zip"
"S3Key": "49f9e3ee598c0259165125872304200dbdffee263d76fca541a8630534d8f5c5.zip"
},
"Environment": {
"Variables": {
@@ -423,7 +423,7 @@
],
"Metadata": {
"aws:cdk:path": "AgentClawStack/AgentRunner/Resource",
"aws:asset:path": "asset.c0db2a060be885d61722dfe6fbd3967e1c956826682078f6338123bf0c797e5b",
"aws:asset:path": "asset.49f9e3ee598c0259165125872304200dbdffee263d76fca541a8630534d8f5c5",
"aws:asset:is-bundled": false,
"aws:asset:property": "Code"
}
@@ -872,6 +872,33 @@
"Effect": "Allow",
"Resource": "arn:aws:secretsmanager:us-east-1:495395224548:secret:agent-claw/google-credentials/*",
"Sid": "PerUserGoogleCredentialsReadRuntime"
},
{
"Action": [
"events:PutRule",
"events:PutTargets",
"events:ListRules",
"events:ListTargetsByRule",
"events:RemoveTargets",
"events:DeleteRule"
],
"Effect": "Allow",
"Resource": "arn:aws:events:us-east-1:*:rule/agent-claw-reminder-*",
"Sid": "EventBridgeScheduler"
},
{
"Action": [
"lambda:AddPermission",
"lambda:RemovePermission"
],
"Effect": "Allow",
"Resource": {
"Fn::GetAtt": [
"SchedulerCFE73206",
"Arn"
]
},
"Sid": "SchedulerLambdaPermission"
}
],
"Version": "2012-10-17"
@@ -1114,6 +1141,127 @@
"aws:asset:property": "Code"
}
},
"SchedulerServiceRole62CDA70C": {
"Type": "AWS::IAM::Role",
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
}
}
],
"Version": "2012-10-17"
},
"ManagedPolicyArns": [
{
"Fn::Join": [
"",
[
"arn:",
{
"Ref": "AWS::Partition"
},
":iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
]
]
}
]
},
"Metadata": {
"aws:cdk:path": "AgentClawStack/Scheduler/ServiceRole/Resource"
}
},
"SchedulerServiceRoleDefaultPolicyFA0D8235": {
"Type": "AWS::IAM::Policy",
"Properties": {
"PolicyDocument": {
"Statement": [
{
"Action": [
"secretsmanager:GetSecretValue",
"secretsmanager:DescribeSecret"
],
"Effect": "Allow",
"Resource": "arn:aws:secretsmanager:us-east-1:495395224548:secret:agent-claw/telegram-bot-token-Oq3in3"
},
{
"Action": [
"events:RemoveTargets",
"events:DeleteRule"
],
"Effect": "Allow",
"Resource": "arn:aws:events:us-east-1:495395224548:rule/agent-claw-reminder-*"
}
],
"Version": "2012-10-17"
},
"PolicyName": "SchedulerServiceRoleDefaultPolicyFA0D8235",
"Roles": [
{
"Ref": "SchedulerServiceRole62CDA70C"
}
]
},
"Metadata": {
"aws:cdk:path": "AgentClawStack/Scheduler/ServiceRole/DefaultPolicy/Resource"
}
},
"SchedulerCFE73206": {
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"S3Bucket": "cdk-hnb659fds-assets-495395224548-us-east-1",
"S3Key": "8e7324457a5952eb51f04a34fbc5ba853252e7157d8d8958ac5fda92e72edb1f.zip"
},
"Environment": {
"Variables": {
"TELEGRAM_BOT_TOKEN_SECRET_ARN": "arn:aws:secretsmanager:us-east-1:495395224548:secret:agent-claw/telegram-bot-token-Oq3in3"
}
},
"FunctionName": "agent-claw-scheduler",
"Handler": "handler.handler",
"MemorySize": 128,
"Role": {
"Fn::GetAtt": [
"SchedulerServiceRole62CDA70C",
"Arn"
]
},
"Runtime": "python3.12",
"Timeout": 30
},
"DependsOn": [
"SchedulerServiceRoleDefaultPolicyFA0D8235",
"SchedulerServiceRole62CDA70C"
],
"Metadata": {
"aws:cdk:path": "AgentClawStack/Scheduler/Resource",
"aws:asset:path": "asset.8e7324457a5952eb51f04a34fbc5ba853252e7157d8d8958ac5fda92e72edb1f",
"aws:asset:is-bundled": false,
"aws:asset:property": "Code"
}
},
"SchedulerEventBridgeInvoke72A0529A": {
"Type": "AWS::Lambda::Permission",
"Properties": {
"Action": "lambda:InvokeFunction",
"FunctionName": {
"Fn::GetAtt": [
"SchedulerCFE73206",
"Arn"
]
},
"Principal": "events.amazonaws.com",
"SourceArn": "arn:aws:events:us-east-1:495395224548:rule/agent-claw-reminder-*"
},
"Metadata": {
"aws:cdk:path": "AgentClawStack/Scheduler/EventBridgeInvoke"
}
},
"CDKMetadata": {
"Type": "AWS::CDK::Metadata",
"Properties": {
@@ -1216,6 +1364,15 @@
"Arn"
]
}
},
"SchedulerLambdaArn": {
"Description": "Scheduler Lambda ARN — set as SCHEDULER_LAMBDA_ARN in agentcore.json",
"Value": {
"Fn::GetAtt": [
"SchedulerCFE73206",
"Arn"
]
}
}
},
"Parameters": {

View File

@@ -0,0 +1,268 @@
import json
import os
import time
import uuid
import boto3
import urllib.request
from typing import Any
# AWS clients
_ddb = None
_agentcore = None
def get_ddb():
global _ddb
if _ddb is None:
_ddb = boto3.resource('dynamodb')
return _ddb
def get_agentcore():
global _agentcore
if _agentcore is None:
from botocore.config import Config
_agentcore = boto3.client(
'bedrock-agentcore',
region_name='us-east-1',
config=Config(read_timeout=600, connect_timeout=10)
)
return _agentcore
def get_or_create_user(actor_id: str, from_info: dict) -> dict:
"""Look up user in registry, auto-registering on first contact."""
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return {'actor_id': actor_id, 'display_name': from_info.get('from_name', actor_id)}
table = get_ddb().Table(table_name)
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
if item:
return item
now = int(time.time())
item = {
'actor_id': actor_id,
'display_name': from_info.get('from_name') or actor_id,
'telegram_username': from_info.get('from_username', ''),
'created_at': str(now),
'status': 'pending',
'services': {},
}
table.put_item(Item=item)
print(f'[agent-runner] Registered new user (pending): {actor_id}')
return item
def update_user_status(actor_id: str, name: str, status: str) -> None:
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return
table = get_ddb().Table(table_name)
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET display_name = :n, #s = :s',
ExpressionAttributeNames={'#s': 'status'},
ExpressionAttributeValues={':n': name, ':s': status},
)
# Per-invocation dedup: track sent message hashes to prevent AgentCore retry duplicates
_sent_hashes: set = set()
def send_telegram_direct(chat_id: str, token: str, text: str) -> None:
import hashlib
h = hashlib.md5(f'{chat_id}:{text}'.encode()).hexdigest()[:12]
if h in _sent_hashes:
print(f'[agent-runner] dedup: skipping duplicate message (hash={h})')
return
_sent_hashes.add(h)
url = f'https://api.telegram.org/bot{token}/sendMessage'
data = json.dumps({'chat_id': chat_id, 'text': text}).encode()
req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'})
try:
resp = urllib.request.urlopen(req, timeout=10)
resp_body = resp.read()
import re
msg_id = re.search(r'"message_id":(\d+)', resp_body.decode('utf-8', errors='replace'))
print(f'[agent-runner] Telegram sendMessage -> msg_id={msg_id.group(1) if msg_id else "?"} hash={h}')
except Exception as e:
print(f'[agent-runner] Telegram sendMessage FAILED: {type(e).__name__}: {e} hash={h}')
raise
def get_or_create_session(actor_id: str) -> str:
"""Look up active session for actor, or create a new one."""
table = get_ddb().Table(os.environ['SESSION_TABLE_NAME'])
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
now = int(time.time())
ttl_8hr = now + (8 * 3600)
if item and item.get('ttl', 0) > now:
# Active session exists — extend TTL
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET #ttl = :ttl',
ExpressionAttributeNames={'#ttl': 'ttl'},
ExpressionAttributeValues={':ttl': ttl_8hr},
)
return item['session_id']
# Create new session
session_id = str(uuid.uuid4())
table.put_item(Item={
'actor_id': actor_id,
'session_id': session_id,
'created_at': str(now),
'ttl': ttl_8hr,
})
return session_id
def handler(event, context):
# ── Parse SQS records (FIFO — all from same actor) ───────────────────
records = []
for record in event.get('Records', []):
try:
records.append(json.loads(record['body']))
except (json.JSONDecodeError, KeyError):
continue
if not records:
return
first = records[0]
channel = first.get('channel', 'telegram')
chat_id = first.get('chat_id', '')
actor_id = f"{channel}:{chat_id}"
# ── User registry ─────────────────────────────────────────────────────
from_info = first.get('messages', [{}])[0]
user_profile = get_or_create_user(actor_id, from_info)
# ── Onboarding gate ─────────────────────────────────────────────────────
table_name = os.environ.get('USERS_TABLE_NAME', '')
if table_name and user_profile.get('status', 'active') == 'pending':
raw_prompt = records[0]['messages'][0]['text'] if records else ''
is_name_msg = bool(raw_prompt and len(raw_prompt.strip()) < 50 and '?' not in raw_prompt)
if is_name_msg:
name = raw_prompt.strip()
update_user_status(actor_id, name=name, status='active')
user_profile['display_name'] = name
user_profile['status'] = 'active'
prompt = f"[System: User just registered with name '{name}'. Welcome them warmly and ask how you can help.]"
else:
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
bot_token = ''
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
send_telegram_direct(chat_id, bot_token, "Hi! I don't recognize you yet. What's your name?")
return
# ── Get or create AgentCore session ──────────────────────────────────
session_id = get_or_create_session(actor_id)
print(f"[agent-runner] actor={actor_id} session={session_id} user={user_profile.get('display_name', '')}")
# ── Bundle messages ───────────────────────────────────────────────────
if len(records) == 1:
prompt = records[0]['messages'][0]['text']
else:
lines = [
f"[{i+1}] {r['messages'][0]['text']}"
for i, r in enumerate(records)
]
prompt = f"You have {len(records)} queued messages:\n" + "\n".join(lines)
# ── Build payload for AgentCore Runtime 1 ────────────────────────────
payload: dict[str, Any] = {
'prompt': prompt,
'actor_id': actor_id,
'session_id': session_id,
'user_profile': {
'display_name': user_profile.get('display_name', actor_id),
'telegram_username': user_profile.get('telegram_username', ''),
'google_email': user_profile.get('google_email', ''),
'allowed': user_profile.get('allowed', True),
'services': user_profile.get('services', {}),
},
'channel_adapter': {
'type': channel,
'target_id': str(chat_id),
'bot_token_secret_arn': os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', ''),
},
}
# ── Invoke AgentCore Runtime 1 ────────────────────────────────────────
runtime_arn = os.environ.get('RUNTIME_1_ARN', '')
if not runtime_arn or runtime_arn == 'PLACEHOLDER_SET_AFTER_RUNTIME_DEPLOY':
print(f"[agent-runner] RUNTIME_1_ARN not set — skipping AgentCore invoke")
print(f"[agent-runner] Would have sent: {json.dumps(payload)[:200]}")
return
client = get_agentcore()
response = client.invoke_agent_runtime(
agentRuntimeArn=runtime_arn,
runtimeSessionId=session_id,
payload=json.dumps(payload).encode(),
)
# Process streaming response: buffer text chunks and send to Telegram as paragraphs arrive
bot_token = ''
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
try:
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
except Exception as e:
print(f'[agent-runner] Failed to get bot token: {e}')
body = response.get('response')
text_buffer = ''
leftover = ''
if body is not None:
for raw_chunk in body.iter_chunks():
if not raw_chunk:
continue
# AgentCore streams SSE format: "data: {...}\n\n"
text = leftover + raw_chunk.decode('utf-8', errors='replace')
parts = text.split('\n\n')
leftover = parts[-1]
for part in parts[:-1]:
for line in part.splitlines():
if not line.startswith('data: '):
continue
data = line[6:].strip()
if not data or data == '[DONE]':
continue
try:
event = json.loads(data)
except (json.JSONDecodeError, ValueError):
continue
if not isinstance(event, dict):
continue
# Extract text delta from contentBlockDelta ONLY
# Do NOT use event.get('data') — that's the full formatted summary,
# causing duplicate delivery alongside the token stream.
delta = event.get('event', {}).get('contentBlockDelta', {}).get('delta', {})
if not isinstance(delta, dict):
continue
token = delta.get('text', '')
if token:
text_buffer += token
# Only flush if buffer is very large — prevents splitting multi-turn responses
if len(text_buffer) > 1200:
print(f'[agent-runner] send chunk {len(text_buffer)}c to {chat_id}')
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
text_buffer = ''
# Flush any remaining text
print(f'[agent-runner] stream done buffer={len(text_buffer)} bot_token_set={bool(bot_token)}')
if text_buffer.strip() and bot_token:
print(f'[agent-runner] flushing {len(text_buffer)}c to {chat_id}')
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
print(f"[agent-runner] Completed session={session_id} actor={actor_id}")

View File

@@ -0,0 +1,232 @@
import json
import os
import time
import uuid
import boto3
import urllib.request
from typing import Any
# AWS clients
_ddb = None
_agentcore = None
def get_ddb():
global _ddb
if _ddb is None:
_ddb = boto3.resource('dynamodb')
return _ddb
def get_agentcore():
global _agentcore
if _agentcore is None:
from botocore.config import Config
_agentcore = boto3.client(
'bedrock-agentcore',
region_name='us-east-1',
config=Config(read_timeout=600, connect_timeout=10)
)
return _agentcore
def get_or_create_user(actor_id: str, from_info: dict) -> dict:
"""Look up user in registry, auto-registering on first contact."""
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return {'actor_id': actor_id, 'display_name': from_info.get('from_name', actor_id)}
table = get_ddb().Table(table_name)
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
if item:
return item
now = int(time.time())
item = {
'actor_id': actor_id,
'display_name': from_info.get('from_name') or actor_id,
'telegram_username': from_info.get('from_username', ''),
'created_at': str(now),
'status': 'pending',
'services': {},
}
table.put_item(Item=item)
print(f'[agent-runner] Registered new user (pending): {actor_id}')
return item
def update_user_status(actor_id: str, name: str, status: str) -> None:
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return
table = get_ddb().Table(table_name)
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET display_name = :n, #s = :s',
ExpressionAttributeNames={'#s': 'status'},
ExpressionAttributeValues={':n': name, ':s': status},
)
def send_telegram_direct(chat_id: str, token: str, text: str) -> None:
url = f'https://api.telegram.org/bot{token}/sendMessage'
data = json.dumps({'chat_id': chat_id, 'text': text}).encode()
req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'})
urllib.request.urlopen(req, timeout=10)
def get_or_create_session(actor_id: str) -> str:
"""Look up active session for actor, or create a new one."""
table = get_ddb().Table(os.environ['SESSION_TABLE_NAME'])
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
now = int(time.time())
ttl_8hr = now + (8 * 3600)
if item and item.get('ttl', 0) > now:
# Active session exists — extend TTL
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET #ttl = :ttl',
ExpressionAttributeNames={'#ttl': 'ttl'},
ExpressionAttributeValues={':ttl': ttl_8hr},
)
return item['session_id']
# Create new session
session_id = str(uuid.uuid4())
table.put_item(Item={
'actor_id': actor_id,
'session_id': session_id,
'created_at': str(now),
'ttl': ttl_8hr,
})
return session_id
def handler(event, context):
# ── Parse SQS records (FIFO — all from same actor) ───────────────────
records = []
for record in event.get('Records', []):
try:
records.append(json.loads(record['body']))
except (json.JSONDecodeError, KeyError):
continue
if not records:
return
first = records[0]
channel = first.get('channel', 'telegram')
chat_id = first.get('chat_id', '')
actor_id = f"{channel}:{chat_id}"
# ── User registry ─────────────────────────────────────────────────────
from_info = first.get('messages', [{}])[0]
user_profile = get_or_create_user(actor_id, from_info)
# ── Onboarding gate ─────────────────────────────────────────────────────
table_name = os.environ.get('USERS_TABLE_NAME', '')
if table_name and user_profile.get('status', 'active') == 'pending':
raw_prompt = records[0]['messages'][0]['text'] if records else ''
is_name_msg = bool(raw_prompt and len(raw_prompt.strip()) < 50 and '?' not in raw_prompt)
if is_name_msg:
name = raw_prompt.strip()
update_user_status(actor_id, name=name, status='active')
user_profile['display_name'] = name
user_profile['status'] = 'active'
prompt = f"[System: User just registered with name '{name}'. Welcome them warmly and ask how you can help.]"
else:
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
bot_token = ''
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
send_telegram_direct(chat_id, bot_token, "Hi! I don't recognize you yet. What's your name?")
return
# ── Get or create AgentCore session ──────────────────────────────────
session_id = get_or_create_session(actor_id)
print(f"[agent-runner] actor={actor_id} session={session_id} user={user_profile.get('display_name', '')}")
# ── Bundle messages ───────────────────────────────────────────────────
if len(records) == 1:
prompt = records[0]['messages'][0]['text']
else:
lines = [
f"[{i+1}] {r['messages'][0]['text']}"
for i, r in enumerate(records)
]
prompt = f"You have {len(records)} queued messages:\n" + "\n".join(lines)
# ── Build payload for AgentCore Runtime 1 ────────────────────────────
payload: dict[str, Any] = {
'prompt': prompt,
'actor_id': actor_id,
'session_id': session_id,
'user_profile': {
'display_name': user_profile.get('display_name', actor_id),
'telegram_username': user_profile.get('telegram_username', ''),
'google_email': user_profile.get('google_email', ''),
'allowed': user_profile.get('allowed', True),
'services': user_profile.get('services', {}),
},
'channel_adapter': {
'type': channel,
'target_id': str(chat_id),
'bot_token_secret_arn': os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', ''),
},
}
# ── Invoke AgentCore Runtime 1 ────────────────────────────────────────
runtime_arn = os.environ.get('RUNTIME_1_ARN', '')
if not runtime_arn or runtime_arn == 'PLACEHOLDER_SET_AFTER_RUNTIME_DEPLOY':
print(f"[agent-runner] RUNTIME_1_ARN not set — skipping AgentCore invoke")
print(f"[agent-runner] Would have sent: {json.dumps(payload)[:200]}")
return
client = get_agentcore()
response = client.invoke_agent_runtime(
agentRuntimeArn=runtime_arn,
runtimeSessionId=session_id,
payload=json.dumps(payload).encode(),
)
# Process streaming response: buffer text chunks and send to Telegram as paragraphs arrive
bot_token = ''
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
try:
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
except Exception as e:
print(f'[agent-runner] Failed to get bot token: {e}')
body = response.get('response')
text_buffer = ''
if body is not None:
for chunk in body.iter_chunks():
if not chunk:
continue
try:
event = json.loads(chunk.decode('utf-8'))
# Strands streaming event: 'data' field contains text delta
delta = event.get('data', '') or event.get('text', '')
if delta:
text_buffer += delta
# Flush on paragraph or sentence break, or if buffer is large
flush = (
text_buffer.rstrip().endswith(('\n\n', '.\n', '!\n', '?\n'))
or len(text_buffer) > 800
)
if flush and text_buffer.strip():
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
text_buffer = ''
except (json.JSONDecodeError, UnicodeDecodeError):
pass
# Flush any remaining text
if text_buffer.strip() and bot_token:
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
print(f"[agent-runner] Completed session={session_id} actor={actor_id}")

View File

@@ -0,0 +1,263 @@
import json
import os
import time
import uuid
import boto3
import urllib.request
from typing import Any
# AWS clients
_ddb = None
_agentcore = None
def get_ddb():
global _ddb
if _ddb is None:
_ddb = boto3.resource('dynamodb')
return _ddb
def get_agentcore():
global _agentcore
if _agentcore is None:
from botocore.config import Config
_agentcore = boto3.client(
'bedrock-agentcore',
region_name='us-east-1',
config=Config(read_timeout=600, connect_timeout=10)
)
return _agentcore
def get_or_create_user(actor_id: str, from_info: dict) -> dict:
"""Look up user in registry, auto-registering on first contact."""
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return {'actor_id': actor_id, 'display_name': from_info.get('from_name', actor_id)}
table = get_ddb().Table(table_name)
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
if item:
return item
now = int(time.time())
item = {
'actor_id': actor_id,
'display_name': from_info.get('from_name') or actor_id,
'telegram_username': from_info.get('from_username', ''),
'created_at': str(now),
'status': 'pending',
'services': {},
}
table.put_item(Item=item)
print(f'[agent-runner] Registered new user (pending): {actor_id}')
return item
def update_user_status(actor_id: str, name: str, status: str) -> None:
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return
table = get_ddb().Table(table_name)
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET display_name = :n, #s = :s',
ExpressionAttributeNames={'#s': 'status'},
ExpressionAttributeValues={':n': name, ':s': status},
)
# Per-invocation dedup: track sent message hashes to prevent AgentCore retry duplicates
_sent_hashes: set = set()
def send_telegram_direct(chat_id: str, token: str, text: str) -> None:
import hashlib, traceback as tb
h = hashlib.md5(f'{chat_id}:{text}'.encode()).hexdigest()[:12]
if h in _sent_hashes:
print(f'[agent-runner] dedup: skipping duplicate message (hash={h})')
print(f'[agent-runner] dedup stack: {tb.format_stack()[-3].strip()}')
return
_sent_hashes.add(h)
print(f'[agent-runner] SEND hash={h} text={repr(text[:40])}')
print(f'[agent-runner] SEND caller: {tb.format_stack()[-2].strip()}')
url = f'https://api.telegram.org/bot{token}/sendMessage'
req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'})
urllib.request.urlopen(req, timeout=10)
def get_or_create_session(actor_id: str) -> str:
"""Look up active session for actor, or create a new one."""
table = get_ddb().Table(os.environ['SESSION_TABLE_NAME'])
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
now = int(time.time())
ttl_8hr = now + (8 * 3600)
if item and item.get('ttl', 0) > now:
# Active session exists — extend TTL
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET #ttl = :ttl',
ExpressionAttributeNames={'#ttl': 'ttl'},
ExpressionAttributeValues={':ttl': ttl_8hr},
)
return item['session_id']
# Create new session
session_id = str(uuid.uuid4())
table.put_item(Item={
'actor_id': actor_id,
'session_id': session_id,
'created_at': str(now),
'ttl': ttl_8hr,
})
return session_id
def handler(event, context):
# ── Parse SQS records (FIFO — all from same actor) ───────────────────
records = []
for record in event.get('Records', []):
try:
records.append(json.loads(record['body']))
except (json.JSONDecodeError, KeyError):
continue
if not records:
return
first = records[0]
channel = first.get('channel', 'telegram')
chat_id = first.get('chat_id', '')
actor_id = f"{channel}:{chat_id}"
# ── User registry ─────────────────────────────────────────────────────
from_info = first.get('messages', [{}])[0]
user_profile = get_or_create_user(actor_id, from_info)
# ── Onboarding gate ─────────────────────────────────────────────────────
table_name = os.environ.get('USERS_TABLE_NAME', '')
if table_name and user_profile.get('status', 'active') == 'pending':
raw_prompt = records[0]['messages'][0]['text'] if records else ''
is_name_msg = bool(raw_prompt and len(raw_prompt.strip()) < 50 and '?' not in raw_prompt)
if is_name_msg:
name = raw_prompt.strip()
update_user_status(actor_id, name=name, status='active')
user_profile['display_name'] = name
user_profile['status'] = 'active'
prompt = f"[System: User just registered with name '{name}'. Welcome them warmly and ask how you can help.]"
else:
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
bot_token = ''
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
send_telegram_direct(chat_id, bot_token, "Hi! I don't recognize you yet. What's your name?")
return
# ── Get or create AgentCore session ──────────────────────────────────
session_id = get_or_create_session(actor_id)
print(f"[agent-runner] actor={actor_id} session={session_id} user={user_profile.get('display_name', '')}")
# ── Bundle messages ───────────────────────────────────────────────────
if len(records) == 1:
prompt = records[0]['messages'][0]['text']
else:
lines = [
f"[{i+1}] {r['messages'][0]['text']}"
for i, r in enumerate(records)
]
prompt = f"You have {len(records)} queued messages:\n" + "\n".join(lines)
# ── Build payload for AgentCore Runtime 1 ────────────────────────────
payload: dict[str, Any] = {
'prompt': prompt,
'actor_id': actor_id,
'session_id': session_id,
'user_profile': {
'display_name': user_profile.get('display_name', actor_id),
'telegram_username': user_profile.get('telegram_username', ''),
'google_email': user_profile.get('google_email', ''),
'allowed': user_profile.get('allowed', True),
'services': user_profile.get('services', {}),
},
'channel_adapter': {
'type': channel,
'target_id': str(chat_id),
'bot_token_secret_arn': os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', ''),
},
}
# ── Invoke AgentCore Runtime 1 ────────────────────────────────────────
runtime_arn = os.environ.get('RUNTIME_1_ARN', '')
if not runtime_arn or runtime_arn == 'PLACEHOLDER_SET_AFTER_RUNTIME_DEPLOY':
print(f"[agent-runner] RUNTIME_1_ARN not set — skipping AgentCore invoke")
print(f"[agent-runner] Would have sent: {json.dumps(payload)[:200]}")
return
client = get_agentcore()
response = client.invoke_agent_runtime(
agentRuntimeArn=runtime_arn,
runtimeSessionId=session_id,
payload=json.dumps(payload).encode(),
)
# Process streaming response: buffer text chunks and send to Telegram as paragraphs arrive
bot_token = ''
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
try:
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
except Exception as e:
print(f'[agent-runner] Failed to get bot token: {e}')
body = response.get('response')
text_buffer = ''
leftover = ''
if body is not None:
for raw_chunk in body.iter_chunks():
if not raw_chunk:
continue
# AgentCore streams SSE format: "data: {...}\n\n"
text = leftover + raw_chunk.decode('utf-8', errors='replace')
parts = text.split('\n\n')
leftover = parts[-1]
for part in parts[:-1]:
for line in part.splitlines():
if not line.startswith('data: '):
continue
data = line[6:].strip()
if not data or data == '[DONE]':
continue
try:
event = json.loads(data)
except (json.JSONDecodeError, ValueError):
continue
if not isinstance(event, dict):
continue
# Extract text delta from contentBlockDelta
delta = event.get('event', {}).get('contentBlockDelta', {}).get('delta', {})
if not isinstance(delta, dict):
continue
token = delta.get('text', '') or event.get('data', '')
if token:
text_buffer += token
flush = (
text_buffer.rstrip().endswith(('\n\n', '.\n', '!\n', '?\n'))
or len(text_buffer) > 800
)
if flush and text_buffer.strip():
print(f'[agent-runner] send chunk {len(text_buffer)}c to {chat_id}')
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
text_buffer = ''
# Flush any remaining text
print(f'[agent-runner] stream done buffer={len(text_buffer)} bot_token_set={bool(bot_token)}')
if text_buffer.strip() and bot_token:
print(f'[agent-runner] flushing {len(text_buffer)}c to {chat_id}')
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
print(f"[agent-runner] Completed session={session_id} actor={actor_id}")

View File

@@ -0,0 +1,268 @@
import json
import os
import time
import uuid
import boto3
import urllib.request
from typing import Any
# AWS clients
_ddb = None
_agentcore = None
def get_ddb():
global _ddb
if _ddb is None:
_ddb = boto3.resource('dynamodb')
return _ddb
def get_agentcore():
global _agentcore
if _agentcore is None:
from botocore.config import Config
_agentcore = boto3.client(
'bedrock-agentcore',
region_name='us-east-1',
config=Config(read_timeout=600, connect_timeout=10)
)
return _agentcore
def get_or_create_user(actor_id: str, from_info: dict) -> dict:
"""Look up user in registry, auto-registering on first contact."""
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return {'actor_id': actor_id, 'display_name': from_info.get('from_name', actor_id)}
table = get_ddb().Table(table_name)
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
if item:
return item
now = int(time.time())
item = {
'actor_id': actor_id,
'display_name': from_info.get('from_name') or actor_id,
'telegram_username': from_info.get('from_username', ''),
'created_at': str(now),
'status': 'pending',
'services': {},
}
table.put_item(Item=item)
print(f'[agent-runner] Registered new user (pending): {actor_id}')
return item
def update_user_status(actor_id: str, name: str, status: str) -> None:
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return
table = get_ddb().Table(table_name)
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET display_name = :n, #s = :s',
ExpressionAttributeNames={'#s': 'status'},
ExpressionAttributeValues={':n': name, ':s': status},
)
# Per-invocation dedup: track sent message hashes to prevent AgentCore retry duplicates
_sent_hashes: set = set()
def send_telegram_direct(chat_id: str, token: str, text: str) -> None:
import hashlib
h = hashlib.md5(f'{chat_id}:{text}'.encode()).hexdigest()[:12]
if h in _sent_hashes:
print(f'[agent-runner] dedup: skipping duplicate message (hash={h})')
return
_sent_hashes.add(h)
url = f'https://api.telegram.org/bot{token}/sendMessage'
data = json.dumps({'chat_id': chat_id, 'text': text}).encode()
req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'})
try:
resp = urllib.request.urlopen(req, timeout=10)
resp_body = resp.read()
import re
msg_id = re.search(r'"message_id":(\d+)', resp_body.decode('utf-8', errors='replace'))
print(f'[agent-runner] Telegram sendMessage -> msg_id={msg_id.group(1) if msg_id else "?"} hash={h}')
except Exception as e:
print(f'[agent-runner] Telegram sendMessage FAILED: {type(e).__name__}: {e} hash={h}')
raise
def get_or_create_session(actor_id: str) -> str:
"""Look up active session for actor, or create a new one."""
table = get_ddb().Table(os.environ['SESSION_TABLE_NAME'])
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
now = int(time.time())
ttl_8hr = now + (8 * 3600)
if item and item.get('ttl', 0) > now:
# Active session exists — extend TTL
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET #ttl = :ttl',
ExpressionAttributeNames={'#ttl': 'ttl'},
ExpressionAttributeValues={':ttl': ttl_8hr},
)
return item['session_id']
# Create new session
session_id = str(uuid.uuid4())
table.put_item(Item={
'actor_id': actor_id,
'session_id': session_id,
'created_at': str(now),
'ttl': ttl_8hr,
})
return session_id
def handler(event, context):
# ── Parse SQS records (FIFO — all from same actor) ───────────────────
records = []
for record in event.get('Records', []):
try:
records.append(json.loads(record['body']))
except (json.JSONDecodeError, KeyError):
continue
if not records:
return
first = records[0]
channel = first.get('channel', 'telegram')
chat_id = first.get('chat_id', '')
actor_id = f"{channel}:{chat_id}"
# ── User registry ─────────────────────────────────────────────────────
from_info = first.get('messages', [{}])[0]
user_profile = get_or_create_user(actor_id, from_info)
# ── Onboarding gate ─────────────────────────────────────────────────────
table_name = os.environ.get('USERS_TABLE_NAME', '')
if table_name and user_profile.get('status', 'active') == 'pending':
raw_prompt = records[0]['messages'][0]['text'] if records else ''
is_name_msg = bool(raw_prompt and len(raw_prompt.strip()) < 50 and '?' not in raw_prompt)
if is_name_msg:
name = raw_prompt.strip()
update_user_status(actor_id, name=name, status='active')
user_profile['display_name'] = name
user_profile['status'] = 'active'
prompt = f"[System: User just registered with name '{name}'. Welcome them warmly and ask how you can help.]"
else:
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
bot_token = ''
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
send_telegram_direct(chat_id, bot_token, "Hi! I don't recognize you yet. What's your name?")
return
# ── Get or create AgentCore session ──────────────────────────────────
session_id = get_or_create_session(actor_id)
print(f"[agent-runner] actor={actor_id} session={session_id} user={user_profile.get('display_name', '')}")
# ── Bundle messages ───────────────────────────────────────────────────
if len(records) == 1:
prompt = records[0]['messages'][0]['text']
else:
lines = [
f"[{i+1}] {r['messages'][0]['text']}"
for i, r in enumerate(records)
]
prompt = f"You have {len(records)} queued messages:\n" + "\n".join(lines)
# ── Build payload for AgentCore Runtime 1 ────────────────────────────
payload: dict[str, Any] = {
'prompt': prompt,
'actor_id': actor_id,
'session_id': session_id,
'user_profile': {
'display_name': user_profile.get('display_name', actor_id),
'telegram_username': user_profile.get('telegram_username', ''),
'google_email': user_profile.get('google_email', ''),
'allowed': user_profile.get('allowed', True),
'services': user_profile.get('enrolled_services', user_profile.get('services', {})),
},
'channel_adapter': {
'type': channel,
'target_id': str(chat_id),
'bot_token_secret_arn': os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', ''),
},
}
# ── Invoke AgentCore Runtime 1 ────────────────────────────────────────
runtime_arn = os.environ.get('RUNTIME_1_ARN', '')
if not runtime_arn or runtime_arn == 'PLACEHOLDER_SET_AFTER_RUNTIME_DEPLOY':
print(f"[agent-runner] RUNTIME_1_ARN not set — skipping AgentCore invoke")
print(f"[agent-runner] Would have sent: {json.dumps(payload)[:200]}")
return
client = get_agentcore()
response = client.invoke_agent_runtime(
agentRuntimeArn=runtime_arn,
runtimeSessionId=session_id,
payload=json.dumps(payload).encode(),
)
# Process streaming response: buffer text chunks and send to Telegram as paragraphs arrive
bot_token = ''
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
try:
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
except Exception as e:
print(f'[agent-runner] Failed to get bot token: {e}')
body = response.get('response')
text_buffer = ''
leftover = ''
if body is not None:
for raw_chunk in body.iter_chunks():
if not raw_chunk:
continue
# AgentCore streams SSE format: "data: {...}\n\n"
text = leftover + raw_chunk.decode('utf-8', errors='replace')
parts = text.split('\n\n')
leftover = parts[-1]
for part in parts[:-1]:
for line in part.splitlines():
if not line.startswith('data: '):
continue
data = line[6:].strip()
if not data or data == '[DONE]':
continue
try:
event = json.loads(data)
except (json.JSONDecodeError, ValueError):
continue
if not isinstance(event, dict):
continue
# Extract text delta from contentBlockDelta ONLY
# Do NOT use event.get('data') — that's the full formatted summary,
# causing duplicate delivery alongside the token stream.
delta = event.get('event', {}).get('contentBlockDelta', {}).get('delta', {})
if not isinstance(delta, dict):
continue
token = delta.get('text', '')
if token:
text_buffer += token
# Only flush if buffer is very large — prevents splitting multi-turn responses
if len(text_buffer) > 1200:
print(f'[agent-runner] send chunk {len(text_buffer)}c to {chat_id}')
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
text_buffer = ''
# Flush any remaining text
print(f'[agent-runner] stream done buffer={len(text_buffer)} bot_token_set={bool(bot_token)}')
if text_buffer.strip() and bot_token:
print(f'[agent-runner] flushing {len(text_buffer)}c to {chat_id}')
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
print(f"[agent-runner] Completed session={session_id} actor={actor_id}")

View File

@@ -0,0 +1,260 @@
import json
import os
import time
import uuid
import boto3
import urllib.request
from typing import Any
# AWS clients
_ddb = None
_agentcore = None
def get_ddb():
global _ddb
if _ddb is None:
_ddb = boto3.resource('dynamodb')
return _ddb
def get_agentcore():
global _agentcore
if _agentcore is None:
from botocore.config import Config
_agentcore = boto3.client(
'bedrock-agentcore',
region_name='us-east-1',
config=Config(read_timeout=600, connect_timeout=10)
)
return _agentcore
def get_or_create_user(actor_id: str, from_info: dict) -> dict:
"""Look up user in registry, auto-registering on first contact."""
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return {'actor_id': actor_id, 'display_name': from_info.get('from_name', actor_id)}
table = get_ddb().Table(table_name)
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
if item:
return item
now = int(time.time())
item = {
'actor_id': actor_id,
'display_name': from_info.get('from_name') or actor_id,
'telegram_username': from_info.get('from_username', ''),
'created_at': str(now),
'status': 'pending',
'services': {},
}
table.put_item(Item=item)
print(f'[agent-runner] Registered new user (pending): {actor_id}')
return item
def update_user_status(actor_id: str, name: str, status: str) -> None:
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return
table = get_ddb().Table(table_name)
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET display_name = :n, #s = :s',
ExpressionAttributeNames={'#s': 'status'},
ExpressionAttributeValues={':n': name, ':s': status},
)
# Per-invocation dedup: track sent message hashes to prevent AgentCore retry duplicates
_sent_hashes: set = set()
def send_telegram_direct(chat_id: str, token: str, text: str) -> None:
import hashlib
h = hashlib.md5(f'{chat_id}:{text}'.encode()).hexdigest()[:12]
if h in _sent_hashes:
print(f'[agent-runner] dedup: skipping duplicate message (hash={h})')
return
_sent_hashes.add(h)
url = f'https://api.telegram.org/bot{token}/sendMessage'
data = json.dumps({'chat_id': chat_id, 'text': text}).encode()
req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'})
urllib.request.urlopen(req, timeout=10)
def get_or_create_session(actor_id: str) -> str:
"""Look up active session for actor, or create a new one."""
table = get_ddb().Table(os.environ['SESSION_TABLE_NAME'])
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
now = int(time.time())
ttl_8hr = now + (8 * 3600)
if item and item.get('ttl', 0) > now:
# Active session exists — extend TTL
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET #ttl = :ttl',
ExpressionAttributeNames={'#ttl': 'ttl'},
ExpressionAttributeValues={':ttl': ttl_8hr},
)
return item['session_id']
# Create new session
session_id = str(uuid.uuid4())
table.put_item(Item={
'actor_id': actor_id,
'session_id': session_id,
'created_at': str(now),
'ttl': ttl_8hr,
})
return session_id
def handler(event, context):
# ── Parse SQS records (FIFO — all from same actor) ───────────────────
records = []
for record in event.get('Records', []):
try:
records.append(json.loads(record['body']))
except (json.JSONDecodeError, KeyError):
continue
if not records:
return
first = records[0]
channel = first.get('channel', 'telegram')
chat_id = first.get('chat_id', '')
actor_id = f"{channel}:{chat_id}"
# ── User registry ─────────────────────────────────────────────────────
from_info = first.get('messages', [{}])[0]
user_profile = get_or_create_user(actor_id, from_info)
# ── Onboarding gate ─────────────────────────────────────────────────────
table_name = os.environ.get('USERS_TABLE_NAME', '')
if table_name and user_profile.get('status', 'active') == 'pending':
raw_prompt = records[0]['messages'][0]['text'] if records else ''
is_name_msg = bool(raw_prompt and len(raw_prompt.strip()) < 50 and '?' not in raw_prompt)
if is_name_msg:
name = raw_prompt.strip()
update_user_status(actor_id, name=name, status='active')
user_profile['display_name'] = name
user_profile['status'] = 'active'
prompt = f"[System: User just registered with name '{name}'. Welcome them warmly and ask how you can help.]"
else:
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
bot_token = ''
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
send_telegram_direct(chat_id, bot_token, "Hi! I don't recognize you yet. What's your name?")
return
# ── Get or create AgentCore session ──────────────────────────────────
session_id = get_or_create_session(actor_id)
print(f"[agent-runner] actor={actor_id} session={session_id} user={user_profile.get('display_name', '')}")
# ── Bundle messages ───────────────────────────────────────────────────
if len(records) == 1:
prompt = records[0]['messages'][0]['text']
else:
lines = [
f"[{i+1}] {r['messages'][0]['text']}"
for i, r in enumerate(records)
]
prompt = f"You have {len(records)} queued messages:\n" + "\n".join(lines)
# ── Build payload for AgentCore Runtime 1 ────────────────────────────
payload: dict[str, Any] = {
'prompt': prompt,
'actor_id': actor_id,
'session_id': session_id,
'user_profile': {
'display_name': user_profile.get('display_name', actor_id),
'telegram_username': user_profile.get('telegram_username', ''),
'google_email': user_profile.get('google_email', ''),
'allowed': user_profile.get('allowed', True),
'services': user_profile.get('services', {}),
},
'channel_adapter': {
'type': channel,
'target_id': str(chat_id),
'bot_token_secret_arn': os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', ''),
},
}
# ── Invoke AgentCore Runtime 1 ────────────────────────────────────────
runtime_arn = os.environ.get('RUNTIME_1_ARN', '')
if not runtime_arn or runtime_arn == 'PLACEHOLDER_SET_AFTER_RUNTIME_DEPLOY':
print(f"[agent-runner] RUNTIME_1_ARN not set — skipping AgentCore invoke")
print(f"[agent-runner] Would have sent: {json.dumps(payload)[:200]}")
return
client = get_agentcore()
response = client.invoke_agent_runtime(
agentRuntimeArn=runtime_arn,
runtimeSessionId=session_id,
payload=json.dumps(payload).encode(),
)
# Process streaming response: buffer text chunks and send to Telegram as paragraphs arrive
bot_token = ''
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
try:
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
except Exception as e:
print(f'[agent-runner] Failed to get bot token: {e}')
body = response.get('response')
text_buffer = ''
leftover = ''
if body is not None:
for raw_chunk in body.iter_chunks():
if not raw_chunk:
continue
# AgentCore streams SSE format: "data: {...}\n\n"
text = leftover + raw_chunk.decode('utf-8', errors='replace')
parts = text.split('\n\n')
leftover = parts[-1]
for part in parts[:-1]:
for line in part.splitlines():
if not line.startswith('data: '):
continue
data = line[6:].strip()
if not data or data == '[DONE]':
continue
try:
event = json.loads(data)
except (json.JSONDecodeError, ValueError):
continue
if not isinstance(event, dict):
continue
# Extract text delta from contentBlockDelta ONLY
# Do NOT use event.get('data') — that's the full formatted summary,
# causing duplicate delivery alongside the token stream.
delta = event.get('event', {}).get('contentBlockDelta', {}).get('delta', {})
if not isinstance(delta, dict):
continue
token = delta.get('text', '')
if token:
text_buffer += token
# Only flush if buffer is very large — prevents splitting multi-turn responses
if len(text_buffer) > 1200:
print(f'[agent-runner] send chunk {len(text_buffer)}c to {chat_id}')
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
text_buffer = ''
# Flush any remaining text
print(f'[agent-runner] stream done buffer={len(text_buffer)} bot_token_set={bool(bot_token)}')
if text_buffer.strip() and bot_token:
print(f'[agent-runner] flushing {len(text_buffer)}c to {chat_id}')
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
print(f"[agent-runner] Completed session={session_id} actor={actor_id}")

View File

@@ -0,0 +1,244 @@
import json
import os
import time
import uuid
import boto3
import urllib.request
from typing import Any
# AWS clients
_ddb = None
_agentcore = None
def get_ddb():
global _ddb
if _ddb is None:
_ddb = boto3.resource('dynamodb')
return _ddb
def get_agentcore():
global _agentcore
if _agentcore is None:
from botocore.config import Config
_agentcore = boto3.client(
'bedrock-agentcore',
region_name='us-east-1',
config=Config(read_timeout=600, connect_timeout=10)
)
return _agentcore
def get_or_create_user(actor_id: str, from_info: dict) -> dict:
"""Look up user in registry, auto-registering on first contact."""
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return {'actor_id': actor_id, 'display_name': from_info.get('from_name', actor_id)}
table = get_ddb().Table(table_name)
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
if item:
return item
now = int(time.time())
item = {
'actor_id': actor_id,
'display_name': from_info.get('from_name') or actor_id,
'telegram_username': from_info.get('from_username', ''),
'created_at': str(now),
'status': 'pending',
'services': {},
}
table.put_item(Item=item)
print(f'[agent-runner] Registered new user (pending): {actor_id}')
return item
def update_user_status(actor_id: str, name: str, status: str) -> None:
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return
table = get_ddb().Table(table_name)
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET display_name = :n, #s = :s',
ExpressionAttributeNames={'#s': 'status'},
ExpressionAttributeValues={':n': name, ':s': status},
)
def send_telegram_direct(chat_id: str, token: str, text: str) -> None:
url = f'https://api.telegram.org/bot{token}/sendMessage'
data = json.dumps({'chat_id': chat_id, 'text': text}).encode()
req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'})
urllib.request.urlopen(req, timeout=10)
def get_or_create_session(actor_id: str) -> str:
"""Look up active session for actor, or create a new one."""
table = get_ddb().Table(os.environ['SESSION_TABLE_NAME'])
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
now = int(time.time())
ttl_8hr = now + (8 * 3600)
if item and item.get('ttl', 0) > now:
# Active session exists — extend TTL
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET #ttl = :ttl',
ExpressionAttributeNames={'#ttl': 'ttl'},
ExpressionAttributeValues={':ttl': ttl_8hr},
)
return item['session_id']
# Create new session
session_id = str(uuid.uuid4())
table.put_item(Item={
'actor_id': actor_id,
'session_id': session_id,
'created_at': str(now),
'ttl': ttl_8hr,
})
return session_id
def handler(event, context):
# ── Parse SQS records (FIFO — all from same actor) ───────────────────
records = []
for record in event.get('Records', []):
try:
records.append(json.loads(record['body']))
except (json.JSONDecodeError, KeyError):
continue
if not records:
return
first = records[0]
channel = first.get('channel', 'telegram')
chat_id = first.get('chat_id', '')
actor_id = f"{channel}:{chat_id}"
# ── User registry ─────────────────────────────────────────────────────
from_info = first.get('messages', [{}])[0]
user_profile = get_or_create_user(actor_id, from_info)
# ── Onboarding gate ─────────────────────────────────────────────────────
table_name = os.environ.get('USERS_TABLE_NAME', '')
if table_name and user_profile.get('status', 'active') == 'pending':
raw_prompt = records[0]['messages'][0]['text'] if records else ''
is_name_msg = bool(raw_prompt and len(raw_prompt.strip()) < 50 and '?' not in raw_prompt)
if is_name_msg:
name = raw_prompt.strip()
update_user_status(actor_id, name=name, status='active')
user_profile['display_name'] = name
user_profile['status'] = 'active'
prompt = f"[System: User just registered with name '{name}'. Welcome them warmly and ask how you can help.]"
else:
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
bot_token = ''
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
send_telegram_direct(chat_id, bot_token, "Hi! I don't recognize you yet. What's your name?")
return
# ── Get or create AgentCore session ──────────────────────────────────
session_id = get_or_create_session(actor_id)
print(f"[agent-runner] actor={actor_id} session={session_id} user={user_profile.get('display_name', '')}")
# ── Bundle messages ───────────────────────────────────────────────────
if len(records) == 1:
prompt = records[0]['messages'][0]['text']
else:
lines = [
f"[{i+1}] {r['messages'][0]['text']}"
for i, r in enumerate(records)
]
prompt = f"You have {len(records)} queued messages:\n" + "\n".join(lines)
# ── Build payload for AgentCore Runtime 1 ────────────────────────────
payload: dict[str, Any] = {
'prompt': prompt,
'actor_id': actor_id,
'session_id': session_id,
'user_profile': {
'display_name': user_profile.get('display_name', actor_id),
'telegram_username': user_profile.get('telegram_username', ''),
'google_email': user_profile.get('google_email', ''),
'allowed': user_profile.get('allowed', True),
'services': user_profile.get('services', {}),
},
'channel_adapter': {
'type': channel,
'target_id': str(chat_id),
'bot_token_secret_arn': os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', ''),
},
}
# ── Invoke AgentCore Runtime 1 ────────────────────────────────────────
runtime_arn = os.environ.get('RUNTIME_1_ARN', '')
if not runtime_arn or runtime_arn == 'PLACEHOLDER_SET_AFTER_RUNTIME_DEPLOY':
print(f"[agent-runner] RUNTIME_1_ARN not set — skipping AgentCore invoke")
print(f"[agent-runner] Would have sent: {json.dumps(payload)[:200]}")
return
client = get_agentcore()
response = client.invoke_agent_runtime(
agentRuntimeArn=runtime_arn,
runtimeSessionId=session_id,
payload=json.dumps(payload).encode(),
)
# Process streaming response: buffer text chunks and send to Telegram as paragraphs arrive
bot_token = ''
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
try:
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
except Exception as e:
print(f'[agent-runner] Failed to get bot token: {e}')
body = response.get('response')
text_buffer = ''
leftover = ''
if body is not None:
for raw_chunk in body.iter_chunks():
if not raw_chunk:
continue
# AgentCore streams SSE format: "data: {...}\n\n"
text = leftover + raw_chunk.decode('utf-8', errors='replace')
parts = text.split('\n\n')
leftover = parts[-1]
for part in parts[:-1]:
for line in part.splitlines():
if not line.startswith('data: '):
continue
data = line[6:].strip()
if not data or data == '[DONE]':
continue
try:
event = json.loads(data)
except (json.JSONDecodeError, ValueError):
continue
# Extract text delta from contentBlockDelta
delta = event.get('event', {}).get('contentBlockDelta', {}).get('delta', {})
token = delta.get('text', '') or event.get('data', '')
if token:
text_buffer += token
flush = (
text_buffer.rstrip().endswith(('\n\n', '.\n', '!\n', '?\n'))
or len(text_buffer) > 800
)
if flush and text_buffer.strip():
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
text_buffer = ''
# Flush any remaining text
if text_buffer.strip() and bot_token:
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
print(f"[agent-runner] Completed session={session_id} actor={actor_id}")

View File

@@ -0,0 +1,251 @@
import json
import os
import time
import uuid
import boto3
import urllib.request
from typing import Any
# AWS clients
_ddb = None
_agentcore = None
def get_ddb():
global _ddb
if _ddb is None:
_ddb = boto3.resource('dynamodb')
return _ddb
def get_agentcore():
global _agentcore
if _agentcore is None:
from botocore.config import Config
_agentcore = boto3.client(
'bedrock-agentcore',
region_name='us-east-1',
config=Config(read_timeout=600, connect_timeout=10)
)
return _agentcore
def get_or_create_user(actor_id: str, from_info: dict) -> dict:
"""Look up user in registry, auto-registering on first contact."""
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return {'actor_id': actor_id, 'display_name': from_info.get('from_name', actor_id)}
table = get_ddb().Table(table_name)
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
if item:
return item
now = int(time.time())
item = {
'actor_id': actor_id,
'display_name': from_info.get('from_name') or actor_id,
'telegram_username': from_info.get('from_username', ''),
'created_at': str(now),
'status': 'pending',
'services': {},
}
table.put_item(Item=item)
print(f'[agent-runner] Registered new user (pending): {actor_id}')
return item
def update_user_status(actor_id: str, name: str, status: str) -> None:
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return
table = get_ddb().Table(table_name)
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET display_name = :n, #s = :s',
ExpressionAttributeNames={'#s': 'status'},
ExpressionAttributeValues={':n': name, ':s': status},
)
def send_telegram_direct(chat_id: str, token: str, text: str) -> None:
url = f'https://api.telegram.org/bot{token}/sendMessage'
data = json.dumps({'chat_id': chat_id, 'text': text}).encode()
req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'})
urllib.request.urlopen(req, timeout=10)
def get_or_create_session(actor_id: str) -> str:
"""Look up active session for actor, or create a new one."""
table = get_ddb().Table(os.environ['SESSION_TABLE_NAME'])
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
now = int(time.time())
ttl_8hr = now + (8 * 3600)
if item and item.get('ttl', 0) > now:
# Active session exists — extend TTL
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET #ttl = :ttl',
ExpressionAttributeNames={'#ttl': 'ttl'},
ExpressionAttributeValues={':ttl': ttl_8hr},
)
return item['session_id']
# Create new session
session_id = str(uuid.uuid4())
table.put_item(Item={
'actor_id': actor_id,
'session_id': session_id,
'created_at': str(now),
'ttl': ttl_8hr,
})
return session_id
def handler(event, context):
# ── Parse SQS records (FIFO — all from same actor) ───────────────────
records = []
for record in event.get('Records', []):
try:
records.append(json.loads(record['body']))
except (json.JSONDecodeError, KeyError):
continue
if not records:
return
first = records[0]
channel = first.get('channel', 'telegram')
chat_id = first.get('chat_id', '')
actor_id = f"{channel}:{chat_id}"
# ── User registry ─────────────────────────────────────────────────────
from_info = first.get('messages', [{}])[0]
user_profile = get_or_create_user(actor_id, from_info)
# ── Onboarding gate ─────────────────────────────────────────────────────
table_name = os.environ.get('USERS_TABLE_NAME', '')
if table_name and user_profile.get('status', 'active') == 'pending':
raw_prompt = records[0]['messages'][0]['text'] if records else ''
is_name_msg = bool(raw_prompt and len(raw_prompt.strip()) < 50 and '?' not in raw_prompt)
if is_name_msg:
name = raw_prompt.strip()
update_user_status(actor_id, name=name, status='active')
user_profile['display_name'] = name
user_profile['status'] = 'active'
prompt = f"[System: User just registered with name '{name}'. Welcome them warmly and ask how you can help.]"
else:
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
bot_token = ''
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
send_telegram_direct(chat_id, bot_token, "Hi! I don't recognize you yet. What's your name?")
return
# ── Get or create AgentCore session ──────────────────────────────────
session_id = get_or_create_session(actor_id)
print(f"[agent-runner] actor={actor_id} session={session_id} user={user_profile.get('display_name', '')}")
# ── Bundle messages ───────────────────────────────────────────────────
if len(records) == 1:
prompt = records[0]['messages'][0]['text']
else:
lines = [
f"[{i+1}] {r['messages'][0]['text']}"
for i, r in enumerate(records)
]
prompt = f"You have {len(records)} queued messages:\n" + "\n".join(lines)
# ── Build payload for AgentCore Runtime 1 ────────────────────────────
payload: dict[str, Any] = {
'prompt': prompt,
'actor_id': actor_id,
'session_id': session_id,
'user_profile': {
'display_name': user_profile.get('display_name', actor_id),
'telegram_username': user_profile.get('telegram_username', ''),
'google_email': user_profile.get('google_email', ''),
'allowed': user_profile.get('allowed', True),
'services': user_profile.get('services', {}),
},
'channel_adapter': {
'type': channel,
'target_id': str(chat_id),
'bot_token_secret_arn': os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', ''),
},
}
# ── Invoke AgentCore Runtime 1 ────────────────────────────────────────
runtime_arn = os.environ.get('RUNTIME_1_ARN', '')
if not runtime_arn or runtime_arn == 'PLACEHOLDER_SET_AFTER_RUNTIME_DEPLOY':
print(f"[agent-runner] RUNTIME_1_ARN not set — skipping AgentCore invoke")
print(f"[agent-runner] Would have sent: {json.dumps(payload)[:200]}")
return
client = get_agentcore()
response = client.invoke_agent_runtime(
agentRuntimeArn=runtime_arn,
runtimeSessionId=session_id,
payload=json.dumps(payload).encode(),
)
# Process streaming response: buffer text chunks and send to Telegram as paragraphs arrive
bot_token = ''
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
try:
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
except Exception as e:
print(f'[agent-runner] Failed to get bot token: {e}')
body = response.get('response')
text_buffer = ''
leftover = ''
if body is not None:
for raw_chunk in body.iter_chunks():
if not raw_chunk:
continue
# AgentCore streams SSE format: "data: {...}\n\n"
text = leftover + raw_chunk.decode('utf-8', errors='replace')
parts = text.split('\n\n')
leftover = parts[-1]
for part in parts[:-1]:
for line in part.splitlines():
if not line.startswith('data: '):
continue
data = line[6:].strip()
if not data or data == '[DONE]':
continue
try:
event = json.loads(data)
except (json.JSONDecodeError, ValueError):
continue
if not isinstance(event, dict):
continue
# Extract text delta from contentBlockDelta
delta = event.get('event', {}).get('contentBlockDelta', {}).get('delta', {})
if not isinstance(delta, dict):
continue
token = delta.get('text', '') or event.get('data', '')
if token:
text_buffer += token
flush = (
text_buffer.rstrip().endswith(('\n\n', '.\n', '!\n', '?\n'))
or len(text_buffer) > 800
)
if flush and text_buffer.strip():
print(f'[agent-runner] send chunk {len(text_buffer)}c to {chat_id}')
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
text_buffer = ''
# Flush any remaining text
print(f'[agent-runner] stream done buffer={len(text_buffer)} bot_token_set={bool(bot_token)}')
if text_buffer.strip() and bot_token:
print(f'[agent-runner] flushing {len(text_buffer)}c to {chat_id}')
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
print(f"[agent-runner] Completed session={session_id} actor={actor_id}")

View File

@@ -0,0 +1,246 @@
import json
import os
import time
import uuid
import boto3
import urllib.request
from typing import Any
# AWS clients
_ddb = None
_agentcore = None
def get_ddb():
global _ddb
if _ddb is None:
_ddb = boto3.resource('dynamodb')
return _ddb
def get_agentcore():
global _agentcore
if _agentcore is None:
from botocore.config import Config
_agentcore = boto3.client(
'bedrock-agentcore',
region_name='us-east-1',
config=Config(read_timeout=600, connect_timeout=10)
)
return _agentcore
def get_or_create_user(actor_id: str, from_info: dict) -> dict:
"""Look up user in registry, auto-registering on first contact."""
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return {'actor_id': actor_id, 'display_name': from_info.get('from_name', actor_id)}
table = get_ddb().Table(table_name)
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
if item:
return item
now = int(time.time())
item = {
'actor_id': actor_id,
'display_name': from_info.get('from_name') or actor_id,
'telegram_username': from_info.get('from_username', ''),
'created_at': str(now),
'status': 'pending',
'services': {},
}
table.put_item(Item=item)
print(f'[agent-runner] Registered new user (pending): {actor_id}')
return item
def update_user_status(actor_id: str, name: str, status: str) -> None:
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return
table = get_ddb().Table(table_name)
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET display_name = :n, #s = :s',
ExpressionAttributeNames={'#s': 'status'},
ExpressionAttributeValues={':n': name, ':s': status},
)
def send_telegram_direct(chat_id: str, token: str, text: str) -> None:
url = f'https://api.telegram.org/bot{token}/sendMessage'
data = json.dumps({'chat_id': chat_id, 'text': text}).encode()
req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'})
urllib.request.urlopen(req, timeout=10)
def get_or_create_session(actor_id: str) -> str:
"""Look up active session for actor, or create a new one."""
table = get_ddb().Table(os.environ['SESSION_TABLE_NAME'])
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
now = int(time.time())
ttl_8hr = now + (8 * 3600)
if item and item.get('ttl', 0) > now:
# Active session exists — extend TTL
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET #ttl = :ttl',
ExpressionAttributeNames={'#ttl': 'ttl'},
ExpressionAttributeValues={':ttl': ttl_8hr},
)
return item['session_id']
# Create new session
session_id = str(uuid.uuid4())
table.put_item(Item={
'actor_id': actor_id,
'session_id': session_id,
'created_at': str(now),
'ttl': ttl_8hr,
})
return session_id
def handler(event, context):
# ── Parse SQS records (FIFO — all from same actor) ───────────────────
records = []
for record in event.get('Records', []):
try:
records.append(json.loads(record['body']))
except (json.JSONDecodeError, KeyError):
continue
if not records:
return
first = records[0]
channel = first.get('channel', 'telegram')
chat_id = first.get('chat_id', '')
actor_id = f"{channel}:{chat_id}"
# ── User registry ─────────────────────────────────────────────────────
from_info = first.get('messages', [{}])[0]
user_profile = get_or_create_user(actor_id, from_info)
# ── Onboarding gate ─────────────────────────────────────────────────────
table_name = os.environ.get('USERS_TABLE_NAME', '')
if table_name and user_profile.get('status', 'active') == 'pending':
raw_prompt = records[0]['messages'][0]['text'] if records else ''
is_name_msg = bool(raw_prompt and len(raw_prompt.strip()) < 50 and '?' not in raw_prompt)
if is_name_msg:
name = raw_prompt.strip()
update_user_status(actor_id, name=name, status='active')
user_profile['display_name'] = name
user_profile['status'] = 'active'
prompt = f"[System: User just registered with name '{name}'. Welcome them warmly and ask how you can help.]"
else:
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
bot_token = ''
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
send_telegram_direct(chat_id, bot_token, "Hi! I don't recognize you yet. What's your name?")
return
# ── Get or create AgentCore session ──────────────────────────────────
session_id = get_or_create_session(actor_id)
print(f"[agent-runner] actor={actor_id} session={session_id} user={user_profile.get('display_name', '')}")
# ── Bundle messages ───────────────────────────────────────────────────
if len(records) == 1:
prompt = records[0]['messages'][0]['text']
else:
lines = [
f"[{i+1}] {r['messages'][0]['text']}"
for i, r in enumerate(records)
]
prompt = f"You have {len(records)} queued messages:\n" + "\n".join(lines)
# ── Build payload for AgentCore Runtime 1 ────────────────────────────
payload: dict[str, Any] = {
'prompt': prompt,
'actor_id': actor_id,
'session_id': session_id,
'user_profile': {
'display_name': user_profile.get('display_name', actor_id),
'telegram_username': user_profile.get('telegram_username', ''),
'google_email': user_profile.get('google_email', ''),
'allowed': user_profile.get('allowed', True),
'services': user_profile.get('services', {}),
},
'channel_adapter': {
'type': channel,
'target_id': str(chat_id),
'bot_token_secret_arn': os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', ''),
},
}
# ── Invoke AgentCore Runtime 1 ────────────────────────────────────────
runtime_arn = os.environ.get('RUNTIME_1_ARN', '')
if not runtime_arn or runtime_arn == 'PLACEHOLDER_SET_AFTER_RUNTIME_DEPLOY':
print(f"[agent-runner] RUNTIME_1_ARN not set — skipping AgentCore invoke")
print(f"[agent-runner] Would have sent: {json.dumps(payload)[:200]}")
return
client = get_agentcore()
response = client.invoke_agent_runtime(
agentRuntimeArn=runtime_arn,
runtimeSessionId=session_id,
payload=json.dumps(payload).encode(),
)
# Process streaming response: buffer text chunks and send to Telegram as paragraphs arrive
bot_token = ''
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
try:
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
except Exception as e:
print(f'[agent-runner] Failed to get bot token: {e}')
body = response.get('response')
text_buffer = ''
leftover = ''
if body is not None:
for raw_chunk in body.iter_chunks():
if not raw_chunk:
continue
# AgentCore streams SSE format: "data: {...}\n\n"
text = leftover + raw_chunk.decode('utf-8', errors='replace')
parts = text.split('\n\n')
leftover = parts[-1]
for part in parts[:-1]:
for line in part.splitlines():
if not line.startswith('data: '):
continue
data = line[6:].strip()
if not data or data == '[DONE]':
continue
try:
event = json.loads(data)
except (json.JSONDecodeError, ValueError):
continue
if not isinstance(event, dict):
continue
# Extract text delta from contentBlockDelta
delta = event.get('event', {}).get('contentBlockDelta', {}).get('delta', {})
token = delta.get('text', '') or event.get('data', '')
if token:
text_buffer += token
flush = (
text_buffer.rstrip().endswith(('\n\n', '.\n', '!\n', '?\n'))
or len(text_buffer) > 800
)
if flush and text_buffer.strip():
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
text_buffer = ''
# Flush any remaining text
if text_buffer.strip() and bot_token:
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
print(f"[agent-runner] Completed session={session_id} actor={actor_id}")

View File

@@ -0,0 +1,29 @@
"""EventBridge-triggered Lambda: sends a Telegram reminder then deletes the rule."""
import json
import os
import boto3
import urllib.request
def handler(event, context):
chat_id = event['chat_id']
message = event['message']
rule_name = event['rule_name']
# Fetch bot token
sm = boto3.client('secretsmanager', region_name='us-east-1')
token = sm.get_secret_value(SecretId=os.environ['TELEGRAM_BOT_TOKEN_SECRET_ARN'])['SecretString']
# Send Telegram message
payload = json.dumps({'chat_id': chat_id, 'text': message}).encode()
req = urllib.request.Request(
f'https://api.telegram.org/bot{token}/sendMessage',
data=payload,
headers={'Content-Type': 'application/json'},
)
urllib.request.urlopen(req)
# Delete the one-time rule
eb = boto3.client('events', region_name='us-east-1')
eb.remove_targets(Rule=rule_name, Ids=['scheduler'])
eb.delete_rule(Name=rule_name)

View File

@@ -0,0 +1,196 @@
import json
import os
import time
import uuid
import boto3
import urllib.request
from typing import Any
# AWS clients
_ddb = None
_agentcore = None
def get_ddb():
global _ddb
if _ddb is None:
_ddb = boto3.resource('dynamodb')
return _ddb
def get_agentcore():
global _agentcore
if _agentcore is None:
_agentcore = boto3.client('bedrock-agentcore', region_name='us-east-1')
return _agentcore
def get_or_create_user(actor_id: str, from_info: dict) -> dict:
"""Look up user in registry, auto-registering on first contact."""
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return {'actor_id': actor_id, 'display_name': from_info.get('from_name', actor_id)}
table = get_ddb().Table(table_name)
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
if item:
return item
now = int(time.time())
item = {
'actor_id': actor_id,
'display_name': from_info.get('from_name') or actor_id,
'telegram_username': from_info.get('from_username', ''),
'created_at': str(now),
'status': 'pending',
'services': {},
}
table.put_item(Item=item)
print(f'[agent-runner] Registered new user (pending): {actor_id}')
return item
def update_user_status(actor_id: str, name: str, status: str) -> None:
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return
table = get_ddb().Table(table_name)
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET display_name = :n, #s = :s',
ExpressionAttributeNames={'#s': 'status'},
ExpressionAttributeValues={':n': name, ':s': status},
)
def send_telegram_direct(chat_id: str, token: str, text: str) -> None:
url = f'https://api.telegram.org/bot{token}/sendMessage'
data = json.dumps({'chat_id': chat_id, 'text': text}).encode()
req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'})
urllib.request.urlopen(req, timeout=10)
def get_or_create_session(actor_id: str) -> str:
"""Look up active session for actor, or create a new one."""
table = get_ddb().Table(os.environ['SESSION_TABLE_NAME'])
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
now = int(time.time())
ttl_8hr = now + (8 * 3600)
if item and item.get('ttl', 0) > now:
# Active session exists — extend TTL
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET #ttl = :ttl',
ExpressionAttributeNames={'#ttl': 'ttl'},
ExpressionAttributeValues={':ttl': ttl_8hr},
)
return item['session_id']
# Create new session
session_id = str(uuid.uuid4())
table.put_item(Item={
'actor_id': actor_id,
'session_id': session_id,
'created_at': str(now),
'ttl': ttl_8hr,
})
return session_id
def handler(event, context):
# ── Parse SQS records (FIFO — all from same actor) ───────────────────
records = []
for record in event.get('Records', []):
try:
records.append(json.loads(record['body']))
except (json.JSONDecodeError, KeyError):
continue
if not records:
return
first = records[0]
channel = first.get('channel', 'telegram')
chat_id = first.get('chat_id', '')
actor_id = f"{channel}:{chat_id}"
# ── User registry ─────────────────────────────────────────────────────
from_info = first.get('messages', [{}])[0]
user_profile = get_or_create_user(actor_id, from_info)
# ── Onboarding gate ─────────────────────────────────────────────────────
table_name = os.environ.get('USERS_TABLE_NAME', '')
if table_name and user_profile.get('status', 'active') == 'pending':
raw_prompt = records[0]['messages'][0]['text'] if records else ''
is_name_msg = bool(raw_prompt and len(raw_prompt.strip()) < 50 and '?' not in raw_prompt)
if is_name_msg:
name = raw_prompt.strip()
update_user_status(actor_id, name=name, status='active')
user_profile['display_name'] = name
user_profile['status'] = 'active'
prompt = f"[System: User just registered with name '{name}'. Welcome them warmly and ask how you can help.]"
else:
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
bot_token = ''
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
send_telegram_direct(chat_id, bot_token, "Hi! I don't recognize you yet. What's your name?")
return
# ── Get or create AgentCore session ──────────────────────────────────
session_id = get_or_create_session(actor_id)
print(f"[agent-runner] actor={actor_id} session={session_id} user={user_profile.get('display_name', '')}")
# ── Bundle messages ───────────────────────────────────────────────────
if len(records) == 1:
prompt = records[0]['messages'][0]['text']
else:
lines = [
f"[{i+1}] {r['messages'][0]['text']}"
for i, r in enumerate(records)
]
prompt = f"You have {len(records)} queued messages:\n" + "\n".join(lines)
# ── Build payload for AgentCore Runtime 1 ────────────────────────────
payload: dict[str, Any] = {
'prompt': prompt,
'actor_id': actor_id,
'session_id': session_id,
'user_profile': {
'display_name': user_profile.get('display_name', actor_id),
'telegram_username': user_profile.get('telegram_username', ''),
'google_email': user_profile.get('google_email', ''),
'allowed': user_profile.get('allowed', True),
'services': user_profile.get('services', {}),
},
'channel_adapter': {
'type': channel,
'target_id': str(chat_id),
'bot_token_secret_arn': os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', ''),
},
}
# ── Invoke AgentCore Runtime 1 ────────────────────────────────────────
runtime_arn = os.environ.get('RUNTIME_1_ARN', '')
if not runtime_arn or runtime_arn == 'PLACEHOLDER_SET_AFTER_RUNTIME_DEPLOY':
print(f"[agent-runner] RUNTIME_1_ARN not set — skipping AgentCore invoke")
print(f"[agent-runner] Would have sent: {json.dumps(payload)[:200]}")
return
client = get_agentcore()
response = client.invoke_agent_runtime(
agentRuntimeArn=runtime_arn,
runtimeSessionId=session_id,
payload=json.dumps(payload).encode(),
)
# Drain streaming response body (agent delivers to Telegram via send_message tool)
body = response.get('response')
if body is not None:
for _ in body.iter_chunks():
pass
print(f"[agent-runner] Completed session={session_id} actor={actor_id}")

View File

@@ -0,0 +1,261 @@
import json
import os
import time
import uuid
import boto3
import urllib.request
from typing import Any
# AWS clients
_ddb = None
_agentcore = None
def get_ddb():
global _ddb
if _ddb is None:
_ddb = boto3.resource('dynamodb')
return _ddb
def get_agentcore():
global _agentcore
if _agentcore is None:
from botocore.config import Config
_agentcore = boto3.client(
'bedrock-agentcore',
region_name='us-east-1',
config=Config(read_timeout=600, connect_timeout=10)
)
return _agentcore
def get_or_create_user(actor_id: str, from_info: dict) -> dict:
"""Look up user in registry, auto-registering on first contact."""
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return {'actor_id': actor_id, 'display_name': from_info.get('from_name', actor_id)}
table = get_ddb().Table(table_name)
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
if item:
return item
now = int(time.time())
item = {
'actor_id': actor_id,
'display_name': from_info.get('from_name') or actor_id,
'telegram_username': from_info.get('from_username', ''),
'created_at': str(now),
'status': 'pending',
'services': {},
}
table.put_item(Item=item)
print(f'[agent-runner] Registered new user (pending): {actor_id}')
return item
def update_user_status(actor_id: str, name: str, status: str) -> None:
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return
table = get_ddb().Table(table_name)
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET display_name = :n, #s = :s',
ExpressionAttributeNames={'#s': 'status'},
ExpressionAttributeValues={':n': name, ':s': status},
)
# Per-invocation dedup: track sent message hashes to prevent AgentCore retry duplicates
_sent_hashes: set = set()
def send_telegram_direct(chat_id: str, token: str, text: str) -> None:
import hashlib
h = hashlib.md5(f'{chat_id}:{text}'.encode()).hexdigest()[:12]
if h in _sent_hashes:
print(f'[agent-runner] dedup: skipping duplicate message (hash={h})')
return
_sent_hashes.add(h)
url = f'https://api.telegram.org/bot{token}/sendMessage'
data = json.dumps({'chat_id': chat_id, 'text': text}).encode()
req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'})
urllib.request.urlopen(req, timeout=10)
def get_or_create_session(actor_id: str) -> str:
"""Look up active session for actor, or create a new one."""
table = get_ddb().Table(os.environ['SESSION_TABLE_NAME'])
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
now = int(time.time())
ttl_8hr = now + (8 * 3600)
if item and item.get('ttl', 0) > now:
# Active session exists — extend TTL
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET #ttl = :ttl',
ExpressionAttributeNames={'#ttl': 'ttl'},
ExpressionAttributeValues={':ttl': ttl_8hr},
)
return item['session_id']
# Create new session
session_id = str(uuid.uuid4())
table.put_item(Item={
'actor_id': actor_id,
'session_id': session_id,
'created_at': str(now),
'ttl': ttl_8hr,
})
return session_id
def handler(event, context):
# ── Parse SQS records (FIFO — all from same actor) ───────────────────
records = []
for record in event.get('Records', []):
try:
records.append(json.loads(record['body']))
except (json.JSONDecodeError, KeyError):
continue
if not records:
return
first = records[0]
channel = first.get('channel', 'telegram')
chat_id = first.get('chat_id', '')
actor_id = f"{channel}:{chat_id}"
# ── User registry ─────────────────────────────────────────────────────
from_info = first.get('messages', [{}])[0]
user_profile = get_or_create_user(actor_id, from_info)
# ── Onboarding gate ─────────────────────────────────────────────────────
table_name = os.environ.get('USERS_TABLE_NAME', '')
if table_name and user_profile.get('status', 'active') == 'pending':
raw_prompt = records[0]['messages'][0]['text'] if records else ''
is_name_msg = bool(raw_prompt and len(raw_prompt.strip()) < 50 and '?' not in raw_prompt)
if is_name_msg:
name = raw_prompt.strip()
update_user_status(actor_id, name=name, status='active')
user_profile['display_name'] = name
user_profile['status'] = 'active'
prompt = f"[System: User just registered with name '{name}'. Welcome them warmly and ask how you can help.]"
else:
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
bot_token = ''
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
send_telegram_direct(chat_id, bot_token, "Hi! I don't recognize you yet. What's your name?")
return
# ── Get or create AgentCore session ──────────────────────────────────
session_id = get_or_create_session(actor_id)
print(f"[agent-runner] actor={actor_id} session={session_id} user={user_profile.get('display_name', '')}")
# ── Bundle messages ───────────────────────────────────────────────────
if len(records) == 1:
prompt = records[0]['messages'][0]['text']
else:
lines = [
f"[{i+1}] {r['messages'][0]['text']}"
for i, r in enumerate(records)
]
prompt = f"You have {len(records)} queued messages:\n" + "\n".join(lines)
# ── Build payload for AgentCore Runtime 1 ────────────────────────────
payload: dict[str, Any] = {
'prompt': prompt,
'actor_id': actor_id,
'session_id': session_id,
'user_profile': {
'display_name': user_profile.get('display_name', actor_id),
'telegram_username': user_profile.get('telegram_username', ''),
'google_email': user_profile.get('google_email', ''),
'allowed': user_profile.get('allowed', True),
'services': user_profile.get('services', {}),
},
'channel_adapter': {
'type': channel,
'target_id': str(chat_id),
'bot_token_secret_arn': os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', ''),
},
}
# ── Invoke AgentCore Runtime 1 ────────────────────────────────────────
runtime_arn = os.environ.get('RUNTIME_1_ARN', '')
if not runtime_arn or runtime_arn == 'PLACEHOLDER_SET_AFTER_RUNTIME_DEPLOY':
print(f"[agent-runner] RUNTIME_1_ARN not set — skipping AgentCore invoke")
print(f"[agent-runner] Would have sent: {json.dumps(payload)[:200]}")
return
client = get_agentcore()
response = client.invoke_agent_runtime(
agentRuntimeArn=runtime_arn,
runtimeSessionId=session_id,
payload=json.dumps(payload).encode(),
)
# Process streaming response: buffer text chunks and send to Telegram as paragraphs arrive
bot_token = ''
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
try:
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
except Exception as e:
print(f'[agent-runner] Failed to get bot token: {e}')
body = response.get('response')
text_buffer = ''
leftover = ''
if body is not None:
for raw_chunk in body.iter_chunks():
if not raw_chunk:
continue
# AgentCore streams SSE format: "data: {...}\n\n"
text = leftover + raw_chunk.decode('utf-8', errors='replace')
parts = text.split('\n\n')
leftover = parts[-1]
for part in parts[:-1]:
for line in part.splitlines():
if not line.startswith('data: '):
continue
data = line[6:].strip()
if not data or data == '[DONE]':
continue
try:
event = json.loads(data)
except (json.JSONDecodeError, ValueError):
continue
if not isinstance(event, dict):
continue
# Extract text delta from contentBlockDelta
delta = event.get('event', {}).get('contentBlockDelta', {}).get('delta', {})
if not isinstance(delta, dict):
continue
token = delta.get('text', '') or event.get('data', '')
if token:
text_buffer += token
flush = (
text_buffer.rstrip().endswith(('\n\n', '.\n', '!\n', '?\n'))
or len(text_buffer) > 800
)
if flush and text_buffer.strip():
print(f'[agent-runner] send chunk {len(text_buffer)}c to {chat_id}')
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
text_buffer = ''
# Flush any remaining text
print(f'[agent-runner] stream done buffer={len(text_buffer)} bot_token_set={bool(bot_token)}')
if text_buffer.strip() and bot_token:
print(f'[agent-runner] flushing {len(text_buffer)}c to {chat_id}')
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
print(f"[agent-runner] Completed session={session_id} actor={actor_id}")

View File

@@ -0,0 +1,263 @@
import json
import os
import time
import uuid
import boto3
import urllib.request
from typing import Any
# AWS clients
_ddb = None
_agentcore = None
def get_ddb():
global _ddb
if _ddb is None:
_ddb = boto3.resource('dynamodb')
return _ddb
def get_agentcore():
global _agentcore
if _agentcore is None:
from botocore.config import Config
_agentcore = boto3.client(
'bedrock-agentcore',
region_name='us-east-1',
config=Config(read_timeout=600, connect_timeout=10)
)
return _agentcore
def get_or_create_user(actor_id: str, from_info: dict) -> dict:
"""Look up user in registry, auto-registering on first contact."""
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return {'actor_id': actor_id, 'display_name': from_info.get('from_name', actor_id)}
table = get_ddb().Table(table_name)
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
if item:
return item
now = int(time.time())
item = {
'actor_id': actor_id,
'display_name': from_info.get('from_name') or actor_id,
'telegram_username': from_info.get('from_username', ''),
'created_at': str(now),
'status': 'pending',
'services': {},
}
table.put_item(Item=item)
print(f'[agent-runner] Registered new user (pending): {actor_id}')
return item
def update_user_status(actor_id: str, name: str, status: str) -> None:
table_name = os.environ.get('USERS_TABLE_NAME', '')
if not table_name:
return
table = get_ddb().Table(table_name)
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET display_name = :n, #s = :s',
ExpressionAttributeNames={'#s': 'status'},
ExpressionAttributeValues={':n': name, ':s': status},
)
# Per-invocation dedup: track sent message hashes to prevent AgentCore retry duplicates
_sent_hashes: set = set()
def send_telegram_direct(chat_id: str, token: str, text: str) -> None:
import hashlib
h = hashlib.md5(f'{chat_id}:{text}'.encode()).hexdigest()[:12]
if h in _sent_hashes:
print(f'[agent-runner] dedup: skipping duplicate message (hash={h})')
return
_sent_hashes.add(h)
url = f'https://api.telegram.org/bot{token}/sendMessage'
data = json.dumps({'chat_id': chat_id, 'text': text}).encode()
req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'})
urllib.request.urlopen(req, timeout=10)
def get_or_create_session(actor_id: str) -> str:
"""Look up active session for actor, or create a new one."""
table = get_ddb().Table(os.environ['SESSION_TABLE_NAME'])
response = table.get_item(Key={'actor_id': actor_id})
item = response.get('Item')
now = int(time.time())
ttl_8hr = now + (8 * 3600)
if item and item.get('ttl', 0) > now:
# Active session exists — extend TTL
table.update_item(
Key={'actor_id': actor_id},
UpdateExpression='SET #ttl = :ttl',
ExpressionAttributeNames={'#ttl': 'ttl'},
ExpressionAttributeValues={':ttl': ttl_8hr},
)
return item['session_id']
# Create new session
session_id = str(uuid.uuid4())
table.put_item(Item={
'actor_id': actor_id,
'session_id': session_id,
'created_at': str(now),
'ttl': ttl_8hr,
})
return session_id
def handler(event, context):
# ── Parse SQS records (FIFO — all from same actor) ───────────────────
records = []
for record in event.get('Records', []):
try:
records.append(json.loads(record['body']))
except (json.JSONDecodeError, KeyError):
continue
if not records:
return
first = records[0]
channel = first.get('channel', 'telegram')
chat_id = first.get('chat_id', '')
actor_id = f"{channel}:{chat_id}"
# ── User registry ─────────────────────────────────────────────────────
from_info = first.get('messages', [{}])[0]
user_profile = get_or_create_user(actor_id, from_info)
# ── Onboarding gate ─────────────────────────────────────────────────────
table_name = os.environ.get('USERS_TABLE_NAME', '')
if table_name and user_profile.get('status', 'active') == 'pending':
raw_prompt = records[0]['messages'][0]['text'] if records else ''
is_name_msg = bool(raw_prompt and len(raw_prompt.strip()) < 50 and '?' not in raw_prompt)
if is_name_msg:
name = raw_prompt.strip()
update_user_status(actor_id, name=name, status='active')
user_profile['display_name'] = name
user_profile['status'] = 'active'
prompt = f"[System: User just registered with name '{name}'. Welcome them warmly and ask how you can help.]"
else:
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
bot_token = ''
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
send_telegram_direct(chat_id, bot_token, "Hi! I don't recognize you yet. What's your name?")
return
# ── Get or create AgentCore session ──────────────────────────────────
session_id = get_or_create_session(actor_id)
print(f"[agent-runner] actor={actor_id} session={session_id} user={user_profile.get('display_name', '')}")
# ── Bundle messages ───────────────────────────────────────────────────
if len(records) == 1:
prompt = records[0]['messages'][0]['text']
else:
lines = [
f"[{i+1}] {r['messages'][0]['text']}"
for i, r in enumerate(records)
]
prompt = f"You have {len(records)} queued messages:\n" + "\n".join(lines)
# ── Build payload for AgentCore Runtime 1 ────────────────────────────
payload: dict[str, Any] = {
'prompt': prompt,
'actor_id': actor_id,
'session_id': session_id,
'user_profile': {
'display_name': user_profile.get('display_name', actor_id),
'telegram_username': user_profile.get('telegram_username', ''),
'google_email': user_profile.get('google_email', ''),
'allowed': user_profile.get('allowed', True),
'services': user_profile.get('services', {}),
},
'channel_adapter': {
'type': channel,
'target_id': str(chat_id),
'bot_token_secret_arn': os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', ''),
},
}
# ── Invoke AgentCore Runtime 1 ────────────────────────────────────────
runtime_arn = os.environ.get('RUNTIME_1_ARN', '')
if not runtime_arn or runtime_arn == 'PLACEHOLDER_SET_AFTER_RUNTIME_DEPLOY':
print(f"[agent-runner] RUNTIME_1_ARN not set — skipping AgentCore invoke")
print(f"[agent-runner] Would have sent: {json.dumps(payload)[:200]}")
return
client = get_agentcore()
response = client.invoke_agent_runtime(
agentRuntimeArn=runtime_arn,
runtimeSessionId=session_id,
payload=json.dumps(payload).encode(),
)
# Process streaming response: buffer text chunks and send to Telegram as paragraphs arrive
bot_token = ''
bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '')
if bot_token_secret_arn:
sm = boto3.client('secretsmanager', region_name='us-east-1')
try:
bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString']
except Exception as e:
print(f'[agent-runner] Failed to get bot token: {e}')
body = response.get('response')
text_buffer = ''
leftover = ''
if body is not None:
for raw_chunk in body.iter_chunks():
if not raw_chunk:
continue
# AgentCore streams SSE format: "data: {...}\n\n"
text = leftover + raw_chunk.decode('utf-8', errors='replace')
parts = text.split('\n\n')
leftover = parts[-1]
for part in parts[:-1]:
for line in part.splitlines():
if not line.startswith('data: '):
continue
data = line[6:].strip()
if not data or data == '[DONE]':
continue
try:
event = json.loads(data)
except (json.JSONDecodeError, ValueError):
continue
if not isinstance(event, dict):
continue
# Extract text delta from contentBlockDelta ONLY
# Do NOT use event.get('data') — that's the full formatted summary,
# causing duplicate delivery alongside the token stream.
delta = event.get('event', {}).get('contentBlockDelta', {}).get('delta', {})
if not isinstance(delta, dict):
continue
token = delta.get('text', '')
if token:
text_buffer += token
flush = (
text_buffer.rstrip().endswith(('\n\n', '.\n', '!\n', '?\n'))
or len(text_buffer) > 800
)
if flush and text_buffer.strip():
print(f'[agent-runner] send chunk {len(text_buffer)}c to {chat_id}')
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
text_buffer = ''
# Flush any remaining text
print(f'[agent-runner] stream done buffer={len(text_buffer)} bot_token_set={bool(bot_token)}')
if text_buffer.strip() and bot_token:
print(f'[agent-runner] flushing {len(text_buffer)}c to {chat_id}')
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
print(f"[agent-runner] Completed session={session_id} actor={actor_id}")

View File

@@ -18,7 +18,7 @@
"validateOnSynth": false,
"assumeRoleArn": "arn:${AWS::Partition}:iam::495395224548:role/cdk-hnb659fds-deploy-role-495395224548-us-east-1",
"cloudFormationExecutionRoleArn": "arn:${AWS::Partition}:iam::495395224548:role/cdk-hnb659fds-cfn-exec-role-495395224548-us-east-1",
"stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-495395224548-us-east-1/a31aaa0bc9eab4fd6f17f10795fba05983dba0c88e83a263fe9fffe930da06b9.json",
"stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-495395224548-us-east-1/c6cd323425a93776b45e2e0806064efbc5c84a3d6d78532282df6dd62cc14bda.json",
"requiresBootstrapStackVersion": 6,
"bootstrapStackVersionSsmParameter": "/cdk-bootstrap/hnb659fds/version",
"additionalDependencies": [

File diff suppressed because one or more lines are too long

View File

@@ -285,6 +285,47 @@ export class AgentClawStack extends cdk.Stack {
// NOTE: AgentCore runtime env vars are set in agentcore.json / agentcore deploy, not CDK.
// Output the URL so it can be manually set in agentcore.json OAUTH_START_URL.
// ── Lambda: scheduler ─────────────────────────────────────────────────
const schedulerFn = new lambda.Function(this, 'Scheduler', {
functionName: 'agent-claw-scheduler',
runtime: lambda.Runtime.PYTHON_3_12,
handler: 'handler.handler',
code: lambda.Code.fromAsset(path.join(__dirname, '../../src/lambdas/scheduler')),
timeout: cdk.Duration.seconds(30),
memorySize: 128,
environment: {
TELEGRAM_BOT_TOKEN_SECRET_ARN: telegramBotTokenSecretArn,
},
});
botTokenSecret.grantRead(schedulerFn);
// Allow EventBridge to invoke the scheduler Lambda
schedulerFn.addPermission('EventBridgeInvoke', {
principal: new iam.ServicePrincipal('events.amazonaws.com'),
sourceArn: `arn:aws:events:${this.region}:${this.account}:rule/agent-claw-reminder-*`,
});
// Allow scheduler Lambda to delete its own EventBridge rule
schedulerFn.addToRolePolicy(new iam.PolicyStatement({
actions: ['events:RemoveTargets', 'events:DeleteRule'],
resources: [`arn:aws:events:${this.region}:${this.account}:rule/agent-claw-reminder-*`],
}));
// AgentCore runtime: EventBridge + Lambda permissions for scheduling
runtime1Role.addToPolicy(new iam.PolicyStatement({
sid: 'EventBridgeScheduler',
actions: [
'events:PutRule', 'events:PutTargets',
'events:ListRules', 'events:ListTargetsByRule',
'events:RemoveTargets', 'events:DeleteRule',
],
resources: [`arn:aws:events:us-east-1:*:rule/agent-claw-reminder-*`],
}));
runtime1Role.addToPolicy(new iam.PolicyStatement({
sid: 'SchedulerLambdaPermission',
actions: ['lambda:AddPermission', 'lambda:RemovePermission'],
resources: [schedulerFn.functionArn],
}));
// ── Outputs ────────────────────────────────────────────────────────────
new cdk.CfnOutput(this, 'WorkspaceMcpFunctionUrl', {
@@ -329,5 +370,10 @@ export class AgentClawStack extends cdk.Stack {
value: runtime1Role.roleArn,
description: 'IAM execution role ARN for AgentCore Runtime 1',
});
new cdk.CfnOutput(this, 'SchedulerLambdaArn', {
value: schedulerFn.functionArn,
description: 'Scheduler Lambda ARN — set as SCHEDULER_LAMBDA_ARN in agentcore.json',
});
}
}

View File

@@ -0,0 +1,29 @@
"""EventBridge-triggered Lambda: sends a Telegram reminder then deletes the rule."""
import json
import os
import boto3
import urllib.request
def handler(event, context):
chat_id = event['chat_id']
message = event['message']
rule_name = event['rule_name']
# Fetch bot token
sm = boto3.client('secretsmanager', region_name='us-east-1')
token = sm.get_secret_value(SecretId=os.environ['TELEGRAM_BOT_TOKEN_SECRET_ARN'])['SecretString']
# Send Telegram message
payload = json.dumps({'chat_id': chat_id, 'text': message}).encode()
req = urllib.request.Request(
f'https://api.telegram.org/bot{token}/sendMessage',
data=payload,
headers={'Content-Type': 'application/json'},
)
urllib.request.urlopen(req)
# Delete the one-time rule
eb = boto3.client('events', region_name='us-east-1')
eb.remove_targets(Rule=rule_name, Ids=['scheduler'])
eb.delete_rule(Name=rule_name)