embed workspace-mcp as direct dependency, simplify google credential loading

- Add workspace-mcp >= 1.20.0 to pyproject.toml (pulls google-api-python-client etc. transitively)
- Remove redundant google-api-python-client/google-auth/google-auth-httplib2 direct deps
- Rewrite google_workspace.py: single Secrets Manager call per tool (client_id/client_secret
  are already in the credentials secret stored by oauth-handler, no separate oauth-client secret needed)
- Mirror workspace-mcp output format for list_calendars and get_calendar_events
- Add body_format param to get_gmail_message (text/html/raw) matching workspace-mcp API
- Update uv.lock
This commit is contained in:
daniel
2026-05-08 11:12:06 -05:00
parent 245c2d64f5
commit 350ce231a4
3 changed files with 767 additions and 80 deletions

View File

@@ -1,4 +1,12 @@
"""Direct Google Calendar and Gmail tools using google-api-python-client."""
"""Google Calendar and Gmail tools — credentials injected from Secrets Manager per call.
Mirrors workspace-mcp (gcalendar/gmail) logic using google-api-python-client directly,
since workspace-mcp tool functions require FastMCP request context and cannot be called
outside an MCP server.
Credential secret: agent-claw/google-credentials/{actor_id.replace(':', '-')}
Contains: token, refresh_token, token_uri, client_id, client_secret, scopes
"""
import json
import boto3
import httplib2
@@ -8,9 +16,7 @@ from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from datetime import datetime, timezone, timedelta
# Set a 15-second timeout on httplib2 to prevent hangs
_HTTP_TIMEOUT = 15
_sm = None
@@ -23,90 +29,94 @@ def _secrets():
def _get_creds(actor_id: str) -> Credentials:
secret_name = 'agent-claw/google-credentials/' + actor_id.replace(':', '-')
print(f'[google] fetching creds for actor={actor_id} secret={secret_name}')
resp = _secrets().get_secret_value(SecretId=secret_name)
print(f'[google] got credential secret')
data = json.loads(resp['SecretString'])
# Load OAuth client info
print('[google] fetching oauth client secret')
client_resp = _secrets().get_secret_value(SecretId='agent-claw/google-oauth-client')
print('[google] got oauth client secret')
client = json.loads(client_resp['SecretString'])
print(f'[google] fetching creds actor={actor_id}')
data = json.loads(_secrets().get_secret_value(SecretId=secret_name)['SecretString'])
creds = Credentials(
token=data.get('token'),
refresh_token=data.get('refresh_token'),
token_uri=data.get('token_uri', 'https://oauth2.googleapis.com/token'),
client_id=client.get('client_id'),
client_secret=client.get('client_secret'),
client_id=data.get('client_id'),
client_secret=data.get('client_secret'),
scopes=data.get('scopes'),
)
print(f'[google] creds created expired={creds.expired}')
if creds.expired and creds.refresh_token:
print('[google] refreshing token')
creds.refresh(Request())
print('[google] token refreshed')
# Persist refreshed token
data['token'] = creds.token
_secrets().put_secret_value(SecretId=secret_name, SecretString=json.dumps(data))
return creds
def _actor_id():
# Import here to avoid circular import; set per-invocation in main.py
import main as _main
return _main._current_actor_id
def _svc(api: str, version: str, creds: Credentials):
http = creds.authorize(httplib2.Http(timeout=_HTTP_TIMEOUT))
return build(api, version, http=http, credentials=creds, cache_discovery=False)
@tool
def list_calendars() -> str:
"""List all Google Calendars for the current user."""
try:
creds = _get_creds(_actor_id())
http = creds.authorize(httplib2.Http(timeout=_HTTP_TIMEOUT))
svc = build('calendar', 'v3', http=http, credentials=creds, cache_discovery=False)
result = svc.calendarList().list().execute()
result = _svc('calendar', 'v3', creds).calendarList().list().execute()
items = result.get('items', [])
if not items:
return 'No calendars found.'
return '\n'.join(f"{c['id']}: {c['summary']}" for c in items)
return '\n'.join(
f'- "{c.get("summary", "")}"{" (Primary)" if c.get("primary") else ""} (ID: {c["id"]})'
for c in items
)
except Exception as e:
return f'Error listing calendars: {e}'
@tool
def get_calendar_events(calendar_id: str = 'primary', days_ahead: int = 7) -> str:
def get_calendar_events(
calendar_id: str = 'primary',
days_ahead: int = 7,
time_min: str = '',
time_max: str = '',
max_results: int = 25,
query: str = '',
) -> str:
"""Get upcoming Google Calendar events.
Args:
calendar_id: Calendar ID (default: 'primary')
days_ahead: Number of days ahead to fetch (default: 7)
days_ahead: Days ahead to fetch when time_min/time_max not specified (default: 7)
time_min: Start of time range in RFC3339 format (optional)
time_max: End of time range in RFC3339 format (optional)
max_results: Maximum events to return (default: 25)
query: Keyword search within event fields (optional)
"""
try:
creds = _get_creds(_actor_id())
http = creds.authorize(httplib2.Http(timeout=_HTTP_TIMEOUT))
svc = build('calendar', 'v3', http=http, credentials=creds, cache_discovery=False)
svc = _svc('calendar', 'v3', creds)
now = datetime.now(timezone.utc)
time_max = now + timedelta(days=days_ahead)
result = svc.events().list(
calendarId=calendar_id,
timeMin=now.isoformat(),
timeMax=time_max.isoformat(),
singleEvents=True,
orderBy='startTime',
maxResults=50,
).execute()
events = result.get('items', [])
params = {
'calendarId': calendar_id,
'timeMin': time_min or now.isoformat().replace('+00:00', 'Z'),
'timeMax': time_max or (now + timedelta(days=days_ahead)).isoformat().replace('+00:00', 'Z'),
'maxResults': max_results,
'singleEvents': True,
'orderBy': 'startTime',
}
if query:
params['q'] = query
events = svc.events().list(**params).execute().get('items', [])
if not events:
return 'No upcoming events.'
return f'No events found in calendar "{calendar_id}".'
lines = []
for e in events:
start = e['start'].get('dateTime', e['start'].get('date', ''))
lines.append(f"{start}{e.get('summary', '(no title)')}")
return '\n'.join(lines)
end = e['end'].get('dateTime', e['end'].get('date', ''))
eid = e.get('id', '')
lines.append(f'- "{e.get("summary", "No Title")}" (Starts: {start}, Ends: {end}) ID: {eid}')
return f'Retrieved {len(events)} events from "{calendar_id}":\n' + '\n'.join(lines)
except Exception as e:
return f'Error fetching calendar events: {e}'
@@ -121,11 +131,8 @@ def list_gmail_messages(max_results: int = 10, query: str = 'in:inbox') -> str:
"""
try:
creds = _get_creds(_actor_id())
http = creds.authorize(httplib2.Http(timeout=_HTTP_TIMEOUT))
svc = build('gmail', 'v1', http=http, credentials=creds, cache_discovery=False)
result = svc.users().messages().list(
userId='me', q=query, maxResults=max_results
).execute()
svc = _svc('gmail', 'v1', creds)
result = svc.users().messages().list(userId='me', q=query, maxResults=max_results).execute()
messages = result.get('messages', [])
if not messages:
return 'No messages found.'
@@ -135,50 +142,81 @@ def list_gmail_messages(max_results: int = 10, query: str = 'in:inbox') -> str:
userId='me', id=m['id'], format='metadata',
metadataHeaders=['Subject', 'From', 'Date']
).execute()
headers = {h['name']: h['value'] for h in msg.get('payload', {}).get('headers', [])}
lines.append(
f"id={m['id']} | {headers.get('Date', '')} | From: {headers.get('From', '')} | {headers.get('Subject', '(no subject)')}"
)
return '\n'.join(lines)
h = {hdr['name']: hdr['value'] for hdr in msg.get('payload', {}).get('headers', [])}
lines.append(f"id={m['id']} | {h.get('Date', '')} | From: {h.get('From', '')} | {h.get('Subject', '(no subject)')}")
next_token = result.get('nextPageToken')
out = '\n'.join(lines)
if next_token:
out += f'\n(more results available)'
return out
except Exception as e:
return f'Error listing Gmail messages: {e}'
@tool
def get_gmail_message(message_id: str) -> str:
def get_gmail_message(message_id: str, body_format: str = 'text') -> str:
"""Get the full content of a Gmail message by ID.
Args:
message_id: The Gmail message ID
body_format: 'text' (default), 'html', or 'raw'
"""
try:
creds = _get_creds(_actor_id())
http = creds.authorize(httplib2.Http(timeout=_HTTP_TIMEOUT))
svc = build('gmail', 'v1', http=http, credentials=creds, cache_discovery=False)
msg = svc.users().messages().get(
userId='me', id=message_id, format='full'
svc = _svc('gmail', 'v1', creds)
meta = svc.users().messages().get(
userId='me', id=message_id, format='metadata',
metadataHeaders=['Subject', 'From', 'To', 'Cc', 'Date']
).execute()
headers = {h['name']: h['value'] for h in msg.get('payload', {}).get('headers', [])}
body = _extract_body(msg.get('payload', {}))
return (
f"From: {headers.get('From', '')}\n"
f"To: {headers.get('To', '')}\n"
f"Date: {headers.get('Date', '')}\n"
f"Subject: {headers.get('Subject', '')}\n\n"
f"{body}"
)
h = {hdr['name']: hdr['value'] for hdr in meta.get('payload', {}).get('headers', [])}
if body_format == 'raw':
import base64
raw = svc.users().messages().get(userId='me', id=message_id, format='raw').execute()
body = base64.urlsafe_b64decode(raw.get('raw', '') + '==').decode('utf-8', errors='replace')
else:
full = svc.users().messages().get(userId='me', id=message_id, format='full').execute()
body = _extract_body(full.get('payload', {}), prefer_html=(body_format == 'html'))
lines = [
f"From: {h.get('From', '')}",
f"To: {h.get('To', '')}",
f"Date: {h.get('Date', '')}",
f"Subject: {h.get('Subject', '')}",
'',
body,
]
if h.get('Cc'):
lines.insert(3, f"Cc: {h['Cc']}")
return '\n'.join(lines)
except Exception as e:
return f'Error fetching Gmail message: {e}'
def _extract_body(payload: dict) -> str:
def _extract_body(payload: dict, prefer_html: bool = False) -> str:
import base64
mime = payload.get('mimeType', '')
if mime == 'text/plain':
target = 'text/html' if prefer_html else 'text/plain'
fallback = 'text/plain' if prefer_html else 'text/html'
if mime == target:
data = payload.get('body', {}).get('data', '')
return base64.urlsafe_b64decode(data + '==').decode('utf-8', errors='replace') if data else ''
for part in payload.get('parts', []):
text = _extract_body(part)
parts = payload.get('parts', [])
# First pass: preferred type
for part in parts:
if part.get('mimeType') == target:
data = part.get('body', {}).get('data', '')
return base64.urlsafe_b64decode(data + '==').decode('utf-8', errors='replace') if data else ''
# Second pass: fallback type
for part in parts:
if part.get('mimeType') == fallback:
data = part.get('body', {}).get('data', '')
return base64.urlsafe_b64decode(data + '==').decode('utf-8', errors='replace') if data else ''
# Recurse into multipart
for part in parts:
text = _extract_body(part, prefer_html)
if text:
return text
return ''