"""Google Calendar and Gmail tools — credentials injected from Secrets Manager per call. Mirrors workspace-mcp (gcalendar/gmail) logic using google-api-python-client directly, since workspace-mcp tool functions require FastMCP request context and cannot be called outside an MCP server. Credential secret: agent-claw/google-credentials/{actor_id.replace(':', '-')} Contains: token, refresh_token, token_uri, client_id, client_secret, scopes """ import json import boto3 import httplib2 from strands import tool from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request from googleapiclient.discovery import build from datetime import datetime, timezone, timedelta _HTTP_TIMEOUT = 15 _sm = None def _secrets(): global _sm if _sm is None: _sm = boto3.client('secretsmanager', region_name='us-east-1') return _sm def _get_creds(actor_id: str) -> Credentials: secret_name = 'agent-claw/google-credentials/' + actor_id.replace(':', '-') print(f'[google] fetching creds actor={actor_id}') data = json.loads(_secrets().get_secret_value(SecretId=secret_name)['SecretString']) creds = Credentials( token=data.get('token'), refresh_token=data.get('refresh_token'), token_uri=data.get('token_uri', 'https://oauth2.googleapis.com/token'), client_id=data.get('client_id'), client_secret=data.get('client_secret'), scopes=data.get('scopes'), ) if creds.expired and creds.refresh_token: print('[google] refreshing token') creds.refresh(Request()) data['token'] = creds.token _secrets().put_secret_value(SecretId=secret_name, SecretString=json.dumps(data)) return creds def _actor_id(): # Read from module-level var set by main.py per invocation # DO NOT use 'import main as _main' — it re-runs main.py including app.run() which hangs return _current_actor_id # Set per-invocation by main.py before any tool call _current_actor_id: str = '' def _svc(api: str, version: str, creds: Credentials): # Pass ONLY the authorized http (with timeout) — do NOT also pass credentials. # When both http= and credentials= are given, google-api-python-client creates a # new un-timed Http() from credentials for API calls, ignoring our timeout. http = creds.authorize(httplib2.Http(timeout=_HTTP_TIMEOUT)) return build(api, version, http=http, cache_discovery=False) @tool def list_calendars() -> str: """List all Google Calendars for the current user.""" try: creds = _get_creds(_actor_id()) result = _svc('calendar', 'v3', creds).calendarList().list().execute() items = result.get('items', []) if not items: return 'No calendars found.' return '\n'.join( f'- "{c.get("summary", "")}"{" (Primary)" if c.get("primary") else ""} (ID: {c["id"]})' for c in items ) except Exception as e: return f'Error listing calendars: {e}' @tool def get_calendar_events( calendar_id: str = 'primary', days_ahead: int = 7, time_min: str = '', time_max: str = '', max_results: int = 25, query: str = '', ) -> str: """Get upcoming Google Calendar events. Args: calendar_id: Calendar ID (default: 'primary') days_ahead: Days ahead to fetch when time_min/time_max not specified (default: 7) time_min: Start of time range in RFC3339 format (optional) time_max: End of time range in RFC3339 format (optional) max_results: Maximum events to return (default: 25) query: Keyword search within event fields (optional) """ try: creds = _get_creds(_actor_id()) svc = _svc('calendar', 'v3', creds) now = datetime.now(timezone.utc) params = { 'calendarId': calendar_id, 'timeMin': time_min or now.isoformat().replace('+00:00', 'Z'), 'timeMax': time_max or (now + timedelta(days=days_ahead)).isoformat().replace('+00:00', 'Z'), 'maxResults': max_results, 'singleEvents': True, 'orderBy': 'startTime', } if query: params['q'] = query events = svc.events().list(**params).execute().get('items', []) if not events: return f'No events found in calendar "{calendar_id}".' lines = [] for e in events: start = e['start'].get('dateTime', e['start'].get('date', '')) end = e['end'].get('dateTime', e['end'].get('date', '')) eid = e.get('id', '') lines.append(f'- "{e.get("summary", "No Title")}" (Starts: {start}, Ends: {end}) ID: {eid}') return f'Retrieved {len(events)} events from "{calendar_id}":\n' + '\n'.join(lines) except Exception as e: return f'Error fetching calendar events: {e}' @tool def list_gmail_messages(max_results: int = 10, query: str = 'in:inbox') -> str: """List Gmail messages. Args: max_results: Maximum number of messages to return (default: 10) query: Gmail search query (default: 'in:inbox') """ try: creds = _get_creds(_actor_id()) svc = _svc('gmail', 'v1', creds) result = svc.users().messages().list(userId='me', q=query, maxResults=max_results).execute() messages = result.get('messages', []) if not messages: return 'No messages found.' lines = [] for m in messages: msg = svc.users().messages().get( userId='me', id=m['id'], format='metadata', metadataHeaders=['Subject', 'From', 'Date'] ).execute() h = {hdr['name']: hdr['value'] for hdr in msg.get('payload', {}).get('headers', [])} lines.append(f"id={m['id']} | {h.get('Date', '')} | From: {h.get('From', '')} | {h.get('Subject', '(no subject)')}") next_token = result.get('nextPageToken') out = '\n'.join(lines) if next_token: out += f'\n(more results available)' return out except Exception as e: return f'Error listing Gmail messages: {e}' @tool def get_gmail_message(message_id: str, body_format: str = 'text') -> str: """Get the full content of a Gmail message by ID. Args: message_id: The Gmail message ID body_format: 'text' (default), 'html', or 'raw' """ try: creds = _get_creds(_actor_id()) svc = _svc('gmail', 'v1', creds) meta = svc.users().messages().get( userId='me', id=message_id, format='metadata', metadataHeaders=['Subject', 'From', 'To', 'Cc', 'Date'] ).execute() h = {hdr['name']: hdr['value'] for hdr in meta.get('payload', {}).get('headers', [])} if body_format == 'raw': import base64 raw = svc.users().messages().get(userId='me', id=message_id, format='raw').execute() body = base64.urlsafe_b64decode(raw.get('raw', '') + '==').decode('utf-8', errors='replace') else: full = svc.users().messages().get(userId='me', id=message_id, format='full').execute() body = _extract_body(full.get('payload', {}), prefer_html=(body_format == 'html')) lines = [ f"From: {h.get('From', '')}", f"To: {h.get('To', '')}", f"Date: {h.get('Date', '')}", f"Subject: {h.get('Subject', '')}", '', body, ] if h.get('Cc'): lines.insert(3, f"Cc: {h['Cc']}") return '\n'.join(lines) except Exception as e: return f'Error fetching Gmail message: {e}' def _extract_body(payload: dict, prefer_html: bool = False) -> str: import base64 mime = payload.get('mimeType', '') target = 'text/html' if prefer_html else 'text/plain' fallback = 'text/plain' if prefer_html else 'text/html' if mime == target: data = payload.get('body', {}).get('data', '') return base64.urlsafe_b64decode(data + '==').decode('utf-8', errors='replace') if data else '' parts = payload.get('parts', []) # First pass: preferred type for part in parts: if part.get('mimeType') == target: data = part.get('body', {}).get('data', '') return base64.urlsafe_b64decode(data + '==').decode('utf-8', errors='replace') if data else '' # Second pass: fallback type for part in parts: if part.get('mimeType') == fallback: data = part.get('body', {}).get('data', '') return base64.urlsafe_b64decode(data + '==').decode('utf-8', errors='replace') if data else '' # Recurse into multipart for part in parts: text = _extract_body(part, prefer_html) if text: return text return ''