diff --git a/config/develop/namespaced/lambda-s3-event-config.yaml b/config/develop/namespaced/lambda-s3-event-config.yaml index fa9c7ea3..8d4c16f5 100644 --- a/config/develop/namespaced/lambda-s3-event-config.yaml +++ b/config/develop/namespaced/lambda-s3-event-config.yaml @@ -5,7 +5,7 @@ template: artifact_prefix: '{{ stack_group_config.namespace }}/src/lambda' dependencies: - develop/namespaced/s3-event-config-lambda-role.yaml - - develop/s3-input-bucket.yaml + - develop/s3-cloudformation-bucket.yaml - develop/namespaced/sns-topic.yaml stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig' stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/config/develop/namespaced/sns-topic.yaml b/config/develop/namespaced/sns-topic.yaml index f383451e..5c407963 100644 --- a/config/develop/namespaced/sns-topic.yaml +++ b/config/develop/namespaced/sns-topic.yaml @@ -1,7 +1,9 @@ template: path: sns-topic.yaml -stack_name: "{{ stack_group_config.namespace }}-sns-input-to-sqs" parameters: S3SourceBucketArn: !stack_output_external recover-dev-input-bucket::BucketArn +dependencies: + - develop/s3-input-bucket.yaml +stack_name: "{{ stack_group_config.namespace }}-sns-input-to-sqs" stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/config/develop/namespaced/sqs-input-to-intermediate.yaml b/config/develop/namespaced/sqs-input-to-intermediate.yaml index 3e195f29..ab2a5b12 100644 --- a/config/develop/namespaced/sqs-input-to-intermediate.yaml +++ b/config/develop/namespaced/sqs-input-to-intermediate.yaml @@ -1,12 +1,12 @@ template: path: sqs-queue.yaml parameters: - MessageRetentionPeriod: "86400" + MessageRetentionPeriod: "1209600" ReceiveMessageWaitTimeSeconds: "20" VisibilityTimeout: "120" - S3SourceBucketArn: !stack_output_external recover-dev-input-bucket::BucketArn + SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn" dependencies: - - develop/s3-input-bucket.yaml -stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-intermediate' + - develop/namespaced/sns-topic.yaml +stack_name: "{{ stack_group_config.namespace }}-sqs-input-to-intermediate" stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/config/develop/namespaced/sqs-input-to-raw.yaml b/config/develop/namespaced/sqs-input-to-raw.yaml index 0ef7adbf..65cbf6f8 100644 --- a/config/develop/namespaced/sqs-input-to-raw.yaml +++ b/config/develop/namespaced/sqs-input-to-raw.yaml @@ -4,9 +4,9 @@ parameters: MessageRetentionPeriod: "1209600" ReceiveMessageWaitTimeSeconds: "20" VisibilityTimeout: "120" - S3SourceBucketArn: !stack_output_external recover-dev-raw-bucket::BucketArn + SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn" dependencies: - - develop/s3-raw-bucket.yaml + - develop/namespaced/sns-topic.yaml stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-raw' stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/s3-event-config-lambda.yaml b/config/prod/namespaced/s3-event-config-lambda.yaml index 08741f14..8efe3328 100644 --- a/config/prod/namespaced/s3-event-config-lambda.yaml +++ b/config/prod/namespaced/s3-event-config-lambda.yaml @@ -5,13 +5,13 @@ template: artifact_prefix: '{{ stack_group_config.namespace }}/src/lambda' dependencies: - prod/namespaced/s3-event-config-lambda-role.yaml - - prod/namespaced/s3-to-glue-lambda.yaml - - prod/namespaced/sqs-input-to-intermediate.yaml + - prod/s3-cloudformation-bucket.yaml + - prod/namespaced/sns-topic.yaml stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig' stack_tags: {{ stack_group_config.default_stack_tags }} parameters: Namespace: {{ stack_group_config.namespace }} - S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-input-to-intermediate::PrimaryQueueArn" - S3ToGlueDestinationType: "Queue" + S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn" + S3ToGlueDestinationType: "Topic" S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-event-config-lambda-role::RoleArn" S3SourceBucketName: {{ stack_group_config.input_bucket_name }} diff --git a/config/prod/namespaced/sns-topic.yaml b/config/prod/namespaced/sns-topic.yaml new file mode 100644 index 00000000..8b5a5668 --- /dev/null +++ b/config/prod/namespaced/sns-topic.yaml @@ -0,0 +1,9 @@ +template: + path: sns-topic.yaml +parameters: + S3SourceBucketArn: !stack_output_external recover-input-bucket::BucketArn +dependencies: + - prod/s3-input-bucket.yaml +stack_name: "{{ stack_group_config.namespace }}-sns-input-to-sqs" +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/sqs-input-to-intermediate.yaml b/config/prod/namespaced/sqs-input-to-intermediate.yaml index ff87c2a8..ccbf268e 100644 --- a/config/prod/namespaced/sqs-input-to-intermediate.yaml +++ b/config/prod/namespaced/sqs-input-to-intermediate.yaml @@ -1,12 +1,12 @@ template: path: sqs-queue.yaml parameters: - MessageRetentionPeriod: '86400' - ReceiveMessageWaitTimeSeconds: '20' - VisibilityTimeout: '120' - S3SourceBucketArn: !stack_output_external recover-input-bucket::BucketArn + MessageRetentionPeriod: "1209600" + ReceiveMessageWaitTimeSeconds: "20" + VisibilityTimeout: "120" + SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn" dependencies: - - prod/s3-input-bucket.yaml -stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-intermediate' + - prod/namespaced/sns-topic.yaml +stack_name: "{{ stack_group_config.namespace }}-sqs-input-to-intermediate" stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/sqs-input-to-raw.yaml b/config/prod/namespaced/sqs-input-to-raw.yaml index 7fb1e94a..ad10199f 100644 --- a/config/prod/namespaced/sqs-input-to-raw.yaml +++ b/config/prod/namespaced/sqs-input-to-raw.yaml @@ -4,9 +4,9 @@ parameters: MessageRetentionPeriod: "1209600" ReceiveMessageWaitTimeSeconds: "20" VisibilityTimeout: "120" - S3SourceBucketArn: !stack_output_external recover-raw-bucket::BucketArn + SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn" dependencies: - - prod/s3-raw-bucket.yaml + - prod/namespaced/sns-topic.yaml stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-raw' stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/src/lambda_function/s3_to_glue/app.py b/src/lambda_function/s3_to_glue/app.py index c10381ac..51e0cfab 100644 --- a/src/lambda_function/s3_to_glue/app.py +++ b/src/lambda_function/s3_to_glue/app.py @@ -110,7 +110,7 @@ def get_object_info(s3_event) -> dict: } return object_info -def lambda_handler(event, context) -> dict: +def lambda_handler(event, context) -> None: """ This main lambda function will be triggered by a SQS event and will poll the SQS queue for all available S3 event messages. If the @@ -125,19 +125,20 @@ def lambda_handler(event, context) -> dict: Unused by this lambda function. """ s3_objects_info = [] - for record in event["Records"]: - s3_event_records = json.loads(record["body"]) - if is_s3_test_event(s3_event_records): + for sqs_record in event["Records"]: + sns_notification = json.loads(sqs_record["body"]) + sns_message = json.loads(sns_notification["Message"]) + if is_s3_test_event(sns_message): logger.info(f"Found AWS default s3:TestEvent. Skipping.") - else: - for s3_event in s3_event_records["Records"]: - object_info = get_object_info(s3_event) - if filter_object_info(object_info) is not None: - s3_objects_info.append(object_info) - else: - logger.info( - f"Object doesn't meet the S3 event rules to be processed. Skipping." - ) + return + for s3_event in sns_message["Records"]: + object_info = get_object_info(s3_event) + if filter_object_info(object_info) is not None: + s3_objects_info.append(object_info) + else: + logger.info( + f"Object doesn't meet the S3 event rules to be processed. Skipping." + ) if len(s3_objects_info) > 0: logger.info( "Submitting the following files to " diff --git a/templates/sns-topic.yaml b/templates/sns-topic.yaml index 5179817f..be3ddb32 100644 --- a/templates/sns-topic.yaml +++ b/templates/sns-topic.yaml @@ -31,9 +31,9 @@ Resources: Action: - sns:Publish Resource: !Ref SnsTopic - #Condition: - #ArnLike: - # "aws:SourceArn": !Ref S3SourceBucketArn + Condition: + ArnLike: + "aws:SourceArn": !Ref S3SourceBucketArn Topics: - !Ref SnsTopic diff --git a/templates/sqs-queue.yaml b/templates/sqs-queue.yaml index 1eb2d77a..c0762e4c 100644 --- a/templates/sqs-queue.yaml +++ b/templates/sqs-queue.yaml @@ -1,7 +1,7 @@ AWSTemplateFormatVersion: '2010-09-09' Description: > - Creates an SQS queue that gets S3 notifications + Creates an SQS queue which subscribes to an SNS topic Parameters: @@ -22,9 +22,9 @@ Parameters: How long our lambda has to submit the messages to Glue and delete the message from the SQS queue - S3SourceBucketArn: + SNSTopicSubscription: Type: String - Description: Arn of the S3 bucket where source data are stored. + Description: Arn of the SNS topic where S3 event notifications are published. Resources: @@ -49,17 +49,24 @@ Resources: - Sid: Send_Permission Effect: Allow Principal: - Service: s3.amazonaws.com + Service: sns.amazonaws.com AWS: !Sub '${AWS::AccountId}' Action: - SQS:SendMessage Resource: !GetAtt PrimaryQueue.Arn Condition: ArnLike: - "aws:SourceArn": !Ref S3SourceBucketArn + "aws:SourceArn": !Ref SNSTopicSubscription Queues: - !Ref PrimaryQueue + SnsSubscription: + Type: "AWS::SNS::Subscription" + Properties: + Protocol: sqs + Endpoint: !GetAtt PrimaryQueue.Arn + TopicArn: !Ref SNSTopicSubscription + DeadLetterQueue: Type: AWS::SQS::Queue Properties: diff --git a/tests/test_s3_event_config_lambda.py b/tests/test_s3_event_config_lambda.py index 2f8d7c8f..7416f01d 100644 --- a/tests/test_s3_event_config_lambda.py +++ b/tests/test_s3_event_config_lambda.py @@ -1,6 +1,6 @@ from unittest import mock import boto3 -from moto import mock_s3, mock_lambda, mock_iam, mock_sqs +from moto import mock_s3, mock_lambda, mock_iam, mock_sqs, mock_sns import pytest from src.lambda_function.s3_event_config import app @@ -29,6 +29,13 @@ def mock_lambda_function(mock_iam_role): ) yield client.get_function(FunctionName="some_function") +@pytest.fixture +def mock_sns_topic_arn(): + with mock_sns(): + client = boto3.client("sns") + topic = client.create_topic(Name="some_topic") + yield topic["TopicArn"] + @pytest.fixture(scope="function") def mock_sqs_queue(mock_aws_credentials): @@ -68,6 +75,30 @@ def test_add_notification_adds_expected_settings_for_lambda(s3, mock_lambda_func "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} } +@mock_s3 +def test_add_notification_adds_expected_settings_for_sns(s3, mock_sns_topic_arn): + s3.create_bucket(Bucket="some_bucket") + with mock.patch.object( + s3, + "get_bucket_notification_configuration", + return_value={}, + ): + app.add_notification( + s3, + "Topic", + mock_sns_topic_arn, + "some_bucket", + "test_folder", + ) + get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") + assert get_config["TopicConfigurations"][0]["TopicArn"] == mock_sns_topic_arn + assert get_config["TopicConfigurations"][0]["Events"] == [ + "s3:ObjectCreated:*" + ] + assert get_config["TopicConfigurations"][0]["Filter"] == { + "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} + } + @mock_s3 def test_add_notification_adds_expected_settings_for_sqs(s3, mock_sqs_queue): diff --git a/tests/test_s3_to_glue_lambda.py b/tests/test_s3_to_glue_lambda.py index 2ef51f4e..d51580d2 100644 --- a/tests/test_s3_to_glue_lambda.py +++ b/tests/test_s3_to_glue_lambda.py @@ -29,30 +29,16 @@ def sqs_queue(self, sqs_queue_name): yield response @pytest.fixture - def no_s3_event_records(self): - sqs_msg = { - "Records": [ - { - "MessageId": "string", - "receiptHandle": "string", - "MD5OfBody": "string", - "Body": "string", - "Attributes": { - "string": "string", - }, - "MD5OfMessageAttributes": "string", - "MessageAttributes": { - "string": { - "DataType": "string", - "StringValue": "string", - "BinaryValue": "string", - }, - }, - } - ] + def sns_message(self): + sns_message_wrapper = { + "Type": "string", + "MessageId": "string", + "TopicArn": "string", + "Subject": "string", + "Message": "string", + "Timestamp": "string" } - sqs_msg["Records"][0]["body"] = json.dumps({"Records": []}) - yield sqs_msg + return sns_message_wrapper @pytest.fixture def s3_test_event(self): @@ -99,26 +85,9 @@ def s3_event(self): return s3_event @pytest.fixture - def sqs_message(self, s3_event, s3_test_event): + def sqs_message(self): sqs_msg = { "Records": [ - { - "MessageId": "string", - "receiptHandle": "string", - "MD5OfBody": "string", - "body": "string", - "Attributes": { - "string": "string", - }, - "MD5OfMessageAttributes": "string", - "MessageAttributes": { - "string": { - "DataType": "string", - "StringValue": "string", - "BinaryValue": "string", - } - }, - }, { "MessageId": "string", "receiptHandle": "string", @@ -136,12 +105,8 @@ def sqs_message(self, s3_event, s3_test_event): } }, } - ], + ] } - sqs_msg["Records"][0]["body"] = json.dumps( - {"Records": [s3_event]} - ) - sqs_msg["Records"][1]["body"] = json.dumps(s3_test_event) yield sqs_msg @pytest.fixture @@ -155,7 +120,7 @@ def object_info(self): return object_info @pytest.fixture - def set_env_var(self, monkeypatch, sqs_queue): + def set_env_var(self, monkeypatch): monkeypatch.setenv("S3_TO_JSON_WORKFLOW_NAME", "test_workflow") def test_submit_s3_to_json_workflow(self, object_info, monkeypatch): @@ -165,8 +130,10 @@ def test_submit_s3_to_json_workflow(self, object_info, monkeypatch): ) def test_that_lambda_handler_does_not_call_submit_s3_to_json_workflow_if_no_s3_records( - self, no_s3_event_records, sqs_queue, set_env_var + self, sqs_queue, set_env_var, sqs_message, sns_message ): + sns_message["Message"] = json.dumps({"Records": []}) + sqs_message["Records"][0]["body"] = json.dumps(sns_message) with mock_sqs(): with mock.patch.object(boto3, "client") as patch_client, mock.patch.object( app, "submit_s3_to_json_workflow" @@ -174,15 +141,15 @@ def test_that_lambda_handler_does_not_call_submit_s3_to_json_workflow_if_no_s3_r patch_client.return_value.get_queue_url.return_value = sqs_queue[ "QueueUrl" ] - patch_client.return_value.receive_message.return_value = ( - no_s3_event_records - ) - app.lambda_handler(event=no_s3_event_records, context=None) + patch_client.return_value.receive_message.return_value = sqs_message + app.lambda_handler(event=sqs_message, context=None) patch_submit.assert_not_called() def test_that_lambda_handler_calls_submit_s3_to_json_workflow_if_queue_has_message( - self, sqs_message, object_info, sqs_queue, set_env_var + self, sqs_message, object_info, sqs_queue, set_env_var, sns_message, s3_event ): + sns_message["Message"] = json.dumps({"Records": [s3_event]}) + sqs_message["Records"][0]["body"] = json.dumps(sns_message) with mock_sqs(): with mock.patch.object(boto3, "client") as patch_client, mock.patch.object( app, "submit_s3_to_json_workflow"