Files
agent-claw/agentclaw/app/agent_claw_main/channels/telegram.py
daniel 3cc90550b5 feat: add Telegram file attachment support (inbound + outbound)
Inbound:
- tg-ingest detects document/photo/audio/video/voice attachments
- Downloads files via Telegram Bot API (getFile + download)
- Inlines small text files (<50KB) directly in the prompt
- Stores binary/large files to S3 (attachments/{chat_id}/{update_id}/{filename})
- agent-runner appends file context to the AgentCore prompt

Outbound:
- New send_file tool for the agent to send documents back to users
- TelegramAdapter.send_document uses multipart/form-data POST
- CDK grants tg-ingest S3 write access and passes bucket name env var
2026-05-13 05:34:33 -05:00

117 lines
4.5 KiB
Python

import os
import threading
import urllib.request
import json
import boto3
class TelegramAdapter:
"""Channel adapter for Telegram Bot API."""
def __init__(self, chat_id: str, bot_token_secret_arn: str = '', message_thread_id: int | None = None):
self.chat_id = str(chat_id)
self.thread_id = message_thread_id # None for regular chats, int for supergroup topics
self._secret_arn = bot_token_secret_arn
self._token: str | None = None
self._lock = threading.Lock()
def _get_token(self) -> str:
if self._token is None:
with self._lock:
if self._token is None:
secret_arn = self._secret_arn or os.environ.get(
'TELEGRAM_BOT_TOKEN_SECRET_ARN',
'arn:aws:secretsmanager:us-east-1:495395224548:secret:agent-claw/telegram-bot-token-Oq3in3'
)
sm = boto3.client('secretsmanager')
self._token = sm.get_secret_value(
SecretId=secret_arn
)['SecretString']
return self._token
def _api(self, method: str, data: dict) -> dict:
token = self._get_token()
body = json.dumps(data).encode()
req = urllib.request.Request(
f'https://api.telegram.org/bot{token}/{method}',
data=body,
headers={'Content-Type': 'application/json'},
)
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read())
def send(self, text: str) -> str:
"""Send message, return message_id."""
payload: dict = {
'chat_id': self.chat_id,
'text': text,
'parse_mode': 'Markdown',
}
if self.thread_id is not None:
payload['message_thread_id'] = self.thread_id
resp = self._api('sendMessage', payload)
return str(resp.get('result', {}).get('message_id', ''))
def send_typing(self) -> None:
"""Send typing action (best-effort)."""
try:
payload: dict = {'chat_id': self.chat_id, 'action': 'typing'}
if self.thread_id is not None:
payload['message_thread_id'] = self.thread_id
self._api('sendChatAction', payload)
except Exception as e:
import traceback
print(f'[telegram] send_typing failed: {e}\n{traceback.format_exc()}')
def send_document(self, file_bytes: bytes, filename: str, caption: str = '') -> str:
"""Send a file as a Telegram document using multipart/form-data. Returns message_id."""
import io
token = self._get_token()
url = f'https://api.telegram.org/bot{token}/sendDocument'
boundary = '----AgentClawBoundary'
body = io.BytesIO()
def add_field(name: str, value: str):
body.write(f'--{boundary}\r\n'.encode())
body.write(f'Content-Disposition: form-data; name="{name}"\r\n\r\n'.encode())
body.write(f'{value}\r\n'.encode())
def add_file(name: str, fname: str, data: bytes):
body.write(f'--{boundary}\r\n'.encode())
body.write(f'Content-Disposition: form-data; name="{name}"; filename="{fname}"\r\n'.encode())
body.write(b'Content-Type: application/octet-stream\r\n\r\n')
body.write(data)
body.write(b'\r\n')
add_field('chat_id', self.chat_id)
if self.thread_id is not None:
add_field('message_thread_id', str(self.thread_id))
if caption:
add_field('caption', caption)
add_file('document', filename, file_bytes)
body.write(f'--{boundary}--\r\n'.encode())
req = urllib.request.Request(
url, data=body.getvalue(),
headers={'Content-Type': f'multipart/form-data; boundary={boundary}'},
)
with urllib.request.urlopen(req, timeout=60) as resp:
result = json.loads(resp.read())
return str(result.get('result', {}).get('message_id', ''))
def edit(self, message_id: str, text: str) -> None:
"""Edit an existing message in-place."""
try:
payload: dict = {
'chat_id': self.chat_id,
'message_id': int(message_id),
'text': text,
'parse_mode': 'Markdown',
}
if self.thread_id is not None:
payload['message_thread_id'] = self.thread_id
self._api('editMessageText', payload)
except Exception:
pass