Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ETL-621] Add SNS topic to broadcast new exports #106

Merged
merged 3 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
template:
path: s3-event-config-lambda-role.yaml
stack_name: "{{ stack_group_config.namespace }}-s3-event-config-lambda-role"
path: lambda-s3-event-config-role.yaml
stack_name: "{{ stack_group_config.namespace }}-lambda-s3-event-config-role"
parameters:
S3SourceBucketName: {{ stack_group_config.input_bucket_name }}
stack_tags:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ template:
artifact_bucket_name: {{ stack_group_config.template_bucket_name }}
artifact_prefix: '{{ stack_group_config.namespace }}/src/lambda'
dependencies:
- prod/namespaced/s3-event-config-lambda-role.yaml
- prod/namespaced/s3-to-glue-lambda.yaml
- prod/namespaced/sqs-input-to-intermediate.yaml
- develop/namespaced/lambda-s3-event-config-role.yaml
- develop/s3-cloudformation-bucket.yaml
- develop/namespaced/sns-topic.yaml
stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig'
stack_tags: {{ stack_group_config.default_stack_tags }}
parameters:
Namespace: {{ stack_group_config.namespace }}
S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-input-to-intermediate::PrimaryQueueArn"
S3ToGlueDestinationType: "Queue"
S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-event-config-lambda-role::RoleArn"
S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn"
S3ToGlueDestinationType: "Topic"
S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-s3-event-config-role::RoleArn"
S3SourceBucketName: {{ stack_group_config.input_bucket_name }}
9 changes: 9 additions & 0 deletions config/develop/namespaced/sns-topic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
template:
path: sns-topic.yaml
parameters:
S3SourceBucketArn: !stack_output_external recover-dev-input-bucket::BucketArn
dependencies:
- develop/s3-input-bucket.yaml
stack_name: "{{ stack_group_config.namespace }}-sns-input-to-sqs"
stack_tags:
{{ stack_group_config.default_stack_tags }}
8 changes: 4 additions & 4 deletions config/develop/namespaced/sqs-input-to-intermediate.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
template:
path: sqs-queue.yaml
parameters:
MessageRetentionPeriod: "86400"
MessageRetentionPeriod: "1209600"
ReceiveMessageWaitTimeSeconds: "20"
VisibilityTimeout: "120"
S3SourceBucketArn: !stack_output_external recover-dev-input-bucket::BucketArn
SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn"
dependencies:
- develop/s3-input-bucket.yaml
stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-intermediate'
- develop/namespaced/sns-topic.yaml
stack_name: "{{ stack_group_config.namespace }}-sqs-input-to-intermediate"
stack_tags:
{{ stack_group_config.default_stack_tags }}
4 changes: 2 additions & 2 deletions config/develop/namespaced/sqs-input-to-raw.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ parameters:
MessageRetentionPeriod: "1209600"
ReceiveMessageWaitTimeSeconds: "20"
VisibilityTimeout: "120"
S3SourceBucketArn: !stack_output_external recover-dev-raw-bucket::BucketArn
SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn"
dependencies:
- develop/s3-raw-bucket.yaml
- develop/namespaced/sns-topic.yaml
stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-raw'
stack_tags:
{{ stack_group_config.default_stack_tags }}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
template:
path: s3-event-config-lambda-role.yaml
stack_name: "{{ stack_group_config.namespace }}-s3-event-config-lambda-role"
path: lambda-s3-event-config-role.yaml
stack_name: "{{ stack_group_config.namespace }}-lambda-s3-event-config-role"
parameters:
S3SourceBucketName: {{ stack_group_config.input_bucket_name }}
stack_tags:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ template:
artifact_bucket_name: {{ stack_group_config.template_bucket_name }}
artifact_prefix: '{{ stack_group_config.namespace }}/src/lambda'
dependencies:
- develop/namespaced/s3-event-config-lambda-role.yaml
- develop/namespaced/s3-to-glue-lambda.yaml
- develop/namespaced/sqs-input-to-intermediate.yaml
- prod/namespaced/lambda-s3-event-config-role.yaml
- prod/s3-cloudformation-bucket.yaml
- prod/namespaced/sns-topic.yaml
stack_name: '{{ stack_group_config.namespace }}-lambda-S3EventConfig'
stack_tags: {{ stack_group_config.default_stack_tags }}
parameters:
Namespace: {{ stack_group_config.namespace }}
S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sqs-input-to-intermediate::PrimaryQueueArn"
S3ToGlueDestinationType: "Queue"
S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-s3-event-config-lambda-role::RoleArn"
S3ToGlueDestinationArn: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn"
S3ToGlueDestinationType: "Topic"
S3EventConfigRoleArn: !stack_output_external "{{ stack_group_config.namespace }}-lambda-s3-event-config-role::RoleArn"
S3SourceBucketName: {{ stack_group_config.input_bucket_name }}
9 changes: 9 additions & 0 deletions config/prod/namespaced/sns-topic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
template:
path: sns-topic.yaml
parameters:
S3SourceBucketArn: !stack_output_external recover-input-bucket::BucketArn
dependencies:
- prod/s3-input-bucket.yaml
stack_name: "{{ stack_group_config.namespace }}-sns-input-to-sqs"
stack_tags:
{{ stack_group_config.default_stack_tags }}
12 changes: 6 additions & 6 deletions config/prod/namespaced/sqs-input-to-intermediate.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
template:
path: sqs-queue.yaml
parameters:
MessageRetentionPeriod: '86400'
ReceiveMessageWaitTimeSeconds: '20'
VisibilityTimeout: '120'
S3SourceBucketArn: !stack_output_external recover-input-bucket::BucketArn
MessageRetentionPeriod: "1209600"
ReceiveMessageWaitTimeSeconds: "20"
VisibilityTimeout: "120"
SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn"
dependencies:
- prod/s3-input-bucket.yaml
stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-intermediate'
- prod/namespaced/sns-topic.yaml
stack_name: "{{ stack_group_config.namespace }}-sqs-input-to-intermediate"
stack_tags:
{{ stack_group_config.default_stack_tags }}
4 changes: 2 additions & 2 deletions config/prod/namespaced/sqs-input-to-raw.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ parameters:
MessageRetentionPeriod: "1209600"
ReceiveMessageWaitTimeSeconds: "20"
VisibilityTimeout: "120"
S3SourceBucketArn: !stack_output_external recover-raw-bucket::BucketArn
SNSTopicSubscription: !stack_output_external "{{ stack_group_config.namespace }}-sns-input-to-sqs::SnsTopicArn"
dependencies:
- prod/s3-raw-bucket.yaml
- prod/namespaced/sns-topic.yaml
stack_name: '{{ stack_group_config.namespace }}-sqs-input-to-raw'
stack_tags:
{{ stack_group_config.default_stack_tags }}
1 change: 1 addition & 0 deletions src/lambda_function/s3_event_config/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Parameters:
Type: String
Description: The S3 Event Config Destination Type
AllowedValues:
- "Topic"
- "Queue"
- "LambdaFunction"

Expand Down
27 changes: 14 additions & 13 deletions src/lambda_function/s3_to_glue/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def get_object_info(s3_event) -> dict:
}
return object_info

def lambda_handler(event, context) -> dict:
def lambda_handler(event, context) -> None:
"""
This main lambda function will be triggered by a SQS event and will
poll the SQS queue for all available S3 event messages. If the
Expand All @@ -125,19 +125,20 @@ def lambda_handler(event, context) -> dict:
Unused by this lambda function.
"""
s3_objects_info = []
for record in event["Records"]:
s3_event_records = json.loads(record["body"])
if is_s3_test_event(s3_event_records):
for sqs_record in event["Records"]:
sns_notification = json.loads(sqs_record["body"])
sns_message = json.loads(sns_notification["Message"])
if is_s3_test_event(sns_message):
logger.info(f"Found AWS default s3:TestEvent. Skipping.")
else:
for s3_event in s3_event_records["Records"]:
object_info = get_object_info(s3_event)
if filter_object_info(object_info) is not None:
s3_objects_info.append(object_info)
else:
logger.info(
f"Object doesn't meet the S3 event rules to be processed. Skipping."
)
return
for s3_event in sns_message["Records"]:
object_info = get_object_info(s3_event)
if filter_object_info(object_info) is not None:
s3_objects_info.append(object_info)
else:
logger.info(
f"Object doesn't meet the S3 event rules to be processed. Skipping."
)
if len(s3_objects_info) > 0:
logger.info(
"Submitting the following files to "
Expand Down
51 changes: 41 additions & 10 deletions src/lambda_function/s3_to_glue/events/generate_test_event.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
This is a utility script that generates a fake S3 event notification and writes
it out as JSON. This test event can be used to test the Lambda which
This is a utility script that generates a fake S3 -> SNS -> SQS event
and writes it out as JSON. This test event can be used to test the Lambda which
triggers the Glue pipeline.

Two types of events can be generated:
Expand Down Expand Up @@ -59,7 +59,11 @@ def read_args() -> argparse.Namespace:

def create_event(bucket: str, key: str, key_prefix: str, key_file: str) -> dict:
"""
Create an SQS event wrapping S3 event notification(s) for testing.
Create an SQS event wrapping a SNS notification of an S3 event notification(s)
for testing.

Each SNS notification will contain a single S3 event, and each SQS event
will contain a single SNS notification.

This function accepts either an S3 object key or an S3 key prefix that will
be included in the test event. If an S3 object key is provided, then the test
Expand Down Expand Up @@ -97,8 +101,11 @@ def create_event(bucket: str, key: str, key_prefix: str, key_file: str) -> dict:
s3_events = [
create_s3_event_record(bucket=bucket, key=k) for k in test_data
]
sns_notifications = [
create_sns_notification(s3_event) for s3_event in s3_events
]
sqs_messages = [
create_sqs_message(s3_event) for s3_event in s3_events
create_sqs_message(sns_notification) for sns_notification in sns_notifications
]
sqs_event = {"Records": sqs_messages}
return sqs_event
Expand Down Expand Up @@ -153,15 +160,15 @@ def create_s3_event_record(bucket: str, key: str) -> dict:
s3_event_record["s3"]["object"]["key"] = key
return s3_event_record

def create_sqs_message(s3_event_record: dict) -> dict:
def create_sqs_message(sns_notification: dict) -> dict:
"""
Create an SQS message wrapper for an individual S3 event notification.
Create an SQS message wrapper around an individual SNS notification.

See `create_s3_event_record` for creating S3 event notifications.
See `create_sns_notification` for creating S3 event notifications.

Args:
s3_event_record (dict): A dictionary formatted as a "Record"
object would be in an S3 event notification
sns_notification (dict): A dictionary formatted as an SNS
notification JSON object would be.

Returns:
dict: A dictionary formatted as an SQS message
Expand All @@ -183,9 +190,33 @@ def create_sqs_message(s3_event_record: dict) -> dict:
"eventSourceARN": "arn:aws:sqs:us-east-1:914833433684:mynamespace-sqs-S3ToLambda-Queue",
"awsRegion": "us-east-1"
}
sqs_event_record["body"] = json.dumps({"Records": [s3_event_record]})
sqs_event_record["body"] = json.dumps(sns_notification)
return sqs_event_record

def create_sns_notification(s3_event_record):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: typing hinting

"""
Create an SNS message wrapper for an individual S3 event notification.

See `create_s3_event_record` for creating S3 event notifications.

Args:
s3_event_record (dict): A dictionary formatted as a "Record"
object would be in an S3 event notification

Returns:
dict: A dictionary formatted as an SQS message
"""
sns_notification = {
"Type": "string",
"MessageId": "string",
"TopicArn": "string",
"Subject": "string",
"Message": "string",
"Timestamp": "string"
}
sns_notification["Message"] = json.dumps({"Records": [s3_event_record]})
return sns_notification

def main() -> None:
args = read_args()
print("Generating mock S3 event...")
Expand Down
45 changes: 45 additions & 0 deletions templates/sns-topic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
AWSTemplateFormatVersion: "2010-09-09"

Description: >
Creates a Standard SNS queue which can be used with S3 event notifications.

Parameters:

S3SourceBucketArn:
Type: String
Description: Arn of the S3 bucket where source data are stored.

Resources:

SnsTopic:
Type: "AWS::SNS::Topic"
Properties:
TopicName: !Sub "${AWS::StackName}-Topic"
FifoTopic: false

SnsTopicPolicy:
Type: AWS::SNS::TopicPolicy
Properties:
PolicyDocument:
Version: "2012-10-17"
Statement:
- Sid: Input S3 bucket event notification to SNS
Effect: Allow
Principal:
Service: s3.amazonaws.com
AWS: !Sub '${AWS::AccountId}'
Action:
- sns:Publish
Resource: !Ref SnsTopic
Condition:
ArnLike:
"aws:SourceArn": !Ref S3SourceBucketArn
Topics:
- !Ref SnsTopic

Outputs:

SnsTopicArn:
Value: !Ref SnsTopic
Export:
Name: !Sub "${AWS::Region}-${AWS::StackName}-SnsTopicArn"
17 changes: 12 additions & 5 deletions templates/sqs-queue.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
AWSTemplateFormatVersion: '2010-09-09'

Description: >
Creates an SQS queue that gets S3 notifications
Creates an SQS queue which subscribes to an SNS topic

Parameters:

Expand All @@ -22,9 +22,9 @@ Parameters:
How long our lambda has to submit the messages to Glue and
delete the message from the SQS queue

S3SourceBucketArn:
SNSTopicSubscription:
Type: String
Description: Arn of the S3 bucket where source data are stored.
Description: Arn of the SNS topic where S3 event notifications are published.

Resources:

Expand All @@ -49,17 +49,24 @@ Resources:
- Sid: Send_Permission
Effect: Allow
Principal:
Service: s3.amazonaws.com
Service: sns.amazonaws.com
AWS: !Sub '${AWS::AccountId}'
Action:
- SQS:SendMessage
Resource: !GetAtt PrimaryQueue.Arn
Condition:
ArnLike:
"aws:SourceArn": !Ref S3SourceBucketArn
"aws:SourceArn": !Ref SNSTopicSubscription
Queues:
- !Ref PrimaryQueue

SnsSubscription:
Type: "AWS::SNS::Subscription"
Properties:
Protocol: sqs
Endpoint: !GetAtt PrimaryQueue.Arn
TopicArn: !Ref SNSTopicSubscription

DeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
Expand Down
Loading
Loading