From 3ebc807ace0647382adfcb72ba81c402888fb100 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Wed, 27 Nov 2024 11:27:16 -0800 Subject: [PATCH] Add codepath for computing buckets without int conversion (#326) * Add codepath for computing buckets without int conversion Signed-off-by: Ayush Dattagupta * Refactor write logic into its own method Signed-off-by: Ayush Dattagupta * Update cli script Signed-off-by: Ayush Dattagupta * Add tests Signed-off-by: Ayush Dattagupta * Update docs Signed-off-by: Ayush Dattagupta * Update fuzzy_deduplication example Signed-off-by: Ayush Dattagupta * Address reviews Signed-off-by: Ayush Dattagupta * update docs Signed-off-by: Ayush Dattagupta * Update arg name in tests Signed-off-by: Ayush Dattagupta --------- Signed-off-by: Ayush Dattagupta --- docs/user-guide/gpudeduplication.rst | 1 + examples/fuzzy_deduplication.py | 7 +- nemo_curator/modules/fuzzy_dedup.py | 120 ++++++++++++++---- .../fuzzy_deduplication/minhash_lsh.py | 6 + .../utils/fuzzy_dedup_utils/io_utils.py | 13 ++ tests/test_fuzzy_dedup.py | 102 +++++++++++++++ 6 files changed, 223 insertions(+), 26 deletions(-) diff --git a/docs/user-guide/gpudeduplication.rst b/docs/user-guide/gpudeduplication.rst index 7970dfe3c..990783feb 100644 --- a/docs/user-guide/gpudeduplication.rst +++ b/docs/user-guide/gpudeduplication.rst @@ -314,6 +314,7 @@ steps (all scripts are included in the `nemo_curator/scripts/fuzzy_deduplication --num-bands num_bands \ --buckets-per-shuffle 1 `#Value between [1-num_bands]. Higher is better but might lead to OOM` \ --log-dir ./ + # --false-positive-check `#Writes bucket ID's in a format required for the false positive check` # --scheduler-file /path/to/file.json 3. False Positive Check (optional): If skipping this step, proceed to the :ref:`skip fp check section `. diff --git a/examples/fuzzy_deduplication.py b/examples/fuzzy_deduplication.py index 40f3fae25..b7da2470c 100644 --- a/examples/fuzzy_deduplication.py +++ b/examples/fuzzy_deduplication.py @@ -81,6 +81,11 @@ def main(args): fuzzy_dup = FuzzyDuplicates(logger=log_dir, config=fuzzy_dedup_config) duplicates = fuzzy_dup(dataset=input_dataset) + if duplicates is None: + print("No duplicates found") + print(f"Time taken:{time.time() - t0}s") + return + # By default all duplicate id's and the group they belong to are included in the result # keep 1 document from each group of duplcates and mark the others to remove # https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.duplicated.html @@ -95,7 +100,7 @@ def main(args): ) ] write_to_disk(result, output_dir, output_type=filetype) - print(time.time() - t0) + print(f"Time taken:{time.time() - t0}s") def attach_args( diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index ec642322f..2986a88f6 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -48,6 +48,7 @@ from nemo_curator.utils.fuzzy_dedup_utils.id_mapping import int_ids_to_str from nemo_curator.utils.fuzzy_dedup_utils.io_utils import ( aggregated_anchor_docs_with_bk_read, + check_empty_buckets, get_restart_offsets, update_restart_offsets, ) @@ -261,6 +262,7 @@ def __init__( num_hashes: int, num_buckets: int, buckets_per_shuffle: int = 1, + false_positive_check: bool = False, logger: Union[logging.LoggerAdapter, str] = "./", id_fields: Union[str, list] = "id", minhash_field: str = "_minhash_signature", @@ -275,8 +277,9 @@ def __init__( num_buckets: Number of bands/buckets to create from the minhash signature. Hashes_per_signature = num_hashes / num_buckets buckets_per_shuffle: Number of bands/buckets to shuffle concurrently. - Larger values process larger batches by processing multiple bands but might lead to memory pressures and related errors. + false_positive_check: bool + If True, writes out buckets in a format compatible with downstream false positive check. logger: Existing logger to log to, or a path to a log directory. id_field: Columns in the Dataset denoting document ID. minhash_field: Column in the Dataset denoting minhash signature. @@ -291,6 +294,7 @@ def __init__( self.bucket_ranges = self._generate_bucket_ranges( self.num_buckets, self.num_hashes ) + self.buckets_as_int = false_positive_check if cache_dir is None: raise ValueError( @@ -379,10 +383,19 @@ def lsh( self, write_path: str, df: dask_cudf.DataFrame, - ) -> None: + ) -> bool: """ - Computes buckets and writes them as parquet files to the write_path + Computes hash buckets for the DataFrame and writes them as parquet files to the specified path. + + Parameters: + - write_path (str): The directory path to write parquet files. + - df (dask_cudf.DataFrame): The input DataFrame with minhashes to be bucketed. + Returns: + are_buckets_empty: True if buckets were empty (no duplicates found), False otherwise. """ + wrote_buckets = False + are_buckets_empty = True + meta = self._minhash_to_bucket_meta(df) df = df.map_partitions( self.minhash_to_buckets, @@ -391,12 +404,14 @@ def lsh( ) bucket_start_id = 0 for i in range(0, self.num_buckets, self.buckets_per_shuffle): - value_vars = [ + bucket_columns = [ f"_bucket_{i}" for i in range(i, min(self.num_buckets, i + self.buckets_per_shuffle)) ] df2 = df.melt( - id_vars=self.id_fields, value_name="_bucket_id", value_vars=value_vars + id_vars=self.id_fields, + value_name="_bucket_id", + value_vars=bucket_columns, )[self.id_fields + ["_bucket_id"]] df2 = df2.shuffle( @@ -406,28 +421,75 @@ def lsh( ).map_partitions(lambda x: x[x["_bucket_id"].duplicated(keep=False)]) df2 = df2.reset_index(drop=True) - df2, end_id = self.bucket_id_to_int( - df2, bucket_col_name="_bucket_id", start_id=bucket_start_id + # Buckets to Int + if self.buckets_as_int: + df2, end_id = self.bucket_id_to_int( + df2, bucket_col_name="_bucket_id", start_id=bucket_start_id + ) + # If bucketing return empty dataframe + if end_id < bucket_start_id: + self._logger.info( + f"No duplicate documents found for buckets: {bucket_columns}" + ) + continue + bucket_start_id = end_id + 1 + are_buckets_empty = False + + wrote_buckets, are_buckets_empty = self._write_bucket_parquet( + df2, + write_path, + wrote_buckets, + are_buckets_empty, + bucket_columns, ) - # If bucketing return empty dataframe - if end_id < bucket_start_id: - continue - bucket_start_id = end_id + 1 - # Workaround for dtype mismatches with empty partitions - dtypes = df2.dtypes.to_dict() - df2 = df2.map_partitions(lambda x: x.astype(dtypes)) + if are_buckets_empty: + self._logger.info("No duplicate documents found during LSH") + if os.path.exists(write_path): + import shutil - if i == 0: - if os.path.exists(write_path): - warnings.warn( - f"Output path {write_path} already exists and will be overwritten" - ) - df2.to_parquet(write_path, write_index=False, overwrite=True) - else: - df2.to_parquet(write_path, write_index=False, append=True) + shutil.rmtree(write_path) - self._logger.info(f"Wrote data for buckets: {value_vars}") + return are_buckets_empty + + def _write_bucket_parquet( + self, + df: dask_cudf.DataFrame, + write_path: str, + wrote_buckets: bool, + are_buckets_empty: bool, + buckets_to_write: List[str], + ) -> tuple[bool, bool]: + """ + Utility function to write the bucketed data to parquet + handling cases of overwriting and appending as needed. + """ + if not wrote_buckets: + if os.path.exists(write_path): + warnings.warn( + f"Output path {write_path} already exists and will be overwritten" + ) + df.to_parquet(write_path, write_index=False, overwrite=True) + else: + df.to_parquet( + write_path, + write_index=False, + overwrite=are_buckets_empty, + append=not are_buckets_empty, + ignore_divisions=True, + ) + # Only check if buckets written so far are empty + if are_buckets_empty: + are_buckets_empty = check_empty_buckets(write_path) + wrote_buckets = True + + if are_buckets_empty: + self._logger.info( + f"No duplicate documents found for buckets: {buckets_to_write}" + ) + else: + self._logger.info(f"Wrote data for buckets: {buckets_to_write}") + return wrote_buckets, are_buckets_empty def __call__(self, dataset: DocumentDataset) -> DocumentDataset: df = dataset.df @@ -435,11 +497,12 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset: write_path = os.path.join(self.cache_dir, "_buckets.parquet") t0 = time.time() with performance_report_if_with_ts_suffix(self.profile_dir, "lsh-profile"): - self.lsh(write_path=write_path, df=df) + empty_result = self.lsh(write_path=write_path, df=df) self._logger.info( f"Time taken for LSH = {time.time() - t0}s and output written at {write_path}" ) - + if empty_result: + return None buckets_df = dask_cudf.read_parquet(write_path, split_row_groups=False) return DocumentDataset(buckets_df) @@ -488,6 +551,7 @@ def __init__( num_hashes=self.config.num_hashes, num_buckets=self.config.num_buckets, buckets_per_shuffle=self.config.buckets_per_shuffle, + false_positive_check=self.config.false_positive_check, logger=self._logger, id_fields=[self.config.id_field], profile_dir=self.config.profile_dir, @@ -556,6 +620,11 @@ def __call__(self, dataset: DocumentDataset): minhashLSH = Sequential([self.minhash, self.lsh]) buckets_df = minhashLSH(dataset) print(f"Stage{stage_num}: Minhash + LSH complete!") + if buckets_df is None: + print( + f"Stage{stage_num}: No potential duplicate documents found during LSH" + ) + return None stage_num += 1 if self.config.false_positive_check: @@ -740,6 +809,7 @@ def buckets_to_edges( def __call__(self, dataset: DocumentDataset) -> DocumentDataset: buckets_df = dataset.df + self._logger.info(f"Starting conversion of LSH Buckets to Graph Edgelist") if len(self.id_fields) > 1: buckets_df = buckets_df.map_partitions( BucketsToEdges._combine_multiple_ids, diff --git a/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py b/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py index c2fb80100..3312c4f9d 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py +++ b/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py @@ -71,6 +71,7 @@ def main(args): id_fields=["dataset_id", "doc_id"], profile_dir=args.profile_path, minhash_field=minhash_field, + false_positive_check=args.false_positive_check, logger=logger, ) @@ -118,6 +119,11 @@ def attach_args(): help="Output directory where minhashes will be written. " "Each Parquet file consists of document and bucket IDs.", ) + parser.add_argument( + "--false-positive-check", + action="store_true", + help="Converts LSH buckets to integers required for running the false positive check", + ) return parser diff --git a/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py index 4722beaa8..ea7e1ad6c 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py @@ -202,3 +202,16 @@ def strip_trailing_sep(path: str): Strips a path string of trailing path seperators like `/` if any. """ return path.rstrip(os.path.sep) + + +def check_empty_buckets(bucket_path): + """ + Inspects parquet metadata of the buckets dataset to check if it's an empty dataset. + """ + from pyarrow.dataset import dataset + + ds = dataset(bucket_path, format="parquet") + for fragment in ds.get_fragments(): + if fragment.metadata.num_rows > 0: + return False + return True diff --git a/tests/test_fuzzy_dedup.py b/tests/test_fuzzy_dedup.py index 72bb92835..45953b6af 100644 --- a/tests/test_fuzzy_dedup.py +++ b/tests/test_fuzzy_dedup.py @@ -71,6 +71,23 @@ def large_fuzzy_dedup_data(): return DocumentDataset(df) +@pytest.fixture +def no_duplicates_fuzzy_dedup_data(): + df = cudf.DataFrame( + { + "id": [1, 2, 3, 4], + "text": [ + "A test string", + "Very different thing", + "Something completely else that doesn't match", + "The quick black cat jumps over the lazy dog", + ], + } + ) + df = dask_cudf.from_cudf(df, 2) + return DocumentDataset(df) + + @pytest.fixture def shuffle_fail_fuzzy_dedup_data(): df = cudf.DataFrame( @@ -224,6 +241,65 @@ def test_multiple_id_cols(self, tmpdir): ) assert_eq(expected_df, docs_list, check_index=False) + @pytest.mark.parametrize("false_positive_check", [True, False]) + def test_no_duplicates(self, tmpdir, false_positive_check): + minhash_df = cudf.DataFrame( + { + "id": [1, 2, 3, 4, 5], + "minhash_sig": [ + [1, 2, 1, 2, 1], + [2, 3, 3, 4, 5], + [3, 4, 5, 5, 6], + [4, 8, 7, 6, 7], + [5, 10, 9, 7, 8], + ], + } + ) + minhash_dataset = DocumentDataset(dask_cudf.from_cudf(minhash_df, 2)) + + lsh = LSH( + cache_dir=tmpdir, + num_hashes=5, + num_buckets=5, + buckets_per_shuffle=1, + id_fields="id", + minhash_field="minhash_sig", + false_positive_check=false_positive_check, + ) + buckets = lsh(minhash_dataset) + assert buckets is None + assert "_buckets.parquet" not in os.listdir(tmpdir) + + @pytest.mark.parametrize("false_positive_check", [True, False]) + def test_partial_overlap(self, tmpdir, false_positive_check): + minhash_df = cudf.DataFrame( + { + "id": [1, 2, 3], + "minhash_sig": [ + [1, 2, 1, 1, 1], + [2, 3, 1, 2, 2], + [3, 4, 2, 3, 1], + ], + } + ) + minhash_dataset = DocumentDataset(dask_cudf.from_cudf(minhash_df, 2)) + + lsh = LSH( + cache_dir=tmpdir, + num_hashes=5, + num_buckets=5, + buckets_per_shuffle=1, + id_fields="id", + minhash_field="minhash_sig", + false_positive_check=false_positive_check, + ) + buckets = lsh(minhash_dataset) + assert len(buckets) == 4 + assert buckets.df["_bucket_id"].nunique().compute() == 2 + assert_eq( + buckets.df["id"], cudf.Series([1, 2, 1, 3], name="id"), check_index=False + ) + @pytest.mark.gpu class TestFuzzyDuplicates: @@ -469,6 +545,32 @@ def test_shuffle_fail_fuzzy_dedup_data( expected_df = expected_df.sort_values() assert_eq(expected_df, result_df, check_index=False) + @pytest.mark.parametrize("false_positive_check", [True, False]) + def test_fuzzy_dedup_no_duplicates( + self, no_duplicates_fuzzy_dedup_data, tmpdir, false_positive_check + ): + # Dedup might fail when indices per partition do not start from 0 + no_duplicates_fuzzy_dedup_data.df = ( + no_duplicates_fuzzy_dedup_data.df.reset_index(drop=True) + ) + config = FuzzyDuplicatesConfig( + cache_dir=tmpdir, + id_field="id", + text_field="text", + seed=42, + char_ngrams=5, + num_buckets=10, + hashes_per_bucket=1, + use_64_bit_hash=False, + buckets_per_shuffle=5, + false_positive_check=false_positive_check, + num_anchors=2, + jaccard_threshold=0.39, + ) + fuzzy_duplicates = FuzzyDuplicates(config=config) + result = fuzzy_duplicates(no_duplicates_fuzzy_dedup_data) + assert result is None + class TestFuzzyDuplicatesConfig: def test_bad_inputs(self, tmpdir):