145 lines
4.1 KiB
Python
145 lines
4.1 KiB
Python
"""EventBridge scheduling tools: schedule_reminder, list_reminders, cancel_reminder."""
|
|
import json
|
|
import os
|
|
import re
|
|
import boto3
|
|
from strands import tool
|
|
|
|
# Injected by main.py before each invocation
|
|
_current_actor_id: str = ''
|
|
_current_chat_id: str = ''
|
|
|
|
SCHEDULER_LAMBDA_ARN = os.environ.get(
|
|
'SCHEDULER_LAMBDA_ARN',
|
|
'arn:aws:lambda:us-east-1:495395224548:function:agent-claw-scheduler'
|
|
)
|
|
ACCOUNT_ID = os.environ.get('AWS_ACCOUNT_ID', '')
|
|
REGION = 'us-east-1'
|
|
|
|
|
|
def _eb():
|
|
return boto3.client('events', region_name=REGION)
|
|
|
|
|
|
def _rule_prefix() -> str:
|
|
safe = re.sub(r'[^a-zA-Z0-9_-]', '-', _current_actor_id)
|
|
return f'agent-claw-reminder-{safe}-'
|
|
|
|
|
|
@tool
|
|
def schedule_reminder(message: str, when_utc: str) -> str:
|
|
"""Schedule a one-time reminder to be sent via Telegram at a specific UTC time.
|
|
|
|
Args:
|
|
message: The reminder text to send.
|
|
when_utc: ISO 8601 UTC datetime, e.g. '2026-05-09T09:00:00' (no timezone suffix).
|
|
"""
|
|
if not SCHEDULER_LAMBDA_ARN:
|
|
return 'SCHEDULER_LAMBDA_ARN not configured.'
|
|
if not _current_chat_id:
|
|
return 'chat_id not available.'
|
|
|
|
# Convert ISO datetime to EventBridge cron: cron(min hour day month ? year)
|
|
try:
|
|
from datetime import datetime
|
|
dt = datetime.fromisoformat(when_utc.rstrip('Z'))
|
|
cron_expr = f'cron({dt.minute} {dt.hour} {dt.day} {dt.month} ? {dt.year})'
|
|
except ValueError as e:
|
|
return f'Invalid when_utc format: {e}'
|
|
|
|
import time
|
|
rule_name = f'{_rule_prefix()}{int(time.time())}'
|
|
|
|
eb = _eb()
|
|
eb.put_rule(
|
|
Name=rule_name,
|
|
ScheduleExpression=cron_expr,
|
|
State='ENABLED',
|
|
)
|
|
|
|
# Grant EventBridge permission to invoke the Lambda
|
|
lm = boto3.client('lambda', region_name=REGION)
|
|
try:
|
|
lm.add_permission(
|
|
FunctionName=SCHEDULER_LAMBDA_ARN,
|
|
StatementId=rule_name,
|
|
Action='lambda:InvokeFunction',
|
|
Principal='events.amazonaws.com',
|
|
SourceArn=f'arn:aws:events:{REGION}:{_account_id()}:rule/{rule_name}',
|
|
)
|
|
except lm.exceptions.ResourceConflictException:
|
|
pass
|
|
|
|
eb.put_targets(
|
|
Rule=rule_name,
|
|
Targets=[{
|
|
'Id': 'scheduler',
|
|
'Arn': SCHEDULER_LAMBDA_ARN,
|
|
'Input': json.dumps({
|
|
'chat_id': _current_chat_id,
|
|
'message': message,
|
|
'rule_name': rule_name,
|
|
}),
|
|
}],
|
|
)
|
|
|
|
return f'Reminder scheduled: "{message}" at {when_utc} UTC (rule: {rule_name})'
|
|
|
|
|
|
@tool
|
|
def list_reminders() -> str:
|
|
"""List all pending reminders for the current user."""
|
|
eb = _eb()
|
|
prefix = _rule_prefix()
|
|
rules = []
|
|
kwargs: dict = {'NamePrefix': prefix}
|
|
while True:
|
|
resp = eb.list_rules(**kwargs)
|
|
rules.extend(resp.get('Rules', []))
|
|
token = resp.get('NextToken')
|
|
if not token:
|
|
break
|
|
kwargs['NextToken'] = token
|
|
|
|
if not rules:
|
|
return 'No pending reminders.'
|
|
|
|
lines = []
|
|
for r in rules:
|
|
lines.append(f"- {r['Name']}: {r.get('ScheduleExpression', '')} [{r.get('State', '')}]")
|
|
return '\n'.join(lines)
|
|
|
|
|
|
@tool
|
|
def cancel_reminder(rule_name: str) -> str:
|
|
"""Cancel a scheduled reminder by its rule name.
|
|
|
|
Args:
|
|
rule_name: The EventBridge rule name (from list_reminders).
|
|
"""
|
|
prefix = _rule_prefix()
|
|
if not rule_name.startswith(prefix):
|
|
return f'Rule "{rule_name}" does not belong to your account.'
|
|
|
|
eb = _eb()
|
|
try:
|
|
eb.remove_targets(Rule=rule_name, Ids=['scheduler'])
|
|
eb.delete_rule(Name=rule_name)
|
|
except eb.exceptions.ResourceNotFoundException:
|
|
return f'Rule "{rule_name}" not found.'
|
|
|
|
# Remove Lambda permission
|
|
lm = boto3.client('lambda', region_name=REGION)
|
|
try:
|
|
lm.remove_permission(FunctionName=SCHEDULER_LAMBDA_ARN, StatementId=rule_name)
|
|
except Exception:
|
|
pass
|
|
|
|
return f'Reminder "{rule_name}" cancelled.'
|
|
|
|
|
|
def _account_id() -> str:
|
|
if ACCOUNT_ID:
|
|
return ACCOUNT_ID
|
|
return boto3.client('sts', region_name=REGION).get_caller_identity()['Account']
|