Fix SSE parsing: read data: prefix + contentBlockDelta.delta.text
This commit is contained in:
@@ -204,17 +204,31 @@ def handler(event, context):
|
||||
|
||||
body = response.get('response')
|
||||
text_buffer = ''
|
||||
leftover = ''
|
||||
if body is not None:
|
||||
for chunk in body.iter_chunks():
|
||||
if not chunk:
|
||||
for raw_chunk in body.iter_chunks():
|
||||
if not raw_chunk:
|
||||
continue
|
||||
# AgentCore streams SSE format: "data: {...}\n\n"
|
||||
text = leftover + raw_chunk.decode('utf-8', errors='replace')
|
||||
parts = text.split('\n\n')
|
||||
leftover = parts[-1]
|
||||
for part in parts[:-1]:
|
||||
for line in part.splitlines():
|
||||
if not line.startswith('data: '):
|
||||
continue
|
||||
data = line[6:].strip()
|
||||
if not data or data == '[DONE]':
|
||||
continue
|
||||
try:
|
||||
event = json.loads(chunk.decode('utf-8'))
|
||||
# Strands streaming event: 'data' field contains text delta
|
||||
delta = event.get('data', '') or event.get('text', '')
|
||||
if delta:
|
||||
text_buffer += delta
|
||||
# Flush on paragraph or sentence break, or if buffer is large
|
||||
event = json.loads(data)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
# Extract text delta from contentBlockDelta
|
||||
delta = event.get('event', {}).get('contentBlockDelta', {}).get('delta', {})
|
||||
token = delta.get('text', '') or event.get('data', '')
|
||||
if token:
|
||||
text_buffer += token
|
||||
flush = (
|
||||
text_buffer.rstrip().endswith(('\n\n', '.\n', '!\n', '?\n'))
|
||||
or len(text_buffer) > 800
|
||||
@@ -222,8 +236,6 @@ def handler(event, context):
|
||||
if flush and text_buffer.strip():
|
||||
send_telegram_direct(str(chat_id), bot_token, text_buffer.strip())
|
||||
text_buffer = ''
|
||||
except (json.JSONDecodeError, UnicodeDecodeError):
|
||||
pass
|
||||
|
||||
# Flush any remaining text
|
||||
if text_buffer.strip() and bot_token:
|
||||
|
||||
101
test-bot.py
Normal file
101
test-bot.py
Normal file
@@ -0,0 +1,101 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Direct AgentCore test script.
|
||||
Invokes the agent-claw runtime and prints streaming response chunks.
|
||||
Usage: AWS_PROFILE=ai1 python3 test-bot.py "your message here"
|
||||
"""
|
||||
import sys, json, uuid, time, boto3
|
||||
from botocore.config import Config
|
||||
|
||||
RUNTIME_ARN = 'arn:aws:bedrock-agentcore:us-east-1:495395224548:runtime/agentclaw_agent_claw_main-vTRGIEG6ON'
|
||||
ACTOR_ID = 'telegram:8537376738'
|
||||
BOT_TOKEN_SECRET_ARN = 'arn:aws:secretsmanager:us-east-1:495395224548:secret:agent-claw/telegram-bot-token-Oq3in3'
|
||||
|
||||
def main():
|
||||
prompt = ' '.join(sys.argv[1:]) if len(sys.argv) > 1 else 'Say hello.'
|
||||
session = boto3.Session(profile_name='ai1')
|
||||
client = session.client(
|
||||
'bedrock-agentcore', region_name='us-east-1',
|
||||
config=Config(read_timeout=600, connect_timeout=10)
|
||||
)
|
||||
sid = str(uuid.uuid4()) # full UUID, 36 chars
|
||||
print(f"\n[test] Prompt: {prompt}")
|
||||
print(f"[test] Session: {sid}")
|
||||
print(f"[test] Invoking AgentCore...\n{'─'*50}")
|
||||
t0 = time.time()
|
||||
|
||||
response = client.invoke_agent_runtime(
|
||||
agentRuntimeArn=RUNTIME_ARN,
|
||||
qualifier='DEFAULT',
|
||||
runtimeSessionId=sid,
|
||||
payload=json.dumps({
|
||||
'prompt': prompt,
|
||||
'actor_id': ACTOR_ID,
|
||||
'session_id': sid,
|
||||
'channel_adapter': {
|
||||
'type': 'telegram',
|
||||
'target_id': '8537376738',
|
||||
'bot_token_secret_arn': BOT_TOKEN_SECRET_ARN,
|
||||
},
|
||||
'user_profile': {
|
||||
'display_name': 'Daniel',
|
||||
'telegram_username': 'nessie_tn',
|
||||
'enrolled_services': {},
|
||||
},
|
||||
}).encode(),
|
||||
)
|
||||
|
||||
body = response.get('response')
|
||||
chunks_received = 0
|
||||
text_buffer = ''
|
||||
leftover = ''
|
||||
messages_sent = []
|
||||
|
||||
if body:
|
||||
for raw_chunk in body.iter_chunks():
|
||||
if not raw_chunk:
|
||||
continue
|
||||
chunks_received += 1
|
||||
# AgentCore streams SSE format: "data: {...}\n\n"
|
||||
text = leftover + raw_chunk.decode('utf-8', errors='replace')
|
||||
parts = text.split('\n\n')
|
||||
leftover = parts[-1]
|
||||
for part in parts[:-1]:
|
||||
for line in part.splitlines():
|
||||
if not line.startswith('data: '):
|
||||
continue
|
||||
data = line[6:].strip()
|
||||
if not data or data == '[DONE]':
|
||||
continue
|
||||
try:
|
||||
event = json.loads(data)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
continue
|
||||
# Text delta lives in event.contentBlockDelta.delta.text
|
||||
delta = event.get('event', {}).get('contentBlockDelta', {}).get('delta', {})
|
||||
token = delta.get('text', '') or event.get('data', '')
|
||||
if token:
|
||||
text_buffer += token
|
||||
flush = (
|
||||
text_buffer.rstrip().endswith(('\n\n', '.\n', '!\n', '?\n'))
|
||||
or len(text_buffer) > 800
|
||||
)
|
||||
if flush and text_buffer.strip():
|
||||
msg = text_buffer.strip()
|
||||
messages_sent.append(msg)
|
||||
print(f"\n[MSG {len(messages_sent)}] {msg}")
|
||||
text_buffer = ''
|
||||
|
||||
if text_buffer.strip():
|
||||
msg = text_buffer.strip()
|
||||
messages_sent.append(msg)
|
||||
print(f"\n[MSG {len(messages_sent)}] {msg}")
|
||||
|
||||
elapsed = time.time() - t0
|
||||
print(f"\n{'─'*50}")
|
||||
print(f"[test] Done in {elapsed:.1f}s | {chunks_received} chunks | {len(messages_sent)} messages")
|
||||
if not messages_sent:
|
||||
print("[test] WARNING: No messages generated — check AgentCore logs")
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
Reference in New Issue
Block a user