From ccb1e3132c25d075a61d0d2f6335f313d4705e43 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Thu, 24 Oct 2024 19:30:32 -0700 Subject: [PATCH 1/6] Add codepath for computing buckets without int conversion Signed-off-by: Ayush Dattagupta --- nemo_curator/modules/fuzzy_dedup.py | 62 +++++++++++++++---- .../utils/fuzzy_dedup_utils/io_utils.py | 13 ++++ 2 files changed, 62 insertions(+), 13 deletions(-) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index 63576516c..ddb3fdc1c 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -47,6 +47,7 @@ from nemo_curator.utils.fuzzy_dedup_utils.id_mapping import int_ids_to_str from nemo_curator.utils.fuzzy_dedup_utils.io_utils import ( aggregated_anchor_docs_with_bk_read, + check_empty_buckets, get_restart_offsets, update_restart_offsets, ) @@ -198,6 +199,7 @@ def __init__( num_hashes: int, num_buckets: int, buckets_per_shuffle: int = 1, + buckets_as_int: bool = False, logger: Union[logging.LoggerAdapter, str] = "./", id_fields: Union[str, list] = "id", minhash_field: str = "_minhash_signature", @@ -228,6 +230,7 @@ def __init__( self.bucket_ranges = self._generate_bucket_ranges( self.num_buckets, self.num_hashes ) + self.buckets_as_int = buckets_as_int if cache_dir is None: raise ValueError( @@ -320,6 +323,8 @@ def lsh( """ Computes buckets and writes them as parquet files to the write_path """ + buckets_isempty = True + meta = self._minhash_to_bucket_meta(df) df = df.map_partitions( self.minhash_to_buckets, @@ -343,17 +348,19 @@ def lsh( ).map_partitions(lambda x: x[x["_bucket_id"].duplicated(keep=False)]) df2 = df2.reset_index(drop=True) - df2, end_id = self.bucket_id_to_int( - df2, bucket_col_name="_bucket_id", start_id=bucket_start_id - ) - # If bucketing return empty dataframe - if end_id < bucket_start_id: - continue - bucket_start_id = end_id + 1 + if self.buckets_as_int: + df2, end_id = self.bucket_id_to_int( + df2, bucket_col_name="_bucket_id", start_id=bucket_start_id + ) + # If bucketing return empty dataframe + if end_id < bucket_start_id: + continue + bucket_start_id = end_id + 1 + buckets_isempty = False # Workaround for dtype mismatches with empty partitions - dtypes = df2.dtypes.to_dict() - df2 = df2.map_partitions(lambda x: x.astype(dtypes)) + # dtypes = df2.dtypes.to_dict() + # df2 = df2.map_partitions(lambda x: x.astype(dtypes)) if i == 0: if os.path.exists(write_path): @@ -362,9 +369,29 @@ def lsh( ) df2.to_parquet(write_path, write_index=False, overwrite=True) else: - df2.to_parquet(write_path, write_index=False, append=True) + df2.to_parquet( + write_path, + write_index=False, + overwrite=buckets_isempty, + append=not buckets_isempty, + ) - self._logger.info(f"Wrote data for buckets: {value_vars}") + if os.path.exists(write_path) and buckets_isempty: + buckets_isempty = check_empty_buckets(write_path) + + if buckets_isempty: + self._logger.info( + f"No duplicate documents found for buckets: {value_vars}" + ) + else: + self._logger.info(f"Wrote data for buckets: {value_vars}") + + if buckets_isempty: + self._logger.info("No duplicate documents found during LSH") + import shutil + + shutil.rmtree(write_path) + return buckets_isempty def __call__(self, dataset: DocumentDataset) -> DocumentDataset: df = dataset.df @@ -372,11 +399,12 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset: write_path = os.path.join(self.cache_dir, "_buckets.parquet") t0 = time.time() with performance_report_if_with_ts_suffix(self.profile_dir, f"lsh-profile"): - self.lsh(write_path=write_path, df=df) + empty_result = self.lsh(write_path=write_path, df=df) self._logger.info( f"Time taken for LSH = {time.time() - t0}s and output written at {write_path}" ) - + if empty_result: + return None buckets_df = dask_cudf.read_parquet(write_path, split_row_groups=False) return DocumentDataset(buckets_df) @@ -425,6 +453,8 @@ def __init__( num_hashes=self.config.num_hashes, num_buckets=self.config.num_buckets, buckets_per_shuffle=self.config.buckets_per_shuffle, + # Only convert buckets to int if we are running false positive check + buckets_as_int=self.config.false_positive_check, logger=self._logger, id_fields=[self.config.id_field], profile_dir=self.config.profile_dir, @@ -494,6 +524,11 @@ def __call__(self, dataset: DocumentDataset): minhashLSH = Sequential([self.minhash, self.lsh]) buckets_df = minhashLSH(dataset) print(f"Stage{stage_num}: Minhash + LSH complete!") + if buckets_df is None: + print( + f"Stage{stage_num}: No potential duplicate documents found during LSH" + ) + return None stage_num += 1 if self.config.false_positive_check: @@ -677,6 +712,7 @@ def buckets_to_edges( def __call__(self, dataset: DocumentDataset) -> DocumentDataset: buckets_df = dataset.df + self._logger.info(f"Starting conversion of LSH Buckets to Graph Edgelist") if len(self.id_fields) > 1: buckets_df = buckets_df.map_partitions( BucketsToEdges._combine_multiple_ids, diff --git a/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py index e6f126209..93484c92f 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py @@ -201,3 +201,16 @@ def strip_trailing_sep(path: str): Strips a path string of trailing path seperators like `/` if any. """ return path.rstrip(os.path.sep) + + +def check_empty_buckets(bucket_path): + """ + Inspects parquet metadata of the buckets dataset to check if it's an empty dataset. + """ + from pyarrow.dataset import dataset + + ds = dataset(bucket_path, format="parquet") + for fragment in ds.get_fragments(): + if fragment.metadata.num_rows > 0: + return False + return True From 30f383cc54b966af7a37ba4cb97434bbb5b24e72 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Wed, 13 Nov 2024 00:29:05 -0800 Subject: [PATCH 2/6] Refactor write logic into its own method Signed-off-by: Ayush Dattagupta --- nemo_curator/modules/fuzzy_dedup.py | 102 +++++++++++++++++++--------- 1 file changed, 69 insertions(+), 33 deletions(-) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index f774efce8..ebe40bfa2 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -319,11 +319,18 @@ def lsh( self, write_path: str, df: dask_cudf.DataFrame, - ) -> None: + ) -> bool: """ - Computes buckets and writes them as parquet files to the write_path + Computes hash buckets for the DataFrame and writes them as parquet files to the specified path. + + Parameters: + - write_path (str): The directory path to write parquet files. + - df (dask_cudf.DataFrame): The input DataFrame with minhashes to be bucketed. + Returns: + are_buckets_empty: True if buckets were empty (no duplicates found), False otherwise. """ - buckets_isempty = True + wrote_buckets = False + are_buckets_empty = True meta = self._minhash_to_bucket_meta(df) df = df.map_partitions( @@ -333,12 +340,14 @@ def lsh( ) bucket_start_id = 0 for i in range(0, self.num_buckets, self.buckets_per_shuffle): - value_vars = [ + bucket_columns = [ f"_bucket_{i}" for i in range(i, min(self.num_buckets, i + self.buckets_per_shuffle)) ] df2 = df.melt( - id_vars=self.id_fields, value_name="_bucket_id", value_vars=value_vars + id_vars=self.id_fields, + value_name="_bucket_id", + value_vars=bucket_columns, )[self.id_fields + ["_bucket_id"]] df2 = df2.shuffle( @@ -348,50 +357,77 @@ def lsh( ).map_partitions(lambda x: x[x["_bucket_id"].duplicated(keep=False)]) df2 = df2.reset_index(drop=True) + # Buckets to Int if self.buckets_as_int: df2, end_id = self.bucket_id_to_int( df2, bucket_col_name="_bucket_id", start_id=bucket_start_id ) # If bucketing return empty dataframe if end_id < bucket_start_id: + self._logger.info( + f"No duplicate documents found for buckets: {bucket_columns}" + ) continue bucket_start_id = end_id + 1 - buckets_isempty = False + are_buckets_empty = False # Workaround for dtype mismatches with empty partitions # dtypes = df2.dtypes.to_dict() # df2 = df2.map_partitions(lambda x: x.astype(dtypes)) + wrote_buckets, are_buckets_empty = self._write_bucket_parquet( + df2, + write_path, + wrote_buckets, + are_buckets_empty, + bucket_columns, + ) - if i == 0: - if os.path.exists(write_path): - warnings.warn( - f"Output path {write_path} already exists and will be overwritten" - ) - df2.to_parquet(write_path, write_index=False, overwrite=True) - else: - df2.to_parquet( - write_path, - write_index=False, - overwrite=buckets_isempty, - append=not buckets_isempty, - ) + if are_buckets_empty: + self._logger.info("No duplicate documents found during LSH") + if os.path.exists(write_path): + import shutil - if os.path.exists(write_path) and buckets_isempty: - buckets_isempty = check_empty_buckets(write_path) + shutil.rmtree(write_path) - if buckets_isempty: - self._logger.info( - f"No duplicate documents found for buckets: {value_vars}" - ) - else: - self._logger.info(f"Wrote data for buckets: {value_vars}") + return are_buckets_empty - if buckets_isempty: - self._logger.info("No duplicate documents found during LSH") - import shutil + def _write_bucket_parquet( + self, + df: dask_cudf.DataFrame, + write_path: str, + wrote_buckets: bool, + are_buckets_empty: bool, + buckets_to_write: List[str], + ) -> tuple[bool, bool]: + """ + Utility function to write the bucketed data to parquet + handling cases of overwriting and appending as needed. + """ + if not wrote_buckets: + if os.path.exists(write_path): + warnings.warn( + f"Output path {write_path} already exists and will be overwritten" + ) + df.to_parquet(write_path, write_index=False, overwrite=True) + else: + df.to_parquet( + write_path, + write_index=False, + overwrite=are_buckets_empty, + append=not are_buckets_empty, + ) + # Only check if buckets written so far are empty + if are_buckets_empty: + are_buckets_empty = check_empty_buckets(write_path) + wrote_buckets = True - shutil.rmtree(write_path) - return buckets_isempty + if are_buckets_empty: + self._logger.info( + f"No duplicate documents found for buckets: {buckets_to_write}" + ) + else: + self._logger.info(f"Wrote data for buckets: {buckets_to_write}") + return wrote_buckets, are_buckets_empty def __call__(self, dataset: DocumentDataset) -> DocumentDataset: df = dataset.df @@ -412,7 +448,7 @@ def __call__(self, dataset: DocumentDataset) -> DocumentDataset: class FuzzyDuplicates: def __init__( self, - config: FuzzyDuplicatesConfig, + config: FuzzyDulicatesConfig, logger: Union[logging.LoggerAdapter, str] = "./", ): """ From d7a2617de533c47013cce829a5aa93840ccc15e8 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Wed, 13 Nov 2024 00:30:25 -0800 Subject: [PATCH 3/6] Update cli script Signed-off-by: Ayush Dattagupta --- nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py b/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py index c2fb80100..7ad17f2bb 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py +++ b/nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py @@ -71,6 +71,7 @@ def main(args): id_fields=["dataset_id", "doc_id"], profile_dir=args.profile_path, minhash_field=minhash_field, + buckets_as_int=args.bucket_id_as_int, logger=logger, ) @@ -118,6 +119,11 @@ def attach_args(): help="Output directory where minhashes will be written. " "Each Parquet file consists of document and bucket IDs.", ) + parser.add_argument( + "--bucket-id-as-int", + action="store_true", + help="Convert bucket IDs to integers. Required if running false positive check.", + ) return parser From 954a0430a9cfb4fa28b8b495a060a29898b8981f Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Wed, 13 Nov 2024 00:30:45 -0800 Subject: [PATCH 4/6] Add tests Signed-off-by: Ayush Dattagupta --- tests/test_fuzzy_dedup.py | 102 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/tests/test_fuzzy_dedup.py b/tests/test_fuzzy_dedup.py index 72bb92835..5bb0a29ec 100644 --- a/tests/test_fuzzy_dedup.py +++ b/tests/test_fuzzy_dedup.py @@ -71,6 +71,23 @@ def large_fuzzy_dedup_data(): return DocumentDataset(df) +@pytest.fixture +def no_duplicates_fuzzy_dedup_data(): + df = cudf.DataFrame( + { + "id": [1, 2, 3, 4], + "text": [ + "A test string", + "Very different thing", + "Something completely else that doesn't match", + "The quick black cat jumps over the lazy dog", + ], + } + ) + df = dask_cudf.from_cudf(df, 2) + return DocumentDataset(df) + + @pytest.fixture def shuffle_fail_fuzzy_dedup_data(): df = cudf.DataFrame( @@ -224,6 +241,65 @@ def test_multiple_id_cols(self, tmpdir): ) assert_eq(expected_df, docs_list, check_index=False) + @pytest.mark.parametrize("buckets_as_int", [True, False]) + def test_no_duplicates(self, tmpdir, buckets_as_int): + minhash_df = cudf.DataFrame( + { + "id": [1, 2, 3, 4, 5], + "minhash_sig": [ + [1, 2, 1, 2, 1], + [2, 3, 3, 4, 5], + [3, 4, 5, 5, 6], + [4, 8, 7, 6, 7], + [5, 10, 9, 7, 8], + ], + } + ) + minhash_dataset = DocumentDataset(dask_cudf.from_cudf(minhash_df, 2)) + + lsh = LSH( + cache_dir=tmpdir, + num_hashes=5, + num_buckets=5, + buckets_per_shuffle=1, + id_fields="id", + minhash_field="minhash_sig", + buckets_as_int=buckets_as_int, + ) + buckets = lsh(minhash_dataset) + assert buckets is None + assert "_buckets.parquet" not in os.listdir(tmpdir) + + @pytest.mark.parametrize("buckets_as_int", [True, False]) + def test_partial_overlap(self, tmpdir, buckets_as_int): + minhash_df = cudf.DataFrame( + { + "id": [1, 2, 3], + "minhash_sig": [ + [1, 2, 1, 1, 1], + [2, 3, 1, 2, 2], + [3, 4, 2, 3, 1], + ], + } + ) + minhash_dataset = DocumentDataset(dask_cudf.from_cudf(minhash_df, 2)) + + lsh = LSH( + cache_dir=tmpdir, + num_hashes=5, + num_buckets=5, + buckets_per_shuffle=1, + id_fields="id", + minhash_field="minhash_sig", + buckets_as_int=buckets_as_int, + ) + buckets = lsh(minhash_dataset) + assert len(buckets) == 4 + assert buckets.df["_bucket_id"].nunique().compute() == 2 + assert_eq( + buckets.df["id"], cudf.Series([1, 2, 1, 3], name="id"), check_index=False + ) + @pytest.mark.gpu class TestFuzzyDuplicates: @@ -469,6 +545,32 @@ def test_shuffle_fail_fuzzy_dedup_data( expected_df = expected_df.sort_values() assert_eq(expected_df, result_df, check_index=False) + @pytest.mark.parametrize("false_positive_check", [True, False]) + def test_fuzzy_dedup_no_duplicates( + self, no_duplicates_fuzzy_dedup_data, tmpdir, false_positive_check + ): + # Dedup might fail when indices per partition do not start from 0 + no_duplicates_fuzzy_dedup_data.df = ( + no_duplicates_fuzzy_dedup_data.df.reset_index(drop=True) + ) + config = FuzzyDuplicatesConfig( + cache_dir=tmpdir, + id_field="id", + text_field="text", + seed=42, + char_ngrams=5, + num_buckets=10, + hashes_per_bucket=1, + use_64_bit_hash=False, + buckets_per_shuffle=5, + false_positive_check=false_positive_check, + num_anchors=2, + jaccard_threshold=0.39, + ) + fuzzy_duplicates = FuzzyDuplicates(config=config) + result = fuzzy_duplicates(no_duplicates_fuzzy_dedup_data) + assert result is None + class TestFuzzyDuplicatesConfig: def test_bad_inputs(self, tmpdir): From 3b51aadc56cd7c1fe0e7a3ef0135845207bc8050 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Wed, 13 Nov 2024 00:32:31 -0800 Subject: [PATCH 5/6] Update docs Signed-off-by: Ayush Dattagupta --- docs/user-guide/gpudeduplication.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/user-guide/gpudeduplication.rst b/docs/user-guide/gpudeduplication.rst index 7970dfe3c..88c4a6d0f 100644 --- a/docs/user-guide/gpudeduplication.rst +++ b/docs/user-guide/gpudeduplication.rst @@ -314,6 +314,7 @@ steps (all scripts are included in the `nemo_curator/scripts/fuzzy_deduplication --num-bands num_bands \ --buckets-per-shuffle 1 `#Value between [1-num_bands]. Higher is better but might lead to OOM` \ --log-dir ./ + # --bucket-id-as-int `#Flag to use integer IDs for buckets if running false positive check.` # --scheduler-file /path/to/file.json 3. False Positive Check (optional): If skipping this step, proceed to the :ref:`skip fp check section `. From 8dbc48a5d9ea765ee5ecbea6d0d02f8bb956c9a4 Mon Sep 17 00:00:00 2001 From: Ayush Dattagupta Date: Fri, 15 Nov 2024 16:04:58 -0800 Subject: [PATCH 6/6] Update fuzzy_deduplication example Signed-off-by: Ayush Dattagupta --- examples/fuzzy_deduplication.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/examples/fuzzy_deduplication.py b/examples/fuzzy_deduplication.py index 40f3fae25..b7da2470c 100644 --- a/examples/fuzzy_deduplication.py +++ b/examples/fuzzy_deduplication.py @@ -81,6 +81,11 @@ def main(args): fuzzy_dup = FuzzyDuplicates(logger=log_dir, config=fuzzy_dedup_config) duplicates = fuzzy_dup(dataset=input_dataset) + if duplicates is None: + print("No duplicates found") + print(f"Time taken:{time.time() - t0}s") + return + # By default all duplicate id's and the group they belong to are included in the result # keep 1 document from each group of duplcates and mark the others to remove # https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.duplicated.html @@ -95,7 +100,7 @@ def main(args): ) ] write_to_disk(result, output_dir, output_type=filetype) - print(time.time() - t0) + print(f"Time taken:{time.time() - t0}s") def attach_args(