Skip to content

Commit

Permalink
[ETL-693] Add function for parsing data type to raw-sync lambda (#144)
Browse files Browse the repository at this point in the history
* add function for parsing data type to raw-sync lambda

* add function for constructing raw key
  • Loading branch information
philerooski authored Oct 2, 2024
1 parent 2a766e2 commit d711a1e
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 13 deletions.
69 changes: 56 additions & 13 deletions src/lambda_function/raw_sync/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def lambda_handler(event: dict, context: dict) -> None:
)


def append_s3_key(key: str, key_format: str, result: dict) -> None:
def append_s3_key(key: str, key_format: str, result: defaultdict) -> defaultdict:
"""
Organizes an S3 object key by appending it to the appropriate entry in the result dictionary
Expand All @@ -68,7 +68,7 @@ def append_s3_key(key: str, key_format: str, result: dict) -> None:
it is structured as result[cohort].
Returns:
None
defaultdict: The `result` dict with `key` added.
"""
result = result.copy() # shallow copy safe for append
if not key.endswith("/"): # Ignore keys that represent "folders"
Expand Down Expand Up @@ -97,7 +97,7 @@ def append_s3_key(key: str, key_format: str, result: dict) -> None:

def list_s3_objects(
s3_client: boto3.client, bucket: str, key_prefix: str, key_format: str
) -> dict:
) -> defaultdict:
"""
Recursively list all objects under an S3 bucket and key prefix which
conform to a specified format.
Expand Down Expand Up @@ -159,6 +159,8 @@ def list_s3_objects(
result = defaultdict(lambda: defaultdict(list))
elif key_format == "input":
result = defaultdict(list)
else:
raise ValueError("Argument `key_format` must be either 'input' or 'raw'.")
for response in response_iterator:
for obj in response.get("Contents", []):
key = obj["Key"]
Expand All @@ -174,7 +176,7 @@ def match_corresponding_raw_object(
data_type: str,
cohort: str,
expected_key: str,
raw_keys: list[dict],
raw_keys: defaultdict,
) -> Optional[str]:
"""
Find a matching raw object for a given export file and filename.
Expand Down Expand Up @@ -232,7 +234,7 @@ def parse_content_range(content_range: str) -> tuple[int, ...]:
return range_start, range_end, total_size


def unpack_eocd_fields(body: bytes, eocd_offset: int) -> list[int]:
def unpack_eocd_fields(body: bytes, eocd_offset: int) -> tuple[int, int]:
"""
Extract the End of Central Directory (EOCD) fields from the given body.
Expand Down Expand Up @@ -305,7 +307,7 @@ def determine_eocd_offset(body: bytes, content_range: str) -> int:

def list_files_in_archive(
s3_client: boto3.client, bucket: str, key: str, range_size=64 * 1024
) -> list[str]:
) -> list[dict]:
"""
Recursively lists files in a ZIP archive stored as an S3 object.
Expand Down Expand Up @@ -464,6 +466,46 @@ def publish_to_sns(
sns_client.publish(TopicArn=sns_arn, Message=json.dumps(file_info))


def get_data_type_from_path(path: str) -> str:
"""
Give the path of an export file, return its associated data type
Args:
path (str): The path of an export file
Returns:
data_type (str): The data type
"""
basename = os.path.basename(path)
data_type = basename.split("_")[0]
if "Deleted" in basename:
data_type = f"{data_type}_Deleted"
return data_type


def get_expected_raw_key(namespace: str, data_type: str, cohort: str, path: str) -> str:
"""Get the expected raw S3 key
Get the expected raw S3 key of a raw bucket object corresponding to the given
input bucket object.
Args:
namespace (str): The namespace of the corresponding input object.
data_type (str): The data type of the corresponding input object.
cohort (str): The cohort of the corresponding input object.
path (str): The path of the file relative to the zip archive (export).
Returns:
str: The expected S3 key of the corresponding raw object.
"""
file_identifier = os.path.basename(path).split(".")[0]
expected_key = (
f"{namespace}/json/dataset={data_type}"
f"/cohort={cohort}/{file_identifier}.ndjson.gz"
)
return expected_key


def main(
event: dict,
s3_client: boto3.client,
Expand Down Expand Up @@ -499,23 +541,24 @@ def main(
f"Checking corresponding raw object for {filename} "
f"from s3://{input_bucket}/{export_key}"
)
data_type = filename.split("_")[0]
file_identifier = filename.split(".")[0]
expected_key = (
f"{namespace}/json/dataset={data_type}"
f"/cohort={cohort}/{file_identifier}.ndjson.gz"
data_type = get_data_type_from_path(path=filename)
expected_raw_key = get_expected_raw_key(
namespace=namespace,
data_type=data_type,
cohort=cohort,
path=filename,
)
corresponding_raw_object = match_corresponding_raw_object(
data_type=data_type,
cohort=cohort,
expected_key=expected_key,
expected_key=expected_raw_key,
raw_keys=raw_keys,
)
if corresponding_raw_object is None:
logger.info(
f"Did not find corresponding raw object for {filename} from "
f"s3://{input_bucket}/{export_key} at "
f"s3://{raw_bucket}/{expected_key}"
f"s3://{raw_bucket}/{expected_raw_key}"
)
publish_to_sns(
bucket=input_bucket,
Expand Down
51 changes: 51 additions & 0 deletions tests/test_lambda_raw_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,3 +612,54 @@ def test_publish_to_sns_with_sqs_subscription():
sqs_client.delete_message(
QueueUrl=sqs_url, ReceiptHandle=messages["Messages"][0]["ReceiptHandle"]
)


def test_get_data_type_from_path_simple():
"""Test a path with no subtype or 'Deleted' component"""
path = "path/to/FitbitIntradayCombined_20241111-20241112.json"
data_type = app.get_data_type_from_path(path=path)
assert data_type == "FitbitIntradayCombined"


def test_get_data_type_from_path_subtype():
"""Test a path with a subtype"""
path = "path/to/HealthKitV2Samples_AppleStandTime_20241111-20241112.json"
data_type = app.get_data_type_from_path(path=path)
assert data_type == "HealthKitV2Samples"


def test_get_data_type_from_path_deleted():
"""Test a path with a subtype and a 'Deleted' component"""
path = "path/to/HealthKitV2Samples_AppleStandTime_Deleted_20241111-20241112.json"
data_type = app.get_data_type_from_path(path=path)
assert data_type == "HealthKitV2Samples_Deleted"


import os


def test_get_expected_raw_key_case1():
namespace = "test-namespace"
data_type = "test-data-type"
cohort = "test-cohort"
path = "path/to/FitbitIntradayCombined_20241111-20241112.json"
expected_key = f"{namespace}/json/dataset={data_type}/cohort={cohort}/FitbitIntradayCombined_20241111-20241112.ndjson.gz"
assert app.get_expected_raw_key(namespace, data_type, cohort, path) == expected_key


def test_get_expected_raw_key_case2():
namespace = "test-namespace"
data_type = "test-data-type"
cohort = "test-cohort"
path = "path/to/HealthKitV2Samples_AppleStandTime_20241111-20241112.json"
expected_key = f"{namespace}/json/dataset={data_type}/cohort={cohort}/HealthKitV2Samples_AppleStandTime_20241111-20241112.ndjson.gz"
assert app.get_expected_raw_key(namespace, data_type, cohort, path) == expected_key


def test_get_expected_raw_key_case3():
namespace = "test-namespace"
data_type = "test-data-type"
cohort = "test-cohort"
path = "path/to/HealthKitV2Samples_AppleStandTime_Deleted_20241111-20241112.json"
expected_key = f"{namespace}/json/dataset={data_type}/cohort={cohort}/HealthKitV2Samples_AppleStandTime_Deleted_20241111-20241112.ndjson.gz"
assert app.get_expected_raw_key(namespace, data_type, cohort, path) == expected_key

0 comments on commit d711a1e

Please sign in to comment.