streaming: switch to stream_async + iter_chunks response drain
This commit is contained in:
@@ -185,8 +185,8 @@ _current_actor_id: str = ''
|
||||
|
||||
|
||||
@app.entrypoint
|
||||
def main(payload: dict, context) -> dict:
|
||||
"""Handle an invocation from agent-runner Lambda."""
|
||||
async def main(payload: dict, context):
|
||||
"""Handle an invocation from agent-runner Lambda (streaming)."""
|
||||
global _current_actor_id
|
||||
|
||||
# Set up channel adapter
|
||||
@@ -267,15 +267,6 @@ def main(payload: dict, context) -> dict:
|
||||
_code_interpreter.code_interpreter, home_assistant, connect_google_account,
|
||||
manage_service]
|
||||
|
||||
def _run_agent(tools):
|
||||
agent = Agent(
|
||||
model=model,
|
||||
system_prompt=system_prompt,
|
||||
session_manager=session_manager,
|
||||
tools=tools,
|
||||
)
|
||||
return agent(payload.get('prompt', ''))
|
||||
|
||||
workspace_mcp_client = MCPClient(
|
||||
lambda: streamablehttp_client(WORKSPACE_MCP_URL, timeout=20, auth=_SigV4HttpxAuth(actor_id=actor_id))
|
||||
)
|
||||
@@ -290,28 +281,35 @@ def main(payload: dict, context) -> dict:
|
||||
else:
|
||||
print(f'[main] actor={actor_id} has no google_email — skipping workspace_mcp')
|
||||
|
||||
agent = Agent(
|
||||
model=model,
|
||||
system_prompt=system_prompt,
|
||||
session_manager=session_manager,
|
||||
tools=base_tools + list(workspace_tools),
|
||||
)
|
||||
|
||||
final_message = None
|
||||
try:
|
||||
result = _run_agent(base_tools + list(workspace_tools))
|
||||
async for event in agent.stream_async(payload.get('prompt', '')):
|
||||
if 'result' in event:
|
||||
final_message = event['result'].message
|
||||
yield event
|
||||
finally:
|
||||
_typing_active = False
|
||||
session_manager.close()
|
||||
|
||||
# Flush buffered memory events
|
||||
session_manager.close()
|
||||
|
||||
# Deliver final response
|
||||
if not messaging.was_sent() and result.message:
|
||||
msg = result.message
|
||||
if isinstance(msg, dict):
|
||||
content = msg.get('content', {})
|
||||
if isinstance(content, dict):
|
||||
msg = content.get('text', str(content))
|
||||
elif isinstance(content, list):
|
||||
msg = ' '.join(c.get('text', '') for c in content if isinstance(c, dict))
|
||||
else:
|
||||
msg = str(content)
|
||||
adapter.send(str(msg))
|
||||
|
||||
return {'result': result.message}
|
||||
# Deliver final response if agent didn't call send_message
|
||||
if not messaging.was_sent() and final_message:
|
||||
msg = final_message
|
||||
if isinstance(msg, dict):
|
||||
content = msg.get('content', {})
|
||||
if isinstance(content, dict):
|
||||
msg = content.get('text', str(content))
|
||||
elif isinstance(content, list):
|
||||
msg = ' '.join(c.get('text', '') for c in content if isinstance(c, dict))
|
||||
else:
|
||||
msg = str(content)
|
||||
adapter.send(str(msg))
|
||||
|
||||
|
||||
app.run()
|
||||
|
||||
@@ -187,8 +187,10 @@ def handler(event, context):
|
||||
payload=json.dumps(payload).encode(),
|
||||
)
|
||||
|
||||
# Consume streaming response (agent delivers to Telegram via send_message tool)
|
||||
for chunk in response.get('response', []):
|
||||
pass # intentional no-op — agent handles delivery internally
|
||||
# Drain streaming response body (agent delivers to Telegram via send_message tool)
|
||||
body = response.get('response')
|
||||
if body is not None:
|
||||
for _ in body.iter_chunks():
|
||||
pass
|
||||
|
||||
print(f"[agent-runner] Completed session={session_id} actor={actor_id}")
|
||||
|
||||
Reference in New Issue
Block a user