diff --git a/src/glue/jobs/s3_to_json.py b/src/glue/jobs/s3_to_json.py index b54969ab..9ac48599 100644 --- a/src/glue/jobs/s3_to_json.py +++ b/src/glue/jobs/s3_to_json.py @@ -225,7 +225,7 @@ def transform_json( obj[sub_prop_name] = array_of_obj return json_obj -def get_output_filename(metadata: dict) -> str: +def get_output_filename(metadata: dict, part_number: int) -> str: """ Get a formatted file name. @@ -238,27 +238,31 @@ def get_output_filename(metadata: dict) -> str: Args: metadata (dict): Metadata derived from the file basename. + part_number (int): Which part we need a file name for. Return: str: A formatted file name. """ if metadata["type"] in DATA_TYPES_WITH_SUBTYPE: - output_fname = "{}_{}_{}-{}.ndjson".format( + output_fname = "{}_{}_{}-{}.part{}.ndjson".format( metadata["type"], metadata["subtype"], metadata["start_date"].strftime("%Y%m%d"), - metadata["end_date"].strftime("%Y%m%d") + metadata["end_date"].strftime("%Y%m%d"), + part_number ) elif metadata["start_date"] is None: - output_fname = "{}_{}.ndjson".format( + output_fname = "{}_{}.part{}.ndjson".format( metadata["type"], - metadata["end_date"].strftime("%Y%m%d") + metadata["end_date"].strftime("%Y%m%d"), + part_number ) else: - output_fname = "{}_{}-{}.ndjson".format( + output_fname = "{}_{}-{}.part{}.ndjson".format( metadata["type"], metadata["start_date"].strftime("%Y%m%d"), - metadata["end_date"].strftime("%Y%m%d") + metadata["end_date"].strftime("%Y%m%d"), + part_number ) return output_fname @@ -266,8 +270,7 @@ def transform_block( input_json: typing.IO, dataset_identifier: str, metadata: dict, - block_size: int=10000 -): + block_size: int=10000): """ A generator function which yields a block of transformed JSON records. @@ -307,13 +310,16 @@ def write_file_to_json_dataset( dataset_identifier: str, metadata: dict, workflow_run_properties: dict, - delete_upon_successful_upload: bool=True) -> str: + delete_upon_successful_upload: bool=True, + file_size_limit: float=1e8) -> list: """ - Write a JSON from a zipfile to a JSON dataset. + Write JSON from a zipfile to a JSON dataset. Metadata fields derived from the file basename are inserted as top-level fields, - other fields are transformed (see `transform_json`). The resulting NDJSON is written - to a JSON dataset in S3. + other fields are transformed (see `transform_json`). The resulting NDJSON(s) are written + to a JSON dataset in S3. Depending on the `file_size_limit`, data from a single + JSON may be written to one or more NDJSON in the JSON dataset as "part" files. + See ETL-519 for more information. Args: z (zipfile.Zipfile): The zip archive as provided by the data provider. @@ -324,9 +330,11 @@ def write_file_to_json_dataset( delete_upon_successful_upload (bool): Whether to delete the local copy of the JSON file after uploading to S3. Set to False during testing. + file_size_limit (float): The approximate maximum file size in bytes + before writing to another part file. Returns: - output_path (str) The local path the file was written to. + list: A list of files uploaded to S3 """ s3_client = boto3.client("s3") os.makedirs(dataset_identifier, exist_ok=True) @@ -336,35 +344,89 @@ def write_file_to_json_dataset( else: s3_metadata["start_date"] = metadata["start_date"].isoformat() s3_metadata["end_date"] = metadata["end_date"].isoformat() - output_filename = get_output_filename(metadata=metadata) - output_path = os.path.join(dataset_identifier, output_filename) - data = [] + part_number = 0 + output_path = get_part_path( + metadata=metadata, + part_number=part_number, + dataset_identifier=dataset_identifier, + touch=True + ) with z.open(json_path, "r") as input_json: - with open(output_path, "w+") as f_out: - for transformed_block in transform_block( - input_json=input_json, - dataset_identifier=dataset_identifier, - metadata=metadata - ): + current_output_path = output_path + for transformed_block in transform_block( + input_json=input_json, + dataset_identifier=dataset_identifier, + metadata=metadata + ): + current_file_size = os.path.getsize(current_output_path) + if current_file_size > file_size_limit: + part_number += 1 + print(f"!!! File is too large, creating new part {part_number}") + current_output_path = get_part_path( + metadata=metadata, + part_number=part_number, + dataset_identifier=dataset_identifier, + touch=True + ) + with open(current_output_path, "a") as f_out: for transformed_record in transformed_block: f_out.write("{}\n".format(json.dumps(transformed_record))) - s3_output_key = os.path.join( - workflow_run_properties["namespace"], - workflow_run_properties["json_prefix"], - f"dataset={dataset_identifier}", - output_filename - ) - logger.debug("Output Key: %s", s3_output_key) - with open(output_path, "rb") as f_in: - response = s3_client.put_object( - Body = f_in, - Bucket = workflow_run_properties["json_bucket"], - Key = s3_output_key, - Metadata = s3_metadata + uploaded_files = [] + for part_file in os.listdir(dataset_identifier): + output_path = os.path.join(dataset_identifier, part_file) + s3_output_key = os.path.join( + workflow_run_properties["namespace"], + workflow_run_properties["json_prefix"], + f"dataset={dataset_identifier}", + part_file + ) + logger.debug( + "Uploading %s to %s", + output_path, + s3_output_key ) - logger.debug("S3 Put object response: %s", json.dumps(response)) - if delete_upon_successful_upload: - os.remove(output_path) + with open(output_path, "rb") as f_in: + response = s3_client.put_object( + Body = f_in, + Bucket = workflow_run_properties["json_bucket"], + Key = s3_output_key, + Metadata = s3_metadata + ) + uploaded_files.append(output_path) + logger.debug("S3 Put object response: %s", json.dumps(response)) + if delete_upon_successful_upload: + os.remove(output_path) + return uploaded_files + +def get_part_path(metadata: dict, part_number: int, dataset_identifier: str, touch: bool): + """ + A helper function for `write_file_to_json_dataset` + + This function returns a part path where we can write data to. Optionally, + create empty file at path. + + Args: + metadata (dict): Metadata derived from the file basename. + part_number (int): Which part we need a file name for. + dataset_identifier (str): The data type of `json_path`. + touch (bool): Whether to create an empty file at the part path + + Returns: + str: A new part path + + Raises: + FileExistsError: If touch is True and a file already exists at + the part path. + """ + output_filename = get_output_filename( + metadata=metadata, + part_number=part_number + ) + output_path = os.path.join(dataset_identifier, output_filename) + if touch: + with open(output_path, "x") as initial_file: + # create file + pass return output_path def get_metadata(basename: str) -> dict: diff --git a/tests/test_s3_to_json.py b/tests/test_s3_to_json.py index dfc6a550..242c9ce0 100644 --- a/tests/test_s3_to_json.py +++ b/tests/test_s3_to_json.py @@ -419,8 +419,11 @@ def test_get_output_filename_generic(self): "start_date": datetime.datetime(2022, 1, 12, 0, 0), "end_date": datetime.datetime(2023, 1, 14, 0, 0), } - output_filename = s3_to_json.get_output_filename(metadata=sample_metadata) - assert output_filename == "FitbitDevices_20220112-20230114.ndjson" + output_filename = s3_to_json.get_output_filename( + metadata=sample_metadata, + part_number=0 + ) + assert output_filename == "FitbitDevices_20220112-20230114.part0.ndjson" def test_get_output_filename_no_start_date(self): sample_metadata = { @@ -428,8 +431,11 @@ def test_get_output_filename_no_start_date(self): "start_date": None, "end_date": datetime.datetime(2023, 1, 14, 0, 0), } - output_filename = s3_to_json.get_output_filename(metadata=sample_metadata) - assert output_filename == "EnrolledParticipants_20230114.ndjson" + output_filename = s3_to_json.get_output_filename( + metadata=sample_metadata, + part_number=0 + ) + assert output_filename == "EnrolledParticipants_20230114.part0.ndjson" def test_get_output_filename_subtype(self): sample_metadata = { @@ -438,8 +444,11 @@ def test_get_output_filename_subtype(self): "start_date": datetime.datetime(2022, 1, 12, 0, 0), "end_date": datetime.datetime(2023, 1, 14, 0, 0), } - output_filename = s3_to_json.get_output_filename(metadata=sample_metadata) - assert output_filename == "HealthKitV2Samples_Weight_20220112-20230114.ndjson" + output_filename = s3_to_json.get_output_filename( + metadata=sample_metadata, + part_number=0 + ) + assert output_filename == "HealthKitV2Samples_Weight_20220112-20230114.part0.ndjson" def test_write_file_to_json_dataset_delete_local_copy(self, s3_obj, namespace, monkeypatch): monkeypatch.setattr("boto3.client", lambda x: MockAWSClient()) @@ -457,7 +466,7 @@ def test_write_file_to_json_dataset_delete_local_copy(self, s3_obj, namespace, m "json_bucket": "json-bucket", } with zipfile.ZipFile(io.BytesIO(s3_obj["Body"])) as z: - output_file = s3_to_json.write_file_to_json_dataset( + output_files = s3_to_json.write_file_to_json_dataset( z=z, json_path="HealthKitV2Samples_Weight_20230112-20230114.json", dataset_identifier="HealthKitV2Samples", @@ -465,6 +474,7 @@ def test_write_file_to_json_dataset_delete_local_copy(self, s3_obj, namespace, m workflow_run_properties=workflow_run_properties, delete_upon_successful_upload=True, ) + output_file = output_files[0] assert not os.path.exists(output_file) @@ -486,7 +496,7 @@ def test_write_file_to_json_dataset_record_consistency(self, s3_obj, namespace, with z.open("FitbitDevices_20230114.json", "r") as fitbit_data: input_line_cnt = len(fitbit_data.readlines()) - output_file = s3_to_json.write_file_to_json_dataset( + output_files = s3_to_json.write_file_to_json_dataset( z=z, json_path="FitbitDevices_20230114.json", dataset_identifier="FitbitDevices", @@ -494,6 +504,7 @@ def test_write_file_to_json_dataset_record_consistency(self, s3_obj, namespace, workflow_run_properties=workflow_run_properties, delete_upon_successful_upload=False, ) + output_file = output_files[0] with open(output_file, "r") as f_out: output_line_cnt = 0 @@ -507,6 +518,69 @@ def test_write_file_to_json_dataset_record_consistency(self, s3_obj, namespace, output_line_cnt += 1 # gets line count of input json and exported json and checks the two assert input_line_cnt == output_line_cnt + os.remove(output_file) + + def test_write_file_to_json_dataset_multiple_parts(self, s3_obj, namespace, monkeypatch): + monkeypatch.setattr("boto3.client", lambda x: MockAWSClient()) + sample_metadata = { + "type": "FitbitIntradayCombined", + "start_date": datetime.datetime(2022, 1, 12, 0, 0), + "end_date": datetime.datetime(2023, 1, 14, 0, 0) + } + workflow_run_properties = { + "namespace": namespace, + "json_prefix": "raw-json", + "json_bucket": "json-bucket", + } + with zipfile.ZipFile(io.BytesIO(s3_obj["Body"])) as z: + json_path = "FitbitIntradayCombined_20230112-20230114.json" + with z.open(json_path, "r") as fitbit_data: + input_line_cnt = len(fitbit_data.readlines()) + output_files = s3_to_json.write_file_to_json_dataset( + z=z, + json_path=json_path, + dataset_identifier=sample_metadata["type"], + metadata=sample_metadata, + workflow_run_properties=workflow_run_properties, + delete_upon_successful_upload=False, + file_size_limit=1e6 + ) + output_line_count = 0 + for output_file in output_files: + with open(output_file, "r") as f_out: + for json_line in f_out: + output_line_count += 1 + os.remove(output_file) + assert input_line_cnt == output_line_count + + def test_get_part_path_no_touch(self): + sample_metadata = { + "type": "FitbitDevices", + "start_date": None, + "end_date": datetime.datetime(2023, 1, 14, 0, 0) + } + part_path = s3_to_json.get_part_path( + metadata=sample_metadata, + part_number=0, + dataset_identifier=sample_metadata["type"], + touch=False + ) + assert part_path == "FitbitDevices/FitbitDevices_20230114.part0.ndjson" + + def test_get_part_path_touch(self): + sample_metadata = { + "type": "FitbitDevices", + "start_date": None, + "end_date": datetime.datetime(2023, 1, 14, 0, 0) + } + part_path = s3_to_json.get_part_path( + metadata=sample_metadata, + part_number=0, + dataset_identifier=sample_metadata["type"], + touch=True + ) + assert os.path.exists(part_path) + os.remove(part_path) def test_get_metadata_startdate_enddate(self, json_file_basenames_dict): basename = json_file_basenames_dict["HealthKitV2Samples_Deleted"]