diff --git a/examples/blend_and_shuffle.py b/examples/blend_and_shuffle.py index e070d5d2a..63fd5955d 100644 --- a/examples/blend_and_shuffle.py +++ b/examples/blend_and_shuffle.py @@ -31,7 +31,10 @@ def main(args): client = get_client(args, args.device) # Blend the datasets - datasets = [DocumentDataset.read_json(path) for path in dataset_paths] + datasets = [ + DocumentDataset.read_json(path, input_meta=args.input_meta) + for path in dataset_paths + ] blended_dataset = nc.blend_datasets(target_size, datasets, dataset_weights) shuffle = nc.Shuffle(seed=42) @@ -46,6 +49,14 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter ), ): + parser.add_argument( + "--input-meta", + type=str, + default=None, + help="A dictionary containing the json object field names and their " + "corresponding data types.", + ) + return add_distributed_args(parser) diff --git a/examples/distributed_data_classification_examples/domain_api_example.py b/examples/distributed_data_classification_examples/domain_api_example.py index ad2fa1c8b..3a6871bd5 100644 --- a/examples/distributed_data_classification_examples/domain_api_example.py +++ b/examples/distributed_data_classification_examples/domain_api_example.py @@ -62,7 +62,7 @@ def main(args): client = get_client(args, cluster_type=args.device) input_dataset = DocumentDataset.read_json( - input_file_path, backend="cudf", add_filename=True + input_file_path, backend="cudf", add_filename=True, input_meta=args.input_meta ) domain_classifier = DomainClassifier( @@ -134,6 +134,13 @@ def attach_args( default="gpu", help="Device to run the script on. Either 'cpu' or 'gpu'.", ) + parser.add_argument( + "--input-meta", + type=str, + default=None, + help="A dictionary containing the json object field names and their " + "corresponding data types.", + ) return parser diff --git a/examples/exact_deduplication.py b/examples/exact_deduplication.py index b722bf582..67b6b6e63 100644 --- a/examples/exact_deduplication.py +++ b/examples/exact_deduplication.py @@ -40,7 +40,9 @@ def main(args): client.run(pre_imports) t0 = time.time() - input_dataset = DocumentDataset.read_json(dataset_dir, backend=backend) + input_dataset = DocumentDataset.read_json( + dataset_dir, backend=backend, input_meta=args.input_meta + ) exact_dup = ExactDuplicates( logger=log_dir, @@ -79,6 +81,14 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter ), ): + parser.add_argument( + "--input-meta", + type=str, + default=None, + help="A dictionary containing the json object field names and their " + "corresponding data types.", + ) + return add_distributed_args(parser) diff --git a/examples/fuzzy_deduplication.py b/examples/fuzzy_deduplication.py index d74fd775c..1081ab27d 100644 --- a/examples/fuzzy_deduplication.py +++ b/examples/fuzzy_deduplication.py @@ -59,8 +59,7 @@ def main(args): ) elif filetype == "jsonl": input_dataset = DocumentDataset.read_json( - dataset_dir, - backend=backend, + dataset_dir, backend=backend, input_meta=args.input_meta ) fuzzy_dedup_config = FuzzyDuplicatesConfig( @@ -102,6 +101,14 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter ), ): + parser.add_argument( + "--input-meta", + type=str, + default=None, + help="A dictionary containing the json object field names and their " + "corresponding data types.", + ) + return add_distributed_args(parser) diff --git a/examples/identify_languages_and_fix_unicode.py b/examples/identify_languages_and_fix_unicode.py index a95dc6905..942a0a46e 100644 --- a/examples/identify_languages_and_fix_unicode.py +++ b/examples/identify_languages_and_fix_unicode.py @@ -27,9 +27,15 @@ from nemo_curator.utils.script_utils import add_distributed_args -def load_dataset(input_data_dir): +def load_dataset(input_data_dir: str, input_meta: str = None): files = list(get_all_files_paths_under(input_data_dir)) - raw_data = read_data(files, file_type="jsonl", backend="pandas", add_filename=True) + raw_data = read_data( + files, + file_type="jsonl", + backend="pandas", + add_filename=True, + input_meta=input_meta, + ) dataset = DocumentDataset(raw_data) return dataset @@ -52,7 +58,9 @@ def main(args): client = get_client(args, args.device) # Filter data - multilingual_dataset = load_dataset(multilingual_data_path) + multilingual_dataset = load_dataset( + input_data_dir=multilingual_data_path, input_meta=args.input_meta + ) language_id_pipeline = nc.ScoreFilter( FastTextLangId(model_path), score_field=language_field, score_type="object" ) @@ -74,7 +82,7 @@ def main(args): lang_data_path = os.path.join(language_separated_output_path, target_language) if not os.path.exists(lang_data_path): raise RuntimeError(f"Dataset did not have language: {target_language}") - lang_data = load_dataset(lang_data_path) + lang_data = load_dataset(input_data_dir=lang_data_path, input_meta=args.input_meta) cleaner = nc.Modify(UnicodeReformatter()) cleaned_data = cleaner(lang_data) @@ -88,6 +96,14 @@ def attach_args( formatter_class=argparse.ArgumentDefaultsHelpFormatter ), ): + parser.add_argument( + "--input-meta", + type=str, + default=None, + help="A dictionary containing the json object field names and their " + "corresponding data types.", + ) + return add_distributed_args(parser) diff --git a/nemo_curator/datasets/doc_dataset.py b/nemo_curator/datasets/doc_dataset.py index a97aa1969..e0425744c 100644 --- a/nemo_curator/datasets/doc_dataset.py +++ b/nemo_curator/datasets/doc_dataset.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from typing import List, Union + import dask.dataframe as dd from nemo_curator.utils.distributed_utils import read_data, write_to_disk @@ -36,10 +38,11 @@ def persist(self): @classmethod def read_json( cls, - input_files, - backend="pandas", - files_per_partition=1, - add_filename=False, + input_files: Union[str, List[str]], + backend: str = "pandas", + files_per_partition: int = 1, + add_filename: bool = False, + input_meta: str = None, ): return cls( _read_json_or_parquet( @@ -48,6 +51,7 @@ def read_json( backend=backend, files_per_partition=files_per_partition, add_filename=add_filename, + input_meta=input_meta, ) ) @@ -77,16 +81,16 @@ def read_pickle( files_per_partition=1, add_filename=False, ): - raw_data = read_data( - input_files=input_files, - file_type="pickle", - backend=backend, - files_per_partition=files_per_partition, - add_filename=add_filename, + return cls( + read_data( + input_files=input_files, + file_type="pickle", + backend=backend, + files_per_partition=files_per_partition, + add_filename=add_filename, + ) ) - return cls(raw_data) - def to_json( self, output_file_dir, @@ -128,11 +132,12 @@ def to_pickle( def _read_json_or_parquet( - input_files, - file_type, - backend, - files_per_partition, - add_filename, + input_files: Union[str, List[str]], + file_type: str, + backend: str, + files_per_partition: int, + add_filename: bool, + input_meta: str = None, ): """ `input_files` may be a list or a string type. @@ -162,6 +167,7 @@ def _read_json_or_parquet( backend=backend, files_per_partition=files_per_partition, add_filename=add_filename, + input_meta=input_meta, ) # List of directories @@ -178,6 +184,7 @@ def _read_json_or_parquet( backend=backend, files_per_partition=files_per_partition, add_filename=add_filename, + input_meta=input_meta, ) dfs.append(df) @@ -200,6 +207,7 @@ def _read_json_or_parquet( backend=backend, files_per_partition=files_per_partition, add_filename=add_filename, + input_meta=input_meta, ) else: diff --git a/nemo_curator/distributed_data_classification/verify_results.py b/nemo_curator/distributed_data_classification/verify_results.py index da79d6921..76efec1fa 100644 --- a/nemo_curator/distributed_data_classification/verify_results.py +++ b/nemo_curator/distributed_data_classification/verify_results.py @@ -13,6 +13,7 @@ # limitations under the License. import argparse +import ast import os import pandas as pd @@ -27,30 +28,39 @@ def parse_args(): """ parser = argparse.ArgumentParser(description="Run verification") + parser.add_argument( "--results_file_path", type=str, - help="The path of the input files", required=True, + help="The path of the input files", ) parser.add_argument( "--expected_results_file_path", type=str, - help="The path of the expected_result file", required=True, + help="The path of the expected_result file", ) parser.add_argument( "--results_pred_column", type=str, - help="The prediction column name for the input files", default="pred", + help="The prediction column name for the input files", ) parser.add_argument( "--expected_pred_column", type=str, - help="The prediction column name for the expected_result file", default="pred", + help="The prediction column name for the expected_result file", ) + parser.add_argument( + "--input-meta", + type=str, + default=None, + help="A dictionary containing the json object field names and their " + "corresponding data types.", + ) + return parser.parse_args() @@ -122,10 +132,11 @@ def verify_same_dataframe( def verify_results( - results_file_path, - expected_results_file_path, - results_pred_column, - expected_pred_column, + results_file_path: str, + expected_results_file_path: str, + results_pred_column: str, + expected_pred_column: str, + input_meta: str = None, ): """ This function compares an input file with its expected result file. @@ -138,7 +149,10 @@ def verify_results( expected_pred_column: The prediction column name for the expected_result file. """ - expected_df = pd.read_json(expected_results_file_path, lines=True) + if input_meta: + input_meta = ast.literal_eval(input_meta) + + expected_df = pd.read_json(expected_results_file_path, lines=True, dtype=input_meta) expected_df = expected_df.sort_values(by=["text"]).reset_index(drop=True) expected_counts = expected_df[expected_pred_column].value_counts().to_dict() @@ -150,7 +164,10 @@ def verify_results( ] got_paths = [p for p in os.scandir(results_file_path)] - got_df = [pd.read_json(path, lines=True)[expected_columns] for path in got_paths] + got_df = [ + pd.read_json(path, lines=True, dtype=input_meta)[expected_columns] + for path in got_paths + ] got_df = pd.concat(got_df, ignore_index=True) got_df = got_df.sort_values(by=["text"]).reset_index(drop=True) got_counts = got_df[results_pred_column].value_counts().to_dict() @@ -172,6 +189,7 @@ def main(): args.expected_results_file_path, args.results_pred_column, args.expected_pred_column, + args.input_meta, ) diff --git a/nemo_curator/download/doc_builder.py b/nemo_curator/download/doc_builder.py index 8bdf3e308..0ef3a63f3 100644 --- a/nemo_curator/download/doc_builder.py +++ b/nemo_curator/download/doc_builder.py @@ -15,7 +15,7 @@ import importlib import os from abc import ABC, abstractmethod -from typing import List, Tuple +from typing import Dict, List, Tuple import dask.dataframe as dd import pandas as pd @@ -111,6 +111,7 @@ def _download_and_extract_single_partition( output_type: str, keep_raw_download: bool, force_download: bool, + input_meta: str = None, ) -> pd.DataFrame: url, output_path = paths @@ -158,6 +159,7 @@ def download_and_extract( output_type: str = "jsonl", keep_raw_download=False, force_download=False, + input_meta: str = None, ) -> DocumentDataset: """ Downloads and extracts a dataset into a format accepted by the NeMo Curator @@ -174,6 +176,7 @@ def download_and_extract( keep_raw_download: Whether to keep the pre-extracted download file. force_download: If False, will skip processing all files in output_paths that already exist and directly read from them instead. + input_meta: A dictionary with the json object field names and data types. Returns: A DocumentDataset of the downloaded data @@ -190,8 +193,9 @@ def download_and_extract( extractor=extractor, output_type=output_type, keep_raw_download=keep_raw_download, - force_download=force_download, + force_download=force_dssownload, enforce_metadata=False, + input_meta=input_meta, meta=output_format, ) diff --git a/nemo_curator/scripts/download_and_extract.py b/nemo_curator/scripts/download_and_extract.py index 4b47a1888..ee9351497 100644 --- a/nemo_curator/scripts/download_and_extract.py +++ b/nemo_curator/scripts/download_and_extract.py @@ -15,7 +15,7 @@ import argparse import os -from nemo_curator.download import batch_download, download_and_extract +from nemo_curator.download.doc_builder import batch_download, download_and_extract from nemo_curator.utils.config_utils import build_downloader from nemo_curator.utils.distributed_utils import get_client from nemo_curator.utils.file_utils import ( @@ -73,6 +73,7 @@ def main(args): output_format, keep_raw_download=args.keep_downloaded_files, force_download=args.overwrite_existing_json, + input_meta=args.input_meta, ) # Sample to trigger the dask computation @@ -116,6 +117,13 @@ def attach_args( required=False, help="Path to input data directory", ) + parser.add_argument( + "--input-meta", + type=str, + default=None, + help="A dictionary containing the json object field names and their " + "corresponding data types.", + ) parser.add_argument( "--output-json-dir", type=str, diff --git a/nemo_curator/scripts/fuzzy_deduplication/jaccard_shuffle.py b/nemo_curator/scripts/fuzzy_deduplication/jaccard_shuffle.py index f0bd555dc..55ce21330 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/jaccard_shuffle.py +++ b/nemo_curator/scripts/fuzzy_deduplication/jaccard_shuffle.py @@ -48,6 +48,7 @@ def main(args): blocksize=args.text_ddf_blocksize, id_column=args.input_json_id_field, text_column=args.input_json_text_field, + input_meta=args.input_meta, ) print( "Graph creation for get_text_ddf_from_json_path_with_blocksize" " complete.", @@ -86,6 +87,13 @@ def attach_args(parser=None): type=str, help="The directory containing anchor docs with bk files", ) + parser.add_argument( + "--input-meta", + type=str, + default=None, + help="A dictionary containing the json object field names and their " + "corresponding data types.", + ) parser.add_argument( "--text-ddf-blocksize", type=int, @@ -115,6 +123,7 @@ def attach_args(parser=None): type=int, help="The number of bucket parts to process per worker per batch", ) + return parser diff --git a/nemo_curator/scripts/fuzzy_deduplication/map_buckets.py b/nemo_curator/scripts/fuzzy_deduplication/map_buckets.py index 5640d9bd3..7f182aaef 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/map_buckets.py +++ b/nemo_curator/scripts/fuzzy_deduplication/map_buckets.py @@ -34,6 +34,7 @@ def get_anchor_and_output_map_info( input_bucket_field, input_id_field, input_text_field, + input_meta, ): """ Get anchor docs with bucket info @@ -53,6 +54,7 @@ def get_anchor_and_output_map_info( blocksize=text_ddf_blocksize, id_column=input_id_field, text_column=input_text_field, + input_meta=input_meta, ) ddf_bk = get_bucket_ddf_from_parquet_path( input_bucket_path=input_bucket_path, num_workers=num_workers @@ -79,6 +81,13 @@ def attach_args(parser=None): type=str, help="The directory containing bucket information files", ) + parser.add_argument( + "--input-meta", + type=str, + default=None, + help="A dictionary containing the json object field names and their " + "corresponding data types.", + ) parser.add_argument( "--text-ddf-blocksize", type=int, @@ -116,6 +125,7 @@ def jaccard_get_output_map_workflow( input_bucket_field, input_id_field, input_text_field, + input_meta, ): """ Workflow for jaccard shuffle @@ -140,6 +150,7 @@ def jaccard_get_output_map_workflow( input_bucket_field, input_id_field, input_text_field, + input_meta=input_meta, ) ddf_anchor_docs_with_bk.to_parquet( output_anchor_docs_with_bk_path, @@ -171,6 +182,7 @@ def main(args): args.input_bucket_field, args.input_json_id_field, args.input_json_text_field, + args.input_meta, ) et = time.time() print(f"Bucket Mapping time taken = {et-st} s") diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 2d7dc9213..7ec250a0c 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -13,13 +13,14 @@ # limitations under the License. from __future__ import annotations +import ast import os os.environ["RAPIDS_NO_INITIALIZE"] = "1" import warnings from contextlib import nullcontext from pathlib import Path -from typing import Union +from typing import List, Union import dask.dataframe as dd import pandas as pd @@ -176,7 +177,7 @@ def _enable_spilling(): def read_single_partition( - files, backend="cudf", filetype="jsonl", add_filename=False + files, backend="cudf", filetype="jsonl", add_filename=False, input_meta: str = None ) -> Union[cudf.DataFrame, pd.DataFrame]: """ This function reads a file with cuDF, sorts the columns of the DataFrame @@ -186,6 +187,8 @@ def read_single_partition( files: The path to the jsonl files to read. backend: The backend to use for reading the data. Either "cudf" or "pandas". add_filename: Whether to add a "filename" column to the DataFrame. + input-meta: A dictionary with the json object field names and data types. + Returns: A cudf DataFrame or a pandas DataFrame. @@ -206,6 +209,9 @@ def read_single_partition( else: raise RuntimeError("Could not read data, please check file type") + if input_meta: + read_kwargs["dtype"] = ast.literal_eval(input_meta) + if add_filename: read_files_one_at_a_time = True else: @@ -252,10 +258,11 @@ def read_pandas_pickle(file, add_filename=False) -> pd.DataFrame: def read_data( input_files, - file_type="pickle", - backend="cudf", - files_per_partition=1, - add_filename=False, + file_type: str = "pickle", + backend: str = "cudf", + files_per_partition: int = 1, + add_filename: bool = False, + input_meta: str = None, ) -> Union[dd.DataFrame, dask_cudf.DataFrame]: """ This function can read multiple data formats and returns a Dask-cuDF DataFrame. @@ -266,6 +273,7 @@ def read_data( backend: The backend to use for reading the data. files_per_partition: The number of files to read per partition. add_filename: Whether to add a "filename" column to the DataFrame. + input_meta: A dictionary with the json object field names and data types. Returns: A Dask-cuDF or a Dask-pandas DataFrame. @@ -297,6 +305,7 @@ def read_data( filetype=file_type, backend=backend, add_filename=add_filename, + input_meta=input_meta, enforce_metadata=False, ) else: diff --git a/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py b/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py index 105021bda..4d6ffb088 100644 --- a/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py +++ b/nemo_curator/utils/fuzzy_dedup_utils/io_utils.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import ast import os from glob import glob @@ -26,31 +27,41 @@ # TODO: # Combine this with # nemo_curator.distributed_utils.read_cudf_jsonl -def read_json_func(files, engine="cudf", include_path_column=False, columns=None): +def _read_json_func( + files, engine="cudf", include_path_column=False, columns=None, input_meta=None +): """ Reads multiple Json Lines files into a cuDF dataframe with an additional `path` column denoting the path of the input file. """ + + if input_meta: + input_meta = ast.literal_eval(input_meta) + if not include_path_column: if columns: - return cudf.read_json(files, engine="cudf", lines=True)[columns] + return cudf.read_json(files, engine="cudf", lines=True, dtype=input_meta)[ + columns + ] else: - return cudf.read_json(files, engine="cudf", lines=True) + return cudf.read_json(files, engine="cudf", lines=True, dtype=input_meta) dfs = [] for file in files: if columns: - df = cudf.read_json(file, engine=engine, lines=True)[columns] + df = cudf.read_json(file, engine=engine, lines=True, dtype=input_meta)[ + columns + ] else: - df = cudf.read_json(file, engine=engine, lines=True) + df = cudf.read_json(file, engine=engine, lines=True, dtype=input_meta) df["path"] = file dfs.append(df) return cudf.concat(dfs, ignore_index=True) def get_text_ddf_from_json_path_with_blocksize( - input_data_paths, num_files, blocksize, id_column, text_column + input_data_paths, num_files, blocksize, id_column, text_column, input_meta ): data_paths = [ entry.path for data_path in input_data_paths for entry in os.scandir(data_path) @@ -71,7 +82,11 @@ def get_text_ddf_from_json_path_with_blocksize( ) filepaths_ls = chunk_files(data_paths, blocksize) text_ddf = dd.from_map( - read_json_func, filepaths_ls, columns=list(meta_df.columns), meta=meta_df + _read_json_func, + filepaths_ls, + columns=list(meta_df.columns), + input_meta=input_meta, + meta=meta_df, ) text_ddf = text_ddf.map_partitions( convert_str_id_to_int, diff --git a/tutorials/peft-curation/main.py b/tutorials/peft-curation/main.py index 9210d9f89..47d30cbd0 100644 --- a/tutorials/peft-curation/main.py +++ b/tutorials/peft-curation/main.py @@ -100,7 +100,9 @@ def run_curation_pipeline(args: Any, jsonl_fp: str) -> str: """ client = get_client(args, args.device) print(f" Running the curation pipeline on '{jsonl_fp}'...") - orig_dataset = DocumentDataset.read_json(jsonl_fp, add_filename=True) + orig_dataset = DocumentDataset.read_json( + jsonl_fp, add_filename=True, input_meta=args.input_meta + ) dataset = orig_dataset redact_pii_subject = partial(redact_pii, text_field="subject") @@ -163,7 +165,15 @@ def run_curation_pipeline(args: Any, jsonl_fp: str) -> str: def main(): parser = argparse.ArgumentParser() parser = add_distributed_args(parser) + parser.add_argument( + "--input-meta", + type=str, + default=None, + help="A dictionary containing the json object field names and their " + "corresponding data types.", + ) args = parser.parse_args() + # Limit the total number of workers to ensure we don't run out of memory. args.n_workers = min(args.n_workers, 8) diff --git a/tutorials/tinystories/main.py b/tutorials/tinystories/main.py index 1fbbba35c..75928d784 100644 --- a/tutorials/tinystories/main.py +++ b/tutorials/tinystories/main.py @@ -181,7 +181,9 @@ def run_curation_pipeline(args: Any, jsonl_dir: str) -> None: if fp.endswith(".jsonl") ] print("Reading the data...") - orig_dataset = DocumentDataset.read_json(files, add_filename=True) + orig_dataset = DocumentDataset.read_json( + files, add_filename=True, input_meta=args.input_meta + ) dataset = orig_dataset curation_steps = Sequential( @@ -214,6 +216,13 @@ def run_curation_pipeline(args: Any, jsonl_dir: str) -> None: def main(): parser = argparse.ArgumentParser() parser = add_distributed_args(parser) + parser.add_argument( + "--input-meta", + type=str, + default=None, + help="A dictionary containing the json object field names and their " + "corresponding data types.", + ) args = parser.parse_args() # Limit the total number of workers to ensure we don't run out of memory. args.n_workers = min(args.n_workers, 8) @@ -225,6 +234,7 @@ def main(): os.makedirs(JSONL_ROOT_DIR) jsonl_val_dir = download_and_convert_to_jsonl() + run_curation_pipeline(args, jsonl_val_dir)