Skip to content

Commit

Permalink
Add Partition On Logic (#519)
Browse files Browse the repository at this point in the history
* add partition_on logic

Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>

* Add Docstring based on Sarah's review

Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>

* Apply Praateek's suggestion and skip test with using pytest.mark.gpu

Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>

* Apply Praateek's suggestion and force index=False

Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>

---------

Signed-off-by: Vibhu Jawa <vjawa@nvidia.com>
  • Loading branch information
VibhuJawa authored Feb 6, 2025
1 parent 97aa372 commit ca30808
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 9 deletions.
44 changes: 42 additions & 2 deletions nemo_curator/datasets/doc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,36 @@ def to_json(
output_path: str,
write_to_filename: Union[bool, str] = False,
keep_filename_column: bool = False,
partition_on: Optional[str] = None,
):
"""
See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters.
Writes the dataset to the specified path in JSONL format.
If `write_to_filename` is True, the DataFrame is expected to have a column
that specifies the filename for each document. This column can be named
`file_name` by default, or a custom name if `write_to_filename` is a string.
Args:
output_path (str): The directory or file path where the dataset will be written.
write_to_filename (Union[bool, str]): Determines how filenames are handled.
- If True, uses the `file_name` column in the DataFrame to determine filenames.
- If a string, uses that string as the column name for filenames.
- If False, writes all data to the specified `output_path`.
keep_filename_column (bool): If True, retains the filename column in the output.
If False, the filename column is dropped from the output.
partition_on (Optional[str]): The column name used to partition the data.
If specified, data is partitioned based on unique values in this column,
with each partition written to a separate directory.
For more details, refer to the `write_to_disk` function in
`nemo_curator.utils.distributed_utils`.
"""
write_to_disk(
df=self.df,
output_path=output_path,
write_to_filename=write_to_filename,
keep_filename_column=keep_filename_column,
partition_on=partition_on,
output_type="jsonl",
)

Expand All @@ -178,16 +198,36 @@ def to_parquet(
output_path: str,
write_to_filename: Union[bool, str] = False,
keep_filename_column: bool = False,
partition_on: Optional[str] = None,
):
"""
See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters.
Writes the dataset to the specified path in Parquet format.
If `write_to_filename` is True, the DataFrame is expected to have a column
that specifies the filename for each document. This column can be named
`file_name` by default, or a custom name if `write_to_filename` is a string.
Args:
output_path (str): The directory or file path where the dataset will be written.
write_to_filename (Union[bool, str]): Determines how filenames are handled.
- If True, uses the `file_name` column in the DataFrame to determine filenames.
- If a string, uses that string as the column name for filenames.
- If False, writes all data to the specified `output_path`.
keep_filename_column (bool): If True, retains the filename column in the output.
If False, the filename column is dropped from the output.
partition_on (Optional[str]): The column name used to partition the data.
If specified, data is partitioned based on unique values in this column,
with each partition written to a separate directory.
For more details, refer to the `write_to_disk` function in
`nemo_curator.utils.distributed_utils`.
"""
write_to_disk(
df=self.df,
output_path=output_path,
write_to_filename=write_to_filename,
keep_filename_column=keep_filename_column,
partition_on=partition_on,
output_type="parquet",
)

Expand Down
64 changes: 57 additions & 7 deletions nemo_curator/utils/distributed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,7 @@ def single_partition_write_with_filename(
orient="records",
lines=True,
force_ascii=False,
index=False, # Only index=False is supported for orient="records"
)
else:
# See open issue here: https://github.com/rapidsai/cudf/issues/15211
Expand All @@ -759,6 +760,7 @@ def single_partition_write_with_filename(
orient="records",
lines=True,
force_ascii=False,
index=False, # Only index=False is supported for orient="records"
)

elif output_type == "parquet":
Expand Down Expand Up @@ -843,6 +845,7 @@ def write_to_disk(
write_to_filename: Union[bool, str] = False,
keep_filename_column: bool = False,
output_type: str = "jsonl",
partition_on: Optional[str] = None,
):
"""
This function writes a Dask DataFrame to the specified file path.
Expand All @@ -857,6 +860,9 @@ def write_to_disk(
If str, uses that as the filename column to write to.
keep_filename_column: Boolean representing whether to keep or drop the filename column, if it exists.
output_type: The type of output file to write. Can be "jsonl" or "parquet".
partition_on: The column name to partition the data on.
If specified, the data will be partitioned based on the unique values in this column,
and each partition will be written to a separate directory
"""

filename_col = _resolve_filename_col(write_to_filename)
Expand All @@ -879,6 +885,11 @@ def write_to_disk(
f"write_using_filename is True but no {filename_col} column found in DataFrame"
)

if partition_on is not None and write_to_filename:
raise ValueError(
"Cannot use both partition_on and write_to_filename parameters simultaneously. "
)

if is_cudf_type(df):
import cudf

Expand All @@ -904,7 +915,12 @@ def write_to_disk(
# output_path is a directory
else:
if output_type == "jsonl" or output_type == "parquet":
_write_to_jsonl_or_parquet(df, output_path, output_type)
_write_to_jsonl_or_parquet(
df,
output_path=output_path,
output_type=output_type,
partition_on=partition_on,
)
elif output_type == "bitext":
if write_to_filename:
os.makedirs(output_path, exist_ok=True)
Expand Down Expand Up @@ -938,16 +954,50 @@ def _write_to_jsonl_or_parquet(
df,
output_path: str,
output_type: Literal["jsonl", "parquet"] = "jsonl",
partition_on: Optional[str] = None,
):
if output_type == "jsonl":
if is_cudf_type(df):
# See open issue here: https://github.com/rapidsai/cudf/issues/15211
# df.to_json(output_path, orient="records", lines=True, engine="cudf", force_ascii=False)
df.to_json(output_path, orient="records", lines=True, force_ascii=False)
if partition_on is not None:
unique_values = (
df[partition_on]
.unique()
.to_backend(backend="pandas")
.compute()
.to_list()
)
for value in unique_values:
os.makedirs(output_path, exist_ok=True)
partition_output_path = os.path.join(
output_path, f"{partition_on}={value}"
)
df[df[partition_on] == value].to_json(
partition_output_path,
orient="records",
lines=True,
force_ascii=False,
index=False, # Only index=False is supported for orient="records"
)
else:
df.to_json(output_path, orient="records", lines=True, force_ascii=False)
if is_cudf_type(df):
# See open issue here: https://github.com/rapidsai/cudf/issues/15211
# df.to_json(output_path, orient="records", lines=True, engine="cudf", force_ascii=False)
df.to_json(
output_path,
orient="records",
lines=True,
force_ascii=False,
index=False,
) # Only index=False is supported for orient="records"
else:
df.to_json(
output_path,
orient="records",
lines=True,
force_ascii=False,
index=False,
) # Only index=False is supported for orient="records"
elif output_type == "parquet":
df.to_parquet(output_path, write_index=False)
df.to_parquet(output_path, write_index=False, partition_on=partition_on)
else:
raise ValueError(f"Unknown output type: {output_type}")

Expand Down
124 changes: 124 additions & 0 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,127 @@ def test_write_single_jsonl_file(self, tmp_path):

result = DocumentDataset.read_json(output_path)
assert json_df.equals(result.df.compute())


class TestPartitionOn:
def test_partition_on_and_write_to_filename_error(self, tmp_path):
"""Verify that using partition_on and write_to_filename together raises an error."""
df = pd.DataFrame(
{
"id": [1, 2, 3],
"file_name": ["f1", "f1", "f1"],
"category": ["A", "B", "A"],
}
)
ddf = dd.from_pandas(df, npartitions=1)
dataset = DocumentDataset(ddf)
with pytest.raises(
ValueError,
match="Cannot use both partition_on and write_to_filename parameters simultaneously.",
):
dataset.to_json(
output_path=str(tmp_path / "output"),
write_to_filename=True, # Intentionally provided to trigger the error
partition_on="category",
)

@pytest.mark.parametrize(
"backend", ["pandas", pytest.param("cudf", marks=pytest.mark.gpu)]
)
@pytest.mark.parametrize(
"category_values",
[
["A", "B", "A", "B"],
[10, 20, 10, 20],
[1.0, 2.0, 1.0, 2.0],
],
)
def test_write_to_disk_with_partition_on_jsonl(
self, tmp_path, backend, category_values
):
"""
Test writing a partitioned JSONL dataset.
The function is expected to create subdirectories in the output directory
with names of the form 'category=<value>' for each unique partition column value.
"""
df = pd.DataFrame(
{"id": [1, 2, 3, 4], "category": category_values, "value": [10, 20, 30, 40]}
)
ddf = dd.from_pandas(df, npartitions=2)
ddf = ddf.to_backend(backend)
output_dir = tmp_path / "output_jsonl"
dataset = DocumentDataset(ddf)
dataset.to_json(output_path=str(output_dir), partition_on="category")
# Check that the output directory contains subdirectories for each partition.
# Unique partition values (as strings) to be used in the directory names.
unique_partitions = {str(x) for x in category_values}
for part in unique_partitions:
expected_dir = output_dir / f"category={part}"
assert expected_dir.exists(), f"Expected directory {expected_dir} not found"

# For each partition directory, load the JSONL files and verify that all records have the correct partition value.
# (Here we assume the files are written with extension ".part")
for part_dir in output_dir.glob("category=*"):
# The partition value is taken from the directory name.
partition_value = part_dir.name.split("=")[-1]
jsonl_files = list(part_dir.glob("*.part"))
assert (
jsonl_files
), f"No JSONL files found in partition directory {part_dir}"
for file in jsonl_files:
with open(file, "r") as f:
for line in f:
record = json.loads(line)
if "category" in record:
# Compare as strings, to work with both integer and string partition values.
assert (
str(record["category"]) == partition_value
), f"Record partition value {record['category']} does not match directory {partition_value}"

@pytest.mark.parametrize(
"backend", ["pandas", pytest.param("cudf", marks=pytest.mark.gpu)]
)
@pytest.mark.parametrize(
"category_values",
[
["A", "B", "A", "B"],
[10, 20, 10, 20],
[1.0, 2.0, 1.0, 2.0],
],
)
def test_write_to_disk_with_partition_on_parquet(
self, tmp_path, backend, category_values
):
"""
Test writing a partitioned Parquet dataset.
The test writes a DataFrame partitioned on the 'category' column and then reads it back
using dd.read_parquet. The output is compared (after sorting) to the original DataFrame.
"""

df = pd.DataFrame(
{"id": [1, 2, 3, 4], "category": category_values, "value": [10, 20, 30, 40]}
)
ddf = dd.from_pandas(df, npartitions=2)
ddf = ddf.to_backend(backend)
output_dir = tmp_path / "output_parquet"
dataset = DocumentDataset(ddf)
dataset.to_parquet(output_path=str(output_dir), partition_on="category")

# Check that the output directory contains subdirectories for each partition.
# Unique partition values (as strings) to be used in the directory names.
unique_partitions = {str(x) for x in category_values}
for part in unique_partitions:
expected_dir = output_dir / f"category={part}"
assert expected_dir.exists(), f"Expected directory {expected_dir} not found"

ddf_loaded = dd.read_parquet(str(output_dir))
df_loaded = ddf_loaded.compute().reset_index(drop=True)
df_loaded["category"] = df_loaded["category"].astype(df["category"].dtype)
# To ensure a fair comparison, sort the dataframes by 'id' and reindex.
pd.testing.assert_frame_equal(
df.sort_values("id").reset_index(drop=True),
df_loaded.sort_values("id").reset_index(drop=True)[df.columns],
check_dtype=False,
)

0 comments on commit ca30808

Please sign in to comment.