diff --git a/docs/user-guide/distributeddataclassification.rst b/docs/user-guide/distributeddataclassification.rst index e35117e13..8c9f6c006 100644 --- a/docs/user-guide/distributeddataclassification.rst +++ b/docs/user-guide/distributeddataclassification.rst @@ -32,37 +32,23 @@ classification helps mitigate biases and inaccuracies that may arise from poorly Usage ----------------------------------------- -NeMo Curator provides a base class ``DistributedDataClassifier`` that can be extended to fit your specfic model. +NeMo Curator provides a base class ``DistributedDataClassifier`` that can be extended to fit your specific model. The only requirement is that the model can fit on a single GPU. We have also provided two subclasses that focus on domain and quality classification. Let's see how ``DomainClassifier`` works in a small excerpt taken from ``examples/domain_classifier_example.py``: .. code-block:: python - labels = [ - "Adult", - "Arts_and_Entertainment", - "Autos_and_Vehicles", - ..., - "Shopping", - "Sports", - "Travel_and_Transportation", - ] - - model_path = "pytorch_model_file.pth" - files = get_all_files_paths_under("books_dataset/") input_dataset = DocumentDataset.read_json(files, backend="cudf", add_filename=True) - domain_classifier = DomainClassifier( - model_path=model_path, - labels=labels, - filter_by=["Games", "Sports"], - ) + domain_classifier = DomainClassifier(filter_by=["Games", "Sports"]) result_dataset = domain_classifier(dataset=input_dataset) result_dataset.to_json("games_and_sports/", write_to_filename=True) +In the above excerpt, the domain classifier is obtained directly from `HuggingFace `_. + This module functions very similarly to the ``ScoreFilter`` module. The key differences is that it operates on the GPU instead of the CPU. Therefore, the Dask cluster must be started as a GPU one. diff --git a/examples/domain_classifier_example.py b/examples/domain_classifier_example.py index d4709822d..78d4612c8 100644 --- a/examples/domain_classifier_example.py +++ b/examples/domain_classifier_example.py @@ -13,7 +13,6 @@ # limitations under the License. import argparse -import os import time from nemo_curator import DomainClassifier @@ -25,37 +24,6 @@ def main(args): global_st = time.time() - labels = [ - "Adult", - "Arts_and_Entertainment", - "Autos_and_Vehicles", - "Beauty_and_Fitness", - "Books_and_Literature", - "Business_and_Industrial", - "Computers_and_Electronics", - "Finance", - "Food_and_Drink", - "Games", - "Health", - "Hobbies_and_Leisure", - "Home_and_Garden", - "Internet_and_Telecom", - "Jobs_and_Education", - "Law_and_Government", - "News", - "Online_Communities", - "People_and_Society", - "Pets_and_Animals", - "Real_Estate", - "Science", - "Sensitive_Subjects", - "Shopping", - "Sports", - "Travel_and_Transportation", - ] - - model_path = "/path/to/pytorch_model_file.pth" - # Input can be a string or list input_file_path = "/path/to/data" output_file_path = "./" @@ -66,11 +34,7 @@ def main(args): input_file_path, backend="cudf", add_filename=True ) - domain_classifier = DomainClassifier( - model_path=model_path, - labels=labels, - filter_by=["Games", "Sports"], - ) + domain_classifier = DomainClassifier(filter_by=["Games", "Sports"]) result_dataset = domain_classifier(dataset=input_dataset) result_dataset.to_json(output_file_dir=output_file_path, write_to_filename=True) diff --git a/examples/quality_classifier_example.py b/examples/quality_classifier_example.py index 277200c05..6d13f9df9 100644 --- a/examples/quality_classifier_example.py +++ b/examples/quality_classifier_example.py @@ -13,7 +13,6 @@ # limitations under the License. import argparse -import os import time from nemo_curator import QualityClassifier @@ -25,7 +24,6 @@ def main(args): global_st = time.time() - labels = ["High", "Medium", "Low"] model_path = "/path/to/pytorch_model_file.pth" # Input can be a string or list @@ -40,7 +38,6 @@ def main(args): quality_classifier = QualityClassifier( model_path=model_path, - labels=labels, filter_by=["High", "Medium"], ) result_dataset = quality_classifier(dataset=input_dataset) diff --git a/nemo_curator/modules/distributed_data_classifier.py b/nemo_curator/modules/distributed_data_classifier.py index 6bb975eec..1e2abb237 100644 --- a/nemo_curator/modules/distributed_data_classifier.py +++ b/nemo_curator/modules/distributed_data_classifier.py @@ -17,18 +17,20 @@ os.environ["RAPIDS_NO_INITIALIZE"] = "1" from abc import ABC, abstractmethod from dataclasses import dataclass +from typing import List import torch import torch.nn as nn from crossfit import op from crossfit.backend.torch.hf.model import HFModel -from packaging import version -from transformers import AutoConfig, AutoModel -from transformers import __version__ as TRANSFORMERS_VERSION +from huggingface_hub import PyTorchModelHubMixin +from transformers import AutoConfig, AutoModel, AutoTokenizer from transformers.models.deberta_v2 import DebertaV2TokenizerFast from nemo_curator.datasets import DocumentDataset +DOMAIN_IDENTIFIER = "nvidia/domain-classifier" + @dataclass class DomainModelConfig: @@ -44,9 +46,15 @@ class QualityModelConfig: max_len = 512 -class CustomModel(nn.Module): +# TODO: Remove this class after Quality Model is uploaded to HuggingFace +class NCCustomModel(nn.Module): def __init__( - self, config, out_dim, config_path=None, pretrained=False, autocast=False + self, + config: dataclass, + out_dim: int, + config_path: str = None, + pretrained: bool = False, + autocast: bool = False, ): super().__init__() self.config = config @@ -56,10 +64,12 @@ def __init__( ) else: self.config = torch.load(config_path) + if pretrained: self.model = AutoModel.from_pretrained(config.model, config=self.config) else: self.model = AutoModel(self.config) + self.fc_dropout = nn.Dropout(config.fc_dropout) self.fc = nn.Linear(self.config.hidden_size, out_dim) self._init_weights(self.fc) @@ -97,6 +107,32 @@ def forward(self, batch): return self._forward(batch) +class HFCustomModel(nn.Module, PyTorchModelHubMixin): + def __init__(self, config: dataclass): + super(HFCustomModel, self).__init__() + self.model = AutoModel.from_pretrained(config["base_model"]) + self.dropout = nn.Dropout(config["fc_dropout"]) + self.fc = nn.Linear(self.model.config.hidden_size, len(config["id2label"])) + + def _forward(self, batch): + features = self.model( + batch["input_ids"], batch["attention_mask"] + ).last_hidden_state + dropped = self.dropout(features) + outputs = self.fc(dropped) + return torch.softmax(outputs[:, 0, :], dim=1) + + def forward(self, batch): + if self.autocast: + with torch.autocast(device_type="cuda"): + return self._forward(batch) + else: + return self._forward(batch) + + def set_autocast(self, autocast): + self.autocast = autocast + + class DistributedDataClassifier(ABC): """Abstract class for running multi-node multi-GPU data classification""" @@ -149,6 +185,9 @@ def _filter_documents( raise TypeError("filter_by must be a string or list type") + def get_labels(self) -> List[str]: + return self.labels + def _run_classifier_helper( df: "dask_cudf.DataFrame", @@ -180,6 +219,7 @@ def _run_classifier_helper( keep_cols=columns_to_keep_list, ) df = classifier_pipe(df) + # TODO: Make crossfit handle this cleanly # to prevent the labeler from dropping the prob_internal_col # and combine it into a single step @@ -188,6 +228,7 @@ def _run_classifier_helper( keep_cols=columns_to_keep_list + [prob_internal_col], ) df = labeling_pipe(df) + if keep_prob: df = df.rename( columns={prob_internal_col: prob_col, pred_internal_col: label_col}, @@ -195,41 +236,27 @@ def _run_classifier_helper( else: df = df.rename(columns={pred_internal_col: label_col}) df = df.drop(columns=[prob_internal_col]) + return df class DomainModel(HFModel): - def __init__(self, config, out_dim=None, model_path=None, autocast=False): + def __init__(self, config: dataclass, autocast: bool = False): self.config = config - self.out_dim = out_dim - self.model_path = model_path self.autocast = autocast super().__init__(self.config.model) def load_model(self, device="cuda"): - model = CustomModel( - self.config, - out_dim=self.out_dim, - config_path=None, - pretrained=True, - autocast=self.autocast, - ) + model = HFCustomModel.from_pretrained(DOMAIN_IDENTIFIER) + model.set_autocast(self.autocast) model = model.to(device) - if os.path.exists(self.model_path): - sd = torch.load(os.path.join(self.model_path), map_location="cpu") - sd = {k[7:] if k.startswith("module.") else k: sd[k] for k in sd.keys()} - if version.parse(TRANSFORMERS_VERSION) >= version.parse("4.31.0"): - sd.pop("model.embeddings.position_ids", None) - model.load_state_dict(sd, strict=True) - else: - raise ValueError(f"Model path {self.model_path} does not exist") return model.eval() def load_tokenizer(self): - return DebertaV2TokenizerFast.from_pretrained(self.config.model) + return AutoTokenizer.from_pretrained(DOMAIN_IDENTIFIER) def load_config(self): - return AutoConfig.from_pretrained(self.path_or_name) + return AutoConfig.from_pretrained(DOMAIN_IDENTIFIER) class QualityModel(HFModel): @@ -241,7 +268,7 @@ def __init__(self, config, out_dim=None, model_path=None, autocast=False): super().__init__(self.config.model) def load_model(self, device="cuda"): - model = CustomModel( + model = NCCustomModel( self.config, out_dim=self.out_dim, config_path=None, @@ -249,6 +276,7 @@ def load_model(self, device="cuda"): autocast=self.autocast, ) model = model.to(device) + if os.path.exists(self.model_path): sd = torch.load(self.model_path, map_location="cpu") if "model_state_dict" in sd: @@ -257,8 +285,8 @@ def load_model(self, device="cuda"): model.load_state_dict(sd, strict=True) else: raise ValueError(f"Model path {self.model_path} does not exist") - model.eval() - return model + + return model.eval() def load_tokenizer(self): return DebertaV2TokenizerFast.from_pretrained(self.config.model) @@ -270,35 +298,28 @@ def load_config(self): class DomainClassifier(DistributedDataClassifier): def __init__( self, - model_path, - labels, filter_by=None, batch_size=256, - out_dim=None, pred_column="domain_pred", prob_column=None, max_chars=2000, device_type="cuda", autocast=True, ): - if out_dim is None: - out_dim = len(labels) + config = AutoConfig.from_pretrained(DOMAIN_IDENTIFIER) self.prob_column = prob_column + self.labels = list(config.label2id.keys()) + self.out_dim = len(self.labels) - model = DomainModel( - config=DomainModelConfig, - out_dim=out_dim, - model_path=model_path, - autocast=autocast, - ) + model = DomainModel(config=DomainModelConfig, autocast=autocast) super().__init__( model=model, - labels=labels, + labels=self.labels, filter_by=filter_by, batch_size=batch_size, - out_dim=out_dim, + out_dim=self.out_dim, pred_column=pred_column, max_chars=max_chars, device_type=device_type, @@ -324,37 +345,39 @@ class QualityClassifier(DistributedDataClassifier): def __init__( self, model_path, - labels, + num_labels=3, filter_by=None, batch_size=256, - out_dim=None, pred_column="quality_pred", prob_column="quality_prob", max_chars=6000, device_type="cuda", autocast=True, ): - if len(labels) == 2: - out_dim = 1 # Binary classification + if num_labels == 3: + self.labels = ["High", "Medium", "Low"] + self.out_dim = num_labels # Multiclass classification + elif num_labels == 2: + self.labels = ["Medium_High", "Low"] + self.out_dim = 1 # Binary classification else: - if out_dim is None: - out_dim = len(labels) # Multiclass classification + raise ValueError("num_labels must be 2 or 3") self.prob_column = prob_column model = QualityModel( config=QualityModelConfig, - out_dim=out_dim, + out_dim=self.out_dim, model_path=model_path, autocast=autocast, ) super().__init__( model=model, - labels=labels, + labels=self.labels, filter_by=filter_by, batch_size=batch_size, - out_dim=out_dim, + out_dim=self.out_dim, pred_column=pred_column, max_chars=max_chars, device_type=device_type, diff --git a/nemo_curator/scripts/domain_classifier_inference.py b/nemo_curator/scripts/domain_classifier_inference.py index f1ad1d497..59aa5fd7a 100644 --- a/nemo_curator/scripts/domain_classifier_inference.py +++ b/nemo_curator/scripts/domain_classifier_inference.py @@ -29,35 +29,6 @@ def main(): - labels = [ - "Adult", - "Arts_and_Entertainment", - "Autos_and_Vehicles", - "Beauty_and_Fitness", - "Books_and_Literature", - "Business_and_Industrial", - "Computers_and_Electronics", - "Finance", - "Food_and_Drink", - "Games", - "Health", - "Hobbies_and_Leisure", - "Home_and_Garden", - "Internet_and_Telecom", - "Jobs_and_Education", - "Law_and_Government", - "News", - "Online_Communities", - "People_and_Society", - "Pets_and_Animals", - "Real_Estate", - "Science", - "Sensitive_Subjects", - "Shopping", - "Sports", - "Travel_and_Transportation", - ] - args = ArgumentHelper.parse_distributed_classifier_args().parse_args() print(f"Arguments parsed = {args}", flush=True) max_chars = 2000 @@ -90,11 +61,8 @@ def main(): add_filename = True domain_classifier = DomainClassifier( - model_path=args.model_path, - labels=labels, max_chars=max_chars, batch_size=args.batch_size, - out_dim=len(labels), autocast=args.autocast, ) diff --git a/nemo_curator/scripts/quality_classifier_inference.py b/nemo_curator/scripts/quality_classifier_inference.py index 4f28c3845..c3260bff5 100644 --- a/nemo_curator/scripts/quality_classifier_inference.py +++ b/nemo_curator/scripts/quality_classifier_inference.py @@ -26,28 +26,10 @@ warnings.filterwarnings("ignore") -def get_labels(num_labels): - """ - This function returns a list of quality labels, depending on how many labels the user expects. - - Args: - num_labels: An integer representing the number of possible classification labels. - Returns: - A list of label names. - - """ - if num_labels == 3: - labels = ["High", "Medium", "Low"] - elif num_labels == 2: - labels = ["Medium_High", "Low"] - return labels - - def main(): parser = ArgumentHelper.parse_distributed_classifier_args() parser.add_argument("--num-labels", type=int, default=3) args = parser.parse_args() - labels = get_labels(args.num_labels) print(f"Arguments parsed = {args}", flush=True) max_chars = 6000 @@ -79,12 +61,11 @@ def main(): add_filename = True classifier = QualityClassifier( - model_path=args.model_path, + model_path=args.pretrained_model_name_or_path, + num_labels=args.num_labels, max_chars=max_chars, - labels=labels, batch_size=args.batch_size, autocast=args.autocast, - out_dim=len(labels), ) for file_batch_id, i in enumerate(range(0, len(input_files), files_per_run)): diff --git a/nemo_curator/utils/script_utils.py b/nemo_curator/utils/script_utils.py index 799cf45f5..66613b6ea 100644 --- a/nemo_curator/utils/script_utils.py +++ b/nemo_curator/utils/script_utils.py @@ -283,10 +283,10 @@ def add_arg_text_ddf_blocksize(self): def add_arg_model_path(self, help="The path to the model file"): self.parser.add_argument( - "--model-path", + "--pretrained-model-name-or-path", type=str, help=help, - required=True, + required=False, ) def add_arg_autocaset(self, help="Whether to use autocast or not"): diff --git a/tutorials/distributed_data_classification/distributed_data_classification.ipynb b/tutorials/distributed_data_classification/distributed_data_classification.ipynb index 81c7a6811..a98b80034 100644 --- a/tutorials/distributed_data_classification/distributed_data_classification.ipynb +++ b/tutorials/distributed_data_classification/distributed_data_classification.ipynb @@ -20,7 +20,8 @@ "name": "stdout", "output_type": "stream", "text": [ - "env: PYTHONWARNINGS=ignore\n" + "env: PYTHONWARNINGS=ignore\n", + "env: DASK_DATAFRAME__QUERY_PLANNING=False\n" ] } ], @@ -28,6 +29,7 @@ "# Silence Warnings (HuggingFace internal warnings)\n", "\n", "%env PYTHONWARNINGS=ignore\n", + "%env DASK_DATAFRAME__QUERY_PLANNING=False\n", "import warnings\n", "warnings.filterwarnings(\"ignore\")" ] @@ -38,9 +40,7 @@ "metadata": {}, "outputs": [], "source": [ - "from dask_cuda import LocalCUDACluster\n", - "from dask.distributed import Client\n", - "from nemo_curator import DomainClassifier, QualityClassifier\n", + "from nemo_curator import DomainClassifier, QualityClassifier, get_client\n", "from nemo_curator.datasets import DocumentDataset\n", "import cudf\n", "import dask_cudf" @@ -52,8 +52,7 @@ "metadata": {}, "outputs": [], "source": [ - "cluster = LocalCUDACluster(rmm_async=True, rmm_pool_size=\"1GB\")\n", - "client = Client(cluster)" + "client = get_client(cluster_type=\"gpu\")" ] }, { @@ -70,7 +69,6 @@ "outputs": [], "source": [ "output_file_path = \"output_data_dir/\"\n", - "domain_model_path = \"domain_model.pth\"\n", "quality_model_path = \"quality_model.pth\"" ] }, @@ -128,47 +126,11 @@ "outputs": [], "source": [ "if classifier_type == \"DomainClassifier\":\n", - " domain_labels = [\n", - " \"Adult\",\n", - " \"Arts_and_Entertainment\",\n", - " \"Autos_and_Vehicles\",\n", - " \"Beauty_and_Fitness\",\n", - " \"Books_and_Literature\",\n", - " \"Business_and_Industrial\",\n", - " \"Computers_and_Electronics\",\n", - " \"Finance\",\n", - " \"Food_and_Drink\",\n", - " \"Games\",\n", - " \"Health\",\n", - " \"Hobbies_and_Leisure\",\n", - " \"Home_and_Garden\",\n", - " \"Internet_and_Telecom\",\n", - " \"Jobs_and_Education\",\n", - " \"Law_and_Government\",\n", - " \"News\",\n", - " \"Online_Communities\",\n", - " \"People_and_Society\",\n", - " \"Pets_and_Animals\",\n", - " \"Real_Estate\",\n", - " \"Science\",\n", - " \"Sensitive_Subjects\",\n", - " \"Shopping\",\n", - " \"Sports\",\n", - " \"Travel_and_Transportation\",\n", - " ]\n", - "\n", - " classifier = DomainClassifier(\n", - " model_path=domain_model_path,\n", - " labels=domain_labels,\n", - " batch_size=1024,\n", - " )\n", + " classifier = DomainClassifier(batch_size=1024)\n", "\n", "elif classifier_type == \"QualityClassifier\":\n", - " quality_labels = [\"High\", \"Medium\", \"Low\"]\n", - "\n", " classifier = QualityClassifier(\n", " model_path=quality_model_path,\n", - " labels=quality_labels,\n", " batch_size=1024,\n", " )\n", "\n", @@ -201,7 +163,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "GPU: 0, Part: 0: 100%|██████████| 10/10 [00:02<00:00, 3.62it/s]" + "GPU: 0, Part: 0: 100%|██████████| 10/10 [00:04<00:00, 2.12it/s]\n" ] }, { @@ -209,15 +171,8 @@ "output_type": "stream", "text": [ "Writing to disk complete for 1 partitions\n", - "CPU times: user 578 ms, sys: 429 ms, total: 1.01 s\n", - "Wall time: 9.91 s\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "GPU: 0, Part: 0: 100%|██████████| 10/10 [00:03<00:00, 3.30it/s]\n" + "CPU times: user 393 ms, sys: 244 ms, total: 638 ms\n", + "Wall time: 6.04 s\n" ] } ], @@ -327,22 +282,6 @@ "output_dataset = DocumentDataset.read_json(output_file_path, backend=\"cudf\", add_filename=write_to_filename)\n", "output_dataset.df.head()" ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Remove the Output File(s)" - ] - }, - { - "cell_type": "code", - "execution_count": 10, - "metadata": {}, - "outputs": [], - "source": [ - "!rm -rf $output_file_path" - ] } ], "metadata": {