Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add codepath for computing buckets without int conversion #326

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions docs/user-guide/gpudeduplication.rst
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ steps (all scripts are included in the `nemo_curator/scripts/fuzzy_deduplication
--num-bands num_bands \
--buckets-per-shuffle 1 `#Value between [1-num_bands]. Higher is better but might lead to OOM` \
--log-dir ./
# --bucket-id-as-int `#Flag to use integer IDs for buckets if running false positive check.`
# --scheduler-file /path/to/file.json

3. False Positive Check (optional): If skipping this step, proceed to the :ref:`skip fp check section <fuzzydup_nofp>`.
Expand Down
7 changes: 6 additions & 1 deletion examples/fuzzy_deduplication.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ def main(args):
fuzzy_dup = FuzzyDuplicates(logger=log_dir, config=fuzzy_dedup_config)
duplicates = fuzzy_dup(dataset=input_dataset)

if duplicates is None:
print("No duplicates found")
print(f"Time taken:{time.time() - t0}s")
return

# By default all duplicate id's and the group they belong to are included in the result
# keep 1 document from each group of duplcates and mark the others to remove
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.duplicated.html
Expand All @@ -95,7 +100,7 @@ def main(args):
)
]
write_to_disk(result, output_dir, output_type=filetype)
print(time.time() - t0)
print(f"Time taken:{time.time() - t0}s")


def attach_args(
Expand Down
120 changes: 96 additions & 24 deletions nemo_curator/modules/fuzzy_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from nemo_curator.utils.fuzzy_dedup_utils.id_mapping import int_ids_to_str
from nemo_curator.utils.fuzzy_dedup_utils.io_utils import (
aggregated_anchor_docs_with_bk_read,
check_empty_buckets,
get_restart_offsets,
update_restart_offsets,
)
Expand Down Expand Up @@ -261,6 +262,7 @@ def __init__(
num_hashes: int,
num_buckets: int,
buckets_per_shuffle: int = 1,
buckets_as_int: bool = False,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about calling this false_positive_check on the user facing side? I'm fine with then doing something like self.buckets_as_int = false_positive_check and referring to it as self.buckets_as_int everywhere else, but from a user perspective I think it might make it a little clearer about how to set this parameter.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a good suggestion. We can update the docstrings to indicate that it writes out data in a format required by false positive_check if set to true.

logger: Union[logging.LoggerAdapter, str] = "./",
id_fields: Union[str, list] = "id",
minhash_field: str = "_minhash_signature",
Expand Down Expand Up @@ -291,6 +293,7 @@ def __init__(
self.bucket_ranges = self._generate_bucket_ranges(
self.num_buckets, self.num_hashes
)
self.buckets_as_int = buckets_as_int

if cache_dir is None:
raise ValueError(
Expand Down Expand Up @@ -379,10 +382,19 @@ def lsh(
self,
write_path: str,
df: dask_cudf.DataFrame,
) -> None:
) -> bool:
"""
Computes buckets and writes them as parquet files to the write_path
Computes hash buckets for the DataFrame and writes them as parquet files to the specified path.

Parameters:
- write_path (str): The directory path to write parquet files.
- df (dask_cudf.DataFrame): The input DataFrame with minhashes to be bucketed.
Returns:
are_buckets_empty: True if buckets were empty (no duplicates found), False otherwise.
"""
wrote_buckets = False
are_buckets_empty = True

meta = self._minhash_to_bucket_meta(df)
df = df.map_partitions(
self.minhash_to_buckets,
Expand All @@ -391,12 +403,14 @@ def lsh(
)
bucket_start_id = 0
for i in range(0, self.num_buckets, self.buckets_per_shuffle):
value_vars = [
bucket_columns = [
f"_bucket_{i}"
for i in range(i, min(self.num_buckets, i + self.buckets_per_shuffle))
]
df2 = df.melt(
id_vars=self.id_fields, value_name="_bucket_id", value_vars=value_vars
id_vars=self.id_fields,
value_name="_bucket_id",
value_vars=bucket_columns,
)[self.id_fields + ["_bucket_id"]]

df2 = df2.shuffle(
Expand All @@ -406,40 +420,90 @@ def lsh(
).map_partitions(lambda x: x[x["_bucket_id"].duplicated(keep=False)])

df2 = df2.reset_index(drop=True)
df2, end_id = self.bucket_id_to_int(
df2, bucket_col_name="_bucket_id", start_id=bucket_start_id
)
# If bucketing return empty dataframe
if end_id < bucket_start_id:
continue
bucket_start_id = end_id + 1
# Buckets to Int
if self.buckets_as_int:
df2, end_id = self.bucket_id_to_int(
df2, bucket_col_name="_bucket_id", start_id=bucket_start_id
)
# If bucketing return empty dataframe
if end_id < bucket_start_id:
self._logger.info(
f"No duplicate documents found for buckets: {bucket_columns}"
)
continue
bucket_start_id = end_id + 1
are_buckets_empty = False

# Workaround for dtype mismatches with empty partitions
dtypes = df2.dtypes.to_dict()
df2 = df2.map_partitions(lambda x: x.astype(dtypes))
# dtypes = df2.dtypes.to_dict()
# df2 = df2.map_partitions(lambda x: x.astype(dtypes))
wrote_buckets, are_buckets_empty = self._write_bucket_parquet(
df2,
write_path,
wrote_buckets,
are_buckets_empty,
bucket_columns,
)

if i == 0:
if os.path.exists(write_path):
warnings.warn(
f"Output path {write_path} already exists and will be overwritten"
)
df2.to_parquet(write_path, write_index=False, overwrite=True)
else:
df2.to_parquet(write_path, write_index=False, append=True)
if are_buckets_empty:
self._logger.info("No duplicate documents found during LSH")
if os.path.exists(write_path):
import shutil

shutil.rmtree(write_path)

return are_buckets_empty
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable for tracking if all the buckets were empty


def _write_bucket_parquet(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewers ptal at this logic. I've tried to cover most edge cases

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only case I could think was if we ever have to worry about scalability here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a non-zero cost to checking if the buckets are empty or not. I've tried to write check_empty_buckets in a way that it breaks on the first file of any where it finds non empty data, but this might be slow for large network based filesystems. It should however be faster than the current approach of persisting the data first and converting to int.

Once a non empty bucket is detected, that setting is persisted through the next set of iterations so the check is skipped in future iterations.

self,
df: dask_cudf.DataFrame,
write_path: str,
wrote_buckets: bool,
are_buckets_empty: bool,
buckets_to_write: List[str],
) -> tuple[bool, bool]:
"""
Utility function to write the bucketed data to parquet
handling cases of overwriting and appending as needed.
"""
if not wrote_buckets:
if os.path.exists(write_path):
warnings.warn(
f"Output path {write_path} already exists and will be overwritten"
)
df.to_parquet(write_path, write_index=False, overwrite=True)
else:
df.to_parquet(
write_path,
write_index=False,
overwrite=are_buckets_empty,
append=not are_buckets_empty,
)
# Only check if buckets written so far are empty
if are_buckets_empty:
are_buckets_empty = check_empty_buckets(write_path)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason we need to do this in the first place is because there's no way to know if we're writing out an empty dataframe or not, unless we persist, or write it out, check the metadata and then overwrite on the next iteration.

wrote_buckets = True

self._logger.info(f"Wrote data for buckets: {value_vars}")
if are_buckets_empty:
self._logger.info(
f"No duplicate documents found for buckets: {buckets_to_write}"
)
else:
self._logger.info(f"Wrote data for buckets: {buckets_to_write}")
return wrote_buckets, are_buckets_empty

def __call__(self, dataset: DocumentDataset) -> DocumentDataset:
df = dataset.df

write_path = os.path.join(self.cache_dir, "_buckets.parquet")
t0 = time.time()
with performance_report_if_with_ts_suffix(self.profile_dir, "lsh-profile"):
self.lsh(write_path=write_path, df=df)
empty_result = self.lsh(write_path=write_path, df=df)
self._logger.info(
f"Time taken for LSH = {time.time() - t0}s and output written at {write_path}"
)

if empty_result:
return None
buckets_df = dask_cudf.read_parquet(write_path, split_row_groups=False)
return DocumentDataset(buckets_df)

Expand Down Expand Up @@ -488,6 +552,8 @@ def __init__(
num_hashes=self.config.num_hashes,
num_buckets=self.config.num_buckets,
buckets_per_shuffle=self.config.buckets_per_shuffle,
# Only convert buckets to int if we are running false positive check
buckets_as_int=self.config.false_positive_check,
logger=self._logger,
id_fields=[self.config.id_field],
profile_dir=self.config.profile_dir,
Expand Down Expand Up @@ -556,6 +622,11 @@ def __call__(self, dataset: DocumentDataset):
minhashLSH = Sequential([self.minhash, self.lsh])
buckets_df = minhashLSH(dataset)
print(f"Stage{stage_num}: Minhash + LSH complete!")
if buckets_df is None:
print(
f"Stage{stage_num}: No potential duplicate documents found during LSH"
)
return None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this return None or an empty DocumentDataset with no id's

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer returning None. Empty DocumentDatasets might lead to unexplained errors downstream that might be tougher to debug/understand. Happy to hear counter points.
One thing that comes up from this is that I might update the examples/FuzzyDedup.py to handle the case where the result returned was None

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, but then for Sequential I think we might want to handle that behavior too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't seen Sequential being used directly with FuzzyDuplicates since the results cannot be processed downstream by any of the other modules without using to filter out the duplicates first. I'm not sure how to handle this use case. But longer term, we would probably want to add a FuzzyDeduplicate class that calls Fuzzy Duplicates and also handles removal.

stage_num += 1

if self.config.false_positive_check:
Expand Down Expand Up @@ -740,6 +811,7 @@ def buckets_to_edges(

def __call__(self, dataset: DocumentDataset) -> DocumentDataset:
buckets_df = dataset.df
self._logger.info(f"Starting conversion of LSH Buckets to Graph Edgelist")
if len(self.id_fields) > 1:
buckets_df = buckets_df.map_partitions(
BucketsToEdges._combine_multiple_ids,
Expand Down
6 changes: 6 additions & 0 deletions nemo_curator/scripts/fuzzy_deduplication/minhash_lsh.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def main(args):
id_fields=["dataset_id", "doc_id"],
profile_dir=args.profile_path,
minhash_field=minhash_field,
buckets_as_int=args.bucket_id_as_int,
logger=logger,
)

Expand Down Expand Up @@ -118,6 +119,11 @@ def attach_args():
help="Output directory where minhashes will be written. "
"Each Parquet file consists of document and bucket IDs.",
)
parser.add_argument(
"--bucket-id-as-int",
action="store_true",
help="Convert bucket IDs to integers. Required if running false positive check.",
)

return parser

Expand Down
13 changes: 13 additions & 0 deletions nemo_curator/utils/fuzzy_dedup_utils/io_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,16 @@ def strip_trailing_sep(path: str):
Strips a path string of trailing path seperators like `/` if any.
"""
return path.rstrip(os.path.sep)


def check_empty_buckets(bucket_path):
"""
Inspects parquet metadata of the buckets dataset to check if it's an empty dataset.
"""
from pyarrow.dataset import dataset

ds = dataset(bucket_path, format="parquet")
for fragment in ds.get_fragments():
if fragment.metadata.num_rows > 0:
return False
Comment on lines +213 to +216
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic can probably be simplified by using a global metadata file when writing out the parquet dataset write_metadata_file=True. However this had some issues in 24.10: rapidsai/cudf#17177 and is only fixed in 24.12. Will open an issue to simplify this method once that's merged in.

return True
102 changes: 102 additions & 0 deletions tests/test_fuzzy_dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,23 @@ def large_fuzzy_dedup_data():
return DocumentDataset(df)


@pytest.fixture
def no_duplicates_fuzzy_dedup_data():
df = cudf.DataFrame(
{
"id": [1, 2, 3, 4],
"text": [
"A test string",
"Very different thing",
"Something completely else that doesn't match",
"The quick black cat jumps over the lazy dog",
],
}
)
df = dask_cudf.from_cudf(df, 2)
return DocumentDataset(df)


@pytest.fixture
def shuffle_fail_fuzzy_dedup_data():
df = cudf.DataFrame(
Expand Down Expand Up @@ -224,6 +241,65 @@ def test_multiple_id_cols(self, tmpdir):
)
assert_eq(expected_df, docs_list, check_index=False)

@pytest.mark.parametrize("buckets_as_int", [True, False])
def test_no_duplicates(self, tmpdir, buckets_as_int):
minhash_df = cudf.DataFrame(
{
"id": [1, 2, 3, 4, 5],
"minhash_sig": [
[1, 2, 1, 2, 1],
[2, 3, 3, 4, 5],
[3, 4, 5, 5, 6],
[4, 8, 7, 6, 7],
[5, 10, 9, 7, 8],
],
}
)
minhash_dataset = DocumentDataset(dask_cudf.from_cudf(minhash_df, 2))

lsh = LSH(
cache_dir=tmpdir,
num_hashes=5,
num_buckets=5,
buckets_per_shuffle=1,
id_fields="id",
minhash_field="minhash_sig",
buckets_as_int=buckets_as_int,
)
buckets = lsh(minhash_dataset)
assert buckets is None
assert "_buckets.parquet" not in os.listdir(tmpdir)

@pytest.mark.parametrize("buckets_as_int", [True, False])
def test_partial_overlap(self, tmpdir, buckets_as_int):
minhash_df = cudf.DataFrame(
{
"id": [1, 2, 3],
"minhash_sig": [
[1, 2, 1, 1, 1],
[2, 3, 1, 2, 2],
[3, 4, 2, 3, 1],
],
}
)
minhash_dataset = DocumentDataset(dask_cudf.from_cudf(minhash_df, 2))

lsh = LSH(
cache_dir=tmpdir,
num_hashes=5,
num_buckets=5,
buckets_per_shuffle=1,
id_fields="id",
minhash_field="minhash_sig",
buckets_as_int=buckets_as_int,
)
buckets = lsh(minhash_dataset)
assert len(buckets) == 4
assert buckets.df["_bucket_id"].nunique().compute() == 2
assert_eq(
buckets.df["id"], cudf.Series([1, 2, 1, 3], name="id"), check_index=False
)


@pytest.mark.gpu
class TestFuzzyDuplicates:
Expand Down Expand Up @@ -469,6 +545,32 @@ def test_shuffle_fail_fuzzy_dedup_data(
expected_df = expected_df.sort_values()
assert_eq(expected_df, result_df, check_index=False)

@pytest.mark.parametrize("false_positive_check", [True, False])
def test_fuzzy_dedup_no_duplicates(
self, no_duplicates_fuzzy_dedup_data, tmpdir, false_positive_check
):
# Dedup might fail when indices per partition do not start from 0
no_duplicates_fuzzy_dedup_data.df = (
no_duplicates_fuzzy_dedup_data.df.reset_index(drop=True)
)
config = FuzzyDuplicatesConfig(
cache_dir=tmpdir,
id_field="id",
text_field="text",
seed=42,
char_ngrams=5,
num_buckets=10,
hashes_per_bucket=1,
use_64_bit_hash=False,
buckets_per_shuffle=5,
false_positive_check=false_positive_check,
num_anchors=2,
jaccard_threshold=0.39,
)
fuzzy_duplicates = FuzzyDuplicates(config=config)
result = fuzzy_duplicates(no_duplicates_fuzzy_dedup_data)
assert result is None


class TestFuzzyDuplicatesConfig:
def test_bad_inputs(self, tmpdir):
Expand Down