diff --git a/agentclaw/app/agent_claw_main/channels/telegram.py b/agentclaw/app/agent_claw_main/channels/telegram.py index 3e10032..6417b19 100644 --- a/agentclaw/app/agent_claw_main/channels/telegram.py +++ b/agentclaw/app/agent_claw_main/channels/telegram.py @@ -63,6 +63,43 @@ class TelegramAdapter: import traceback print(f'[telegram] send_typing failed: {e}\n{traceback.format_exc()}') + def send_document(self, file_bytes: bytes, filename: str, caption: str = '') -> str: + """Send a file as a Telegram document using multipart/form-data. Returns message_id.""" + import io + token = self._get_token() + url = f'https://api.telegram.org/bot{token}/sendDocument' + + boundary = '----AgentClawBoundary' + body = io.BytesIO() + + def add_field(name: str, value: str): + body.write(f'--{boundary}\r\n'.encode()) + body.write(f'Content-Disposition: form-data; name="{name}"\r\n\r\n'.encode()) + body.write(f'{value}\r\n'.encode()) + + def add_file(name: str, fname: str, data: bytes): + body.write(f'--{boundary}\r\n'.encode()) + body.write(f'Content-Disposition: form-data; name="{name}"; filename="{fname}"\r\n'.encode()) + body.write(b'Content-Type: application/octet-stream\r\n\r\n') + body.write(data) + body.write(b'\r\n') + + add_field('chat_id', self.chat_id) + if self.thread_id is not None: + add_field('message_thread_id', str(self.thread_id)) + if caption: + add_field('caption', caption) + add_file('document', filename, file_bytes) + body.write(f'--{boundary}--\r\n'.encode()) + + req = urllib.request.Request( + url, data=body.getvalue(), + headers={'Content-Type': f'multipart/form-data; boundary={boundary}'}, + ) + with urllib.request.urlopen(req, timeout=60) as resp: + result = json.loads(resp.read()) + return str(result.get('result', {}).get('message_id', '')) + def edit(self, message_id: str, text: str) -> None: """Edit an existing message in-place.""" try: diff --git a/agentclaw/app/agent_claw_main/main.py b/agentclaw/app/agent_claw_main/main.py index 1bb5725..3cbb0f1 100644 --- a/agentclaw/app/agent_claw_main/main.py +++ b/agentclaw/app/agent_claw_main/main.py @@ -17,6 +17,7 @@ from tools.scheduler import schedule_reminder, list_reminders, cancel_reminder import tools.scheduler as _scheduler_module from tools.home_assistant import home_assistant, set_ha_config from tools.google_workspace import list_calendars, get_calendar_events, list_gmail_messages, get_gmail_message +from tools.send_file import send_file as _send_file_impl import tools.google_workspace as _gws import httpx import botocore.auth @@ -74,6 +75,19 @@ def web_search(query: str) -> str: return web_tools.brave_search(query) +@tool +def send_file(file_content: str, filename: str, caption: str = '') -> str: + """Send a file to the user as a Telegram document attachment. + Use this when you need to send code, data, or any text content as a downloadable file. + + Args: + file_content: The text content of the file to send. + filename: The filename with extension (e.g. 'report.txt', 'data.csv', 'script.py'). + caption: Optional caption to display with the file. + """ + return _send_file_impl(file_content, filename, caption) + + @tool def web_fetch(url: str) -> str: """Fetch and extract readable text content from a URL.""" @@ -360,7 +374,7 @@ async def main(payload: dict, context): home_assistant, connect_google_account, list_google_accounts, remove_google_account, manage_service, schedule_reminder, list_reminders, cancel_reminder, list_calendars, get_calendar_events, list_gmail_messages, get_gmail_message, - run_code] + run_code, send_file] agent = Agent( model=model, diff --git a/agentclaw/app/agent_claw_main/tools/send_file.py b/agentclaw/app/agent_claw_main/tools/send_file.py new file mode 100644 index 0000000..44a5d3a --- /dev/null +++ b/agentclaw/app/agent_claw_main/tools/send_file.py @@ -0,0 +1,20 @@ +"""Send file tool — sends documents to the user via Telegram.""" +from tools import messaging + + +def send_file(file_content: str, filename: str, caption: str = '') -> str: + """Send a file to the user as a Telegram document attachment. + + Args: + file_content: The text content of the file to send. + filename: The filename (e.g. 'report.txt', 'data.csv'). + caption: Optional caption to display with the file. + """ + adapter = messaging._adapter + if adapter is None: + return 'No channel adapter configured.' + if not hasattr(adapter, 'send_document'): + return 'Channel adapter does not support file sending.' + file_bytes = file_content.encode('utf-8') + msg_id = adapter.send_document(file_bytes, filename, caption) + return f'File "{filename}" sent (id={msg_id})' if msg_id else f'File "{filename}" sent' diff --git a/cdk/lib/agent-claw-stack.ts b/cdk/lib/agent-claw-stack.ts index 88978c1..401bbf7 100644 --- a/cdk/lib/agent-claw-stack.ts +++ b/cdk/lib/agent-claw-stack.ts @@ -96,10 +96,12 @@ export class AgentClawStack extends cdk.Stack { MESSAGE_QUEUE_URL: messageQueue.queueUrl, TELEGRAM_BOT_TOKEN_SECRET_ARN: telegramBotTokenSecretArn, TELEGRAM_WEBHOOK_SECRET: '', // set via SSM or direct env after deploy + ATTACHMENTS_BUCKET_NAME: workspaceBucket.bucketName, }, }); messageQueue.grantSendMessages(tgIngestFn); botTokenSecret.grantRead(tgIngestFn); + workspaceBucket.grantWrite(tgIngestFn); // ── Lambda: agent-runner ─────────────────────────────────────────────── const agentRunnerFn = new lambda.Function(this, 'AgentRunner', { diff --git a/src/lambdas/agent-runner/handler.py b/src/lambdas/agent-runner/handler.py index bd26ccc..ee05504 100644 --- a/src/lambdas/agent-runner/handler.py +++ b/src/lambdas/agent-runner/handler.py @@ -184,6 +184,18 @@ def handler(event, context): ] prompt = f"You have {len(records)} queued messages:\n" + "\n".join(lines) + # ── Attach file context if present ──────────────────────────────────── + attachment = first.get('attachment') + if attachment: + file_name = attachment.get('file_name', 'unknown') + if 'inline_content' in attachment: + prompt += f"\n\n[Attached file: {file_name}]\n```\n{attachment['inline_content']}\n```" + elif 's3_key' in attachment: + s3_ref = f"s3://{attachment['s3_bucket']}/{attachment['s3_key']}" + prompt += f"\n\n[Attached file: {file_name} ({attachment.get('mime_type', '')}) — stored at {s3_ref}]" + elif 'error' in attachment: + prompt += f"\n\n[Attachment {file_name} could not be processed: {attachment['error']}]" + # ── Build payload for AgentCore Runtime 1 ──────────────────────────── payload: dict[str, Any] = { 'prompt': prompt, diff --git a/src/lambdas/tg-ingest/handler.py b/src/lambdas/tg-ingest/handler.py index d987ee2..defcb48 100644 --- a/src/lambdas/tg-ingest/handler.py +++ b/src/lambdas/tg-ingest/handler.py @@ -9,6 +9,11 @@ import boto3 _bot_token: str | None = None _token_lock = threading.Lock() +TEXT_EXTENSIONS = {'.txt', '.py', '.js', '.ts', '.json', '.md', '.csv', '.xml', '.html', + '.css', '.yaml', '.yml', '.toml', '.ini', '.cfg', '.sh', '.bash', + '.sql', '.log', '.env', '.rs', '.go', '.java', '.c', '.h', '.cpp'} +MAX_INLINE_SIZE = 50 * 1024 # 50KB + def get_bot_token() -> str: global _bot_token @@ -40,6 +45,64 @@ def send_typing(chat_id: str, thread_id: int | None = None) -> None: pass # typing is best-effort +def get_file_from_telegram(file_id: str) -> tuple[str, bytes]: + """Call getFile then download. Returns (file_path, file_bytes).""" + token = get_bot_token() + # getFile + url = f'https://api.telegram.org/bot{token}/getFile' + data = json.dumps({'file_id': file_id}).encode() + req = urllib.request.Request(url, data=data, headers={'Content-Type': 'application/json'}) + with urllib.request.urlopen(req, timeout=30) as resp: + result = json.loads(resp.read()).get('result', {}) + file_path = result.get('file_path', '') + # Download + download_url = f'https://api.telegram.org/file/bot{token}/{file_path}' + with urllib.request.urlopen(download_url, timeout=60) as resp: + file_bytes = resp.read() + return file_path, file_bytes + + +def extract_attachment(message: dict) -> dict | None: + """Extract file attachment info from a Telegram message. Returns metadata dict or None.""" + # Priority: document > photo > audio > video > voice > video_note + if 'document' in message: + doc = message['document'] + return {'type': 'document', 'file_id': doc['file_id'], + 'file_name': doc.get('file_name', 'document'), 'mime_type': doc.get('mime_type', ''), + 'file_size': doc.get('file_size', 0)} + if 'photo' in message: + # Take largest photo (last in array) + photo = message['photo'][-1] + return {'type': 'photo', 'file_id': photo['file_id'], + 'file_name': 'photo.jpg', 'mime_type': 'image/jpeg', + 'file_size': photo.get('file_size', 0)} + if 'audio' in message: + audio = message['audio'] + return {'type': 'audio', 'file_id': audio['file_id'], + 'file_name': audio.get('file_name', 'audio.ogg'), 'mime_type': audio.get('mime_type', 'audio/ogg'), + 'file_size': audio.get('file_size', 0)} + if 'video' in message: + video = message['video'] + return {'type': 'video', 'file_id': video['file_id'], + 'file_name': video.get('file_name', 'video.mp4'), 'mime_type': video.get('mime_type', 'video/mp4'), + 'file_size': video.get('file_size', 0)} + if 'voice' in message: + voice = message['voice'] + return {'type': 'voice', 'file_id': voice['file_id'], + 'file_name': 'voice.ogg', 'mime_type': voice.get('mime_type', 'audio/ogg'), + 'file_size': voice.get('file_size', 0)} + return None + + +def is_text_file(file_name: str, mime_type: str) -> bool: + """Determine if a file should be inlined as text.""" + ext = os.path.splitext(file_name)[1].lower() + if ext in TEXT_EXTENSIONS: + return True + if mime_type.startswith('text/'): + return True + return False + def handler(event, context): # ── Validate Telegram webhook secret ────────────────────────────────── @@ -68,14 +131,69 @@ def handler(event, context): chat_id = str(message.get('chat', {}).get('id', '')) message_thread_id = message.get('message_thread_id') # present for supergroup topics - text = message.get('text', '') + text = message.get('text', '') or message.get('caption', '') from_user = message.get('from', {}) timestamp = message.get('date', 0) - print(f'[tg-ingest] chat_id={chat_id} text_len={len(text)} update_id={update_id}') + # ── Detect file attachment ──────────────────────────────────────────── + attachment = extract_attachment(message) + attachment_meta = None - if not chat_id or not text: - print(f'[tg-ingest] Dropping: chat_id={chat_id!r} text={text!r}') + if attachment: + print(f'[tg-ingest] Attachment detected: type={attachment["type"]} name={attachment["file_name"]} size={attachment["file_size"]}') + try: + file_path, file_bytes = get_file_from_telegram(attachment['file_id']) + file_name = attachment['file_name'] + mime_type = attachment['mime_type'] + + if is_text_file(file_name, mime_type) and len(file_bytes) <= MAX_INLINE_SIZE: + # Inline small text files + try: + text_content = file_bytes.decode('utf-8') + except UnicodeDecodeError: + text_content = file_bytes.decode('latin-1') + attachment_meta = { + 'type': attachment['type'], + 'file_name': file_name, + 'mime_type': mime_type, + 'inline_content': text_content, + } + else: + # Store to S3 + bucket = os.environ.get('ATTACHMENTS_BUCKET_NAME', '') + if bucket: + s3 = boto3.client('s3') + s3_key = f'attachments/{chat_id}/{update_id}/{file_name}' + s3.put_object(Bucket=bucket, Key=s3_key, Body=file_bytes, + ContentType=mime_type or 'application/octet-stream') + attachment_meta = { + 'type': attachment['type'], + 'file_name': file_name, + 'mime_type': mime_type, + 's3_bucket': bucket, + 's3_key': s3_key, + } + print(f'[tg-ingest] Stored to s3://{bucket}/{s3_key}') + else: + print(f'[tg-ingest] No ATTACHMENTS_BUCKET_NAME configured, skipping S3 upload') + attachment_meta = { + 'type': attachment['type'], + 'file_name': file_name, + 'mime_type': mime_type, + 'error': 'S3 bucket not configured', + } + except Exception as e: + print(f'[tg-ingest] Failed to process attachment: {e}') + attachment_meta = { + 'type': attachment['type'], + 'file_name': attachment['file_name'], + 'error': str(e), + } + + print(f'[tg-ingest] chat_id={chat_id} text_len={len(text)} attachment={bool(attachment_meta)} update_id={update_id}') + + if not chat_id or (not text and not attachment_meta): + print(f'[tg-ingest] Dropping: chat_id={chat_id!r} text={text!r} attachment={attachment_meta}') return {'statusCode': 200, 'body': 'ok'} # ── Send typing action (non-blocking, background thread) ────────────── @@ -85,23 +203,27 @@ def handler(event, context): # ── Enqueue to SQS FIFO ─────────────────────────────────────────────── sqs = boto3.client('sqs') + msg_body: dict = { + 'channel': 'telegram', + 'chat_id': chat_id, + 'message_thread_id': message_thread_id, + 'messages': [{ + 'text': text, + 'from_id': str(from_user.get('id', '')), + 'from_username': from_user.get('username', ''), + 'from_name': f"{from_user.get('first_name', '')} {from_user.get('last_name', '')}".strip(), + }], + 'update_id': update_id, + 'timestamp': timestamp, + } + if attachment_meta: + msg_body['attachment'] = attachment_meta + sqs.send_message( QueueUrl=os.environ['MESSAGE_QUEUE_URL'], MessageGroupId=chat_id, MessageDeduplicationId=str(update_id), - MessageBody=json.dumps({ - 'channel': 'telegram', - 'chat_id': chat_id, - 'message_thread_id': message_thread_id, # None for regular chats, int for topics - 'messages': [{ - 'text': text, - 'from_id': str(from_user.get('id', '')), - 'from_username': from_user.get('username', ''), - 'from_name': f"{from_user.get('first_name', '')} {from_user.get('last_name', '')}".strip(), - }], - 'update_id': update_id, - 'timestamp': timestamp, - }), + MessageBody=json.dumps(msg_body), ) return {'statusCode': 200, 'body': 'ok'}