"""Direct Google Calendar and Gmail tools using google-api-python-client.""" import json import boto3 from strands import tool from google.oauth2.credentials import Credentials from google.auth.transport.requests import Request from googleapiclient.discovery import build from datetime import datetime, timezone, timedelta _sm = None def _secrets(): global _sm if _sm is None: _sm = boto3.client('secretsmanager', region_name='us-east-1') return _sm def _get_creds(actor_id: str) -> Credentials: secret_name = 'agent-claw/google-credentials/' + actor_id.replace(':', '-') resp = _secrets().get_secret_value(SecretId=secret_name) data = json.loads(resp['SecretString']) # Load OAuth client info client_resp = _secrets().get_secret_value(SecretId='agent-claw/google-oauth-client') client = json.loads(client_resp['SecretString']) creds = Credentials( token=data.get('token'), refresh_token=data.get('refresh_token'), token_uri=data.get('token_uri', 'https://oauth2.googleapis.com/token'), client_id=client.get('client_id'), client_secret=client.get('client_secret'), scopes=data.get('scopes'), ) if creds.expired and creds.refresh_token: creds.refresh(Request()) # Persist refreshed token data['token'] = creds.token _secrets().put_secret_value(SecretId=secret_name, SecretString=json.dumps(data)) return creds def _actor_id(): # Import here to avoid circular import; set per-invocation in main.py import main as _main return _main._current_actor_id @tool def list_calendars() -> str: """List all Google Calendars for the current user.""" try: creds = _get_creds(_actor_id()) svc = build('calendar', 'v3', credentials=creds) result = svc.calendarList().list().execute() items = result.get('items', []) if not items: return 'No calendars found.' return '\n'.join(f"{c['id']}: {c['summary']}" for c in items) except Exception as e: return f'Error listing calendars: {e}' @tool def get_calendar_events(calendar_id: str = 'primary', days_ahead: int = 7) -> str: """Get upcoming Google Calendar events. Args: calendar_id: Calendar ID (default: 'primary') days_ahead: Number of days ahead to fetch (default: 7) """ try: creds = _get_creds(_actor_id()) svc = build('calendar', 'v3', credentials=creds) now = datetime.now(timezone.utc) time_max = now + timedelta(days=days_ahead) result = svc.events().list( calendarId=calendar_id, timeMin=now.isoformat(), timeMax=time_max.isoformat(), singleEvents=True, orderBy='startTime', maxResults=50, ).execute() events = result.get('items', []) if not events: return 'No upcoming events.' lines = [] for e in events: start = e['start'].get('dateTime', e['start'].get('date', '')) lines.append(f"{start} — {e.get('summary', '(no title)')}") return '\n'.join(lines) except Exception as e: return f'Error fetching calendar events: {e}' @tool def list_gmail_messages(max_results: int = 10, query: str = 'in:inbox') -> str: """List Gmail messages. Args: max_results: Maximum number of messages to return (default: 10) query: Gmail search query (default: 'in:inbox') """ try: creds = _get_creds(_actor_id()) svc = build('gmail', 'v1', credentials=creds) result = svc.users().messages().list( userId='me', q=query, maxResults=max_results ).execute() messages = result.get('messages', []) if not messages: return 'No messages found.' lines = [] for m in messages: msg = svc.users().messages().get( userId='me', id=m['id'], format='metadata', metadataHeaders=['Subject', 'From', 'Date'] ).execute() headers = {h['name']: h['value'] for h in msg.get('payload', {}).get('headers', [])} lines.append( f"id={m['id']} | {headers.get('Date', '')} | From: {headers.get('From', '')} | {headers.get('Subject', '(no subject)')}" ) return '\n'.join(lines) except Exception as e: return f'Error listing Gmail messages: {e}' @tool def get_gmail_message(message_id: str) -> str: """Get the full content of a Gmail message by ID. Args: message_id: The Gmail message ID """ try: creds = _get_creds(_actor_id()) svc = build('gmail', 'v1', credentials=creds) msg = svc.users().messages().get( userId='me', id=message_id, format='full' ).execute() headers = {h['name']: h['value'] for h in msg.get('payload', {}).get('headers', [])} body = _extract_body(msg.get('payload', {})) return ( f"From: {headers.get('From', '')}\n" f"To: {headers.get('To', '')}\n" f"Date: {headers.get('Date', '')}\n" f"Subject: {headers.get('Subject', '')}\n\n" f"{body}" ) except Exception as e: return f'Error fetching Gmail message: {e}' def _extract_body(payload: dict) -> str: import base64 mime = payload.get('mimeType', '') if mime == 'text/plain': data = payload.get('body', {}).get('data', '') return base64.urlsafe_b64decode(data + '==').decode('utf-8', errors='replace') if data else '' for part in payload.get('parts', []): text = _extract_body(part) if text: return text return ''