Skip to content

Commit

Permalink
Update lambda to use SQS(SNS(S3)) event format
Browse files Browse the repository at this point in the history
  • Loading branch information
philerooski committed Feb 21, 2024
1 parent d722bcb commit b71abf3
Show file tree
Hide file tree
Showing 13 changed files with 112 additions and 95 deletions.
2 changes: 1 addition & 1 deletion config/develop/namespaced/lambda-s3-event-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ template:
artifact_prefix: '{{ stack_group_config.namespace }}/src/lambda'
dependencies:
- develop/namespaced/s3-event-config-lambda-role.yaml
- develop/s3-input-bucket.yaml
- develop/s3-cloudformation-bucket.yaml
- develop/namespaced/sns-topic.yaml
stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig'
stack_tags: {{ stack_group_config.default_stack_tags }}
Expand Down
4 changes: 3 additions & 1 deletion config/develop/namespaced/sns-topic.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
template:
path: sns-topic.yaml
stack_name: "{{ stack_group_config.namespace }}-sns-input-to-sqs"
parameters:
S3SourceBucketArn: !stack_output_external recover-dev-input-bucket::BucketArn
dependencies:
- develop/s3-input-bucket.yaml
stack_name: "{{ stack_group_config.namespace }}-sns-input-to-sqs"
stack_tags:
{{ stack_group_config.default_stack_tags }}
8 changes: 4 additions & 4 deletions config/develop/namespaced/sqs-input-to-intermediate.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
template:
path: sqs-queue.yaml
parameters:
MessageRetentionPeriod: "86400"
MessageRetentionPeriod: "1209600"
ReceiveMessageWaitTimeSeconds: "20"
VisibilityTimeout: "120"
S3SourceBucketArn: !stack_output_external recover-dev-input-bucket::BucketArn
SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn"
dependencies:
- develop/s3-input-bucket.yaml
stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-intermediate'
- develop/namespaced/sns-topic.yaml
stack_name: "{{ stack_group_config.namespace }}-sqs-input-to-intermediate"
stack_tags:
{{ stack_group_config.default_stack_tags }}
4 changes: 2 additions & 2 deletions config/develop/namespaced/sqs-input-to-raw.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ parameters:
MessageRetentionPeriod: "1209600"
ReceiveMessageWaitTimeSeconds: "20"
VisibilityTimeout: "120"
S3SourceBucketArn: !stack_output_external recover-dev-raw-bucket::BucketArn
SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn"
dependencies:
- develop/s3-raw-bucket.yaml
- develop/namespaced/sns-topic.yaml
stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-raw'
stack_tags:
{{ stack_group_config.default_stack_tags }}
8 changes: 4 additions & 4 deletions config/prod/namespaced/s3-event-config-lambda.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ template:
artifact_prefix: '{{ stack_group_config.namespace }}/src/lambda'
dependencies:
- prod/namespaced/s3-event-config-lambda-role.yaml
- prod/namespaced/s3-to-glue-lambda.yaml
- prod/namespaced/sqs-input-to-intermediate.yaml
- prod/s3-cloudformation-bucket.yaml
- prod/namespaced/sns-topic.yaml
stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig'
stack_tags: {{ stack_group_config.default_stack_tags }}
parameters:
Namespace: {{ stack_group_config.namespace }}
S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-input-to-intermediate::PrimaryQueueArn"
S3ToGlueDestinationType: "Queue"
S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn"
S3ToGlueDestinationType: "Topic"
S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-event-config-lambda-role::RoleArn"
S3SourceBucketName: {{ stack_group_config.input_bucket_name }}
9 changes: 9 additions & 0 deletions config/prod/namespaced/sns-topic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
template:
path: sns-topic.yaml
parameters:
S3SourceBucketArn: !stack_output_external recover-input-bucket::BucketArn
dependencies:
- prod/s3-input-bucket.yaml
stack_name: "{{ stack_group_config.namespace }}-sns-input-to-sqs"
stack_tags:
{{ stack_group_config.default_stack_tags }}
12 changes: 6 additions & 6 deletions config/prod/namespaced/sqs-input-to-intermediate.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
template:
path: sqs-queue.yaml
parameters:
MessageRetentionPeriod: '86400'
ReceiveMessageWaitTimeSeconds: '20'
VisibilityTimeout: '120'
S3SourceBucketArn: !stack_output_external recover-input-bucket::BucketArn
MessageRetentionPeriod: "1209600"
ReceiveMessageWaitTimeSeconds: "20"
VisibilityTimeout: "120"
SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn"
dependencies:
- prod/s3-input-bucket.yaml
stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-intermediate'
- prod/namespaced/sns-topic.yaml
stack_name: "{{ stack_group_config.namespace }}-sqs-input-to-intermediate"
stack_tags:
{{ stack_group_config.default_stack_tags }}
4 changes: 2 additions & 2 deletions config/prod/namespaced/sqs-input-to-raw.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ parameters:
MessageRetentionPeriod: "1209600"
ReceiveMessageWaitTimeSeconds: "20"
VisibilityTimeout: "120"
S3SourceBucketArn: !stack_output_external recover-raw-bucket::BucketArn
SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn"
dependencies:
- prod/s3-raw-bucket.yaml
- prod/namespaced/sns-topic.yaml
stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-raw'
stack_tags:
{{ stack_group_config.default_stack_tags }}
27 changes: 14 additions & 13 deletions src/lambda_function/s3_to_glue/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def get_object_info(s3_event) -> dict:
}
return object_info

def lambda_handler(event, context) -> dict:
def lambda_handler(event, context) -> None:
"""
This main lambda function will be triggered by a SQS event and will
poll the SQS queue for all available S3 event messages. If the
Expand All @@ -125,19 +125,20 @@ def lambda_handler(event, context) -> dict:
Unused by this lambda function.
"""
s3_objects_info = []
for record in event["Records"]:
s3_event_records = json.loads(record["body"])
if is_s3_test_event(s3_event_records):
for sqs_record in event["Records"]:
sns_notification = json.loads(sqs_record["body"])
sns_message = json.loads(sns_notification["Message"])
if is_s3_test_event(sns_message):
logger.info(f"Found AWS default s3:TestEvent. Skipping.")
else:
for s3_event in s3_event_records["Records"]:
object_info = get_object_info(s3_event)
if filter_object_info(object_info) is not None:
s3_objects_info.append(object_info)
else:
logger.info(
f"Object doesn't meet the S3 event rules to be processed. Skipping."
)
return
for s3_event in sns_message["Records"]:
object_info = get_object_info(s3_event)
if filter_object_info(object_info) is not None:
s3_objects_info.append(object_info)
else:
logger.info(
f"Object doesn't meet the S3 event rules to be processed. Skipping."
)
if len(s3_objects_info) > 0:
logger.info(
"Submitting the following files to "
Expand Down
6 changes: 3 additions & 3 deletions templates/sns-topic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ Resources:
Action:
- sns:Publish
Resource: !Ref SnsTopic
#Condition:
#ArnLike:
# "aws:SourceArn": !Ref S3SourceBucketArn
Condition:
ArnLike:
"aws:SourceArn": !Ref S3SourceBucketArn
Topics:
- !Ref SnsTopic

Expand Down
17 changes: 12 additions & 5 deletions templates/sqs-queue.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
AWSTemplateFormatVersion: '2010-09-09'

Description: >
Creates an SQS queue that gets S3 notifications
Creates an SQS queue which subscribes to an SNS topic
Parameters:

Expand All @@ -22,9 +22,9 @@ Parameters:
How long our lambda has to submit the messages to Glue and
delete the message from the SQS queue
S3SourceBucketArn:
SNSTopicSubscription:
Type: String
Description: Arn of the S3 bucket where source data are stored.
Description: Arn of the SNS topic where S3 event notifications are published.

Resources:

Expand All @@ -49,17 +49,24 @@ Resources:
- Sid: Send_Permission
Effect: Allow
Principal:
Service: s3.amazonaws.com
Service: sns.amazonaws.com
AWS: !Sub '${AWS::AccountId}'
Action:
- SQS:SendMessage
Resource: !GetAtt PrimaryQueue.Arn
Condition:
ArnLike:
"aws:SourceArn": !Ref S3SourceBucketArn
"aws:SourceArn": !Ref SNSTopicSubscription
Queues:
- !Ref PrimaryQueue

SnsSubscription:
Type: "AWS::SNS::Subscription"
Properties:
Protocol: sqs
Endpoint: !GetAtt PrimaryQueue.Arn
TopicArn: !Ref SNSTopicSubscription

DeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
Expand Down
33 changes: 32 additions & 1 deletion tests/test_s3_event_config_lambda.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from unittest import mock
import boto3
from moto import mock_s3, mock_lambda, mock_iam, mock_sqs
from moto import mock_s3, mock_lambda, mock_iam, mock_sqs, mock_sns
import pytest

from src.lambda_function.s3_event_config import app
Expand Down Expand Up @@ -29,6 +29,13 @@ def mock_lambda_function(mock_iam_role):
)
yield client.get_function(FunctionName="some_function")

@pytest.fixture
def mock_sns_topic_arn():
with mock_sns():
client = boto3.client("sns")
topic = client.create_topic(Name="some_topic")
yield topic["TopicArn"]


@pytest.fixture(scope="function")
def mock_sqs_queue(mock_aws_credentials):
Expand Down Expand Up @@ -68,6 +75,30 @@ def test_add_notification_adds_expected_settings_for_lambda(s3, mock_lambda_func
"Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]}
}

@mock_s3
def test_add_notification_adds_expected_settings_for_sns(s3, mock_sns_topic_arn):
s3.create_bucket(Bucket="some_bucket")
with mock.patch.object(
s3,
"get_bucket_notification_configuration",
return_value={},
):
app.add_notification(
s3,
"Topic",
mock_sns_topic_arn,
"some_bucket",
"test_folder",
)
get_config = s3.get_bucket_notification_configuration(Bucket="some_bucket")
assert get_config["TopicConfigurations"][0]["TopicArn"] == mock_sns_topic_arn
assert get_config["TopicConfigurations"][0]["Events"] == [
"s3:ObjectCreated:*"
]
assert get_config["TopicConfigurations"][0]["Filter"] == {
"Key": {"FilterRules": [{"Name": "prefix", "Value": "test_folder/"}]}
}


@mock_s3
def test_add_notification_adds_expected_settings_for_sqs(s3, mock_sqs_queue):
Expand Down
73 changes: 20 additions & 53 deletions tests/test_s3_to_glue_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,30 +29,16 @@ def sqs_queue(self, sqs_queue_name):
yield response

@pytest.fixture
def no_s3_event_records(self):
sqs_msg = {
"Records": [
{
"MessageId": "string",
"receiptHandle": "string",
"MD5OfBody": "string",
"Body": "string",
"Attributes": {
"string": "string",
},
"MD5OfMessageAttributes": "string",
"MessageAttributes": {
"string": {
"DataType": "string",
"StringValue": "string",
"BinaryValue": "string",
},
},
}
]
def sns_message(self):
sns_message_wrapper = {
"Type": "string",
"MessageId": "string",
"TopicArn": "string",
"Subject": "string",
"Message": "string",
"Timestamp": "string"
}
sqs_msg["Records"][0]["body"] = json.dumps({"Records": []})
yield sqs_msg
return sns_message_wrapper

@pytest.fixture
def s3_test_event(self):
Expand Down Expand Up @@ -99,26 +85,9 @@ def s3_event(self):
return s3_event

@pytest.fixture
def sqs_message(self, s3_event, s3_test_event):
def sqs_message(self):
sqs_msg = {
"Records": [
{
"MessageId": "string",
"receiptHandle": "string",
"MD5OfBody": "string",
"body": "string",
"Attributes": {
"string": "string",
},
"MD5OfMessageAttributes": "string",
"MessageAttributes": {
"string": {
"DataType": "string",
"StringValue": "string",
"BinaryValue": "string",
}
},
},
{
"MessageId": "string",
"receiptHandle": "string",
Expand All @@ -136,12 +105,8 @@ def sqs_message(self, s3_event, s3_test_event):
}
},
}
],
]
}
sqs_msg["Records"][0]["body"] = json.dumps(
{"Records": [s3_event]}
)
sqs_msg["Records"][1]["body"] = json.dumps(s3_test_event)
yield sqs_msg

@pytest.fixture
Expand All @@ -155,7 +120,7 @@ def object_info(self):
return object_info

@pytest.fixture
def set_env_var(self, monkeypatch, sqs_queue):
def set_env_var(self, monkeypatch):
monkeypatch.setenv("S3_TO_JSON_WORKFLOW_NAME", "test_workflow")

def test_submit_s3_to_json_workflow(self, object_info, monkeypatch):
Expand All @@ -165,24 +130,26 @@ def test_submit_s3_to_json_workflow(self, object_info, monkeypatch):
)

def test_that_lambda_handler_does_not_call_submit_s3_to_json_workflow_if_no_s3_records(
self, no_s3_event_records, sqs_queue, set_env_var
self, sqs_queue, set_env_var, sqs_message, sns_message
):
sns_message["Message"] = json.dumps({"Records": []})
sqs_message["Records"][0]["body"] = json.dumps(sns_message)
with mock_sqs():
with mock.patch.object(boto3, "client") as patch_client, mock.patch.object(
app, "submit_s3_to_json_workflow"
) as patch_submit:
patch_client.return_value.get_queue_url.return_value = sqs_queue[
"QueueUrl"
]
patch_client.return_value.receive_message.return_value = (
no_s3_event_records
)
app.lambda_handler(event=no_s3_event_records, context=None)
patch_client.return_value.receive_message.return_value = sqs_message
app.lambda_handler(event=sqs_message, context=None)
patch_submit.assert_not_called()

def test_that_lambda_handler_calls_submit_s3_to_json_workflow_if_queue_has_message(
self, sqs_message, object_info, sqs_queue, set_env_var
self, sqs_message, object_info, sqs_queue, set_env_var, sns_message, s3_event
):
sns_message["Message"] = json.dumps({"Records": [s3_event]})
sqs_message["Records"][0]["body"] = json.dumps(sns_message)
with mock_sqs():
with mock.patch.object(boto3, "client") as patch_client, mock.patch.object(
app, "submit_s3_to_json_workflow"
Expand Down

0 comments on commit b71abf3

Please sign in to comment.