-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
yield compressed data up to part threshold
- Loading branch information
1 parent
0aa8f6a
commit 9cd7ef9
Showing
1 changed file
with
89 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,95 @@ | ||
import gzip | ||
import io | ||
import json | ||
import logging | ||
import zipfile | ||
|
||
import boto3 | ||
|
||
logger = logging.getLogger() | ||
logger.setLevel(logging.INFO) | ||
|
||
def lambda_handler(event, context): | ||
|
||
def lambda_handler(event: dict, context: dict): | ||
""" | ||
Entrypoint for this Lambda. | ||
Args: | ||
event (dict) An SQS message from the dispatch-to-raw SQS queue. | ||
context (dict): Information about the runtime environment and | ||
the current invocation | ||
""" | ||
s3_client = boto3.client("s3") | ||
return {"statusCode": 200, "body": json.dumps("Hello from Lambda!")} | ||
main(event=event, s3_client=s3_client) | ||
|
||
|
||
def upload_part(gzip_stream: gzip.GzipFile): | ||
pass | ||
|
||
|
||
def yield_compressed_data(object_stream: io.BytesIO, path: str, part_threshold=None): | ||
if part_threshold is None: | ||
part_threshold = 50 * 1024 * 1024 # 50 MB | ||
with zipfile.ZipFile(object_stream, "r") as zip_stream: | ||
with zip_stream.open(path, "r") as json_stream: | ||
compressed_data = io.BytesIO() | ||
with gzip.GzipFile(fileobj=compressed_data, mode="wb") as gzip_file: | ||
# Read/write the JSON file in 1 MB chunks | ||
for chunk in iter(lambda: json_stream.read(1024 * 1024), b""): | ||
gzip_file.write(chunk) | ||
if compressed_data.tell() >= part_threshold: | ||
yield compressed_data | ||
compressed_data.seek(0) | ||
compressed_data.truncate(0) | ||
yield compressed_data | ||
|
||
|
||
def main(event: dict, s3_client: boto3.client): | ||
""" | ||
This function should be invoked by `lambda_handler`. | ||
Args: | ||
event (dict): The dispatch-to-raw SQS event. | ||
The payload from the dispatch Lambda is in .["Records"][0]["body"]["Message"] | ||
and contains the keys: | ||
* Bucket - The name of the S3 bucket | ||
* Key - The S3 key | ||
* Path - The path within the archive which identifies this file | ||
* FileSize - The uncompressed size in bytes of this file | ||
context (dict): Information about the runtime environment and | ||
the current invocation | ||
s3_client (botocore.client.S3): An S3 client | ||
Returns: | ||
(None): Logs and uploads an object to the raw S3 bucket. | ||
""" | ||
for sqs_record in event["Records"]: | ||
sns_notification = json.loads(sqs_record["body"]) | ||
sns_message = json.loads(sns_notification["Message"]) | ||
logger.info(f"Received SNS message: {sns_message}") | ||
# Step 1: Stream the zip file from S3 | ||
with io.BytesIO() as object_stream: | ||
s3_client.download_fileobj( | ||
Bucket=sns_message["Bucket"], | ||
Key=sns_message["Key"], | ||
Fileobj=object_stream, | ||
) | ||
object_stream.seek(0) | ||
for compressed_data in yield_compressed_data( | ||
object_stream=object_stream, | ||
path=sns_message["Path"], | ||
part_threshold=102, | ||
): | ||
logger.info(compressed_data.tell()) | ||
object_stream.close() | ||
|
||
# # Define the S3 key for the gzipped file (you can customize this) | ||
# gzip_key = f"{s3_details['Path']}.gz" | ||
|
||
# # Step 4: Upload the gzipped JSON file to S3 | ||
# s3_client.upload_fileobj( | ||
# compressed_data, s3_details["Bucket"], gzip_key | ||
# ) | ||
|
||
# print(f"Uploaded gzipped file to s3://{s3_details['Bucket']}/{gzip_key}") |