import json import os import time import uuid import boto3 import urllib.request from typing import Any # AWS clients _ddb = None _agentcore = None def get_ddb(): global _ddb if _ddb is None: _ddb = boto3.resource('dynamodb') return _ddb def get_agentcore(): global _agentcore if _agentcore is None: from botocore.config import Config _agentcore = boto3.client( 'bedrock-agentcore', region_name='us-east-1', config=Config(read_timeout=600, connect_timeout=10) ) return _agentcore def get_or_create_user(actor_id: str, from_info: dict) -> dict: """Look up user in registry, auto-registering on first contact.""" table_name = os.environ.get('USERS_TABLE_NAME', '') if not table_name: return {'actor_id': actor_id, 'display_name': from_info.get('from_name', actor_id)} table = get_ddb().Table(table_name) response = table.get_item(Key={'actor_id': actor_id}) item = response.get('Item') if item: return item now = int(time.time()) item = { 'actor_id': actor_id, 'display_name': from_info.get('from_name') or actor_id, 'telegram_username': from_info.get('from_username', ''), 'created_at': str(now), 'status': 'pending', 'services': {}, } table.put_item(Item=item) print(f'[agent-runner] Registered new user (pending): {actor_id}') return item def update_user_status(actor_id: str, name: str, status: str) -> None: table_name = os.environ.get('USERS_TABLE_NAME', '') if not table_name: return table = get_ddb().Table(table_name) table.update_item( Key={'actor_id': actor_id}, UpdateExpression='SET display_name = :n, #s = :s', ExpressionAttributeNames={'#s': 'status'}, ExpressionAttributeValues={':n': name, ':s': status}, ) # Per-invocation dedup: track sent message hashes to prevent AgentCore retry duplicates _sent_hashes: set = set() def send_telegram_direct(chat_id: str, token: str, text: str, thread_id: int | None = None) -> None: import hashlib h = hashlib.md5(f'{chat_id}:{text}'.encode()).hexdigest()[:12] if h in _sent_hashes: print(f'[agent-runner] dedup: skipping duplicate message (hash={h})') return _sent_hashes.add(h) url = f'https://api.telegram.org/bot{token}/sendMessage' payload: dict = {'chat_id': chat_id, 'text': text} if thread_id is not None: payload['message_thread_id'] = thread_id data = json.dumps(payload).encode() req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'}) try: resp = urllib.request.urlopen(req, timeout=10) resp_body = resp.read() import re msg_id = re.search(r'"message_id":(\d+)', resp_body.decode('utf-8', errors='replace')) print(f'[agent-runner] Telegram sendMessage -> msg_id={msg_id.group(1) if msg_id else "?"} hash={h}') except Exception as e: print(f'[agent-runner] Telegram sendMessage FAILED: {type(e).__name__}: {e} hash={h}') raise def get_or_create_session(actor_id: str) -> str: """Look up active session for actor, or create a new one.""" table = get_ddb().Table(os.environ['SESSION_TABLE_NAME']) response = table.get_item(Key={'actor_id': actor_id}) item = response.get('Item') now = int(time.time()) ttl_8hr = now + (8 * 3600) if item and item.get('ttl', 0) > now: # Active session exists — extend TTL table.update_item( Key={'actor_id': actor_id}, UpdateExpression='SET #ttl = :ttl', ExpressionAttributeNames={'#ttl': 'ttl'}, ExpressionAttributeValues={':ttl': ttl_8hr}, ) return item['session_id'] # Create new session session_id = str(uuid.uuid4()) table.put_item(Item={ 'actor_id': actor_id, 'session_id': session_id, 'created_at': str(now), 'ttl': ttl_8hr, }) return session_id def handler(event, context): # ── Parse SQS records (FIFO — all from same actor) ─────────────────── records = [] for record in event.get('Records', []): try: records.append(json.loads(record['body'])) except (json.JSONDecodeError, KeyError): continue if not records: return first = records[0] channel = first.get('channel', 'telegram') chat_id = first.get('chat_id', '') message_thread_id = first.get('message_thread_id') # int or None # Use sender's user ID for identity (not chat_id, which is the group ID in group chats) from_info_early = first.get('messages', [{}])[0] sender_id = from_info_early.get('from_id') or chat_id actor_id = f"{channel}:{sender_id}" # ── User registry ───────────────────────────────────────────────────── from_info = first.get('messages', [{}])[0] user_profile = get_or_create_user(actor_id, from_info) # ── Onboarding gate ───────────────────────────────────────────────────── table_name = os.environ.get('USERS_TABLE_NAME', '') if table_name and user_profile.get('status', 'active') == 'pending': raw_prompt = records[0]['messages'][0]['text'] if records else '' is_name_msg = bool(raw_prompt and len(raw_prompt.strip()) < 50 and '?' not in raw_prompt) if is_name_msg: name = raw_prompt.strip() update_user_status(actor_id, name=name, status='active') user_profile['display_name'] = name user_profile['status'] = 'active' prompt = f"[System: User just registered with name '{name}'. Welcome them warmly and ask how you can help.]" else: bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '') bot_token = '' if bot_token_secret_arn: sm = boto3.client('secretsmanager', region_name='us-east-1') bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString'] send_telegram_direct(chat_id, bot_token, "Hi! I don't recognize you yet. What's your name?", thread_id=message_thread_id) return # ── Get or create AgentCore session ────────────────────────────────── session_id = get_or_create_session(actor_id) print(f"[agent-runner] actor={actor_id} session={session_id} user={user_profile.get('display_name', '')}") # ── Bundle messages ─────────────────────────────────────────────────── if len(records) == 1: prompt = records[0]['messages'][0]['text'] else: lines = [ f"[{i+1}] {r['messages'][0]['text']}" for i, r in enumerate(records) ] prompt = f"You have {len(records)} queued messages:\n" + "\n".join(lines) # ── Attach file context if present ──────────────────────────────────── attachment = first.get('attachment') if attachment: file_name = attachment.get('file_name', 'unknown') if 'inline_content' in attachment: prompt += f"\n\n[Attached file: {file_name}]\n```\n{attachment['inline_content']}\n```" elif 's3_key' in attachment: s3_ref = f"s3://{attachment['s3_bucket']}/{attachment['s3_key']}" prompt += f"\n\n[Attached file: {file_name} ({attachment.get('mime_type', '')}) — stored at {s3_ref}]" elif 'error' in attachment: prompt += f"\n\n[Attachment {file_name} could not be processed: {attachment['error']}]" # ── Build payload for AgentCore Runtime 1 ──────────────────────────── payload: dict[str, Any] = { 'prompt': prompt, 'actor_id': actor_id, 'session_id': session_id, 'user_profile': { 'display_name': user_profile.get('display_name', actor_id), 'telegram_username': user_profile.get('telegram_username', ''), 'google_accounts': user_profile.get('google_accounts', {'primary': user_profile['google_email']} if user_profile.get('google_email') else {}), 'allowed': user_profile.get('allowed', True), 'services': user_profile.get('enrolled_services', user_profile.get('services', {})), }, 'channel_adapter': { 'type': channel, 'target_id': str(chat_id), 'message_thread_id': message_thread_id, 'bot_token_secret_arn': os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', ''), }, } # ── Invoke AgentCore Runtime 1 ──────────────────────────────────────── runtime_arn = os.environ.get('RUNTIME_1_ARN', '') if not runtime_arn or runtime_arn == 'PLACEHOLDER_SET_AFTER_RUNTIME_DEPLOY': print(f"[agent-runner] RUNTIME_1_ARN not set — skipping AgentCore invoke") print(f"[agent-runner] Would have sent: {json.dumps(payload)[:200]}") return client = get_agentcore() response = client.invoke_agent_runtime( agentRuntimeArn=runtime_arn, runtimeSessionId=session_id, payload=json.dumps(payload).encode(), ) # Process streaming response: buffer text chunks and send to Telegram as paragraphs arrive bot_token = '' bot_token_secret_arn = os.environ.get('TELEGRAM_BOT_TOKEN_SECRET_ARN', '') if bot_token_secret_arn: sm = boto3.client('secretsmanager', region_name='us-east-1') try: bot_token = sm.get_secret_value(SecretId=bot_token_secret_arn)['SecretString'] except Exception as e: print(f'[agent-runner] Failed to get bot token: {e}') body = response.get('response') text_buffer = '' leftover = '' if body is not None: for raw_chunk in body.iter_chunks(): if not raw_chunk: continue # AgentCore streams SSE format: "data: {...}\n\n" text = leftover + raw_chunk.decode('utf-8', errors='replace') parts = text.split('\n\n') leftover = parts[-1] for part in parts[:-1]: for line in part.splitlines(): if not line.startswith('data: '): continue data = line[6:].strip() if not data or data == '[DONE]': continue try: event = json.loads(data) except (json.JSONDecodeError, ValueError): continue if not isinstance(event, dict): continue # Extract text delta from contentBlockDelta ONLY # Do NOT use event.get('data') — that's the full formatted summary, # causing duplicate delivery alongside the token stream. delta = event.get('event', {}).get('contentBlockDelta', {}).get('delta', {}) if not isinstance(delta, dict): continue token = delta.get('text', '') if token: text_buffer += token # Only flush if buffer is very large — prevents splitting multi-turn responses if len(text_buffer) > 1200: print(f'[agent-runner] send chunk {len(text_buffer)}c to {chat_id}') send_telegram_direct(str(chat_id), bot_token, text_buffer.strip(), thread_id=message_thread_id) text_buffer = '' # Flush any remaining text print(f'[agent-runner] stream done buffer={len(text_buffer)} bot_token_set={bool(bot_token)}') if text_buffer.strip() and bot_token: # Suppress heartbeat OK responses if text_buffer.strip().upper().startswith('HEARTBEAT_OK'): print(f'[agent-runner] heartbeat suppressed for {actor_id}') return print(f'[agent-runner] flushing {len(text_buffer)}c to {chat_id}') send_telegram_direct(str(chat_id), bot_token, text_buffer.strip(), thread_id=message_thread_id) print(f"[agent-runner] Completed session={session_id} actor={actor_id}")