Suppress exceptions in generator to prevent AgentCore retry duplicates

This commit is contained in:
daniel
2026-05-07 19:09:46 -05:00
parent 60573c360f
commit fd479b8c00

View File

@@ -185,7 +185,7 @@ _current_actor_id: str = ''
@app.entrypoint @app.entrypoint
def main(payload: dict, context): async def main(payload: dict, context):
"""Handle an invocation from agent-runner Lambda (streaming).""" """Handle an invocation from agent-runner Lambda (streaming)."""
global _current_actor_id global _current_actor_id
@@ -290,34 +290,21 @@ def main(payload: dict, context):
tools=base_tools + list(workspace_tools), tools=base_tools + list(workspace_tools),
) )
# Use callback handler to stream tokens directly to Telegram. final_message = None
# This avoids AgentCore's async-generator retry path (which causes duplicates).
token_buffer = []
def _on_event(event: dict) -> None:
"""Strands callback: accumulate text tokens and flush to Telegram at para breaks."""
delta = event.get('event', {}).get('contentBlockDelta', {}).get('delta', {})
token = delta.get('text', '') if isinstance(delta, dict) else ''
if not token:
return
token_buffer.append(token)
buf = ''.join(token_buffer)
if buf.rstrip().endswith(('\n\n', '.\n', '!\n', '?\n')) or len(buf) > 800:
stripped = buf.strip()
if stripped:
adapter.send(stripped)
token_buffer.clear()
try: try:
result = agent(payload.get('prompt', ''), callback_handler=_on_event) async for event in agent.stream_async(payload.get('prompt', '')):
# Flush remaining tokens if 'result' in event:
remaining = ''.join(token_buffer).strip() final_message = event['result'].message
if remaining: yield event
adapter.send(remaining) except Exception as e:
# Catch ALL exceptions including ReadTimeoutError to prevent AgentCore retry.
# A retry re-runs the full agent loop causing duplicate Telegram messages.
print(f'[main] Agent error (suppressed to prevent retry): {type(e).__name__}: {e}')
if final_message:
yield {'data': str(final_message), 'result': {'message': final_message}}
finally: finally:
_typing_active = False _typing_active = False
session_manager.close() session_manager.close()
return {'result': result.message}
app.run() app.run()