diff --git a/src/lambda_function/s3_event_config/app.py b/src/lambda_function/s3_event_config/app.py index 753f0300..f50f0567 100644 --- a/src/lambda_function/s3_event_config/app.py +++ b/src/lambda_function/s3_event_config/app.py @@ -12,6 +12,8 @@ import json import logging import typing +from collections import defaultdict +from enum import Enum import boto3 @@ -22,21 +24,20 @@ def lambda_handler(event, context): - s3 = boto3.client("s3") + s3_client = boto3.client("s3") logger.info(f"Received event: {json.dumps(event, indent=2)}") if event["RequestType"] == "Delete": logger.info(f'Request Type:{event["RequestType"]}') delete_notification( - s3, + s3_client=s3_client, bucket=os.environ["S3_SOURCE_BUCKET_NAME"], - destination_type=os.environ["S3_TO_GLUE_DESTINATION_TYPE"], - destination_arn=os.environ["S3_TO_GLUE_DESTINATION_ARN"], + bucket_key_prefix=os.environ["BUCKET_KEY_PREFIX"] ) logger.info("Sending response to custom resource after Delete") elif event["RequestType"] in ["Update", "Create"]: logger.info(f'Request Type: {event["RequestType"]}') add_notification( - s3, + s3_client=s3_client, destination_arn=os.environ["S3_TO_GLUE_DESTINATION_ARN"], destination_type=os.environ["S3_TO_GLUE_DESTINATION_TYPE"], bucket=os.environ["S3_SOURCE_BUCKET_NAME"], @@ -48,14 +49,61 @@ def lambda_handler(event, context): raise KeyError(err_msg) -class ExistingNotificationConfiguration(typing.NamedTuple): - existing_bucket_notification_configuration: dict - existing_notification_configurations_for_type: list +class NotificationConfigurationType(Enum): + """ + Supported types for an S3 event configuration. + """ + Topic = "Topic" + Queue = "Queue" + LambdaFunction = "LambdaFunction" + + +class NotificationConfiguration: + """ + An abstraction of S3 event configurations. + """ + def __init__(self, notification_type: NotificationConfigurationType, value: dict): + self.type = notification_type.value + self.value = value + self.arn = self.get_arn() + + def get_arn(self): + """ + Assign the ARN of this notification configuration to the `arn` property. + """ + if self.type == "Topic": + arn = self.value["TopicArn"] + elif self.type == "Queue": + arn = self.value["QueueArn"] + elif self.type == "LambdaFunction": + arn = self.value["LambdaFunctionArn"] + else: + raise ValueError(f"{self.type} is not a recognized notification configuration type.") + return arn -def get_existing_bucket_notification_configuration_and_type( - s3_client: boto3.client, bucket: str, destination_type: str -) -> ExistingNotificationConfiguration: +class BucketNotificationConfigurations: + """ + A convenience class for working with a collection of `NotificationConfiguration`s. + """ + def __init__(self, notification_configurations: list[NotificationConfiguration]): + self.configs = notification_configurations + + def to_dict(self): + """ + A dict representation of this object which can be supplied to + `put_bucket_notification_configuration` + """ + return_dict = defaultdict(list) + for config in self.configs: + return_dict[f"{config.type}Configurations"].append(config.value) + return dict(return_dict) + + +def get_bucket_notification_configurations( + s3_client: boto3.client, + bucket: str, + ) -> BucketNotificationConfigurations: """ Gets the existing bucket notification configuration and the existing notification configurations for a specific destination type. @@ -66,77 +114,71 @@ def get_existing_bucket_notification_configuration_and_type( destination_type (str): String name of the destination type for the configuration Returns: - ExistingNotificationConfiguration: A bucket notifiction configuration, - and the notification configurations for a specific destination type + BucketNotificationConfigurations """ - existing_bucket_notification_configuration = ( - s3_client.get_bucket_notification_configuration(Bucket=bucket) - ) - - # Remove ResponseMetadata because we don't want to persist it - existing_bucket_notification_configuration.pop("ResponseMetadata", None) - - existing_notification_configurations_for_type = ( - existing_bucket_notification_configuration.get( - f"{destination_type}Configurations", [] - ) - ) - - # Initialize this with an empty list to have consistent handling if it's present - # or missing. - if not existing_notification_configurations_for_type: - existing_bucket_notification_configuration[ - f"{destination_type}Configurations" - ] = [] - - return ExistingNotificationConfiguration( - existing_bucket_notification_configuration=existing_bucket_notification_configuration, - existing_notification_configurations_for_type=existing_notification_configurations_for_type, - ) - - -class MatchingNotificationConfiguration(typing.NamedTuple): - index_of_matching_arn: typing.Union[int, None] - matching_notification_configuration: typing.Union[dict, None] + bucket_notification_configuration = \ + s3_client.get_bucket_notification_configuration(Bucket=bucket) + all_notification_configurations = [] + for configuration_type in NotificationConfigurationType: + configuration_type_name = f"{configuration_type.value}Configurations" + if configuration_type_name in bucket_notification_configuration: + notification_configurations = [ + NotificationConfiguration( + notification_type=configuration_type, + value=config + ) + for config + in bucket_notification_configuration[configuration_type_name] + ] + all_notification_configurations.extend(notification_configurations) + bucket_notification_configurations = BucketNotificationConfigurations(all_notification_configurations) + return bucket_notification_configurations -def get_matching_notification_configuration( - destination_type_arn: str, - existing_notification_configurations_for_type: list, - destination_arn: str, -) -> MatchingNotificationConfiguration: +def get_notification_configuration( + bucket_notification_configurations: BucketNotificationConfigurations, + bucket_key_prefix: typing.Union[str,None]=None, + bucket_key_suffix: typing.Union[str,None]=None, + ) -> typing.Union[NotificationConfiguration, None]: """ - Search through the list of existing notifications and find the one that has a key of - `destination_type_arn` and a value of `destination_arn`. + Filter the list of existing notifications based on the unique S3 key prefix and suffix. Arguments: - destination_type_arn (str): Key value for the destination type arn - existing_notification_configurations_for_type (list): The existing notification configs - destination_arn (str): Arn of the destination's s3 event config + bucket_notification_configuration (BucketNotificationConfigurations): The S3 bucket + notification configurations + bucket_key_prefix (str): Optional. The S3 key prefix included with the filter rules + to match upon. + bucket_key_suffix (str): Optional. The S3 key suffix. The S3 key suffix included with the filter rules + to match upon. Returns: - MatchingNotificationConfiguration: The index of the matching notification - configuration and the matching notification configuration - or None, None if no match is found + NotificationConfiguration or None if no match is found """ - for index, existing_notification_configuration_for_type in enumerate( - existing_notification_configurations_for_type - ): - if ( - destination_type_arn in existing_notification_configuration_for_type - and existing_notification_configuration_for_type[destination_type_arn] - == destination_arn - ): - return MatchingNotificationConfiguration( - index_of_matching_arn=index, - matching_notification_configuration=existing_notification_configuration_for_type, - ) - return MatchingNotificationConfiguration(None, None) + for notification_configuration in bucket_notification_configurations.configs: + filter_rules = notification_configuration.value["Filter"]["Key"]["FilterRules"] + rule_names = {rule["Name"] for rule in filter_rules} + common_prefix_path, common_suffix_path = (False, False) + if "prefix" not in rule_names and bucket_key_prefix is None: + common_prefix_path = True + if "suffix" not in rule_names and bucket_key_suffix is None: + common_suffix_path = True + for filter_rule in filter_rules: + if filter_rule["Name"] == "prefix" and bucket_key_prefix is not None: + common_prefix_path = bool( + os.path.commonpath([filter_rule["Value"], bucket_key_prefix]) + ) + elif filter_rule["Name"] == "suffix" and bucket_key_suffix is not None: + common_suffix_path = bool( + os.path.commonpath([filter_rule["Value"], bucket_key_suffix]) + ) + if common_prefix_path and common_suffix_path: + return notification_configuration + return None def create_formatted_message( bucket: str, destination_type: str, destination_arn: str -) -> str: + ) -> str: """Creates a formatted message for logging purposes. Arguments: @@ -151,42 +193,58 @@ def create_formatted_message( def notification_configuration_matches( - matching_notification_configuration: dict, new_notification_configuration: dict -) -> bool: - """Determines if the Events and Filter.key.FilterRules match. + config: NotificationConfiguration, + other_config: NotificationConfiguration + ) -> bool: + """Determines if two S3 event notification configurations are functionally equivalent. + + Two notifiction configurations are considered equivalent if: + 1. They have the same filter rules + 2. They have the same destination (ARN) + 3. They are triggered by the same events Arguments: - matching_notification_configuration (dict): The existing notification config - new_notification_configuration (dict): The new notification config + config (dict): The existing notification config + other_config (dict): The new notification config Returns: - bool: True if the Events and Filter.key.FilterRules match. + bool: Whether the Events, ARN, and filter rules match. """ - # Check if the Events match - if matching_notification_configuration.get( - "Events" - ) != new_notification_configuration.get("Events"): - return False - - # Check if the Filter.key.FilterRules match - matching_filter_rules = ( - matching_notification_configuration.get("Filter", {}) - .get("Key", {}) - .get("FilterRules", []) + arn_match = other_config.arn == config.arn + events_match = ( + set(other_config.value["Events"]) == + set(config.value["Events"]) ) - new_filter_rules = ( - new_notification_configuration.get("Filter", {}) - .get("Key", {}) - .get("FilterRules", []) + filter_rule_names_match = ( + { + filter_rule["Name"] + for filter_rule + in other_config.value["Filter"]["Key"]["FilterRules"] + } == + { + filter_rule["Name"] + for filter_rule + in config.value["Filter"]["Key"]["FilterRules"] + } ) - if len(matching_filter_rules) != len(new_filter_rules): - return False - for i in range(len(matching_filter_rules)): - if matching_filter_rules[i] != new_filter_rules[i]: - return False - - # All checks passed, the notification configurations match - return True + filter_rule_values_match = all( + [ + any( + [ + filter_rule["Value"] == other_filter_rule["Value"] + for filter_rule + in config.value["Filter"]["Key"]["FilterRules"] + if filter_rule["Name"] == other_filter_rule["Name"] + ] + ) + for other_filter_rule + in other_config.value["Filter"]["Key"]["FilterRules"] + ] + ) + configurations_match = ( + arn_match and events_match and filter_rule_names_match and filter_rule_values_match + ) + return configurations_match def add_notification( @@ -195,17 +253,15 @@ def add_notification( destination_arn: str, bucket: str, bucket_key_prefix: str, -) -> None: + ) -> None: """Adds the S3 notification configuration to an existing bucket. - Use cases: - 1) If a bucket has no `NotificationConfiguration` then create the config - 2) If a bucket has a `NotificationConfiguration` but no matching - "{destination_arn}" for the "{destination_type}" then add the config - 3) If a bucket has a `NotificationConfiguration` and a matching - "{destination_arn}" for the "{destination_type}": - 3a) If the config is the same then do nothing - 3b) If the config is different then overwrite the matching config + Notification configurations are identified by their unique prefix/suffix filter rules. + If the notification configuration already exists, and is functionally equivalent, + then no action is taken. If the notification configuration already exists, but is not + functionally equivalent (see function `notification_configuration_matches`), then a + RuntimeError is raised. In this case, the notifiction configuration must be deleted + before being added. Args: s3_client (boto3.client) : s3 client to use for s3 event config @@ -214,130 +270,119 @@ def add_notification( bucket (str): bucket name of the s3 bucket to add the config to bucket_key_prefix (str): bucket key prefix for where to look for s3 object notifications """ - update_required = False + ### Create new notification configuration destination_type_arn = f"{destination_type}Arn" - new_notification_configuration = { + new_notification_configuration_value = { destination_type_arn: destination_arn, "Events": ["s3:ObjectCreated:*"], "Filter": { "Key": { - "FilterRules": [{"Name": "prefix", "Value": f"{bucket_key_prefix}/"}] + "FilterRules": [{"Name": "prefix", "Value": os.path.join(bucket_key_prefix, "")}] } }, } - - ( - existing_bucket_notification_configuration, - existing_notification_configurations_for_type, - ) = get_existing_bucket_notification_configuration_and_type( - s3_client, bucket, destination_type + new_notification_configuration = NotificationConfiguration( + notification_type=NotificationConfigurationType(destination_type), + value=new_notification_configuration_value ) - - ( - index_of_matching_arn, - matching_notification_configuration, - ) = get_matching_notification_configuration( - destination_type_arn, - existing_notification_configurations_for_type, - destination_arn, + ### Get any matching notification configuration + bucket_notification_configurations = get_bucket_notification_configurations( + s3_client=s3_client, + bucket=bucket ) - - if ( - index_of_matching_arn is not None - and matching_notification_configuration is not None - ): - if not notification_configuration_matches( - matching_notification_configuration, new_notification_configuration - ): - existing_notification_configurations_for_type[ - index_of_matching_arn - ] = new_notification_configuration - update_required = True - else: - existing_notification_configurations_for_type.append( - new_notification_configuration - ) - update_required = True - - if update_required: - logger.info( - "Put request started to add a NotificationConfiguration for " - + create_formatted_message(bucket, destination_type, destination_arn) - ) - existing_bucket_notification_configuration[ - f"{destination_type}Configurations" - ] = existing_notification_configurations_for_type - s3_client.put_bucket_notification_configuration( - Bucket=bucket, - NotificationConfiguration=existing_bucket_notification_configuration, - SkipDestinationValidation=True, - ) - logger.info( - "Put request completed to add a NotificationConfiguration for " - + create_formatted_message(bucket, destination_type, destination_arn) + matching_notification_configuration = get_notification_configuration( + bucket_notification_configurations=bucket_notification_configurations, + bucket_key_prefix=bucket_key_prefix, + ) + if matching_notification_configuration is not None: + is_the_same_configuration = notification_configuration_matches( + config=matching_notification_configuration, + other_config=new_notification_configuration ) - else: - logger.info( - "Put not required as an existing NotificationConfiguration already exists for " - + create_formatted_message(bucket, destination_type, destination_arn) + if is_the_same_configuration: + logger.info( + "Put not required as an equivalent NotificationConfiguration already exists at " + + create_formatted_message( + bucket=bucket, + destination_type=destination_type, + destination_arn=matching_notification_configuration.arn, + ) + ) + return + raise RuntimeError( + "There already exists an event configuration on S3 bucket {bucket} for " + f"the key prefix {bucket_key_prefix} which differs from the event " + "configuration which we wish to add." ) + ### Store new notification configuration + logger.info( + "Put request started to add a NotificationConfiguration for " + + create_formatted_message(bucket, destination_type, destination_arn) + ) + bucket_notification_configurations.configs.append(new_notification_configuration) + s3_client.put_bucket_notification_configuration( + Bucket=bucket, + NotificationConfiguration=bucket_notification_configurations.to_dict(), + SkipDestinationValidation=True, + ) + logger.info( + "Put request completed to add a NotificationConfiguration for " + + create_formatted_message(bucket, destination_type, destination_arn) + ) def delete_notification( - s3_client: boto3.client, bucket: str, destination_type: str, destination_arn: str -) -> None: - """Deletes the S3 notification configuration from an existing bucket for a specific destination type. + s3_client: boto3.client, bucket: str, bucket_key_prefix: str + ) -> None: + """ + Deletes the S3 notification configuration from an existing bucket + based on its unique S3 key prefix/suffix filter rules. Args: - s3_client (boto3.client) : s3 client to use for s3 event config - bucket (str): bucket name of the s3 bucket to delete the config in - destination_type (str): String name of the destination type for the configuration - """ - destination_type_arn = f"{destination_type}Arn" + s3_client (boto3.client) : S3 client to use for S3 event config + bucket (str): Name of the S3 bucket to delete the config in + bucket_key_prefix (str): The S3 key prefix. - ( - existing_bucket_notification_configuration, - existing_notification_configurations_for_type, - ) = get_existing_bucket_notification_configuration_and_type( - s3_client, bucket, destination_type + Returns: None + """ + bucket_notification_configurations = get_bucket_notification_configurations( + s3_client=s3_client, + bucket=bucket, ) - - ( - index_of_matching_arn, - _matching_notification_configuration, - ) = get_matching_notification_configuration( - destination_type_arn, - existing_notification_configurations_for_type, - destination_arn, + ### Get any matching notification configuration + matching_notification_configuration = get_notification_configuration( + bucket_notification_configurations=bucket_notification_configurations, + bucket_key_prefix=bucket_key_prefix, ) - - if index_of_matching_arn is not None: - del existing_notification_configurations_for_type[index_of_matching_arn] - - if existing_notification_configurations_for_type: - existing_bucket_notification_configuration[ - f"{destination_type}Configurations" - ] = existing_notification_configurations_for_type - else: - del existing_bucket_notification_configuration[ - f"{destination_type}Configurations" - ] - + if matching_notification_configuration is None: logger.info( - "Delete request started to remove a NotificationConfiguration for " - + create_formatted_message(bucket, destination_type, destination_arn) - ) - s3_client.put_bucket_notification_configuration( - Bucket=bucket, - NotificationConfiguration=existing_bucket_notification_configuration, - SkipDestinationValidation=True, + "Delete not required as no NotificationConfiguration " + f"exists for S3 prefix {bucket_key_prefix}" ) - logger.info( - "Delete request completed to remove a NotificationConfiguration for " - + create_formatted_message(bucket, destination_type, destination_arn) - ) - else: - logger.info( - "Delete not required as no NotificationConfiguration exists for " - + create_formatted_message(bucket, destination_type, destination_arn) + return + bucket_notification_configurations.configs = [ + config for config + in bucket_notification_configurations.configs + if config.arn != matching_notification_configuration.arn + ] + ### Delete matching notification configuration + logger.info( + "Delete request started to remove a NotificationConfiguration for " + + create_formatted_message( + bucket=bucket, + destination_type=matching_notification_configuration.type, + destination_arn=matching_notification_configuration.arn ) + ) + s3_client.put_bucket_notification_configuration( + Bucket=bucket, + NotificationConfiguration=bucket_notification_configurations.to_dict(), + SkipDestinationValidation=True, + ) + logger.info( + "Delete request completed to remove a NotificationConfiguration for " + + create_formatted_message( + bucket=bucket, + destination_type=matching_notification_configuration.type, + destination_arn=matching_notification_configuration.arn) + ) diff --git a/tests/test_s3_event_config_lambda.py b/tests/test_s3_event_config_lambda.py index 7416f01d..bd8548c1 100644 --- a/tests/test_s3_event_config_lambda.py +++ b/tests/test_s3_event_config_lambda.py @@ -1,3 +1,4 @@ +import copy from unittest import mock import boto3 from moto import mock_s3, mock_lambda, mock_iam, mock_sqs, mock_sns @@ -47,470 +48,501 @@ def mock_sqs_queue(mock_aws_credentials): QueueUrl=queue_url["QueueUrl"], AttributeNames=["QueueArn"] ) +@pytest.fixture +def notification_configuration(): + return app.NotificationConfiguration( + notification_type=app.NotificationConfigurationType("Topic"), + value={ + "Events": ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"], + "TopicArn": "arn:aws:sns:bla", + "Filter": { + "Key": { + "FilterRules": [ + {"Name": "prefix", "Value": "documents/"} + ] + } + } + } + ) -@mock_s3 -def test_add_notification_adds_expected_settings_for_lambda(s3, mock_lambda_function): - s3.create_bucket(Bucket="some_bucket") - with mock.patch.object( - s3, - "get_bucket_notification_configuration", - return_value={}, - ): - app.add_notification( - s3, - "LambdaFunction", - mock_lambda_function["Configuration"]["FunctionArn"], - "some_bucket", - "test_folder", - ) - get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") - assert ( - get_config["LambdaFunctionConfigurations"][0]["LambdaFunctionArn"] - == mock_lambda_function["Configuration"]["FunctionArn"] +@pytest.fixture +def bucket_notification_configurations(notification_configuration): + ### Topic Configuration + topic_configuration = copy.deepcopy(notification_configuration) + ### SQS Configuration + queue_configuration_value = copy.deepcopy(notification_configuration.value) + del queue_configuration_value["TopicArn"] + queue_configuration_value["QueueArn"] = "arn:aws:sqs:bla" + queue_configuration_value["Filter"]["Key"]["FilterRules"][0] = \ + {"Name": "suffix", "Value": "jpeg"} + queue_configuration = app.NotificationConfiguration( + notification_type=app.NotificationConfigurationType("Queue"), + value=queue_configuration_value ) - assert get_config["LambdaFunctionConfigurations"][0]["Events"] == [ - "s3:ObjectCreated:*" - ] - assert get_config["LambdaFunctionConfigurations"][0]["Filter"] == { - "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} - } - -@mock_s3 -def test_add_notification_adds_expected_settings_for_sns(s3, mock_sns_topic_arn): - s3.create_bucket(Bucket="some_bucket") - with mock.patch.object( - s3, - "get_bucket_notification_configuration", - return_value={}, - ): - app.add_notification( + ### Lambda Configuration + lambda_configuration_value = copy.deepcopy(notification_configuration.value) + del lambda_configuration_value["TopicArn"] + lambda_configuration_value["LambdaFunctionArn"] = "arn:aws:lambda:bla" + lambda_configuration_value["Filter"]["Key"]["FilterRules"][0] = \ + {"Name": "suffix", "Value": "jpeg"} + lambda_configuration_value["Filter"]["Key"]["FilterRules"].append( + {"Name": "prefix", "Value": "pictures/"} + ) + lambda_configuration = app.NotificationConfiguration( + notification_type=app.NotificationConfigurationType("LambdaFunction"), + value=lambda_configuration_value + ) + bucket_notification_configurations = app.BucketNotificationConfigurations( + [topic_configuration, queue_configuration, lambda_configuration] + ) + return bucket_notification_configurations + +class TestBucketNotificationConfigurations: + def test_init(self, notification_configuration): + configs = [notification_configuration, notification_configuration] + bucket_notification_configurations = app.BucketNotificationConfigurations(configs) + assert bucket_notification_configurations.configs == configs + + def test_to_dict(self, notification_configuration): + other_notification_configuration = copy.deepcopy(notification_configuration) + other_notification_configuration.type = "LambdaFunction" + configs = [notification_configuration, other_notification_configuration] + bucket_notification_configurations = app.BucketNotificationConfigurations(configs) + bnc_as_dict = bucket_notification_configurations.to_dict() + assert "TopicConfigurations" in bnc_as_dict + assert "LambdaFunctionConfigurations" in bnc_as_dict + +class TestGetBucketNotificationConfigurations: + @mock_s3 + def test_get_configurations(self, s3, notification_configuration): + s3.create_bucket(Bucket="some_bucket") + topic_configuration = copy.deepcopy(notification_configuration.value) + queue_configuration = copy.deepcopy(notification_configuration.value) + del queue_configuration["TopicArn"] + queue_configuration["QueueArn"] = "arn:aws:sqs:bla" + lambda_configuration = copy.deepcopy(notification_configuration.value) + del lambda_configuration["TopicArn"] + lambda_configuration["LambdaFunctionArn"] = "arn:aws:lambda:bla" + event_bridge_configuration = copy.deepcopy(notification_configuration.value) + del event_bridge_configuration["TopicArn"] + event_bridge_configuration["EventBridgeArn"] = "arn:aws:eventbridge:bla" + with mock.patch.object( s3, - "Topic", - mock_sns_topic_arn, - "some_bucket", - "test_folder", + "get_bucket_notification_configuration", + return_value={ + "QueueConfigurations": [queue_configuration], + "TopicConfigurations": [topic_configuration], + "LambdaFunctionConfigurations": [lambda_configuration], + "EventBridgeConfiguration": event_bridge_configuration + }, + ): + bucket_notification_configurations = app.get_bucket_notification_configurations( + s3_client=s3, + bucket="some_bucket" + ) + # We should ignore 'EventBridgeConfiguration' + assert len(bucket_notification_configurations.configs) == 3 + +class TestGetNotificationConfiguration: + def test_no_prefix_matching_suffix(self, bucket_notification_configurations): + # No prefix provided, suffix provided + matching_notification_configuration = app.get_notification_configuration( + bucket_notification_configurations, bucket_key_suffix="jpeg" ) - get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") - assert get_config["TopicConfigurations"][0]["TopicArn"] == mock_sns_topic_arn - assert get_config["TopicConfigurations"][0]["Events"] == [ - "s3:ObjectCreated:*" - ] - assert get_config["TopicConfigurations"][0]["Filter"] == { - "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} - } - - -@mock_s3 -def test_add_notification_adds_expected_settings_for_sqs(s3, mock_sqs_queue): - s3.create_bucket(Bucket="some_bucket") - with mock.patch.object( - s3, - "get_bucket_notification_configuration", - return_value={}, - ): - app.add_notification( - s3, - "Queue", - mock_sqs_queue["Attributes"]["QueueArn"], - "some_bucket", - "test_folder", + assert matching_notification_configuration is not None + assert matching_notification_configuration.type == "Queue" + + def test_no_suffix_matching_prefix(self, bucket_notification_configurations): + matching_notification_configuration = app.get_notification_configuration( + bucket_notification_configurations, bucket_key_prefix="documents" ) - get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") - assert ( - get_config["QueueConfigurations"][0]["QueueArn"] - == mock_sqs_queue["Attributes"]["QueueArn"] - ) - assert get_config["QueueConfigurations"][0]["Events"] == ["s3:ObjectCreated:*"] - assert get_config["QueueConfigurations"][0]["Filter"] == { - "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} - } - - -@mock_s3 -def test_add_notification_replace_if_filter_rule_different(s3, mock_sqs_queue): - # GIVEN an S3 bucket - s3.create_bucket(Bucket="some_bucket") - - # AND the bucket has an existing `QueueConfigurations` that is different to what we want - # but matches the ARN. - with mock.patch.object( - s3, - "get_bucket_notification_configuration", - return_value={ - f"QueueConfigurations": [ - { - "QueueArn": mock_sqs_queue["Attributes"]["QueueArn"], - "Events": ["s3:ObjectCreated:*"], - "Filter": { - "Key": { - "FilterRules": [ - {"Name": "prefix", "Value": f"some_other_folder/"} - ] - } - }, - } - ] - }, - ): - app.add_notification( - s3, - "Queue", - mock_sqs_queue["Attributes"]["QueueArn"], - "some_bucket", - "test_folder", + assert matching_notification_configuration is not None + assert matching_notification_configuration.type == "Topic" + + def test_matching_prefix_not_matching_suffix(self, bucket_notification_configurations): + matching_notification_configuration = app.get_notification_configuration( + bucket_notification_configurations=bucket_notification_configurations, + bucket_key_prefix="pictures", + bucket_key_suffix="png" ) - get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") - assert ( - get_config["QueueConfigurations"][0]["QueueArn"] - == mock_sqs_queue["Attributes"]["QueueArn"] - ) - assert get_config["QueueConfigurations"][0]["Events"] == ["s3:ObjectCreated:*"] - assert get_config["QueueConfigurations"][0]["Filter"] == { - "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} - } - -@mock_s3 -def test_add_notification_replace_if_events_different(s3, mock_sqs_queue): - # GIVEN an S3 bucket - s3.create_bucket(Bucket="some_bucket") - - # AND the bucket has an existing `QueueConfigurations` that is different to what we want - # but matches the ARN. - with mock.patch.object( - s3, - "get_bucket_notification_configuration", - return_value={ - f"QueueConfigurations": [ - { - "QueueArn": mock_sqs_queue["Attributes"]["QueueArn"], - "Events": ["s3:ASDF:*"], - "Filter": { - "Key": { - "FilterRules": [ - {"Name": "prefix", "Value": f"test_folder/"} - ] - } - }, - } - ] - }, - ): - app.add_notification( - s3, - "Queue", - mock_sqs_queue["Attributes"]["QueueArn"], - "some_bucket", - "test_folder", + assert matching_notification_configuration is None + + def test_matching_suffix_not_matching_prefix(self, bucket_notification_configurations): + matching_notification_configuration = app.get_notification_configuration( + bucket_notification_configurations=bucket_notification_configurations, + bucket_key_prefix="documents", + bucket_key_suffix="jpeg" ) - get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") - assert ( - get_config["QueueConfigurations"][0]["QueueArn"] - == mock_sqs_queue["Attributes"]["QueueArn"] - ) - assert get_config["QueueConfigurations"][0]["Events"] == ["s3:ObjectCreated:*"] - assert get_config["QueueConfigurations"][0]["Filter"] == { - "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} - } - -@mock_s3 -def test_add_notification_does_nothing_if_notification_already_exists( - s3, mock_lambda_function -): - # GIVEN an S3 bucket - s3.create_bucket(Bucket="some_bucket") - - # AND the bucket has an existing `LambdaFunctionConfigurations` that matches the one we will add - with mock.patch.object( - s3, - "get_bucket_notification_configuration", - return_value={ - f"LambdaFunctionConfigurations": [ - { - f"LambdaFunctionArn": mock_lambda_function["Configuration"][ - "FunctionArn" - ], - "Events": ["s3:ObjectCreated:*"], - "Filter": { - "Key": { - "FilterRules": [ - {"Name": "prefix", "Value": f"test_folder/"} - ] - } - }, - } - ] - }, - ), mock.patch.object(s3, "put_bucket_notification_configuration") as put_config: - # WHEN I add the existing matching `LambdaFunction` configuration - app.add_notification( - s3, - "LambdaFunction", - mock_lambda_function["Configuration"]["FunctionArn"], - "some_bucket", - "test_folder", + assert matching_notification_configuration is None + + def test_no_match(self, bucket_notification_configurations): + matching_notification_configuration = app.get_notification_configuration( + bucket_notification_configurations=bucket_notification_configurations, + bucket_key_prefix="downloads", ) + assert matching_notification_configuration is None - # AND I get the notification configuration - get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") - - # THEN I expect nothing to have been saved in our mocked environment - assert not put_config.called - - -@mock_s3 -def test_add_notification_does_nothing_if_notification_already_exists_even_in_different_dict_order( - s3, mock_lambda_function -): - # GIVEN an S3 bucket - s3.create_bucket(Bucket="some_bucket") - - # AND the bucket has an existing `LambdaFunctionConfigurations` that matches - # content of the one we will add - But differs in order - with mock.patch.object( - s3, - "get_bucket_notification_configuration", - return_value={ - f"LambdaFunctionConfigurations": [ - { - "Filter": { - "Key": { - "FilterRules": [ - {"Name": "prefix", "Value": f"test_folder/"} - ] - } - }, - "Events": ["s3:ObjectCreated:*"], - f"LambdaFunctionArn": mock_lambda_function["Configuration"][ - "FunctionArn" - ], - } - ] - }, - ), mock.patch.object(s3, "put_bucket_notification_configuration") as put_config: - # WHEN I add the existing matching `LambdaFunction` configuration - app.add_notification( - s3, - "LambdaFunction", - mock_lambda_function["Configuration"]["FunctionArn"], - "some_bucket", - "test_folder", +class TestNotificationConfigurationMatches: + def test_all_true(self, notification_configuration): + assert app.notification_configuration_matches( + config=notification_configuration, + other_config=notification_configuration ) - # AND I get the notification configuration - get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") - - # THEN I expect nothing to have been saved in our mocked environment - assert not put_config.called - - -@mock_s3 -def test_add_notification_adds_config_if_requested_notification_does_not_exist( - s3, mock_lambda_function, mock_sqs_queue -): - # GIVEN an S3 bucket - s3.create_bucket(Bucket="some_bucket") - - # AND the bucket has an existing `QueueConfigurations` - with mock.patch.object( - s3, - "get_bucket_notification_configuration", - return_value={ - f"QueueConfigurations": [ - { - "Id": "123", - "QueueArn": mock_sqs_queue["Attributes"]["QueueArn"], - "Events": ["s3:ObjectCreated:*"], - "Filter": { - "Key": { - "FilterRules": [ - {"Name": "prefix", "Value": f"test_folder/"} - ] - } - }, - } - ] - }, - ): - # WHEN I add a new `LambdaFunction` configuration - app.add_notification( - s3, - "LambdaFunction", - mock_lambda_function["Configuration"]["FunctionArn"], - "some_bucket", - "test_folder", + def test_arn_false(self, notification_configuration): + other_notification_configuration = copy.deepcopy(notification_configuration) + other_notification_configuration.arn = "arn:aws:sns:hubba" + assert not app.notification_configuration_matches( + config=notification_configuration, + other_config=other_notification_configuration ) - # AND I get the notification configuration - get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") + def test_events_false(self, notification_configuration): + other_notification_configuration = copy.deepcopy(notification_configuration) + other_notification_configuration.value["Events"] = ["s3:ObjectCreated*"] + assert not app.notification_configuration_matches( + config=notification_configuration, + other_config=other_notification_configuration + ) - # THEN I expect to see a new `LambdaFunction` configuration - assert ( - get_config["LambdaFunctionConfigurations"][0]["LambdaFunctionArn"] - == mock_lambda_function["Configuration"]["FunctionArn"] - ) - assert get_config["LambdaFunctionConfigurations"][0]["Events"] == [ - "s3:ObjectCreated:*" - ] - assert get_config["LambdaFunctionConfigurations"][0]["Filter"] == { - "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} - } - - # AND I expect the `QueueConfigurations` to be unchanged - assert ( - get_config["QueueConfigurations"][0]["QueueArn"] - == mock_sqs_queue["Attributes"]["QueueArn"] - ) - assert get_config["QueueConfigurations"][0]["Events"] == ["s3:ObjectCreated:*"] - assert get_config["QueueConfigurations"][0]["Filter"] == { - "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} - } - - -@mock_s3 -def test_add_notification_adds_config_if_existing_config_does_not_match( - s3, mock_lambda_function -): - # GIVEN an S3 bucket - s3.create_bucket(Bucket="some_bucket") - - # AND the bucket has an existing `LambdaFunctionConfigurations` that does not match the one we are adding - with mock.patch.object( - s3, - "get_bucket_notification_configuration", - return_value={ - f"LambdaFunctionConfigurations": [ - { - f"LambdaFunctionArn": mock_lambda_function["Configuration"][ - "FunctionArn" - ] - + "asdf", - "Events": ["s3:ObjectCreated:*"], - "Filter": { - "Key": { - "FilterRules": [ - {"Name": "prefix", "Value": f"test_folder/"} - ] - } - }, - } - ] - }, - ): - # WHEN I add the `LambdaFunction` configuration - app.add_notification( + def test_filter_rule_names_false(self, notification_configuration): + other_notification_configuration = copy.deepcopy(notification_configuration) + other_notification_configuration.value["Filter"]["Key"]["FilterRules"] = [ + {"Name": "prefix", "Value": "documents/"}, + {"Name": "suffix", "Value": "jpeg"}, + ] + assert not app.notification_configuration_matches( + config=notification_configuration, + other_config=other_notification_configuration + ) + + def test_filter_rule_values_false(self, notification_configuration): + other_notification_configuration = copy.deepcopy(notification_configuration) + other_notification_configuration.value["Filter"]["Key"]["FilterRules"] = [ + {"Name": "prefix", "Value": "pictures/"} + ] + assert not app.notification_configuration_matches( + config=notification_configuration, + other_config=other_notification_configuration + ) + +class TestAddNotification: + @mock_s3 + def test_adds_expected_settings_for_lambda(self, s3, mock_lambda_function): + s3.create_bucket(Bucket="some_bucket") + with mock.patch.object( + s3, + "get_bucket_notification_configuration", + return_value={}, + ): + app.add_notification( + s3, + "LambdaFunction", + mock_lambda_function["Configuration"]["FunctionArn"], + "some_bucket", + "test_folder", + ) + get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") + assert ( + get_config["LambdaFunctionConfigurations"][0]["LambdaFunctionArn"] + == mock_lambda_function["Configuration"]["FunctionArn"] + ) + assert get_config["LambdaFunctionConfigurations"][0]["Events"] == [ + "s3:ObjectCreated:*" + ] + assert get_config["LambdaFunctionConfigurations"][0]["Filter"] == { + "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} + } + + @mock_s3 + def test_adds_expected_settings_for_sns(self, s3, mock_sns_topic_arn): + s3.create_bucket(Bucket="some_bucket") + with mock.patch.object( + s3, + "get_bucket_notification_configuration", + return_value={}, + ): + app.add_notification( + s3, + "Topic", + mock_sns_topic_arn, + "some_bucket", + "test_folder", + ) + get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") + assert get_config["TopicConfigurations"][0]["TopicArn"] == mock_sns_topic_arn + assert get_config["TopicConfigurations"][0]["Events"] == [ + "s3:ObjectCreated:*" + ] + assert get_config["TopicConfigurations"][0]["Filter"] == { + "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} + } + + + @mock_s3 + def test_adds_expected_settings_for_sqs(self, s3, mock_sqs_queue): + s3.create_bucket(Bucket="some_bucket") + with mock.patch.object( s3, - "LambdaFunction", - mock_lambda_function["Configuration"]["FunctionArn"], - "some_bucket", - "test_folder", + "get_bucket_notification_configuration", + return_value={}, + ): + app.add_notification( + s3, + "Queue", + mock_sqs_queue["Attributes"]["QueueArn"], + "some_bucket", + "test_folder", + ) + get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") + assert ( + get_config["QueueConfigurations"][0]["QueueArn"] + == mock_sqs_queue["Attributes"]["QueueArn"] ) + assert get_config["QueueConfigurations"][0]["Events"] == ["s3:ObjectCreated:*"] + assert get_config["QueueConfigurations"][0]["Filter"] == { + "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} + } - # AND I get the notification configuration - get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") - # THEN I expect to see the updated `LambdaFunction` configuration for the existing one - assert ( - get_config["LambdaFunctionConfigurations"][0]["LambdaFunctionArn"] - == mock_lambda_function["Configuration"]["FunctionArn"] + "asdf" - ) - assert get_config["LambdaFunctionConfigurations"][0]["Events"] == [ - "s3:ObjectCreated:*" - ] - - assert get_config["LambdaFunctionConfigurations"][0]["Filter"] == { - "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} - } - - # AND I expect to see the updated `LambdaFunction` configuration for the new one - assert ( - get_config["LambdaFunctionConfigurations"][1]["LambdaFunctionArn"] - == mock_lambda_function["Configuration"]["FunctionArn"] - ) - assert get_config["LambdaFunctionConfigurations"][1]["Events"] == [ - "s3:ObjectCreated:*" - ] - assert get_config["LambdaFunctionConfigurations"][1]["Filter"] == { - "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} - } - - -@mock_s3 -def test_delete_notification_is_successful_for_configuration_that_exists( - s3, mock_lambda_function -): - # GIVEN an S3 bucket - s3.create_bucket(Bucket="some_bucket") - - # AND a configuration exists - with mock.patch.object( - s3, - "get_bucket_notification_configuration", - return_value={ - f"LambdaFunctionConfigurations": [ - { - f"LambdaFunctionArn": mock_lambda_function["Configuration"][ - "FunctionArn" - ], - "Events": ["s3:ObjectCreated:*"], - "Filter": { - "Key": { - "FilterRules": [ - {"Name": "prefix", "Value": f"test_folder/"} - ] - } - }, - } - ] - }, + @mock_s3 + def test_raise_exception_if_config_exists_for_prefix( + self, s3, notification_configuration ): - # WHEN I delete the notification - app.delete_notification( - s3_client=s3, - bucket="some_bucket", - destination_type="LambdaFunction", - destination_arn=mock_lambda_function["Configuration"]["FunctionArn"], - ) - # THEN the notification should be deleted - get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") - assert "LambdaFunctionConfigurations" not in get_config - - -@mock_s3 -def test_delete_notification_does_nothing_when_deleting_configuration_that_does_not_exist( - s3, mock_lambda_function -): - # GIVEN an S3 bucket - s3.create_bucket(Bucket="some_bucket") - - # AND a configuration exists for a different notification type - with mock.patch.object( - s3, - "get_bucket_notification_configuration", - return_value={ - f"LambdaFunctionConfigurations": [ - { - f"LambdaFunctionArn": mock_lambda_function["Configuration"][ - "FunctionArn" - ], - "Events": ["s3:ObjectCreated:*"], - "Filter": { - "Key": { - "FilterRules": [ - {"Name": "prefix", "Value": f"test_folder/"} - ] - } - }, + # GIVEN an S3 bucket + s3.create_bucket(Bucket="some_bucket") + + # AND the bucket has an existing notification configuration on the same S3 key prefix + with mock.patch.object( + s3, + "get_bucket_notification_configuration", + return_value={ + f"TopicConfigurations": [ + notification_configuration.value + ] + }, + ): + with pytest.raises(RuntimeError): + app.add_notification( + s3, + "Queue", + "arn:aws:sqs:bla", + "some_bucket", + notification_configuration.value["Filter"]["Key"]["FilterRules"][0]["Value"], + ) + + @mock_s3 + def test_does_nothing_if_notification_already_exists(self, s3): + # GIVEN an S3 bucket + s3.create_bucket(Bucket="some_bucket") + + # AND the bucket has an existing notifiction configuration that matches the one we will add + notification_configuration = { + "TopicArn": "arn:aws:sns:bla", + "Events": ["s3:ObjectCreated:*"], + "Filter": { + "Key": { + "FilterRules": [{"Name": "prefix", "Value": f"documents/"}] } - ] - }, - # AND a mock for the put_bucket_notification_configuration method - ), mock.patch.object(s3, "put_bucket_notification_configuration") as put_config: - # WHEN I delete a notification that does not exist - app.delete_notification( - s3_client=s3, - bucket="some_bucket", - destination_type="doesNotExist", - destination_arn=mock_lambda_function["Configuration"]["FunctionArn"], + }, + } + with mock.patch.object( + s3, + "get_bucket_notification_configuration", + return_value={ + f"TopicConfigurations": [notification_configuration] + }, + ), mock.patch.object(s3, "put_bucket_notification_configuration") as put_config: + # WHEN I add the existing matching `LambdaFunction` configuration + app.add_notification( + s3_client=s3, + destination_type="Topic", + destination_arn=notification_configuration["TopicArn"], + bucket="some_bucket", + bucket_key_prefix=notification_configuration["Filter"]["Key"]["FilterRules"][0]["Value"], + ) + + # AND I get the notification configuration + get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") + + # THEN I expect nothing to have been saved in our mocked environment + assert not put_config.called + + + @mock_s3 + def test_does_nothing_if_notification_already_exists_even_in_different_dict_order( + self, s3, mock_lambda_function + ): + # GIVEN an S3 bucket + s3.create_bucket(Bucket="some_bucket") + + # AND the bucket has an existing `LambdaFunctionConfigurations` that matches + # content of the one we will add - But differs in order + with mock.patch.object( + s3, + "get_bucket_notification_configuration", + return_value={ + f"LambdaFunctionConfigurations": [ + { + "Filter": { + "Key": { + "FilterRules": [ + {"Name": "prefix", "Value": f"test_folder/"} + ] + } + }, + "Events": ["s3:ObjectCreated:*"], + f"LambdaFunctionArn": mock_lambda_function["Configuration"][ + "FunctionArn" + ], + } + ] + }, + ), mock.patch.object(s3, "put_bucket_notification_configuration") as put_config: + # WHEN I add the existing matching `LambdaFunction` configuration + app.add_notification( + s3, + "LambdaFunction", + mock_lambda_function["Configuration"]["FunctionArn"], + "some_bucket", + "test_folder", + ) + + # AND I get the notification configuration + get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") + + # THEN I expect nothing to have been saved in our mocked environment + assert not put_config.called + + + @mock_s3 + def test_adds_config_if_requested_notification_does_not_exist( + self, s3, mock_lambda_function, mock_sqs_queue + ): + # GIVEN an S3 bucket + s3.create_bucket(Bucket="some_bucket") + + # AND the bucket has an existing `QueueConfigurations` + with mock.patch.object( + s3, + "get_bucket_notification_configuration", + return_value={ + f"QueueConfigurations": [ + { + "Id": "123", + "QueueArn": mock_sqs_queue["Attributes"]["QueueArn"], + "Events": ["s3:ObjectCreated:*"], + "Filter": { + "Key": { + "FilterRules": [ + {"Name": "prefix", "Value": f"test_folder/"} + ] + } + }, + } + ] + }, + ): + # WHEN I add a new `LambdaFunction` configuration + app.add_notification( + s3, + "LambdaFunction", + mock_lambda_function["Configuration"]["FunctionArn"], + "some_bucket", + "another_test_folder", + ) + + # AND I get the notification configuration + get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") + + # THEN I expect to see a new `LambdaFunction` configuration + assert ( + get_config["LambdaFunctionConfigurations"][0]["LambdaFunctionArn"] + == mock_lambda_function["Configuration"]["FunctionArn"] ) - # THEN nothing should have been called - assert not put_config.called + assert get_config["LambdaFunctionConfigurations"][0]["Events"] == [ + "s3:ObjectCreated:*" + ] + assert get_config["LambdaFunctionConfigurations"][0]["Filter"] == { + "Key": {"FilterRules": [{"Name": "prefix", "Value": "another_test_folder/"}]} + } + +class TestDeleteNotification: + @mock_s3 + def test_is_successful_for_configuration_that_exists( + self, s3, mock_lambda_function + ): + # GIVEN an S3 bucket + s3.create_bucket(Bucket="some_bucket") + + # AND a configuration exists + with mock.patch.object( + s3, + "get_bucket_notification_configuration", + return_value={ + f"LambdaFunctionConfigurations": [ + { + f"LambdaFunctionArn": mock_lambda_function["Configuration"][ + "FunctionArn" + ], + "Events": ["s3:ObjectCreated:*"], + "Filter": { + "Key": { + "FilterRules": [ + {"Name": "prefix", "Value": f"test_folder/"} + ] + } + }, + } + ] + }, + ): + # WHEN I delete the notification + app.delete_notification( + s3_client=s3, + bucket="some_bucket", + bucket_key_prefix="test_folder" + ) + # THEN the notification should be deleted + get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") + assert "LambdaFunctionConfigurations" not in get_config + + + @mock_s3 + def test_does_nothing_when_deleting_configuration_that_does_not_exist( + self, s3, mock_lambda_function + ): + # GIVEN an S3 bucket + s3.create_bucket(Bucket="some_bucket") + + # AND a configuration exists for a different notification type + with mock.patch.object( + s3, + "get_bucket_notification_configuration", + return_value={ + f"LambdaFunctionConfigurations": [ + { + f"LambdaFunctionArn": mock_lambda_function["Configuration"][ + "FunctionArn" + ], + "Events": ["s3:ObjectCreated:*"], + "Filter": { + "Key": { + "FilterRules": [ + {"Name": "prefix", "Value": f"test_folder/"} + ] + } + }, + } + ] + }, + # AND a mock for the put_bucket_notification_configuration method + ), mock.patch.object(s3, "put_bucket_notification_configuration") as put_config: + # WHEN I delete a notification that does not exist + app.delete_notification( + s3_client=s3, + bucket="some_bucket", + bucket_key_prefix="another_test_folder" + ) + # THEN nothing should have been called + assert not put_config.called