Skip to content

Commit

Permalink
Removal logic for fuzzy / exact (no class abstraction) (#509)
Browse files Browse the repository at this point in the history
  • Loading branch information
praateekmahajan authored Feb 8, 2025
1 parent b318d61 commit f642628
Show file tree
Hide file tree
Showing 10 changed files with 496 additions and 77 deletions.
36 changes: 27 additions & 9 deletions docs/user-guide/gpudeduplication.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,19 @@ After ensuring your dataset has a unique ID field (or creating one with the code
from nemo_curator.datasets import DocumentDataset
# Initialize the deduplication object
ExactDups = ExactDuplicates(id_field="my_id", text_field="text")
exact_duplicates = ExactDuplicates(
id_field="my_id",
text_field="text",
perform_removal=True,
cache_dir="/path/to/dedup_outputs", # Recommended to specify a cache_dir if perform_removal=True
)
dataset = DocumentDataset.read_parquet(
input_files="/path/to/parquet/data",
backend="cudf", # or "pandas" for CPU
)
duplicate_docs = ExactDups(dataset)
# Users who have specified perform_removal=False can split as following
duplicate_docs = exact_duplicates.identify_duplicates(dataset)
"""
Sample output:
Expand All @@ -82,9 +87,14 @@ After ensuring your dataset has a unique ID field (or creating one with the code
107 doc_prefix-52271 0f763a2937d57b9d96bf9f220e55f2bd
"""
deduplicated_dataset = exact_duplicates.remove(dataset, duplicate_docs)
# Users who have specified perform_removal=True can get the output deduplicated dataset directly as follows
# deduplicated_dataset = exact_duplicates(dataset)
.. tip::
A more comprehensive example, including how to remove documents from a corpus using the list of
duplicate IDs generated from the exact deduplication step above, can be found in `examples/exact_deduplication.py <https://github.com/NVIDIA/NeMo-Curator/blob/main/examples/exact_deduplication.py>`_.
A more comprehensive example, can be found in `examples/exact_deduplication.py <https://github.com/NVIDIA/NeMo-Curator/blob/main/examples/exact_deduplication.py>`_.

""""""""""""
CLI Utility
Expand Down Expand Up @@ -187,6 +197,7 @@ Python API
cache_dir="/path/to/dedup_outputs", # must be cleared between runs
id_field="my_id",
text_field="text",
perform_removal=False, # dictates if deduplicated dataset or IDs of duplicates are returned
seed=42,
char_ngrams=24,
num_buckets=20,
Expand All @@ -203,6 +214,7 @@ Python API
cache_dir: /path/to/dedup_outputs
id_field: my_id
text_field: text
perform_removal: False
seed: 42
char_ngrams: 24
num_buckets: 20
Expand All @@ -226,14 +238,15 @@ Python API
from nemo_curator.datasets import DocumentDataset
# Initialize the deduplication object
FuzzyDups = FuzzyDuplicates(config=config, logger="./")
fuzzy_duplicates = FuzzyDuplicates(config=config, logger="./")
dataset = DocumentDataset.read_json(
input_files="/path/to/jsonl/data",
backend="cudf", # FuzzyDuplicates only supports datasets with the cuDF backend.
)
duplicate_docs = FuzzyDups(dataset)
# Users who have specified perform_removal=False can split as following
duplicate_docs = fuzzy_duplicates.identify_duplicates(dataset)
"""
Sample output:
my_id group
Expand All @@ -244,10 +257,15 @@ Python API
4 doc_prefix-42050 154
"""
deduplicated_dataset = fuzzy_duplicates.remove(dataset, duplicate_docs)
# Users who have specified perform_removal=True can get the output deduplicated dataset directly as follows
# deduplicated_dataset = fuzzy_duplicates(dataset)
.. tip::

- A more comprehensive example for the above, including how to remove documents from a corpus using the list of
duplicate IDs generated from fuzzy deduplication, can be found in `examples/fuzzy_deduplication.py <https://github.com/NVIDIA/NeMo-Curator/blob/main/examples/fuzzy_deduplication.py>`_.
- A comprehensive example can be found in `examples/fuzzy_deduplication.py <https://github.com/NVIDIA/NeMo-Curator/blob/main/examples/fuzzy_deduplication.py>`_.
- The default values of ``num_buckets`` and ``hashes_per_bucket`` are set to find documents with an approximately Jaccard similarity of 0.8 or above.
- Higher ``buckets_per_shuffle`` values can lead to better performance but might lead to out of memory errors.
- Setting the ``false_positive_check`` flag to ``False`` is ideal for optimal performance.
Expand Down
32 changes: 14 additions & 18 deletions examples/exact_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

from nemo_curator.datasets import DocumentDataset
from nemo_curator.modules import ExactDuplicates
from nemo_curator.utils.distributed_utils import get_client, read_data, write_to_disk
from nemo_curator.utils.file_utils import get_all_files_paths_under
from nemo_curator.utils.distributed_utils import get_client, write_to_disk
from nemo_curator.utils.script_utils import ArgumentHelper


Expand All @@ -40,36 +39,33 @@ def main(args):
client.run(pre_imports)

t0 = time.time()
input_dataset = DocumentDataset.read_json(dataset_dir, backend=backend)
input_dataset = DocumentDataset.read_json(
dataset_dir, backend=backend, blocksize="1GiB", files_per_partition=None
)

exact_dup = ExactDuplicates(
logger=log_dir,
id_field=dataset_id_field,
text_field=dataset_text_field,
# Decides whether output of the module is deduplicated dataset or duplicates
# If true, you should set cache_dir for performance improvement
perform_removal=False,
# cache_dir=output_dir # Optionally write the output to disk
)

duplicates = exact_dup(dataset=input_dataset)
# When perform_removal=False, it will only call .identify_duplicates() and return the list of duplicate IDs.
# When perform_removal=True, then exact_dup outputs the dataset with the duplicates removed.
# It will behave by calling .identify_duplicates() and .remove() in sequence.
duplicates = exact_dup(
dataset=input_dataset
) # or exact_dup.identify_duplicates(input_dataset)

# If caching, result is a path to the output dataset.
if isinstance(duplicates, str):
duplicates = DocumentDataset.read_parquet(duplicates, backend=backend)

# It's easy to apply dataframe operations to the dataset by using the underlying df.

# By default all duplicate id's are included in the result
# keep 1 document from each group of duplcates and mark the others to remove
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.duplicated.html
docs_to_remove = duplicates.df.map_partitions(
lambda x: x[x._hashes.duplicated(keep="first")]
)

# When there are few duplicates we can compute the results to a list and use `isin`.
result = input_dataset.df[
~input_dataset.df[dataset_id_field].isin(
docs_to_remove[dataset_id_field].compute()
)
]
result = exact_dup.remove(input_dataset, duplicates)
write_to_disk(result, output_dir, output_type="parquet")
print(time.time() - t0)

Expand Down
24 changes: 10 additions & 14 deletions examples/fuzzy_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def main(args):
cache_dir=cache_dir,
id_field=dataset_id_field,
text_field=dataset_text_field,
# Decides whether output of the module is a deduplicated dataset or the IDs of the duplicates
perform_removal=False,
seed=42,
char_ngrams=24,
num_buckets=20,
Expand All @@ -77,26 +79,20 @@ def main(args):
false_positive_check=False,
)
fuzzy_dup = FuzzyDuplicates(logger=log_dir, config=fuzzy_dedup_config)
duplicates = fuzzy_dup(dataset=input_dataset)

# When perform_removal=False, it will only call .identify_duplicates() and return the list of duplicate IDs.
# When perform_removal=True, then exact_dup outputs the dataset with the duplicates removed.
# It will behave by calling .identify_duplicates() and .remove() in sequence.
duplicates = fuzzy_dup(
dataset=input_dataset
) # or fuzzy_dup.identify_duplicates(input_dataset)

if duplicates is None:
print("No duplicates found")
print(f"Time taken:{time.time() - t0}s")
return

# By default all duplicate id's and the group they belong to are included in the result
# keep 1 document from each group of duplcates and mark the others to remove
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.duplicated.html
docs_to_remove = duplicates.df.map_partitions(
lambda x: x[x.group.duplicated(keep="first")]
)

# When there are few duplicates we can compute the results to a list and use `isin`.
result = input_dataset.df[
~input_dataset.df[dataset_id_field].isin(
docs_to_remove[dataset_id_field].compute()
)
]
result = fuzzy_dup.remove(input_dataset, duplicates)
write_to_disk(result, output_dir, output_type=filetype)
print(f"Time taken:{time.time() - t0}s")

Expand Down
8 changes: 8 additions & 0 deletions nemo_curator/modules/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class FuzzyDuplicatesConfig(BaseConfig):
but might lead to memory pressures and related errors.
id_field: Column in the Dataset denoting document ID.
text_field: Column in the Dataset denoting document content.
perform_removal: Boolean value to specify whether calling the module should remove the duplicates from
the original dataset, or return the list of IDs denoting duplicates.
profile_dir: str, Default None
If specified directory to write dask profile
cache_dir: str, Default None
Expand All @@ -64,6 +66,7 @@ class FuzzyDuplicatesConfig(BaseConfig):
profile_dir: Optional[str] = None
id_field: str = "id"
text_field: str = "text"
perform_removal: bool = False

# Minhash + LSH Config
seed: int = 42
Expand Down Expand Up @@ -131,6 +134,11 @@ def __post_init__(self):
if not 1 <= self.buckets_per_shuffle <= self.num_buckets:
raise ValueError("Buckets per shuffle must be between [1, num_buckets]")

if not self.perform_removal:
warnings.warn(
"In future releases (starting with 0.8.0) the default will be True."
)


@dataclass
class SemDedupConfig(BaseConfig):
Expand Down
53 changes: 45 additions & 8 deletions nemo_curator/modules/exact_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import time
import warnings
from contextlib import nullcontext
from datetime import datetime
from hashlib import md5
from typing import Optional, Union

Expand All @@ -31,6 +30,7 @@
from nemo_curator.log import create_logger
from nemo_curator.modules.base import BaseModule
from nemo_curator.utils.distributed_utils import performance_report_if_with_ts_suffix
from nemo_curator.utils.duplicates_removal import remove_duplicates
from nemo_curator.utils.gpu_utils import is_cudf_type


Expand All @@ -45,6 +45,7 @@ def __init__(
id_field: str = "id",
text_field: str = "text",
hash_method: str = "md5",
perform_removal: bool = False,
profile_dir: Optional[str] = None,
cache_dir: Optional[str] = None,
):
Expand All @@ -66,9 +67,17 @@ def __init__(
raise ValueError(
f"{hash_method} not in supported hash_methods. Choose a hash_method from {self.SUPPORTED_HASHES}"
)

self.hash_method = hash_method
self.id_field = id_field
self.text_field = text_field
self.perform_removal = perform_removal
if not self.perform_removal:
warnings.warn(
"In future releases (starting with 0.8.0) the default will be True."
)
if self.perform_removal and cache_dir is None:
warnings.warn("cache_dir is recommended to remove duplicates.")
if cache_dir is None and profile_dir is not None:
warnings.warn(
"cache_dir for intermediate outputs is required to generate profiles"
Expand Down Expand Up @@ -137,7 +146,7 @@ def hash_documents(
# TODO: Generalize ty using self.hash_method
return df.apply(lambda x: md5(x.encode()).hexdigest())

def call(self, dataset: DocumentDataset) -> Union[DocumentDataset, str]:
def identify_duplicates(self, dataset: DocumentDataset) -> DocumentDataset:
"""
Find document ID's for exact duplicates in a given DocumentDataset
Parameters
Expand Down Expand Up @@ -168,10 +177,38 @@ def call(self, dataset: DocumentDataset) -> Union[DocumentDataset, str]:
self._logger.info(
f"Time taken for Exact Dedup Computation = {time.time() - t0}s and output written at {write_path}"
)
if is_cudf_type(result):
import dask_cudf
backend = "cudf" if is_cudf_type(result) else "pandas"
return DocumentDataset.read_parquet(
write_path,
backend=backend,
# We read with files_per_partition=1 so that groups are read in whole (and do not exist across partitions)
files_per_partition=1,
blocksize=None,
)

result_dataset = dask_cudf.read_parquet(write_path, split_row_groups=False)
else:
result_dataset = dd.read_parquet(write_path)
return DocumentDataset(result_dataset)
def remove(
self, dataset: DocumentDataset, duplicates_to_remove: Optional[DocumentDataset]
) -> DocumentDataset:
"""
Remove exact duplicates from a given DocumentDataset
Parameters
----------
dataset: DocumentDataset
The input datset to remove exact duplicates
Returns
-------
DocumentDataset containing only non-duplicate documents
"""
result = remove_duplicates(
left=dataset.df,
duplicates=duplicates_to_remove.df,
id_field=self.id_field,
group_field="_hashes",
)
return DocumentDataset(result)

def call(self, dataset: DocumentDataset) -> DocumentDataset:
duplicates = self.identify_duplicates(dataset)
if self.perform_removal:
return self.remove(dataset, duplicates)
return duplicates
Loading

0 comments on commit f642628

Please sign in to comment.