Skip to content

Commit

Permalink
Merge pull request #70 from Sage-Bionetworks/etl-519
Browse files Browse the repository at this point in the history
[ETL-519] Limit file size of NDJSON written to JSON datasets
  • Loading branch information
philerooski authored Aug 17, 2023
2 parents f1d9bbf + bd49221 commit ad5d307
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 47 deletions.
140 changes: 101 additions & 39 deletions src/glue/jobs/s3_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def transform_json(
obj[sub_prop_name] = array_of_obj
return json_obj

def get_output_filename(metadata: dict) -> str:
def get_output_filename(metadata: dict, part_number: int) -> str:
"""
Get a formatted file name.
Expand All @@ -238,36 +238,39 @@ def get_output_filename(metadata: dict) -> str:
Args:
metadata (dict): Metadata derived from the file basename.
part_number (int): Which part we need a file name for.
Return:
str: A formatted file name.
"""
if metadata["type"] in DATA_TYPES_WITH_SUBTYPE:
output_fname = "{}_{}_{}-{}.ndjson".format(
output_fname = "{}_{}_{}-{}.part{}.ndjson".format(
metadata["type"],
metadata["subtype"],
metadata["start_date"].strftime("%Y%m%d"),
metadata["end_date"].strftime("%Y%m%d")
metadata["end_date"].strftime("%Y%m%d"),
part_number
)
elif metadata["start_date"] is None:
output_fname = "{}_{}.ndjson".format(
output_fname = "{}_{}.part{}.ndjson".format(
metadata["type"],
metadata["end_date"].strftime("%Y%m%d")
metadata["end_date"].strftime("%Y%m%d"),
part_number
)
else:
output_fname = "{}_{}-{}.ndjson".format(
output_fname = "{}_{}-{}.part{}.ndjson".format(
metadata["type"],
metadata["start_date"].strftime("%Y%m%d"),
metadata["end_date"].strftime("%Y%m%d")
metadata["end_date"].strftime("%Y%m%d"),
part_number
)
return output_fname

def transform_block(
input_json: typing.IO,
dataset_identifier: str,
metadata: dict,
block_size: int=10000
):
block_size: int=10000):
"""
A generator function which yields a block of transformed JSON records.
Expand Down Expand Up @@ -307,13 +310,16 @@ def write_file_to_json_dataset(
dataset_identifier: str,
metadata: dict,
workflow_run_properties: dict,
delete_upon_successful_upload: bool=True) -> str:
delete_upon_successful_upload: bool=True,
file_size_limit: float=1e8) -> list:
"""
Write a JSON from a zipfile to a JSON dataset.
Write JSON from a zipfile to a JSON dataset.
Metadata fields derived from the file basename are inserted as top-level fields,
other fields are transformed (see `transform_json`). The resulting NDJSON is written
to a JSON dataset in S3.
other fields are transformed (see `transform_json`). The resulting NDJSON(s) are written
to a JSON dataset in S3. Depending on the `file_size_limit`, data from a single
JSON may be written to one or more NDJSON in the JSON dataset as "part" files.
See ETL-519 for more information.
Args:
z (zipfile.Zipfile): The zip archive as provided by the data provider.
Expand All @@ -324,9 +330,11 @@ def write_file_to_json_dataset(
delete_upon_successful_upload (bool): Whether to delete the local
copy of the JSON file after uploading to S3. Set to False
during testing.
file_size_limit (float): The approximate maximum file size in bytes
before writing to another part file.
Returns:
output_path (str) The local path the file was written to.
list: A list of files uploaded to S3
"""
s3_client = boto3.client("s3")
os.makedirs(dataset_identifier, exist_ok=True)
Expand All @@ -336,35 +344,89 @@ def write_file_to_json_dataset(
else:
s3_metadata["start_date"] = metadata["start_date"].isoformat()
s3_metadata["end_date"] = metadata["end_date"].isoformat()
output_filename = get_output_filename(metadata=metadata)
output_path = os.path.join(dataset_identifier, output_filename)
data = []
part_number = 0
output_path = get_part_path(
metadata=metadata,
part_number=part_number,
dataset_identifier=dataset_identifier,
touch=True
)
with z.open(json_path, "r") as input_json:
with open(output_path, "w+") as f_out:
for transformed_block in transform_block(
input_json=input_json,
dataset_identifier=dataset_identifier,
metadata=metadata
):
current_output_path = output_path
for transformed_block in transform_block(
input_json=input_json,
dataset_identifier=dataset_identifier,
metadata=metadata
):
current_file_size = os.path.getsize(current_output_path)
if current_file_size > file_size_limit:
part_number += 1
print(f"!!! File is too large, creating new part {part_number}")
current_output_path = get_part_path(
metadata=metadata,
part_number=part_number,
dataset_identifier=dataset_identifier,
touch=True
)
with open(current_output_path, "a") as f_out:
for transformed_record in transformed_block:
f_out.write("{}\n".format(json.dumps(transformed_record)))
s3_output_key = os.path.join(
workflow_run_properties["namespace"],
workflow_run_properties["json_prefix"],
f"dataset={dataset_identifier}",
output_filename
)
logger.debug("Output Key: %s", s3_output_key)
with open(output_path, "rb") as f_in:
response = s3_client.put_object(
Body = f_in,
Bucket = workflow_run_properties["json_bucket"],
Key = s3_output_key,
Metadata = s3_metadata
uploaded_files = []
for part_file in os.listdir(dataset_identifier):
output_path = os.path.join(dataset_identifier, part_file)
s3_output_key = os.path.join(
workflow_run_properties["namespace"],
workflow_run_properties["json_prefix"],
f"dataset={dataset_identifier}",
part_file
)
logger.debug(
"Uploading %s to %s",
output_path,
s3_output_key
)
logger.debug("S3 Put object response: %s", json.dumps(response))
if delete_upon_successful_upload:
os.remove(output_path)
with open(output_path, "rb") as f_in:
response = s3_client.put_object(
Body = f_in,
Bucket = workflow_run_properties["json_bucket"],
Key = s3_output_key,
Metadata = s3_metadata
)
uploaded_files.append(output_path)
logger.debug("S3 Put object response: %s", json.dumps(response))
if delete_upon_successful_upload:
os.remove(output_path)
return uploaded_files

def get_part_path(metadata: dict, part_number: int, dataset_identifier: str, touch: bool):
"""
A helper function for `write_file_to_json_dataset`
This function returns a part path where we can write data to. Optionally,
create empty file at path.
Args:
metadata (dict): Metadata derived from the file basename.
part_number (int): Which part we need a file name for.
dataset_identifier (str): The data type of `json_path`.
touch (bool): Whether to create an empty file at the part path
Returns:
str: A new part path
Raises:
FileExistsError: If touch is True and a file already exists at
the part path.
"""
output_filename = get_output_filename(
metadata=metadata,
part_number=part_number
)
output_path = os.path.join(dataset_identifier, output_filename)
if touch:
with open(output_path, "x") as initial_file:
# create file
pass
return output_path

def get_metadata(basename: str) -> dict:
Expand Down
90 changes: 82 additions & 8 deletions tests/test_s3_to_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,17 +419,23 @@ def test_get_output_filename_generic(self):
"start_date": datetime.datetime(2022, 1, 12, 0, 0),
"end_date": datetime.datetime(2023, 1, 14, 0, 0),
}
output_filename = s3_to_json.get_output_filename(metadata=sample_metadata)
assert output_filename == "FitbitDevices_20220112-20230114.ndjson"
output_filename = s3_to_json.get_output_filename(
metadata=sample_metadata,
part_number=0
)
assert output_filename == "FitbitDevices_20220112-20230114.part0.ndjson"

def test_get_output_filename_no_start_date(self):
sample_metadata = {
"type": "EnrolledParticipants",
"start_date": None,
"end_date": datetime.datetime(2023, 1, 14, 0, 0),
}
output_filename = s3_to_json.get_output_filename(metadata=sample_metadata)
assert output_filename == "EnrolledParticipants_20230114.ndjson"
output_filename = s3_to_json.get_output_filename(
metadata=sample_metadata,
part_number=0
)
assert output_filename == "EnrolledParticipants_20230114.part0.ndjson"

def test_get_output_filename_subtype(self):
sample_metadata = {
Expand All @@ -438,8 +444,11 @@ def test_get_output_filename_subtype(self):
"start_date": datetime.datetime(2022, 1, 12, 0, 0),
"end_date": datetime.datetime(2023, 1, 14, 0, 0),
}
output_filename = s3_to_json.get_output_filename(metadata=sample_metadata)
assert output_filename == "HealthKitV2Samples_Weight_20220112-20230114.ndjson"
output_filename = s3_to_json.get_output_filename(
metadata=sample_metadata,
part_number=0
)
assert output_filename == "HealthKitV2Samples_Weight_20220112-20230114.part0.ndjson"

def test_write_file_to_json_dataset_delete_local_copy(self, s3_obj, namespace, monkeypatch):
monkeypatch.setattr("boto3.client", lambda x: MockAWSClient())
Expand All @@ -457,14 +466,15 @@ def test_write_file_to_json_dataset_delete_local_copy(self, s3_obj, namespace, m
"json_bucket": "json-bucket",
}
with zipfile.ZipFile(io.BytesIO(s3_obj["Body"])) as z:
output_file = s3_to_json.write_file_to_json_dataset(
output_files = s3_to_json.write_file_to_json_dataset(
z=z,
json_path="HealthKitV2Samples_Weight_20230112-20230114.json",
dataset_identifier="HealthKitV2Samples",
metadata=sample_metadata["Metadata"],
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=True,
)
output_file = output_files[0]

assert not os.path.exists(output_file)

Expand All @@ -486,14 +496,15 @@ def test_write_file_to_json_dataset_record_consistency(self, s3_obj, namespace,
with z.open("FitbitDevices_20230114.json", "r") as fitbit_data:
input_line_cnt = len(fitbit_data.readlines())

output_file = s3_to_json.write_file_to_json_dataset(
output_files = s3_to_json.write_file_to_json_dataset(
z=z,
json_path="FitbitDevices_20230114.json",
dataset_identifier="FitbitDevices",
metadata=sample_metadata["Metadata"],
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=False,
)
output_file = output_files[0]

with open(output_file, "r") as f_out:
output_line_cnt = 0
Expand All @@ -507,6 +518,69 @@ def test_write_file_to_json_dataset_record_consistency(self, s3_obj, namespace,
output_line_cnt += 1
# gets line count of input json and exported json and checks the two
assert input_line_cnt == output_line_cnt
os.remove(output_file)

def test_write_file_to_json_dataset_multiple_parts(self, s3_obj, namespace, monkeypatch):
monkeypatch.setattr("boto3.client", lambda x: MockAWSClient())
sample_metadata = {
"type": "FitbitIntradayCombined",
"start_date": datetime.datetime(2022, 1, 12, 0, 0),
"end_date": datetime.datetime(2023, 1, 14, 0, 0)
}
workflow_run_properties = {
"namespace": namespace,
"json_prefix": "raw-json",
"json_bucket": "json-bucket",
}
with zipfile.ZipFile(io.BytesIO(s3_obj["Body"])) as z:
json_path = "FitbitIntradayCombined_20230112-20230114.json"
with z.open(json_path, "r") as fitbit_data:
input_line_cnt = len(fitbit_data.readlines())
output_files = s3_to_json.write_file_to_json_dataset(
z=z,
json_path=json_path,
dataset_identifier=sample_metadata["type"],
metadata=sample_metadata,
workflow_run_properties=workflow_run_properties,
delete_upon_successful_upload=False,
file_size_limit=1e6
)
output_line_count = 0
for output_file in output_files:
with open(output_file, "r") as f_out:
for json_line in f_out:
output_line_count += 1
os.remove(output_file)
assert input_line_cnt == output_line_count

def test_get_part_path_no_touch(self):
sample_metadata = {
"type": "FitbitDevices",
"start_date": None,
"end_date": datetime.datetime(2023, 1, 14, 0, 0)
}
part_path = s3_to_json.get_part_path(
metadata=sample_metadata,
part_number=0,
dataset_identifier=sample_metadata["type"],
touch=False
)
assert part_path == "FitbitDevices/FitbitDevices_20230114.part0.ndjson"

def test_get_part_path_touch(self):
sample_metadata = {
"type": "FitbitDevices",
"start_date": None,
"end_date": datetime.datetime(2023, 1, 14, 0, 0)
}
part_path = s3_to_json.get_part_path(
metadata=sample_metadata,
part_number=0,
dataset_identifier=sample_metadata["type"],
touch=True
)
assert os.path.exists(part_path)
os.remove(part_path)

def test_get_metadata_startdate_enddate(self, json_file_basenames_dict):
basename = json_file_basenames_dict["HealthKitV2Samples_Deleted"]
Expand Down

0 comments on commit ad5d307

Please sign in to comment.