Phase 2: wire X-Actor-Id credential loading into workspace-mcp handler.py
Replace cold-start single-user credential loading with per-request
multi-tenant loading via ASGI middleware:
- _setup_shared_environment(): loads OAuth client creds once at cold start
- _ActorCredentialsMiddleware: reads x-actor-id header per request,
fetches per-user Google credentials from Secrets Manager
(agent-claw/google-credentials/{actor_id}), writes to /tmp,
sets USER_GOOGLE_EMAIL env var
- 5-minute in-memory cache to avoid redundant Secrets Manager calls
This commit is contained in:
114
src/lambdas/workspace-mcp/handler.py
Normal file
114
src/lambdas/workspace-mcp/handler.py
Normal file
@@ -0,0 +1,114 @@
|
||||
"""
|
||||
workspace-mcp Lambda handler using Mangum ASGI adapter.
|
||||
|
||||
Wraps workspace-mcp's FastMCP/Starlette HTTP app directly — no subprocess,
|
||||
no Lambda Web Adapter. Each Lambda invocation is one MCP HTTP request.
|
||||
|
||||
Google credentials are loaded per-request from Secrets Manager based on
|
||||
the X-Actor-Id header, enabling multi-tenant operation.
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import threading
|
||||
|
||||
|
||||
def _setup_shared_environment():
|
||||
"""Load shared OAuth client credentials at cold start."""
|
||||
import boto3
|
||||
|
||||
os.environ.setdefault('HOME', '/tmp')
|
||||
os.environ.setdefault('GOOGLE_WORKSPACE_MCP_CREDENTIALS_DIR', '/tmp/workspace_mcp_credentials')
|
||||
|
||||
client_arn = os.environ.get('GOOGLE_OAUTH_CLIENT_SECRET_ARN', '')
|
||||
if client_arn:
|
||||
try:
|
||||
sm = boto3.client('secretsmanager', region_name=os.environ.get('AWS_REGION', 'us-east-1'))
|
||||
client_creds = json.loads(sm.get_secret_value(SecretId=client_arn)['SecretString'])
|
||||
os.environ['GOOGLE_OAUTH_CLIENT_ID'] = client_creds['client_id']
|
||||
os.environ['GOOGLE_OAUTH_CLIENT_SECRET'] = client_creds['client_secret']
|
||||
except Exception as e:
|
||||
print(f'[handler] WARNING: Could not load OAuth client creds: {e}')
|
||||
|
||||
|
||||
_setup_shared_environment()
|
||||
|
||||
# Add layer packages to path
|
||||
sys.path.insert(0, '/opt/python')
|
||||
|
||||
# Per-user credential cache: actor_id -> (email, creds_dict, fetched_at)
|
||||
_creds_cache: dict = {}
|
||||
_cache_ttl = 300 # 5 minutes
|
||||
_sm = None
|
||||
_sm_lock = threading.Lock()
|
||||
|
||||
|
||||
def _get_sm():
|
||||
global _sm
|
||||
if _sm is None:
|
||||
with _sm_lock:
|
||||
if _sm is None:
|
||||
import boto3
|
||||
_sm = boto3.client('secretsmanager', region_name=os.environ.get('AWS_REGION', 'us-east-1'))
|
||||
return _sm
|
||||
|
||||
|
||||
def _actor_id_to_secret_name(actor_id: str) -> str:
|
||||
safe = actor_id.replace(':', '-').replace('/', '-')
|
||||
return f'agent-claw/google-credentials/{safe}'
|
||||
|
||||
|
||||
def _load_credentials_for_actor(actor_id: str) -> bool:
|
||||
"""Fetch per-user Google credentials from Secrets Manager and write to /tmp."""
|
||||
now = time.time()
|
||||
cached = _creds_cache.get(actor_id)
|
||||
if cached and now - cached[2] < _cache_ttl:
|
||||
email, creds = cached[0], cached[1]
|
||||
else:
|
||||
secret_name = _actor_id_to_secret_name(actor_id)
|
||||
try:
|
||||
secret = _get_sm().get_secret_value(SecretId=secret_name)['SecretString']
|
||||
creds = json.loads(secret)
|
||||
email = creds.get('email') or creds.get('user_email') or creds.get('client_email', '')
|
||||
_creds_cache[actor_id] = (email, creds, now)
|
||||
print(f'[handler] Loaded credentials for actor={actor_id} email={email}')
|
||||
except Exception as e:
|
||||
print(f'[handler] No credentials for actor={actor_id}: {e}')
|
||||
return False
|
||||
|
||||
creds_dir = os.environ['GOOGLE_WORKSPACE_MCP_CREDENTIALS_DIR']
|
||||
os.makedirs(creds_dir, exist_ok=True)
|
||||
with open(f'{creds_dir}/{email}.json', 'w') as f:
|
||||
json.dump(creds, f)
|
||||
os.environ['USER_GOOGLE_EMAIL'] = email
|
||||
return True
|
||||
|
||||
|
||||
# Configure workspace-mcp for streamable-http transport
|
||||
from core.server import server, configure_server_for_http, set_transport_mode
|
||||
|
||||
set_transport_mode('streamable-http')
|
||||
configure_server_for_http()
|
||||
|
||||
from mangum import Mangum
|
||||
|
||||
_app = server.http_app()
|
||||
|
||||
|
||||
class _ActorCredentialsMiddleware:
|
||||
"""ASGI middleware: reads x-actor-id header, loads per-user Google credentials."""
|
||||
|
||||
def __init__(self, app):
|
||||
self._app = app
|
||||
|
||||
async def __call__(self, scope, receive, send):
|
||||
if scope['type'] in ('http', 'websocket'):
|
||||
headers = dict(scope.get('headers', []))
|
||||
actor_id = headers.get(b'x-actor-id', b'').decode()
|
||||
if actor_id:
|
||||
_load_credentials_for_actor(actor_id)
|
||||
await self._app(scope, receive, send)
|
||||
|
||||
|
||||
handler = Mangum(_ActorCredentialsMiddleware(_app), lifespan='auto')
|
||||
Reference in New Issue
Block a user