239 lines
9.0 KiB
Python
239 lines
9.0 KiB
Python
"""Google Calendar and Gmail tools — credentials injected from Secrets Manager per call.
|
|
|
|
Mirrors workspace-mcp (gcalendar/gmail) logic using google-api-python-client directly,
|
|
since workspace-mcp tool functions require FastMCP request context and cannot be called
|
|
outside an MCP server.
|
|
|
|
Credential secret: agent-claw/google-credentials/{actor_id.replace(':', '-')}
|
|
Contains: token, refresh_token, token_uri, client_id, client_secret, scopes
|
|
"""
|
|
import json
|
|
import boto3
|
|
import httplib2
|
|
from strands import tool
|
|
from google.oauth2.credentials import Credentials
|
|
from google.auth.transport.requests import Request
|
|
from googleapiclient.discovery import build
|
|
from datetime import datetime, timezone, timedelta
|
|
|
|
_HTTP_TIMEOUT = 15
|
|
_sm = None
|
|
|
|
|
|
def _secrets():
|
|
global _sm
|
|
if _sm is None:
|
|
_sm = boto3.client('secretsmanager', region_name='us-east-1')
|
|
return _sm
|
|
|
|
|
|
def _get_creds(actor_id: str) -> Credentials:
|
|
from datetime import datetime
|
|
secret_name = 'agent-claw/google-credentials/' + actor_id.replace(':', '-')
|
|
print(f'[google] fetching creds actor={actor_id}')
|
|
data = json.loads(_secrets().get_secret_value(SecretId=secret_name)['SecretString'])
|
|
expiry_str = data.get('expiry')
|
|
expiry = datetime.fromisoformat(expiry_str.replace('Z', '+00:00')) if expiry_str else None
|
|
creds = Credentials(
|
|
token=data.get('token'),
|
|
refresh_token=data.get('refresh_token'),
|
|
token_uri=data.get('token_uri', 'https://oauth2.googleapis.com/token'),
|
|
client_id=data.get('client_id'),
|
|
client_secret=data.get('client_secret'),
|
|
scopes=data.get('scopes'),
|
|
expiry=expiry,
|
|
)
|
|
print(f'[google] creds loaded, expired={creds.expired}')
|
|
if (creds.expired or not creds.valid) and creds.refresh_token:
|
|
print('[google] refreshing token')
|
|
creds.refresh(Request())
|
|
data['token'] = creds.token
|
|
if creds.expiry:
|
|
data['expiry'] = creds.expiry.isoformat()
|
|
_secrets().put_secret_value(SecretId=secret_name, SecretString=json.dumps(data))
|
|
print('[google] token refreshed and saved')
|
|
return creds
|
|
|
|
|
|
def _actor_id():
|
|
# Read from module-level var set by main.py per invocation
|
|
# DO NOT use 'import main as _main' — it re-runs main.py including app.run() which hangs
|
|
return _current_actor_id
|
|
|
|
|
|
# Set per-invocation by main.py before any tool call
|
|
_current_actor_id: str = ''
|
|
|
|
|
|
def _svc(api: str, version: str, creds: Credentials):
|
|
# Pass ONLY the authorized http (with timeout) — do NOT also pass credentials.
|
|
# When both http= and credentials= are given, google-api-python-client creates a
|
|
# new un-timed Http() from credentials for API calls, ignoring our timeout.
|
|
http = creds.authorize(httplib2.Http(timeout=_HTTP_TIMEOUT))
|
|
return build(api, version, http=http, cache_discovery=False)
|
|
|
|
|
|
@tool
|
|
def list_calendars() -> str:
|
|
"""List all Google Calendars for the current user."""
|
|
try:
|
|
creds = _get_creds(_actor_id())
|
|
result = _svc('calendar', 'v3', creds).calendarList().list().execute()
|
|
items = result.get('items', [])
|
|
if not items:
|
|
return 'No calendars found.'
|
|
return '\n'.join(
|
|
f'- "{c.get("summary", "")}"{" (Primary)" if c.get("primary") else ""} (ID: {c["id"]})'
|
|
for c in items
|
|
)
|
|
except Exception as e:
|
|
return f'Error listing calendars: {e}'
|
|
|
|
|
|
@tool
|
|
def get_calendar_events(
|
|
calendar_id: str = 'primary',
|
|
days_ahead: int = 7,
|
|
time_min: str = '',
|
|
time_max: str = '',
|
|
max_results: int = 25,
|
|
query: str = '',
|
|
) -> str:
|
|
"""Get upcoming Google Calendar events.
|
|
|
|
Args:
|
|
calendar_id: Calendar ID (default: 'primary')
|
|
days_ahead: Days ahead to fetch when time_min/time_max not specified (default: 7)
|
|
time_min: Start of time range in RFC3339 format (optional)
|
|
time_max: End of time range in RFC3339 format (optional)
|
|
max_results: Maximum events to return (default: 25)
|
|
query: Keyword search within event fields (optional)
|
|
"""
|
|
try:
|
|
creds = _get_creds(_actor_id())
|
|
svc = _svc('calendar', 'v3', creds)
|
|
now = datetime.now(timezone.utc)
|
|
params = {
|
|
'calendarId': calendar_id,
|
|
'timeMin': time_min or now.isoformat().replace('+00:00', 'Z'),
|
|
'timeMax': time_max or (now + timedelta(days=days_ahead)).isoformat().replace('+00:00', 'Z'),
|
|
'maxResults': max_results,
|
|
'singleEvents': True,
|
|
'orderBy': 'startTime',
|
|
}
|
|
if query:
|
|
params['q'] = query
|
|
events = svc.events().list(**params).execute().get('items', [])
|
|
if not events:
|
|
return f'No events found in calendar "{calendar_id}".'
|
|
lines = []
|
|
for e in events:
|
|
start = e['start'].get('dateTime', e['start'].get('date', ''))
|
|
end = e['end'].get('dateTime', e['end'].get('date', ''))
|
|
eid = e.get('id', '')
|
|
lines.append(f'- "{e.get("summary", "No Title")}" (Starts: {start}, Ends: {end}) ID: {eid}')
|
|
return f'Retrieved {len(events)} events from "{calendar_id}":\n' + '\n'.join(lines)
|
|
except Exception as e:
|
|
return f'Error fetching calendar events: {e}'
|
|
|
|
|
|
@tool
|
|
def list_gmail_messages(max_results: int = 10, query: str = 'in:inbox') -> str:
|
|
"""List Gmail messages.
|
|
|
|
Args:
|
|
max_results: Maximum number of messages to return (default: 10)
|
|
query: Gmail search query (default: 'in:inbox')
|
|
"""
|
|
try:
|
|
creds = _get_creds(_actor_id())
|
|
svc = _svc('gmail', 'v1', creds)
|
|
result = svc.users().messages().list(userId='me', q=query, maxResults=max_results).execute()
|
|
messages = result.get('messages', [])
|
|
if not messages:
|
|
return 'No messages found.'
|
|
lines = []
|
|
for m in messages:
|
|
msg = svc.users().messages().get(
|
|
userId='me', id=m['id'], format='metadata',
|
|
metadataHeaders=['Subject', 'From', 'Date']
|
|
).execute()
|
|
h = {hdr['name']: hdr['value'] for hdr in msg.get('payload', {}).get('headers', [])}
|
|
lines.append(f"id={m['id']} | {h.get('Date', '')} | From: {h.get('From', '')} | {h.get('Subject', '(no subject)')}")
|
|
next_token = result.get('nextPageToken')
|
|
out = '\n'.join(lines)
|
|
if next_token:
|
|
out += f'\n(more results available)'
|
|
return out
|
|
except Exception as e:
|
|
return f'Error listing Gmail messages: {e}'
|
|
|
|
|
|
@tool
|
|
def get_gmail_message(message_id: str, body_format: str = 'text') -> str:
|
|
"""Get the full content of a Gmail message by ID.
|
|
|
|
Args:
|
|
message_id: The Gmail message ID
|
|
body_format: 'text' (default), 'html', or 'raw'
|
|
"""
|
|
try:
|
|
creds = _get_creds(_actor_id())
|
|
svc = _svc('gmail', 'v1', creds)
|
|
meta = svc.users().messages().get(
|
|
userId='me', id=message_id, format='metadata',
|
|
metadataHeaders=['Subject', 'From', 'To', 'Cc', 'Date']
|
|
).execute()
|
|
h = {hdr['name']: hdr['value'] for hdr in meta.get('payload', {}).get('headers', [])}
|
|
|
|
if body_format == 'raw':
|
|
import base64
|
|
raw = svc.users().messages().get(userId='me', id=message_id, format='raw').execute()
|
|
body = base64.urlsafe_b64decode(raw.get('raw', '') + '==').decode('utf-8', errors='replace')
|
|
else:
|
|
full = svc.users().messages().get(userId='me', id=message_id, format='full').execute()
|
|
body = _extract_body(full.get('payload', {}), prefer_html=(body_format == 'html'))
|
|
|
|
lines = [
|
|
f"From: {h.get('From', '')}",
|
|
f"To: {h.get('To', '')}",
|
|
f"Date: {h.get('Date', '')}",
|
|
f"Subject: {h.get('Subject', '')}",
|
|
'',
|
|
body,
|
|
]
|
|
if h.get('Cc'):
|
|
lines.insert(3, f"Cc: {h['Cc']}")
|
|
return '\n'.join(lines)
|
|
except Exception as e:
|
|
return f'Error fetching Gmail message: {e}'
|
|
|
|
|
|
def _extract_body(payload: dict, prefer_html: bool = False) -> str:
|
|
import base64
|
|
mime = payload.get('mimeType', '')
|
|
target = 'text/html' if prefer_html else 'text/plain'
|
|
fallback = 'text/plain' if prefer_html else 'text/html'
|
|
|
|
if mime == target:
|
|
data = payload.get('body', {}).get('data', '')
|
|
return base64.urlsafe_b64decode(data + '==').decode('utf-8', errors='replace') if data else ''
|
|
|
|
parts = payload.get('parts', [])
|
|
# First pass: preferred type
|
|
for part in parts:
|
|
if part.get('mimeType') == target:
|
|
data = part.get('body', {}).get('data', '')
|
|
return base64.urlsafe_b64decode(data + '==').decode('utf-8', errors='replace') if data else ''
|
|
# Second pass: fallback type
|
|
for part in parts:
|
|
if part.get('mimeType') == fallback:
|
|
data = part.get('body', {}).get('data', '')
|
|
return base64.urlsafe_b64decode(data + '==').decode('utf-8', errors='replace') if data else ''
|
|
# Recurse into multipart
|
|
for part in parts:
|
|
text = _extract_body(part, prefer_html)
|
|
if text:
|
|
return text
|
|
return ''
|