From beb8dfc969ef2e4ddf04678d33004a317ac05fd0 Mon Sep 17 00:00:00 2001 From: daniel Date: Thu, 7 May 2026 16:45:58 -0500 Subject: [PATCH] Fix SSE parsing: read data: prefix + contentBlockDelta.delta.text --- src/lambdas/agent-runner/handler.py | 48 ++++++++----- test-bot.py | 101 ++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+), 18 deletions(-) create mode 100644 test-bot.py diff --git a/src/lambdas/agent-runner/handler.py b/src/lambdas/agent-runner/handler.py index ff5226e..ab74dd4 100644 --- a/src/lambdas/agent-runner/handler.py +++ b/src/lambdas/agent-runner/handler.py @@ -204,26 +204,38 @@ def handler(event, context): body = response.get('response') text_buffer = '' + leftover = '' if body is not None: - for chunk in body.iter_chunks(): - if not chunk: + for raw_chunk in body.iter_chunks(): + if not raw_chunk: continue - try: - event = json.loads(chunk.decode('utf-8')) - # Strands streaming event: 'data' field contains text delta - delta = event.get('data', '') or event.get('text', '') - if delta: - text_buffer += delta - # Flush on paragraph or sentence break, or if buffer is large - flush = ( - text_buffer.rstrip().endswith(('\n\n', '.\n', '!\n', '?\n')) - or len(text_buffer) > 800 - ) - if flush and text_buffer.strip(): - send_telegram_direct(str(chat_id), bot_token, text_buffer.strip()) - text_buffer = '' - except (json.JSONDecodeError, UnicodeDecodeError): - pass + # AgentCore streams SSE format: "data: {...}\n\n" + text = leftover + raw_chunk.decode('utf-8', errors='replace') + parts = text.split('\n\n') + leftover = parts[-1] + for part in parts[:-1]: + for line in part.splitlines(): + if not line.startswith('data: '): + continue + data = line[6:].strip() + if not data or data == '[DONE]': + continue + try: + event = json.loads(data) + except (json.JSONDecodeError, ValueError): + continue + # Extract text delta from contentBlockDelta + delta = event.get('event', {}).get('contentBlockDelta', {}).get('delta', {}) + token = delta.get('text', '') or event.get('data', '') + if token: + text_buffer += token + flush = ( + text_buffer.rstrip().endswith(('\n\n', '.\n', '!\n', '?\n')) + or len(text_buffer) > 800 + ) + if flush and text_buffer.strip(): + send_telegram_direct(str(chat_id), bot_token, text_buffer.strip()) + text_buffer = '' # Flush any remaining text if text_buffer.strip() and bot_token: diff --git a/test-bot.py b/test-bot.py new file mode 100644 index 0000000..37f75ad --- /dev/null +++ b/test-bot.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +""" +Direct AgentCore test script. +Invokes the agent-claw runtime and prints streaming response chunks. +Usage: AWS_PROFILE=ai1 python3 test-bot.py "your message here" +""" +import sys, json, uuid, time, boto3 +from botocore.config import Config + +RUNTIME_ARN = 'arn:aws:bedrock-agentcore:us-east-1:495395224548:runtime/agentclaw_agent_claw_main-vTRGIEG6ON' +ACTOR_ID = 'telegram:8537376738' +BOT_TOKEN_SECRET_ARN = 'arn:aws:secretsmanager:us-east-1:495395224548:secret:agent-claw/telegram-bot-token-Oq3in3' + +def main(): + prompt = ' '.join(sys.argv[1:]) if len(sys.argv) > 1 else 'Say hello.' + session = boto3.Session(profile_name='ai1') + client = session.client( + 'bedrock-agentcore', region_name='us-east-1', + config=Config(read_timeout=600, connect_timeout=10) + ) + sid = str(uuid.uuid4()) # full UUID, 36 chars + print(f"\n[test] Prompt: {prompt}") + print(f"[test] Session: {sid}") + print(f"[test] Invoking AgentCore...\n{'─'*50}") + t0 = time.time() + + response = client.invoke_agent_runtime( + agentRuntimeArn=RUNTIME_ARN, + qualifier='DEFAULT', + runtimeSessionId=sid, + payload=json.dumps({ + 'prompt': prompt, + 'actor_id': ACTOR_ID, + 'session_id': sid, + 'channel_adapter': { + 'type': 'telegram', + 'target_id': '8537376738', + 'bot_token_secret_arn': BOT_TOKEN_SECRET_ARN, + }, + 'user_profile': { + 'display_name': 'Daniel', + 'telegram_username': 'nessie_tn', + 'enrolled_services': {}, + }, + }).encode(), + ) + + body = response.get('response') + chunks_received = 0 + text_buffer = '' + leftover = '' + messages_sent = [] + + if body: + for raw_chunk in body.iter_chunks(): + if not raw_chunk: + continue + chunks_received += 1 + # AgentCore streams SSE format: "data: {...}\n\n" + text = leftover + raw_chunk.decode('utf-8', errors='replace') + parts = text.split('\n\n') + leftover = parts[-1] + for part in parts[:-1]: + for line in part.splitlines(): + if not line.startswith('data: '): + continue + data = line[6:].strip() + if not data or data == '[DONE]': + continue + try: + event = json.loads(data) + except (json.JSONDecodeError, ValueError): + continue + # Text delta lives in event.contentBlockDelta.delta.text + delta = event.get('event', {}).get('contentBlockDelta', {}).get('delta', {}) + token = delta.get('text', '') or event.get('data', '') + if token: + text_buffer += token + flush = ( + text_buffer.rstrip().endswith(('\n\n', '.\n', '!\n', '?\n')) + or len(text_buffer) > 800 + ) + if flush and text_buffer.strip(): + msg = text_buffer.strip() + messages_sent.append(msg) + print(f"\n[MSG {len(messages_sent)}] {msg}") + text_buffer = '' + + if text_buffer.strip(): + msg = text_buffer.strip() + messages_sent.append(msg) + print(f"\n[MSG {len(messages_sent)}] {msg}") + + elapsed = time.time() - t0 + print(f"\n{'─'*50}") + print(f"[test] Done in {elapsed:.1f}s | {chunks_received} chunks | {len(messages_sent)} messages") + if not messages_sent: + print("[test] WARNING: No messages generated — check AgentCore logs") + +if __name__ == '__main__': + main()