From 40a942b50657084a471ff602d46ac99d2beb3985 Mon Sep 17 00:00:00 2001 From: daniel Date: Thu, 7 May 2026 16:27:26 -0500 Subject: [PATCH] streaming: switch to stream_async + iter_chunks response drain --- agentclaw/app/agent_claw_main/main.py | 56 +++++++++++++-------------- src/lambdas/agent-runner/handler.py | 8 ++-- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/agentclaw/app/agent_claw_main/main.py b/agentclaw/app/agent_claw_main/main.py index cd17a12..8185ee2 100644 --- a/agentclaw/app/agent_claw_main/main.py +++ b/agentclaw/app/agent_claw_main/main.py @@ -185,8 +185,8 @@ _current_actor_id: str = '' @app.entrypoint -def main(payload: dict, context) -> dict: - """Handle an invocation from agent-runner Lambda.""" +async def main(payload: dict, context): + """Handle an invocation from agent-runner Lambda (streaming).""" global _current_actor_id # Set up channel adapter @@ -267,15 +267,6 @@ def main(payload: dict, context) -> dict: _code_interpreter.code_interpreter, home_assistant, connect_google_account, manage_service] - def _run_agent(tools): - agent = Agent( - model=model, - system_prompt=system_prompt, - session_manager=session_manager, - tools=tools, - ) - return agent(payload.get('prompt', '')) - workspace_mcp_client = MCPClient( lambda: streamablehttp_client(WORKSPACE_MCP_URL, timeout=20, auth=_SigV4HttpxAuth(actor_id=actor_id)) ) @@ -290,28 +281,35 @@ def main(payload: dict, context) -> dict: else: print(f'[main] actor={actor_id} has no google_email — skipping workspace_mcp') + agent = Agent( + model=model, + system_prompt=system_prompt, + session_manager=session_manager, + tools=base_tools + list(workspace_tools), + ) + + final_message = None try: - result = _run_agent(base_tools + list(workspace_tools)) + async for event in agent.stream_async(payload.get('prompt', '')): + if 'result' in event: + final_message = event['result'].message + yield event finally: _typing_active = False + session_manager.close() - # Flush buffered memory events - session_manager.close() - - # Deliver final response - if not messaging.was_sent() and result.message: - msg = result.message - if isinstance(msg, dict): - content = msg.get('content', {}) - if isinstance(content, dict): - msg = content.get('text', str(content)) - elif isinstance(content, list): - msg = ' '.join(c.get('text', '') for c in content if isinstance(c, dict)) - else: - msg = str(content) - adapter.send(str(msg)) - - return {'result': result.message} + # Deliver final response if agent didn't call send_message + if not messaging.was_sent() and final_message: + msg = final_message + if isinstance(msg, dict): + content = msg.get('content', {}) + if isinstance(content, dict): + msg = content.get('text', str(content)) + elif isinstance(content, list): + msg = ' '.join(c.get('text', '') for c in content if isinstance(c, dict)) + else: + msg = str(content) + adapter.send(str(msg)) app.run() diff --git a/src/lambdas/agent-runner/handler.py b/src/lambdas/agent-runner/handler.py index fb92e20..8eb39d8 100644 --- a/src/lambdas/agent-runner/handler.py +++ b/src/lambdas/agent-runner/handler.py @@ -187,8 +187,10 @@ def handler(event, context): payload=json.dumps(payload).encode(), ) - # Consume streaming response (agent delivers to Telegram via send_message tool) - for chunk in response.get('response', []): - pass # intentional no-op — agent handles delivery internally + # Drain streaming response body (agent delivers to Telegram via send_message tool) + body = response.get('response') + if body is not None: + for _ in body.iter_chunks(): + pass print(f"[agent-runner] Completed session={session_id} actor={actor_id}")