import boto3 # type: ignore import json import logging import urllib.request s3 = boto3.client("s3") EVENTBRIDGE_CONFIGURATION = 'EventBridgeConfiguration' CONFIGURATION_TYPES = ["TopicConfigurations", "QueueConfigurations", "LambdaFunctionConfigurations"] def handler(event: dict, context): response_status = "SUCCESS" error_message = "" try: props = event["ResourceProperties"] notification_configuration = props["NotificationConfiguration"] managed = props.get('Managed', 'true').lower() == 'true' skipDestinationValidation = props.get('SkipDestinationValidation', 'false').lower() == 'true' stack_id = event['StackId'] old = event.get("OldResourceProperties", {}).get("NotificationConfiguration", {}) if managed: config = handle_managed(event["RequestType"], notification_configuration) else: config = handle_unmanaged(props["BucketName"], stack_id, event["RequestType"], notification_configuration, old) s3.put_bucket_notification_configuration(Bucket=props["BucketName"], NotificationConfiguration=config, SkipDestinationValidation=skipDestinationValidation) except Exception as e: logging.exception("Failed to put bucket notification configuration") response_status = "FAILED" error_message = f"Error: {str(e)}. " finally: submit_response(event, context, response_status, error_message) def handle_managed(request_type, notification_configuration): if request_type == 'Delete': return {} return notification_configuration def handle_unmanaged(bucket, stack_id, request_type, notification_configuration, old): def get_id(n): n['Id'] = '' sorted_notifications = sort_filter_rules(n) strToHash=json.dumps(sorted_notifications, sort_keys=True).replace('"Name": "prefix"', '"Name": "Prefix"').replace('"Name": "suffix"', '"Name": "Suffix"') return f"{stack_id}-{hash(strToHash)}" def with_id(n): n['Id'] = get_id(n) return n # find external notifications external_notifications = {} existing_notifications = s3.get_bucket_notification_configuration(Bucket=bucket) for t in CONFIGURATION_TYPES: if request_type == 'Update': old_incoming_ids = [get_id(n) for n in old.get(t, [])] # if the notification was created by us, we know what id to expect so we can filter by it. external_notifications[t] = [n for n in existing_notifications.get(t, []) if not get_id(n) in old_incoming_ids] elif request_type == 'Delete': # For 'Delete' request, old parameter is an empty dict so we cannot use this to determine which are external # notifications. Fall back to rely on the stack naming logic. external_notifications[t] = [n for n in existing_notifications.get(t, []) if not n['Id'].startswith(f"{stack_id}-")] elif request_type == 'Create': # if this is a create event then all existing notifications are external external_notifications[t] = [n for n in existing_notifications.get(t, [])] # always treat EventBridge configuration as an external config if it already exists # as there is no way to determine whether it's managed by us or not if EVENTBRIDGE_CONFIGURATION in existing_notifications: external_notifications[EVENTBRIDGE_CONFIGURATION] = existing_notifications[EVENTBRIDGE_CONFIGURATION] # if delete, that's all we need if request_type == 'Delete': return external_notifications # otherwise, merge external with incoming config and augment with id notifications = {} for t in CONFIGURATION_TYPES: external = external_notifications.get(t, []) incoming = [with_id(n) for n in notification_configuration.get(t, [])] notifications[t] = external + incoming # EventBridge configuration is a special case because it's just an empty object if it exists if EVENTBRIDGE_CONFIGURATION in notification_configuration: notifications[EVENTBRIDGE_CONFIGURATION] = notification_configuration[EVENTBRIDGE_CONFIGURATION] elif EVENTBRIDGE_CONFIGURATION in external_notifications: notifications[EVENTBRIDGE_CONFIGURATION] = external_notifications[EVENTBRIDGE_CONFIGURATION] return notifications def submit_response(event: dict, context, response_status: str, error_message: str): response_body = json.dumps( { "Status": response_status, "Reason": f"{error_message}See the details in CloudWatch Log Stream: {context.log_stream_name}", "PhysicalResourceId": event.get("PhysicalResourceId") or event["LogicalResourceId"], "StackId": event["StackId"], "RequestId": event["RequestId"], "LogicalResourceId": event["LogicalResourceId"], "NoEcho": False, } ).encode("utf-8") headers = {"content-type": "", "content-length": str(len(response_body))} try: req = urllib.request.Request(url=event["ResponseURL"], headers=headers, data=response_body, method="PUT") with urllib.request.urlopen(req) as response: print(response.read().decode("utf-8")) print("Status code: " + response.reason) except Exception as e: print("send(..) failed executing request.urlopen(..): " + str(e)) def sort_filter_rules(json_obj): # Check if the input is a dictionary if not isinstance(json_obj, dict): return json_obj # Recursively sort the filter rules for nested dictionaries for key, value in json_obj.items(): if isinstance(value, dict): json_obj[key] = sort_filter_rules(value) elif isinstance(value, list): json_obj[key] = [sort_filter_rules(item) for item in value] # Sort the FilterRules list if it exists if "Filter" in json_obj and "Key" in json_obj["Filter"] and "FilterRules" in json_obj["Filter"]["Key"]: filter_rules = json_obj["Filter"]["Key"]["FilterRules"] sorted_filter_rules = sorted(filter_rules, key=lambda x: x["Name"]) json_obj["Filter"]["Key"]["FilterRules"] = sorted_filter_rules return json_obj