Skip to content

Commit

Permalink
Upload compressed JSON in S3 to JSON
Browse files Browse the repository at this point in the history
  • Loading branch information
philerooski committed Aug 20, 2024
1 parent 1f47dfc commit a2166ec
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 107 deletions.
175 changes: 100 additions & 75 deletions src/glue/jobs/s3_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
(for example: json/dataset=EnrolledParticipants/) and only contains
files which share a similar schema.
"""

import datetime
import gzip
import io
import json
import logging
import os
import sys
import typing
import zipfile
from typing import Iterator, Optional

import boto3
import ecs_logging
Expand Down Expand Up @@ -409,9 +412,12 @@ def _transform_garmin_data_types(
return json_obj


def get_output_filename(metadata: dict, part_number: int) -> str:
def get_file_identifier(metadata: dict) -> str:
"""
Get a formatted file name.
Get an identifier for a file from the source file's metadata.
This function effectively reverse-engineers the process in `get_metadata`,
enabling us to reconstuct the source file's identifier.
The format depends on which metadata fields we have available to us.
Metadata fields we can potentially use:
Expand All @@ -428,33 +434,31 @@ def get_output_filename(metadata: dict, part_number: int) -> str:
str: A formatted file name.
"""
if metadata["type"] in DATA_TYPES_WITH_SUBTYPE:
output_fname = "{}_{}_{}-{}.part{}.ndjson".format(
identifier = "{}_{}_{}-{}".format(
metadata["type"],
metadata["subtype"],
metadata["start_date"].strftime("%Y%m%d"),
metadata["end_date"].strftime("%Y%m%d"),
part_number,
)
elif metadata["start_date"] is None:
output_fname = "{}_{}.part{}.ndjson".format(
metadata["type"], metadata["end_date"].strftime("%Y%m%d"), part_number
identifier = "{}_{}".format(
metadata["type"], metadata["end_date"].strftime("%Y%m%d")
)
else:
output_fname = "{}_{}-{}.part{}.ndjson".format(
identifier = "{}_{}-{}".format(
metadata["type"],
metadata["start_date"].strftime("%Y%m%d"),
metadata["end_date"].strftime("%Y%m%d"),
part_number,
)
return output_fname
return identifier


def transform_block(
input_json: typing.IO,
metadata: dict,
logger_context: dict = {},
block_size: int = 10000,
):
) -> Iterator[list]:
"""
A generator function which yields a block of transformed JSON records.
Expand Down Expand Up @@ -523,73 +527,88 @@ def write_file_to_json_dataset(
list: A list of files uploaded to S3
"""
# Configuration related to where we write our part files
part_dir = os.path.join(
output_dir = os.path.join(
f"dataset={metadata['type']}", f"cohort={metadata['cohort']}"
)
os.makedirs(part_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
part_number = 0
output_path = get_part_path(
metadata=metadata, part_number=part_number, part_dir=part_dir, touch=True
part_output_path = get_output_path(
metadata=metadata, part_number=part_number, output_dir=output_dir, touch=True
)
compressed_output_path = get_output_path(
metadata=metadata, part_number=None, output_dir=output_dir, touch=True
)

# We will attach file metadata to the uploaded S3 object
s3_metadata = _derive_str_metadata(metadata=metadata)
uploaded_files = []
with z.open(json_path, "r") as input_json:
current_output_path = output_path
line_count = 0
for transformed_block in transform_block(
input_json=input_json, metadata=metadata, logger_context=logger_context
):
current_file_size = os.path.getsize(current_output_path)
if current_file_size > file_size_limit:
# Upload completed part file
_upload_file_to_json_dataset(
file_path=current_output_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
)
uploaded_files.append(current_output_path)

# Update output path to next part
part_number += 1
current_output_path = get_part_path(
metadata=metadata,
part_number=part_number,
part_dir=part_dir,
touch=True,
)
with open(current_output_path, "a") as f_out:
# Write block data to part file
with gzip.open(
compressed_output_path, "wt", encoding="utf-8"
) as f_compressed_out:
current_part_path = part_output_path
line_count = 0
for transformed_block in transform_block(
input_json=input_json, metadata=metadata, logger_context=logger_context
):
current_file_size = os.path.getsize(current_part_path)
if current_file_size > file_size_limit:
# Upload completed part file
_upload_file_to_json_dataset(
file_path=current_part_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
)
uploaded_files.append(current_part_path)
# Update output path to next part
part_number += 1
current_part_path = get_output_path(
metadata=metadata,
part_number=part_number,
output_dir=output_dir,
touch=True,
)
# Write block data to both part file and compressed file
for transformed_record in transformed_block:
line_count += 1
f_out.write("{}\n".format(json.dumps(transformed_record)))
# Upload final block
_upload_file_to_json_dataset(
file_path=current_output_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
)
uploaded_files.append(current_output_path)
logger_extra = dict(
merge_dicts(
logger_context,
{
"file.LineCount": line_count,
"event.kind": "metric",
"event.category": ["file"],
"event.type": ["info", "creation"],
"event.action": "list-file-properties",
"labels": {
k: v.isoformat() if isinstance(v, datetime.datetime) else v
for k, v in metadata.items()
record_with_newline = "{}\n".format(json.dumps(transformed_record))
with open(current_part_path, "a") as f_out:
f_out.write(record_with_newline)
f_compressed_out.write(record_with_newline)
# Upload final block
_upload_file_to_json_dataset(
file_path=current_part_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
)
uploaded_files.append(current_part_path)
logger_extra = dict(
merge_dicts(
logger_context,
{
"file.LineCount": line_count,
"event.kind": "metric",
"event.category": ["file"],
"event.type": ["info", "creation"],
"event.action": "list-file-properties",
"labels": {
k: v.isoformat() if isinstance(v, datetime.datetime) else v
for k, v in metadata.items()
},
},
},
)
)
)
logger.info("Output file attributes", extra=logger_extra)
logger.info("Output file attributes", extra=logger_extra)
# Upload compressed file
_upload_file_to_json_dataset(
file_path=compressed_output_path,
s3_metadata=s3_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=delete_upon_successful_upload,
)
uploaded_files.append(compressed_output_path)
return uploaded_files


Expand Down Expand Up @@ -701,23 +720,25 @@ def merge_dicts(x: dict, y: dict) -> typing.Generator:
yield (key, y[key])


def get_part_path(
def get_output_path(
metadata: dict,
part_number: int,
part_dir: str,
output_dir: str,
part_number: Optional[int],
touch: bool,
):
"""
A helper function for `write_file_to_json_dataset`
This function returns a part path where we can write data to. Optionally,
create empty file at path.
This function returns a file path where we can write data to. If a part
number is provided, we assume that this is a part file. Otherwise, we
assume that this is a gzip file.
Args:
metadata (dict): Metadata about the source JSON file.
part_number (int): Which part we need a file name for.
part_dir (str): The directory to which we write the part file.
touch (bool): Whether to create an empty file at the part path
output_dir (str): The directory to which we write the file.
part_number (int): Which part we need a file name for. Set to None if
this is the path to a gzip file.
touch (bool): Whether to create an empty file at the path
Returns:
str: A new part path
Expand All @@ -726,10 +747,14 @@ def get_part_path(
FileExistsError: If touch is True and a file already exists at
the part path.
"""
output_filename = get_output_filename(metadata=metadata, part_number=part_number)
output_path = os.path.join(part_dir, output_filename)
file_identifier = get_file_identifier(metadata=metadata)
if part_number is not None:
output_filename = f"{file_identifier}.part{part_number}.ndjson"
else:
output_filename = f"{file_identifier}.ndjson.gz"
output_path = os.path.join(output_dir, output_filename)
if touch:
os.makedirs(part_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
with open(output_path, "x") as initial_file:
# create file
pass
Expand Down
66 changes: 34 additions & 32 deletions tests/test_s3_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,31 +306,19 @@ def test_transform_block_non_empty_file_all_blocks(self, s3_obj, sample_metadata
# Should be 12
assert counter == record_count

def test_get_output_filename_generic(self, sample_metadata):
output_filename = s3_to_json.get_output_filename(
metadata=sample_metadata, part_number=0
)
assert (
output_filename
== f"{sample_metadata['type']}_20220112-20230114.part0.ndjson"
)
def test_get_file_identifier_generic(self, sample_metadata):
file_identifier = s3_to_json.get_file_identifier(metadata=sample_metadata)
assert file_identifier == f"{sample_metadata['type']}_20220112-20230114"

def test_get_output_filename_no_start_date(self, sample_metadata):
def test_get_file_identifier_no_start_date(self, sample_metadata):
sample_metadata["start_date"] = None
output_filename = s3_to_json.get_output_filename(
metadata=sample_metadata, part_number=0
)
assert output_filename == f"{sample_metadata['type']}_20230114.part0.ndjson"
file_identifier = s3_to_json.get_file_identifier(metadata=sample_metadata)
assert file_identifier == f"{sample_metadata['type']}_20230114"

def test_get_output_filename_subtype(self, sample_metadata):
def test_get_file_identifier_subtype(self, sample_metadata):
sample_metadata["type"] = "HealthKitV2Samples"
output_filename = s3_to_json.get_output_filename(
metadata=sample_metadata, part_number=0
)
assert (
output_filename
== "HealthKitV2Samples_FakeSubtype_20220112-20230114.part0.ndjson"
)
file_identifier = s3_to_json.get_file_identifier(metadata=sample_metadata)
assert file_identifier == "HealthKitV2Samples_FakeSubtype_20220112-20230114"

def test_upload_file_to_json_dataset_delete_local_copy(
self, namespace, sample_metadata, monkeypatch, shared_datadir
Expand Down Expand Up @@ -394,7 +382,9 @@ def test_upload_file_to_json_dataset_s3_key(
def test_write_file_to_json_dataset_delete_local_copy(
self, s3_obj, sample_metadata, namespace, monkeypatch
):
sample_metadata["type"] = "HealthKitV2Samples"
sample_metadata["type"] = "FitbitDevices"
sample_metadata["subtype"] = None
sample_metadata["start_date"] = None
monkeypatch.setattr("boto3.client", lambda x: MockAWSClient())
workflow_run_properties = {
"namespace": namespace,
Expand All @@ -404,7 +394,7 @@ def test_write_file_to_json_dataset_delete_local_copy(
with zipfile.ZipFile(io.BytesIO(s3_obj["Body"])) as z:
output_files = s3_to_json.write_file_to_json_dataset(
z=z,
json_path="HealthKitV2Samples_Weight_20230112-20230114.json",
json_path="FitbitDevices_20230114.json",
metadata=sample_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=True,
Expand Down Expand Up @@ -474,7 +464,8 @@ def test_write_file_to_json_dataset_multiple_parts(
file_size_limit=1e6,
)
output_line_count = 0
for output_file in output_files:
# We only want to examine part files, so don't include the compressed file
for output_file in output_files[:-1]:
with open(output_file, "r") as f_out:
for json_line in f_out:
output_line_count += 1
Expand All @@ -487,24 +478,35 @@ def test_derive_str_metadata(self, sample_metadata):
assert isinstance(str_metadata["start_date"], str)
assert isinstance(str_metadata["end_date"], str)

def test_get_part_path_no_touch(self, sample_metadata):
def test_get_output_path_no_touch(self, sample_metadata):
sample_metadata["start_date"] = None
part_path = s3_to_json.get_part_path(
output_path = s3_to_json.get_output_path(
metadata=sample_metadata,
part_number=0,
part_dir=sample_metadata["type"],
output_dir=sample_metadata["type"],
touch=False,
)
assert part_path == "FitbitDevices/FitbitDevices_20230114.part0.ndjson"
assert output_path == "FitbitDevices/FitbitDevices_20230114.part0.ndjson"

def test_get_part_path_touch(self, sample_metadata):
part_path = s3_to_json.get_part_path(
def test_get_output_path_touch(self, sample_metadata):
output_path = s3_to_json.get_output_path(
metadata=sample_metadata,
part_number=0,
part_dir=sample_metadata["type"],
output_dir=sample_metadata["type"],
touch=True,
)
assert os.path.exists(output_path)
shutil.rmtree(sample_metadata["type"], ignore_errors=True)

def test_get_output_path_gzip(self, sample_metadata):
sample_metadata["start_date"] = None
output_path = s3_to_json.get_output_path(
metadata=sample_metadata,
part_number=None,
output_dir=sample_metadata["type"],
touch=True,
)
assert os.path.exists(part_path)
assert output_path == "FitbitDevices/FitbitDevices_20230114.ndjson.gz"
shutil.rmtree(sample_metadata["type"], ignore_errors=True)

def test_get_metadata_startdate_enddate(self, json_file_basenames_dict):
Expand Down

0 comments on commit a2166ec

Please sign in to comment.