diff --git a/config/develop/namespaced/s3-event-config-lambda-role.yaml b/config/develop/namespaced/lambda-s3-event-config-role.yaml similarity index 55% rename from config/develop/namespaced/s3-event-config-lambda-role.yaml rename to config/develop/namespaced/lambda-s3-event-config-role.yaml index 8b611a1b..11909cd2 100644 --- a/config/develop/namespaced/s3-event-config-lambda-role.yaml +++ b/config/develop/namespaced/lambda-s3-event-config-role.yaml @@ -1,6 +1,6 @@ template: - path: s3-event-config-lambda-role.yaml -stack_name: "{{ stack_group_config.namespace }}-s3-event-config-lambda-role" + path: lambda-s3-event-config-role.yaml +stack_name: "{{ stack_group_config.namespace }}-lambda-s3-event-config-role" parameters: S3SourceBucketName: {{ stack_group_config.input_bucket_name }} stack_tags: diff --git a/config/prod/namespaced/s3-event-config-lambda.yaml b/config/develop/namespaced/lambda-s3-event-config.yaml similarity index 66% rename from config/prod/namespaced/s3-event-config-lambda.yaml rename to config/develop/namespaced/lambda-s3-event-config.yaml index 08741f14..69ab6a78 100644 --- a/config/prod/namespaced/s3-event-config-lambda.yaml +++ b/config/develop/namespaced/lambda-s3-event-config.yaml @@ -4,14 +4,14 @@ template: artifact_bucket_name: {{ stack_group_config.template_bucket_name }} artifact_prefix: '{{ stack_group_config.namespace }}/src/lambda' dependencies: - - prod/namespaced/s3-event-config-lambda-role.yaml - - prod/namespaced/s3-to-glue-lambda.yaml - - prod/namespaced/sqs-input-to-intermediate.yaml + - develop/namespaced/lambda-s3-event-config-role.yaml + - develop/s3-cloudformation-bucket.yaml + - develop/namespaced/sns-topic.yaml stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig' stack_tags: {{ stack_group_config.default_stack_tags }} parameters: Namespace: {{ stack_group_config.namespace }} - S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-input-to-intermediate::PrimaryQueueArn" - S3ToGlueDestinationType: "Queue" - S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-event-config-lambda-role::RoleArn" + S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn" + S3ToGlueDestinationType: "Topic" + S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-s3-event-config-role::RoleArn" S3SourceBucketName: {{ stack_group_config.input_bucket_name }} diff --git a/config/develop/namespaced/sns-topic.yaml b/config/develop/namespaced/sns-topic.yaml new file mode 100644 index 00000000..5c407963 --- /dev/null +++ b/config/develop/namespaced/sns-topic.yaml @@ -0,0 +1,9 @@ +template: + path: sns-topic.yaml +parameters: + S3SourceBucketArn: !stack_output_external recover-dev-input-bucket::BucketArn +dependencies: + - develop/s3-input-bucket.yaml +stack_name: "{{ stack_group_config.namespace }}-sns-input-to-sqs" +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/develop/namespaced/sqs-input-to-intermediate.yaml b/config/develop/namespaced/sqs-input-to-intermediate.yaml index 3e195f29..ab2a5b12 100644 --- a/config/develop/namespaced/sqs-input-to-intermediate.yaml +++ b/config/develop/namespaced/sqs-input-to-intermediate.yaml @@ -1,12 +1,12 @@ template: path: sqs-queue.yaml parameters: - MessageRetentionPeriod: "86400" + MessageRetentionPeriod: "1209600" ReceiveMessageWaitTimeSeconds: "20" VisibilityTimeout: "120" - S3SourceBucketArn: !stack_output_external recover-dev-input-bucket::BucketArn + SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn" dependencies: - - develop/s3-input-bucket.yaml -stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-intermediate' + - develop/namespaced/sns-topic.yaml +stack_name: "{{ stack_group_config.namespace }}-sqs-input-to-intermediate" stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/config/develop/namespaced/sqs-input-to-raw.yaml b/config/develop/namespaced/sqs-input-to-raw.yaml index 0ef7adbf..65cbf6f8 100644 --- a/config/develop/namespaced/sqs-input-to-raw.yaml +++ b/config/develop/namespaced/sqs-input-to-raw.yaml @@ -4,9 +4,9 @@ parameters: MessageRetentionPeriod: "1209600" ReceiveMessageWaitTimeSeconds: "20" VisibilityTimeout: "120" - S3SourceBucketArn: !stack_output_external recover-dev-raw-bucket::BucketArn + SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn" dependencies: - - develop/s3-raw-bucket.yaml + - develop/namespaced/sns-topic.yaml stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-raw' stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/s3-event-config-lambda-role.yaml b/config/prod/namespaced/lambda-s3-event-config-role.yaml similarity index 55% rename from config/prod/namespaced/s3-event-config-lambda-role.yaml rename to config/prod/namespaced/lambda-s3-event-config-role.yaml index 8b611a1b..11909cd2 100644 --- a/config/prod/namespaced/s3-event-config-lambda-role.yaml +++ b/config/prod/namespaced/lambda-s3-event-config-role.yaml @@ -1,6 +1,6 @@ template: - path: s3-event-config-lambda-role.yaml -stack_name: "{{ stack_group_config.namespace }}-s3-event-config-lambda-role" + path: lambda-s3-event-config-role.yaml +stack_name: "{{ stack_group_config.namespace }}-lambda-s3-event-config-role" parameters: S3SourceBucketName: {{ stack_group_config.input_bucket_name }} stack_tags: diff --git a/config/develop/namespaced/s3-event-config-lambda.yaml b/config/prod/namespaced/lambda-s3-event-config.yaml similarity index 66% rename from config/develop/namespaced/s3-event-config-lambda.yaml rename to config/prod/namespaced/lambda-s3-event-config.yaml index 83fc6f07..1d26eca1 100644 --- a/config/develop/namespaced/s3-event-config-lambda.yaml +++ b/config/prod/namespaced/lambda-s3-event-config.yaml @@ -4,14 +4,14 @@ template: artifact_bucket_name: {{ stack_group_config.template_bucket_name }} artifact_prefix: '{{ stack_group_config.namespace }}/src/lambda' dependencies: - - develop/namespaced/s3-event-config-lambda-role.yaml - - develop/namespaced/s3-to-glue-lambda.yaml - - develop/namespaced/sqs-input-to-intermediate.yaml + - prod/namespaced/lambda-s3-event-config-role.yaml + - prod/s3-cloudformation-bucket.yaml + - prod/namespaced/sns-topic.yaml stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig' stack_tags: {{ stack_group_config.default_stack_tags }} parameters: Namespace: {{ stack_group_config.namespace }} - S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-input-to-intermediate::PrimaryQueueArn" - S3ToGlueDestinationType: "Queue" - S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-event-config-lambda-role::RoleArn" + S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn" + S3ToGlueDestinationType: "Topic" + S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-s3-event-config-role::RoleArn" S3SourceBucketName: {{ stack_group_config.input_bucket_name }} diff --git a/config/prod/namespaced/sns-topic.yaml b/config/prod/namespaced/sns-topic.yaml new file mode 100644 index 00000000..8b5a5668 --- /dev/null +++ b/config/prod/namespaced/sns-topic.yaml @@ -0,0 +1,9 @@ +template: + path: sns-topic.yaml +parameters: + S3SourceBucketArn: !stack_output_external recover-input-bucket::BucketArn +dependencies: + - prod/s3-input-bucket.yaml +stack_name: "{{ stack_group_config.namespace }}-sns-input-to-sqs" +stack_tags: + {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/sqs-input-to-intermediate.yaml b/config/prod/namespaced/sqs-input-to-intermediate.yaml index ff87c2a8..ccbf268e 100644 --- a/config/prod/namespaced/sqs-input-to-intermediate.yaml +++ b/config/prod/namespaced/sqs-input-to-intermediate.yaml @@ -1,12 +1,12 @@ template: path: sqs-queue.yaml parameters: - MessageRetentionPeriod: '86400' - ReceiveMessageWaitTimeSeconds: '20' - VisibilityTimeout: '120' - S3SourceBucketArn: !stack_output_external recover-input-bucket::BucketArn + MessageRetentionPeriod: "1209600" + ReceiveMessageWaitTimeSeconds: "20" + VisibilityTimeout: "120" + SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn" dependencies: - - prod/s3-input-bucket.yaml -stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-intermediate' + - prod/namespaced/sns-topic.yaml +stack_name: "{{ stack_group_config.namespace }}-sqs-input-to-intermediate" stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/sqs-input-to-raw.yaml b/config/prod/namespaced/sqs-input-to-raw.yaml index 7fb1e94a..ad10199f 100644 --- a/config/prod/namespaced/sqs-input-to-raw.yaml +++ b/config/prod/namespaced/sqs-input-to-raw.yaml @@ -4,9 +4,9 @@ parameters: MessageRetentionPeriod: "1209600" ReceiveMessageWaitTimeSeconds: "20" VisibilityTimeout: "120" - S3SourceBucketArn: !stack_output_external recover-raw-bucket::BucketArn + SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn" dependencies: - - prod/s3-raw-bucket.yaml + - prod/namespaced/sns-topic.yaml stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-raw' stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/src/lambda_function/s3_event_config/template.yaml b/src/lambda_function/s3_event_config/template.yaml index c73d2888..97a47395 100644 --- a/src/lambda_function/s3_event_config/template.yaml +++ b/src/lambda_function/s3_event_config/template.yaml @@ -19,6 +19,7 @@ Parameters: Type: String Description: The S3 Event Config Destination Type AllowedValues: + - "Topic" - "Queue" - "LambdaFunction" diff --git a/src/lambda_function/s3_to_glue/app.py b/src/lambda_function/s3_to_glue/app.py index c10381ac..51e0cfab 100644 --- a/src/lambda_function/s3_to_glue/app.py +++ b/src/lambda_function/s3_to_glue/app.py @@ -110,7 +110,7 @@ def get_object_info(s3_event) -> dict: } return object_info -def lambda_handler(event, context) -> dict: +def lambda_handler(event, context) -> None: """ This main lambda function will be triggered by a SQS event and will poll the SQS queue for all available S3 event messages. If the @@ -125,19 +125,20 @@ def lambda_handler(event, context) -> dict: Unused by this lambda function. """ s3_objects_info = [] - for record in event["Records"]: - s3_event_records = json.loads(record["body"]) - if is_s3_test_event(s3_event_records): + for sqs_record in event["Records"]: + sns_notification = json.loads(sqs_record["body"]) + sns_message = json.loads(sns_notification["Message"]) + if is_s3_test_event(sns_message): logger.info(f"Found AWS default s3:TestEvent. Skipping.") - else: - for s3_event in s3_event_records["Records"]: - object_info = get_object_info(s3_event) - if filter_object_info(object_info) is not None: - s3_objects_info.append(object_info) - else: - logger.info( - f"Object doesn't meet the S3 event rules to be processed. Skipping." - ) + return + for s3_event in sns_message["Records"]: + object_info = get_object_info(s3_event) + if filter_object_info(object_info) is not None: + s3_objects_info.append(object_info) + else: + logger.info( + f"Object doesn't meet the S3 event rules to be processed. Skipping." + ) if len(s3_objects_info) > 0: logger.info( "Submitting the following files to " diff --git a/src/lambda_function/s3_to_glue/events/generate_test_event.py b/src/lambda_function/s3_to_glue/events/generate_test_event.py index 1db90b2d..4fa9914d 100755 --- a/src/lambda_function/s3_to_glue/events/generate_test_event.py +++ b/src/lambda_function/s3_to_glue/events/generate_test_event.py @@ -1,6 +1,6 @@ """ -This is a utility script that generates a fake S3 event notification and writes -it out as JSON. This test event can be used to test the Lambda which +This is a utility script that generates a fake S3 -> SNS -> SQS event +and writes it out as JSON. This test event can be used to test the Lambda which triggers the Glue pipeline. Two types of events can be generated: @@ -59,7 +59,11 @@ def read_args() -> argparse.Namespace: def create_event(bucket: str, key: str, key_prefix: str, key_file: str) -> dict: """ - Create an SQS event wrapping S3 event notification(s) for testing. + Create an SQS event wrapping a SNS notification of an S3 event notification(s) + for testing. + + Each SNS notification will contain a single S3 event, and each SQS event + will contain a single SNS notification. This function accepts either an S3 object key or an S3 key prefix that will be included in the test event. If an S3 object key is provided, then the test @@ -97,8 +101,11 @@ def create_event(bucket: str, key: str, key_prefix: str, key_file: str) -> dict: s3_events = [ create_s3_event_record(bucket=bucket, key=k) for k in test_data ] + sns_notifications = [ + create_sns_notification(s3_event) for s3_event in s3_events + ] sqs_messages = [ - create_sqs_message(s3_event) for s3_event in s3_events + create_sqs_message(sns_notification) for sns_notification in sns_notifications ] sqs_event = {"Records": sqs_messages} return sqs_event @@ -153,15 +160,15 @@ def create_s3_event_record(bucket: str, key: str) -> dict: s3_event_record["s3"]["object"]["key"] = key return s3_event_record -def create_sqs_message(s3_event_record: dict) -> dict: +def create_sqs_message(sns_notification: dict) -> dict: """ - Create an SQS message wrapper for an individual S3 event notification. + Create an SQS message wrapper around an individual SNS notification. - See `create_s3_event_record` for creating S3 event notifications. + See `create_sns_notification` for creating S3 event notifications. Args: - s3_event_record (dict): A dictionary formatted as a "Record" - object would be in an S3 event notification + sns_notification (dict): A dictionary formatted as an SNS + notification JSON object would be. Returns: dict: A dictionary formatted as an SQS message @@ -183,9 +190,33 @@ def create_sqs_message(s3_event_record: dict) -> dict: "eventSourceARN": "arn:aws:sqs:us-east-1:914833433684:mynamespace-sqs-S3ToLambda-Queue", "awsRegion": "us-east-1" } - sqs_event_record["body"] = json.dumps({"Records": [s3_event_record]}) + sqs_event_record["body"] = json.dumps(sns_notification) return sqs_event_record +def create_sns_notification(s3_event_record): + """ + Create an SNS message wrapper for an individual S3 event notification. + + See `create_s3_event_record` for creating S3 event notifications. + + Args: + s3_event_record (dict): A dictionary formatted as a "Record" + object would be in an S3 event notification + + Returns: + dict: A dictionary formatted as an SQS message + """ + sns_notification = { + "Type": "string", + "MessageId": "string", + "TopicArn": "string", + "Subject": "string", + "Message": "string", + "Timestamp": "string" + } + sns_notification["Message"] = json.dumps({"Records": [s3_event_record]}) + return sns_notification + def main() -> None: args = read_args() print("Generating mock S3 event...") diff --git a/templates/s3-event-config-lambda-role.yaml b/templates/lambda-s3-event-config-role.yaml similarity index 100% rename from templates/s3-event-config-lambda-role.yaml rename to templates/lambda-s3-event-config-role.yaml diff --git a/templates/sns-topic.yaml b/templates/sns-topic.yaml new file mode 100644 index 00000000..be3ddb32 --- /dev/null +++ b/templates/sns-topic.yaml @@ -0,0 +1,45 @@ +AWSTemplateFormatVersion: "2010-09-09" + +Description: > + Creates a Standard SNS queue which can be used with S3 event notifications. + +Parameters: + + S3SourceBucketArn: + Type: String + Description: Arn of the S3 bucket where source data are stored. + +Resources: + + SnsTopic: + Type: "AWS::SNS::Topic" + Properties: + TopicName: !Sub "${AWS::StackName}-Topic" + FifoTopic: false + + SnsTopicPolicy: + Type: AWS::SNS::TopicPolicy + Properties: + PolicyDocument: + Version: "2012-10-17" + Statement: + - Sid: Input S3 bucket event notification to SNS + Effect: Allow + Principal: + Service: s3.amazonaws.com + AWS: !Sub '${AWS::AccountId}' + Action: + - sns:Publish + Resource: !Ref SnsTopic + Condition: + ArnLike: + "aws:SourceArn": !Ref S3SourceBucketArn + Topics: + - !Ref SnsTopic + +Outputs: + + SnsTopicArn: + Value: !Ref SnsTopic + Export: + Name: !Sub "${AWS::Region}-${AWS::StackName}-SnsTopicArn" diff --git a/templates/sqs-queue.yaml b/templates/sqs-queue.yaml index 1eb2d77a..c0762e4c 100644 --- a/templates/sqs-queue.yaml +++ b/templates/sqs-queue.yaml @@ -1,7 +1,7 @@ AWSTemplateFormatVersion: '2010-09-09' Description: > - Creates an SQS queue that gets S3 notifications + Creates an SQS queue which subscribes to an SNS topic Parameters: @@ -22,9 +22,9 @@ Parameters: How long our lambda has to submit the messages to Glue and delete the message from the SQS queue - S3SourceBucketArn: + SNSTopicSubscription: Type: String - Description: Arn of the S3 bucket where source data are stored. + Description: Arn of the SNS topic where S3 event notifications are published. Resources: @@ -49,17 +49,24 @@ Resources: - Sid: Send_Permission Effect: Allow Principal: - Service: s3.amazonaws.com + Service: sns.amazonaws.com AWS: !Sub '${AWS::AccountId}' Action: - SQS:SendMessage Resource: !GetAtt PrimaryQueue.Arn Condition: ArnLike: - "aws:SourceArn": !Ref S3SourceBucketArn + "aws:SourceArn": !Ref SNSTopicSubscription Queues: - !Ref PrimaryQueue + SnsSubscription: + Type: "AWS::SNS::Subscription" + Properties: + Protocol: sqs + Endpoint: !GetAtt PrimaryQueue.Arn + TopicArn: !Ref SNSTopicSubscription + DeadLetterQueue: Type: AWS::SQS::Queue Properties: diff --git a/tests/test_s3_event_config_lambda.py b/tests/test_s3_event_config_lambda.py index 2f8d7c8f..7416f01d 100644 --- a/tests/test_s3_event_config_lambda.py +++ b/tests/test_s3_event_config_lambda.py @@ -1,6 +1,6 @@ from unittest import mock import boto3 -from moto import mock_s3, mock_lambda, mock_iam, mock_sqs +from moto import mock_s3, mock_lambda, mock_iam, mock_sqs, mock_sns import pytest from src.lambda_function.s3_event_config import app @@ -29,6 +29,13 @@ def mock_lambda_function(mock_iam_role): ) yield client.get_function(FunctionName="some_function") +@pytest.fixture +def mock_sns_topic_arn(): + with mock_sns(): + client = boto3.client("sns") + topic = client.create_topic(Name="some_topic") + yield topic["TopicArn"] + @pytest.fixture(scope="function") def mock_sqs_queue(mock_aws_credentials): @@ -68,6 +75,30 @@ def test_add_notification_adds_expected_settings_for_lambda(s3, mock_lambda_func "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} } +@mock_s3 +def test_add_notification_adds_expected_settings_for_sns(s3, mock_sns_topic_arn): + s3.create_bucket(Bucket="some_bucket") + with mock.patch.object( + s3, + "get_bucket_notification_configuration", + return_value={}, + ): + app.add_notification( + s3, + "Topic", + mock_sns_topic_arn, + "some_bucket", + "test_folder", + ) + get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket") + assert get_config["TopicConfigurations"][0]["TopicArn"] == mock_sns_topic_arn + assert get_config["TopicConfigurations"][0]["Events"] == [ + "s3:ObjectCreated:*" + ] + assert get_config["TopicConfigurations"][0]["Filter"] == { + "Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]} + } + @mock_s3 def test_add_notification_adds_expected_settings_for_sqs(s3, mock_sqs_queue): diff --git a/tests/test_s3_to_glue_lambda.py b/tests/test_s3_to_glue_lambda.py index 2ef51f4e..d51580d2 100644 --- a/tests/test_s3_to_glue_lambda.py +++ b/tests/test_s3_to_glue_lambda.py @@ -29,30 +29,16 @@ def sqs_queue(self, sqs_queue_name): yield response @pytest.fixture - def no_s3_event_records(self): - sqs_msg = { - "Records": [ - { - "MessageId": "string", - "receiptHandle": "string", - "MD5OfBody": "string", - "Body": "string", - "Attributes": { - "string": "string", - }, - "MD5OfMessageAttributes": "string", - "MessageAttributes": { - "string": { - "DataType": "string", - "StringValue": "string", - "BinaryValue": "string", - }, - }, - } - ] + def sns_message(self): + sns_message_wrapper = { + "Type": "string", + "MessageId": "string", + "TopicArn": "string", + "Subject": "string", + "Message": "string", + "Timestamp": "string" } - sqs_msg["Records"][0]["body"] = json.dumps({"Records": []}) - yield sqs_msg + return sns_message_wrapper @pytest.fixture def s3_test_event(self): @@ -99,26 +85,9 @@ def s3_event(self): return s3_event @pytest.fixture - def sqs_message(self, s3_event, s3_test_event): + def sqs_message(self): sqs_msg = { "Records": [ - { - "MessageId": "string", - "receiptHandle": "string", - "MD5OfBody": "string", - "body": "string", - "Attributes": { - "string": "string", - }, - "MD5OfMessageAttributes": "string", - "MessageAttributes": { - "string": { - "DataType": "string", - "StringValue": "string", - "BinaryValue": "string", - } - }, - }, { "MessageId": "string", "receiptHandle": "string", @@ -136,12 +105,8 @@ def sqs_message(self, s3_event, s3_test_event): } }, } - ], + ] } - sqs_msg["Records"][0]["body"] = json.dumps( - {"Records": [s3_event]} - ) - sqs_msg["Records"][1]["body"] = json.dumps(s3_test_event) yield sqs_msg @pytest.fixture @@ -155,7 +120,7 @@ def object_info(self): return object_info @pytest.fixture - def set_env_var(self, monkeypatch, sqs_queue): + def set_env_var(self, monkeypatch): monkeypatch.setenv("S3_TO_JSON_WORKFLOW_NAME", "test_workflow") def test_submit_s3_to_json_workflow(self, object_info, monkeypatch): @@ -165,8 +130,10 @@ def test_submit_s3_to_json_workflow(self, object_info, monkeypatch): ) def test_that_lambda_handler_does_not_call_submit_s3_to_json_workflow_if_no_s3_records( - self, no_s3_event_records, sqs_queue, set_env_var + self, sqs_queue, set_env_var, sqs_message, sns_message ): + sns_message["Message"] = json.dumps({"Records": []}) + sqs_message["Records"][0]["body"] = json.dumps(sns_message) with mock_sqs(): with mock.patch.object(boto3, "client") as patch_client, mock.patch.object( app, "submit_s3_to_json_workflow" @@ -174,15 +141,15 @@ def test_that_lambda_handler_does_not_call_submit_s3_to_json_workflow_if_no_s3_r patch_client.return_value.get_queue_url.return_value = sqs_queue[ "QueueUrl" ] - patch_client.return_value.receive_message.return_value = ( - no_s3_event_records - ) - app.lambda_handler(event=no_s3_event_records, context=None) + patch_client.return_value.receive_message.return_value = sqs_message + app.lambda_handler(event=sqs_message, context=None) patch_submit.assert_not_called() def test_that_lambda_handler_calls_submit_s3_to_json_workflow_if_queue_has_message( - self, sqs_message, object_info, sqs_queue, set_env_var + self, sqs_message, object_info, sqs_queue, set_env_var, sns_message, s3_event ): + sns_message["Message"] = json.dumps({"Records": [s3_event]}) + sqs_message["Records"][0]["body"] = json.dumps(sns_message) with mock_sqs(): with mock.patch.object(boto3, "client") as patch_client, mock.patch.object( app, "submit_s3_to_json_workflow"