Files
2026-05-06 18:55:16 -05:00

123 lines
5.8 KiB
Python

import boto3 # type: ignore
import json
import logging
import urllib.request
s3 = boto3.client("s3")
EVENTBRIDGE_CONFIGURATION = 'EventBridgeConfiguration'
CONFIGURATION_TYPES = ["TopicConfigurations", "QueueConfigurations", "LambdaFunctionConfigurations"]
def handler(event: dict, context):
response_status = "SUCCESS"
error_message = ""
try:
props = event["ResourceProperties"]
notification_configuration = props["NotificationConfiguration"]
managed = props.get('Managed', 'true').lower() == 'true'
skipDestinationValidation = props.get('SkipDestinationValidation', 'false').lower() == 'true'
stack_id = event['StackId']
old = event.get("OldResourceProperties", {}).get("NotificationConfiguration", {})
if managed:
config = handle_managed(event["RequestType"], notification_configuration)
else:
config = handle_unmanaged(props["BucketName"], stack_id, event["RequestType"], notification_configuration, old)
s3.put_bucket_notification_configuration(Bucket=props["BucketName"], NotificationConfiguration=config, SkipDestinationValidation=skipDestinationValidation)
except Exception as e:
logging.exception("Failed to put bucket notification configuration")
response_status = "FAILED"
error_message = f"Error: {str(e)}. "
finally:
submit_response(event, context, response_status, error_message)
def handle_managed(request_type, notification_configuration):
if request_type == 'Delete':
return {}
return notification_configuration
def handle_unmanaged(bucket, stack_id, request_type, notification_configuration, old):
def get_id(n):
n['Id'] = ''
sorted_notifications = sort_filter_rules(n)
strToHash=json.dumps(sorted_notifications, sort_keys=True).replace('"Name": "prefix"', '"Name": "Prefix"').replace('"Name": "suffix"', '"Name": "Suffix"')
return f"{stack_id}-{hash(strToHash)}"
def with_id(n):
n['Id'] = get_id(n)
return n
# find external notifications
external_notifications = {}
existing_notifications = s3.get_bucket_notification_configuration(Bucket=bucket)
for t in CONFIGURATION_TYPES:
if request_type == 'Update':
old_incoming_ids = [get_id(n) for n in old.get(t, [])]
# if the notification was created by us, we know what id to expect so we can filter by it.
external_notifications[t] = [n for n in existing_notifications.get(t, []) if not get_id(n) in old_incoming_ids]
elif request_type == 'Delete':
# For 'Delete' request, old parameter is an empty dict so we cannot use this to determine which are external
# notifications. Fall back to rely on the stack naming logic.
external_notifications[t] = [n for n in existing_notifications.get(t, []) if not n['Id'].startswith(f"{stack_id}-")]
elif request_type == 'Create':
# if this is a create event then all existing notifications are external
external_notifications[t] = [n for n in existing_notifications.get(t, [])]
# always treat EventBridge configuration as an external config if it already exists
# as there is no way to determine whether it's managed by us or not
if EVENTBRIDGE_CONFIGURATION in existing_notifications:
external_notifications[EVENTBRIDGE_CONFIGURATION] = existing_notifications[EVENTBRIDGE_CONFIGURATION]
# if delete, that's all we need
if request_type == 'Delete':
return external_notifications
# otherwise, merge external with incoming config and augment with id
notifications = {}
for t in CONFIGURATION_TYPES:
external = external_notifications.get(t, [])
incoming = [with_id(n) for n in notification_configuration.get(t, [])]
notifications[t] = external + incoming
# EventBridge configuration is a special case because it's just an empty object if it exists
if EVENTBRIDGE_CONFIGURATION in notification_configuration:
notifications[EVENTBRIDGE_CONFIGURATION] = notification_configuration[EVENTBRIDGE_CONFIGURATION]
elif EVENTBRIDGE_CONFIGURATION in external_notifications:
notifications[EVENTBRIDGE_CONFIGURATION] = external_notifications[EVENTBRIDGE_CONFIGURATION]
return notifications
def submit_response(event: dict, context, response_status: str, error_message: str):
response_body = json.dumps(
{
"Status": response_status,
"Reason": f"{error_message}See the details in CloudWatch Log Stream: {context.log_stream_name}",
"PhysicalResourceId": event.get("PhysicalResourceId") or event["LogicalResourceId"],
"StackId": event["StackId"],
"RequestId": event["RequestId"],
"LogicalResourceId": event["LogicalResourceId"],
"NoEcho": False,
}
).encode("utf-8")
headers = {"content-type": "", "content-length": str(len(response_body))}
try:
req = urllib.request.Request(url=event["ResponseURL"], headers=headers, data=response_body, method="PUT")
with urllib.request.urlopen(req) as response:
print(response.read().decode("utf-8"))
print("Status code: " + response.reason)
except Exception as e:
print("send(..) failed executing request.urlopen(..): " + str(e))
def sort_filter_rules(json_obj):
# Check if the input is a dictionary
if not isinstance(json_obj, dict):
return json_obj
# Recursively sort the filter rules for nested dictionaries
for key, value in json_obj.items():
if isinstance(value, dict):
json_obj[key] = sort_filter_rules(value)
elif isinstance(value, list):
json_obj[key] = [sort_filter_rules(item) for item in value]
# Sort the FilterRules list if it exists
if "Filter" in json_obj and "Key" in json_obj["Filter"] and "FilterRules" in json_obj["Filter"]["Key"]:
filter_rules = json_obj["Filter"]["Key"]["FilterRules"]
sorted_filter_rules = sorted(filter_rules, key=lambda x: x["Name"])
json_obj["Filter"]["Key"]["FilterRules"] = sorted_filter_rules
return json_obj