Phase 3: proactive heartbeat — EventBridge 30min rule, heartbeat-runner Lambda, HEARTBEAT_OK suppression
This commit is contained in:
@@ -0,0 +1,68 @@
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import boto3
|
||||
|
||||
_ddb = None
|
||||
_sqs = None
|
||||
|
||||
|
||||
def get_ddb():
|
||||
global _ddb
|
||||
if _ddb is None:
|
||||
_ddb = boto3.resource('dynamodb')
|
||||
return _ddb
|
||||
|
||||
|
||||
def get_sqs():
|
||||
global _sqs
|
||||
if _sqs is None:
|
||||
_sqs = boto3.client('sqs')
|
||||
return _sqs
|
||||
|
||||
|
||||
def handler(event, context):
|
||||
table_name = os.environ['USERS_TABLE_NAME']
|
||||
queue_url = os.environ['MESSAGE_QUEUE_URL']
|
||||
|
||||
# Scan for active users
|
||||
table = get_ddb().Table(table_name)
|
||||
response = table.scan(
|
||||
FilterExpression='#s = :active',
|
||||
ExpressionAttributeNames={'#s': 'status'},
|
||||
ExpressionAttributeValues={':active': 'active'},
|
||||
)
|
||||
users = response.get('Items', [])
|
||||
|
||||
# 30-min bucket for deduplication
|
||||
bucket_ts = str(int(time.time()) // 1800)
|
||||
sqs = get_sqs()
|
||||
|
||||
sent = 0
|
||||
for user in users:
|
||||
actor_id = user['actor_id']
|
||||
# Extract chat_id from actor_id (format: "telegram:<chat_id>")
|
||||
if not actor_id.startswith('telegram:'):
|
||||
continue
|
||||
chat_id = actor_id.split(':', 1)[1]
|
||||
|
||||
msg = {
|
||||
'chat_id': chat_id,
|
||||
'channel': 'telegram',
|
||||
'messages': [{
|
||||
'text': '[HEARTBEAT]',
|
||||
'from_name': user.get('display_name', ''),
|
||||
'from_username': user.get('telegram_username', ''),
|
||||
}],
|
||||
}
|
||||
|
||||
sqs.send_message(
|
||||
QueueUrl=queue_url,
|
||||
MessageBody=json.dumps(msg),
|
||||
MessageGroupId=actor_id,
|
||||
MessageDeduplicationId=f'heartbeat-{actor_id}-{bucket_ts}',
|
||||
)
|
||||
sent += 1
|
||||
|
||||
print(f'[heartbeat-runner] Sent {sent} heartbeat messages')
|
||||
return {'statusCode': 200, 'sent': sent}
|
||||
Reference in New Issue
Block a user