Switch to sync entrypoint + callback delivery: eliminates AgentCore retry duplicates

This commit is contained in:
daniel
2026-05-07 19:07:30 -05:00
parent bbd9a99645
commit 60573c360f

View File

@@ -185,7 +185,7 @@ _current_actor_id: str = ''
@app.entrypoint
async def main(payload: dict, context):
def main(payload: dict, context):
"""Handle an invocation from agent-runner Lambda (streaming)."""
global _current_actor_id
@@ -290,16 +290,34 @@ async def main(payload: dict, context):
tools=base_tools + list(workspace_tools),
)
final_message = None
# Use callback handler to stream tokens directly to Telegram.
# This avoids AgentCore's async-generator retry path (which causes duplicates).
token_buffer = []
def _on_event(event: dict) -> None:
"""Strands callback: accumulate text tokens and flush to Telegram at para breaks."""
delta = event.get('event', {}).get('contentBlockDelta', {}).get('delta', {})
token = delta.get('text', '') if isinstance(delta, dict) else ''
if not token:
return
token_buffer.append(token)
buf = ''.join(token_buffer)
if buf.rstrip().endswith(('\n\n', '.\n', '!\n', '?\n')) or len(buf) > 800:
stripped = buf.strip()
if stripped:
adapter.send(stripped)
token_buffer.clear()
try:
async for event in agent.stream_async(payload.get('prompt', '')):
if 'result' in event:
final_message = event['result'].message
yield event
result = agent(payload.get('prompt', ''), callback_handler=_on_event)
# Flush remaining tokens
remaining = ''.join(token_buffer).strip()
if remaining:
adapter.send(remaining)
finally:
_typing_active = False
session_manager.close()
# Delivery handled by agent-runner streaming consumer — no direct send here
return {'result': result.message}
app.run()