Skip to content

Commit

Permalink
Add codepath for computing buckets without int conversion (NVIDIA#326)
Browse files Browse the repository at this point in the history
* Add codepath for computing buckets without int conversion

Signed-off-by: Ayush Dattagupta <[email protected]>

* Refactor write logic into its own method

Signed-off-by: Ayush Dattagupta <[email protected]>

* Update cli script

Signed-off-by: Ayush Dattagupta <[email protected]>

* Add tests

Signed-off-by: Ayush Dattagupta <[email protected]>

* Update docs

Signed-off-by: Ayush Dattagupta <[email protected]>

* Update fuzzy_deduplication example

Signed-off-by: Ayush Dattagupta <[email protected]>

* Address reviews

Signed-off-by: Ayush Dattagupta <[email protected]>

* update docs

Signed-off-by: Ayush Dattagupta <[email protected]>

* Update arg name in tests

Signed-off-by: Ayush Dattagupta <[email protected]>

---------

Signed-off-by: Ayush Dattagupta <[email protected]>
  • Loading branch information
ayushdg authored Nov 27, 2024
1 parent a024652 commit 3ebc807
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 26 deletions.
1 change: 1 addition & 0 deletions docs/user-guide/gpudeduplication.rst
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ steps (all scripts are included in the `nemo_curator/scripts/fuzzy_deduplication
--num-bands num_bands \
--buckets-per-shuffle 1 `#Value between [1-num_bands]. Higher is better but might lead to OOM` \
--log-dir ./
# --false-positive-check `#Writes bucket ID's in a format required for the false positive check`
# --scheduler-file /path/to/file.json
3. False Positive Check (optional): If skipping this step, proceed to the :ref:`skip fp check section <fuzzydup_nofp>`.
Expand Down
7 changes: 6 additions & 1 deletion examples/fuzzy_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ def main(args):
fuzzy_dup = FuzzyDuplicates(logger=log_dir, config=fuzzy_dedup_config)
duplicates = fuzzy_dup(dataset=input_dataset)

if duplicates is None:
print("No duplicates found")
print(f"Time taken:{time.time() - t0}s")
return

# By default all duplicate id's and the group they belong to are included in the result
# keep 1 document from each group of duplcates and mark the others to remove
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.duplicated.html
Expand All @@ -95,7 +100,7 @@ def main(args):
)
]
write_to_disk(result, output_dir, output_type=filetype)
print(time.time() - t0)
print(f"Time taken:{time.time() - t0}s")


def attach_args(
Expand Down
120 changes: 95 additions & 25 deletions nemo_curator/modules/fuzzy_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from nemo_curator.utils.fuzzy_dedup_utils.id_mapping import int_ids_to_str
from nemo_curator.utils.fuzzy_dedup_utils.io_utils import (
aggregated_anchor_docs_with_bk_read,
check_empty_buckets,
get_restart_offsets,
update_restart_offsets,
)
Expand Down Expand Up @@ -261,6 +262,7 @@ def __init__(
num_hashes: int,
num_buckets: int,
buckets_per_shuffle: int = 1,
false_positive_check: bool = False,
logger: Union[logging.LoggerAdapter, str] = "./",
id_fields: Union[str, list] = "id",
minhash_field: str = "_minhash_signature",
Expand All @@ -275,8 +277,9 @@ def __init__(
num_buckets: Number of bands/buckets to create from the minhash signature.
Hashes_per_signature = num_hashes / num_buckets
buckets_per_shuffle: Number of bands/buckets to shuffle concurrently.
Larger values process larger batches by processing multiple bands
but might lead to memory pressures and related errors.
false_positive_check: bool
If True, writes out buckets in a format compatible with downstream false positive check.
logger: Existing logger to log to, or a path to a log directory.
id_field: Columns in the Dataset denoting document ID.
minhash_field: Column in the Dataset denoting minhash signature.
Expand All @@ -291,6 +294,7 @@ def __init__(
self.bucket_ranges = self._generate_bucket_ranges(
self.num_buckets, self.num_hashes
)
self.buckets_as_int = false_positive_check

if cache_dir is None:
raise ValueError(
Expand Down Expand Up @@ -379,10 +383,19 @@ def lsh(
self,
write_path: str,
df: dask_cudf.DataFrame,
) -> None:
) -> bool:
"""
Computes buckets and writes them as parquet files to the write_path
Computes hash buckets for the DataFrame and writes them as parquet files to the specified path.
Parameters:
- write_path (str): The directory path to write parquet files.
- df (dask_cudf.DataFrame): The input DataFrame with minhashes to be bucketed.
Returns:
are_buckets_empty: True if buckets were empty (no duplicates found), False otherwise.
"""
wrote_buckets = False
are_buckets_empty = True

meta = self._minhash_to_bucket_meta(df)
df = df.map_partitions(
self.minhash_to_buckets,
Expand All @@ -391,12 +404,14 @@ def lsh(
)
bucket_start_id = 0
for i in range(0, self.num_buckets, self.buckets_per_shuffle):
value_vars = [
bucket_columns = [
f"_bucket_{i}"
for i in range(i, min(self.num_buckets, i + self.buckets_per_shuffle))
]
df2 = df.melt(
id_vars=self.id_fields, value_name="_bucket_id", value_vars=value_vars
id_vars=self.id_fields,
value_name="_bucket_id",
value_vars=bucket_columns,
)[self.id_fields + ["_bucket_id"]]

df2 = df2.shuffle(
Expand All @@ -406,40 +421,88 @@ def lsh(
).map_partitions(lambda x: x[x["_bucket_id"].duplicated(keep=False)])

df2 = df2.reset_index(drop=True)
df2, end_id = self.bucket_id_to_int(
df2, bucket_col_name="_bucket_id", start_id=bucket_start_id
# Buckets to Int
if self.buckets_as_int:
df2, end_id = self.bucket_id_to_int(
df2, bucket_col_name="_bucket_id", start_id=bucket_start_id
)
# If bucketing return empty dataframe
if end_id < bucket_start_id:
self._logger.info(
f"No duplicate documents found for buckets: {bucket_columns}"
)
continue
bucket_start_id = end_id + 1
are_buckets_empty = False

wrote_buckets, are_buckets_empty = self._write_bucket_parquet(
df2,
write_path,
wrote_buckets,
are_buckets_empty,
bucket_columns,
)
# If bucketing return empty dataframe
if end_id < bucket_start_id:
continue
bucket_start_id = end_id + 1

# Workaround for dtype mismatches with empty partitions
dtypes = df2.dtypes.to_dict()
df2 = df2.map_partitions(lambda x: x.astype(dtypes))
if are_buckets_empty:
self._logger.info("No duplicate documents found during LSH")
if os.path.exists(write_path):
import shutil

if i == 0:
if os.path.exists(write_path):
warnings.warn(
f"Output path {write_path} already exists and will be overwritten"
)
df2.to_parquet(write_path, write_index=False, overwrite=True)
else:
df2.to_parquet(write_path, write_index=False, append=True)
shutil.rmtree(write_path)

self._logger.info(f"Wrote data for buckets: {value_vars}")
return are_buckets_empty

def _write_bucket_parquet(
self,
df: dask_cudf.DataFrame,
write_path: str,
wrote_buckets: bool,
are_buckets_empty: bool,
buckets_to_write: List[str],
) -> tuple[bool, bool]:
"""
Utility function to write the bucketed data to parquet
handling cases of overwriting and appending as needed.
"""
if not wrote_buckets:
if os.path.exists(write_path):
warnings.warn(
f"Output path {write_path} already exists and will be overwritten"
)
df.to_parquet(write_path, write_index=False, overwrite=True)
else:
df.to_parquet(
write_path,
write_index=False,
overwrite=are_buckets_empty,
append=not are_buckets_empty,
ignore_divisions=True,
)
# Only check if buckets written so far are empty
if are_buckets_empty:
are_buckets_empty = check_empty_buckets(write_path)
wrote_buckets = True

if are_buckets_empty:
self._logger.info(
f"No duplicate documents found for buckets: {buckets_to_write}"
)
else:
self._logger.info(f"Wrote data for buckets: {buckets_to_write}")
return wrote_buckets, are_buckets_empty

def __call__(self, dataset: DocumentDataset) -> DocumentDataset:
df = dataset.df

write_path = os.path.join(self.cache_dir, "_buckets.parquet")
t0 = time.time()
with performance_report_if_with_ts_suffix(self.profile_dir, "lsh-profile"):
self.lsh(write_path=write_path, df=df)
empty_result = self.lsh(write_path=write_path, df=df)
self._logger.info(
f"Time taken for LSH = {time.time() - t0}s and output written at {write_path}"
)

if empty_result:
return None
buckets_df = dask_cudf.read_parquet(write_path, split_row_groups=False)
return DocumentDataset(buckets_df)

Expand Down Expand Up @@ -488,6 +551,7 @@ def __init__(
num_hashes=self.config.num_hashes,
num_buckets=self.config.num_buckets,
buckets_per_shuffle=self.config.buckets_per_shuffle,
false_positive_check=self.config.false_positive_check,
logger=self._logger,
id_fields=[self.config.id_field],
profile_dir=self.config.profile_dir,
Expand Down Expand Up @@ -556,6 +620,11 @@ def __call__(self, dataset: DocumentDataset):
minhashLSH = Sequential([self.minhash, self.lsh])
buckets_df = minhashLSH(dataset)
print(f"Stage{stage_num}: Minhash + LSH complete!")
if buckets_df is None:
print(
f"Stage{stage_num}: No potential duplicate documents found during LSH"
)
return None
stage_num += 1

if self.config.false_positive_check:
Expand Down Expand Up @@ -740,6 +809,7 @@ def buckets_to_edges(

def __call__(self, dataset: DocumentDataset) -> DocumentDataset:
buckets_df = dataset.df
self._logger.info(f"Starting conversion of LSH Buckets to Graph Edgelist")
if len(self.id_fields) > 1:
buckets_df = buckets_df.map_partitions(
BucketsToEdges._combine_multiple_ids,
Expand Down
6 changes: 6 additions & 0 deletions nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def main(args):
id_fields=["dataset_id", "doc_id"],
profile_dir=args.profile_path,
minhash_field=minhash_field,
false_positive_check=args.false_positive_check,
logger=logger,
)

Expand Down Expand Up @@ -118,6 +119,11 @@ def attach_args():
help="Output directory where minhashes will be written. "
"Each Parquet file consists of document and bucket IDs.",
)
parser.add_argument(
"--false-positive-check",
action="store_true",
help="Converts LSH buckets to integers required for running the false positive check",
)

return parser

Expand Down
13 changes: 13 additions & 0 deletions nemo_curator/utils/fuzzy_dedup_utils/io_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,16 @@ def strip_trailing_sep(path: str):
Strips a path string of trailing path seperators like `/` if any.
"""
return path.rstrip(os.path.sep)


def check_empty_buckets(bucket_path):
"""
Inspects parquet metadata of the buckets dataset to check if it's an empty dataset.
"""
from pyarrow.dataset import dataset

ds = dataset(bucket_path, format="parquet")
for fragment in ds.get_fragments():
if fragment.metadata.num_rows > 0:
return False
return True
102 changes: 102 additions & 0 deletions tests/test_fuzzy_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,23 @@ def large_fuzzy_dedup_data():
return DocumentDataset(df)


@pytest.fixture
def no_duplicates_fuzzy_dedup_data():
df = cudf.DataFrame(
{
"id": [1, 2, 3, 4],
"text": [
"A test string",
"Very different thing",
"Something completely else that doesn't match",
"The quick black cat jumps over the lazy dog",
],
}
)
df = dask_cudf.from_cudf(df, 2)
return DocumentDataset(df)


@pytest.fixture
def shuffle_fail_fuzzy_dedup_data():
df = cudf.DataFrame(
Expand Down Expand Up @@ -224,6 +241,65 @@ def test_multiple_id_cols(self, tmpdir):
)
assert_eq(expected_df, docs_list, check_index=False)

@pytest.mark.parametrize("false_positive_check", [True, False])
def test_no_duplicates(self, tmpdir, false_positive_check):
minhash_df = cudf.DataFrame(
{
"id": [1, 2, 3, 4, 5],
"minhash_sig": [
[1, 2, 1, 2, 1],
[2, 3, 3, 4, 5],
[3, 4, 5, 5, 6],
[4, 8, 7, 6, 7],
[5, 10, 9, 7, 8],
],
}
)
minhash_dataset = DocumentDataset(dask_cudf.from_cudf(minhash_df, 2))

lsh = LSH(
cache_dir=tmpdir,
num_hashes=5,
num_buckets=5,
buckets_per_shuffle=1,
id_fields="id",
minhash_field="minhash_sig",
false_positive_check=false_positive_check,
)
buckets = lsh(minhash_dataset)
assert buckets is None
assert "_buckets.parquet" not in os.listdir(tmpdir)

@pytest.mark.parametrize("false_positive_check", [True, False])
def test_partial_overlap(self, tmpdir, false_positive_check):
minhash_df = cudf.DataFrame(
{
"id": [1, 2, 3],
"minhash_sig": [
[1, 2, 1, 1, 1],
[2, 3, 1, 2, 2],
[3, 4, 2, 3, 1],
],
}
)
minhash_dataset = DocumentDataset(dask_cudf.from_cudf(minhash_df, 2))

lsh = LSH(
cache_dir=tmpdir,
num_hashes=5,
num_buckets=5,
buckets_per_shuffle=1,
id_fields="id",
minhash_field="minhash_sig",
false_positive_check=false_positive_check,
)
buckets = lsh(minhash_dataset)
assert len(buckets) == 4
assert buckets.df["_bucket_id"].nunique().compute() == 2
assert_eq(
buckets.df["id"], cudf.Series([1, 2, 1, 3], name="id"), check_index=False
)


@pytest.mark.gpu
class TestFuzzyDuplicates:
Expand Down Expand Up @@ -469,6 +545,32 @@ def test_shuffle_fail_fuzzy_dedup_data(
expected_df = expected_df.sort_values()
assert_eq(expected_df, result_df, check_index=False)

@pytest.mark.parametrize("false_positive_check", [True, False])
def test_fuzzy_dedup_no_duplicates(
self, no_duplicates_fuzzy_dedup_data, tmpdir, false_positive_check
):
# Dedup might fail when indices per partition do not start from 0
no_duplicates_fuzzy_dedup_data.df = (
no_duplicates_fuzzy_dedup_data.df.reset_index(drop=True)
)
config = FuzzyDuplicatesConfig(
cache_dir=tmpdir,
id_field="id",
text_field="text",
seed=42,
char_ngrams=5,
num_buckets=10,
hashes_per_bucket=1,
use_64_bit_hash=False,
buckets_per_shuffle=5,
false_positive_check=false_positive_check,
num_anchors=2,
jaccard_threshold=0.39,
)
fuzzy_duplicates = FuzzyDuplicates(config=config)
result = fuzzy_duplicates(no_duplicates_fuzzy_dedup_data)
assert result is None


class TestFuzzyDuplicatesConfig:
def test_bad_inputs(self, tmpdir):
Expand Down

0 comments on commit 3ebc807

Please sign in to comment.