Files
agent-claw/agentclaw/app/agent_claw_main/tools/google_workspace.py

256 lines
9.8 KiB
Python

"""Google Calendar and Gmail tools — credentials injected from Secrets Manager per call.
Mirrors workspace-mcp (gcalendar/gmail) logic using google-api-python-client directly,
since workspace-mcp tool functions require FastMCP request context and cannot be called
outside an MCP server.
Credential secret: agent-claw/google-credentials/{actor_id.replace(':', '-')}
Contains: token, refresh_token, token_uri, client_id, client_secret, scopes
"""
import json
import traceback
import boto3
import httplib2
from strands import tool
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
from datetime import datetime, timezone, timedelta
_HTTP_TIMEOUT = 15
_sm = None
def _secrets():
global _sm
if _sm is None:
_sm = boto3.client('secretsmanager', region_name='us-east-1')
return _sm
def _get_creds(actor_id: str) -> Credentials:
from datetime import datetime
secret_name = 'agent-claw/google-credentials/' + actor_id.replace(':', '-')
print(f'[google] fetching creds actor={actor_id}')
data = json.loads(_secrets().get_secret_value(SecretId=secret_name)['SecretString'])
expiry_str = data.get('expiry')
if expiry_str:
exp_aware = datetime.fromisoformat(expiry_str.replace('Z', '+00:00'))
expiry = exp_aware.replace(tzinfo=None) # google-auth uses naive UTC datetimes
else:
expiry = None
stored_scopes = data.get('scopes', [])
api_scopes = [s for s in stored_scopes if s.startswith('https://')] if stored_scopes else None
# Fix stored scopes if they contain OIDC scopes
if stored_scopes and any(s in stored_scopes for s in ['openid', 'email', 'profile']):
data['scopes'] = api_scopes
_secrets().put_secret_value(SecretId=secret_name, SecretString=json.dumps(data))
print('[google] fixed stored scopes: removed OIDC scopes')
creds = Credentials(
token=data.get('token'),
refresh_token=data.get('refresh_token'),
token_uri=data.get('token_uri', 'https://oauth2.googleapis.com/token'),
client_id=data.get('client_id'),
client_secret=data.get('client_secret'),
scopes=api_scopes,
expiry=expiry,
)
print(f'[google] creds loaded, expired={creds.expired}')
if (creds.expired or not creds.valid) and creds.refresh_token:
print('[google] refreshing token')
creds.refresh(Request())
data['token'] = creds.token
if creds.expiry:
data['expiry'] = creds.expiry.isoformat()
_secrets().put_secret_value(SecretId=secret_name, SecretString=json.dumps(data))
print('[google] token refreshed and saved')
return creds
def _actor_id():
# Read from module-level var set by main.py per invocation
# DO NOT use 'import main as _main' — it re-runs main.py including app.run() which hangs
return _current_actor_id
# Set per-invocation by main.py before any tool call
_current_actor_id: str = ''
def _svc(api: str, version: str, creds: Credentials):
# Standard google-auth pattern: pass credentials= directly to build().
# google.oauth2.credentials.Credentials is natively supported by googleapiclient.
# (creds.authorize() is oauth2client only — not available here)
return build(api, version, credentials=creds, cache_discovery=False)
@tool
def list_calendars() -> str:
"""List all Google Calendars for the current user."""
try:
creds = _get_creds(_actor_id())
result = _svc('calendar', 'v3', creds).calendarList().list().execute()
items = result.get('items', [])
if not items:
return 'No calendars found.'
return '\n'.join(
f'- "{c.get("summary", "")}"{" (Primary)" if c.get("primary") else ""} (ID: {c["id"]})'
for c in items
)
except Exception as e:
tb = traceback.format_exc()
print(f'[google] list_calendars error: {e}\n{tb}')
return f'Error listing calendars: {e}'
@tool
def get_calendar_events(
calendar_id: str = 'primary',
days_ahead: int = 7,
time_min: str = '',
time_max: str = '',
max_results: int = 25,
query: str = '',
) -> str:
"""Get upcoming Google Calendar events.
Args:
calendar_id: Calendar ID (default: 'primary')
days_ahead: Days ahead to fetch when time_min/time_max not specified (default: 7)
time_min: Start of time range in RFC3339 format (optional)
time_max: End of time range in RFC3339 format (optional)
max_results: Maximum events to return (default: 25)
query: Keyword search within event fields (optional)
"""
try:
creds = _get_creds(_actor_id())
svc = _svc('calendar', 'v3', creds)
now = datetime.now(timezone.utc)
params = {
'calendarId': calendar_id,
'timeMin': time_min or now.isoformat().replace('+00:00', 'Z'),
'timeMax': time_max or (now + timedelta(days=days_ahead)).isoformat().replace('+00:00', 'Z'),
'maxResults': max_results,
'singleEvents': True,
'orderBy': 'startTime',
}
if query:
params['q'] = query
events = svc.events().list(**params).execute().get('items', [])
if not events:
return f'No events found in calendar "{calendar_id}".'
lines = []
for e in events:
start = e['start'].get('dateTime', e['start'].get('date', ''))
end = e['end'].get('dateTime', e['end'].get('date', ''))
eid = e.get('id', '')
lines.append(f'- "{e.get("summary", "No Title")}" (Starts: {start}, Ends: {end}) ID: {eid}')
return f'Retrieved {len(events)} events from "{calendar_id}":\n' + '\n'.join(lines)
except Exception as e:
tb = traceback.format_exc()
print(f'[google] get_calendar_events error: {e}\n{tb}')
return f'Error fetching calendar events: {e}'
@tool
def list_gmail_messages(max_results: int = 10, query: str = 'in:inbox') -> str:
"""List Gmail messages.
Args:
max_results: Maximum number of messages to return (default: 10)
query: Gmail search query (default: 'in:inbox')
"""
try:
creds = _get_creds(_actor_id())
svc = _svc('gmail', 'v1', creds)
result = svc.users().messages().list(userId='me', q=query, maxResults=max_results).execute()
messages = result.get('messages', [])
if not messages:
return 'No messages found.'
lines = []
for m in messages:
msg = svc.users().messages().get(
userId='me', id=m['id'], format='metadata',
metadataHeaders=['Subject', 'From', 'Date']
).execute()
h = {hdr['name']: hdr['value'] for hdr in msg.get('payload', {}).get('headers', [])}
lines.append(f"id={m['id']} | {h.get('Date', '')} | From: {h.get('From', '')} | {h.get('Subject', '(no subject)')}")
next_token = result.get('nextPageToken')
out = '\n'.join(lines)
if next_token:
out += f'\n(more results available)'
return out
except Exception as e:
tb = traceback.format_exc()
print(f'[google] list_gmail_messages error: {e}\n{tb}')
return f'Error listing Gmail messages: {e}'
@tool
def get_gmail_message(message_id: str, body_format: str = 'text') -> str:
"""Get the full content of a Gmail message by ID.
Args:
message_id: The Gmail message ID
body_format: 'text' (default), 'html', or 'raw'
"""
try:
creds = _get_creds(_actor_id())
svc = _svc('gmail', 'v1', creds)
meta = svc.users().messages().get(
userId='me', id=message_id, format='metadata',
metadataHeaders=['Subject', 'From', 'To', 'Cc', 'Date']
).execute()
h = {hdr['name']: hdr['value'] for hdr in meta.get('payload', {}).get('headers', [])}
if body_format == 'raw':
import base64
raw = svc.users().messages().get(userId='me', id=message_id, format='raw').execute()
body = base64.urlsafe_b64decode(raw.get('raw', '') + '==').decode('utf-8', errors='replace')
else:
full = svc.users().messages().get(userId='me', id=message_id, format='full').execute()
body = _extract_body(full.get('payload', {}), prefer_html=(body_format == 'html'))
lines = [
f"From: {h.get('From', '')}",
f"To: {h.get('To', '')}",
f"Date: {h.get('Date', '')}",
f"Subject: {h.get('Subject', '')}",
'',
body,
]
if h.get('Cc'):
lines.insert(3, f"Cc: {h['Cc']}")
return '\n'.join(lines)
except Exception as e:
return f'Error fetching Gmail message: {e}'
def _extract_body(payload: dict, prefer_html: bool = False) -> str:
import base64
mime = payload.get('mimeType', '')
target = 'text/html' if prefer_html else 'text/plain'
fallback = 'text/plain' if prefer_html else 'text/html'
if mime == target:
data = payload.get('body', {}).get('data', '')
return base64.urlsafe_b64decode(data + '==').decode('utf-8', errors='replace') if data else ''
parts = payload.get('parts', [])
# First pass: preferred type
for part in parts:
if part.get('mimeType') == target:
data = part.get('body', {}).get('data', '')
return base64.urlsafe_b64decode(data + '==').decode('utf-8', errors='replace') if data else ''
# Second pass: fallback type
for part in parts:
if part.get('mimeType') == fallback:
data = part.get('body', {}).get('data', '')
return base64.urlsafe_b64decode(data + '==').decode('utf-8', errors='replace') if data else ''
# Recurse into multipart
for part in parts:
text = _extract_body(part, prefer_html)
if text:
return text
return ''