diff --git a/docs/user-guide/index.rst b/docs/user-guide/index.rst index 1b0e63165..327b0d223 100644 --- a/docs/user-guide/index.rst +++ b/docs/user-guide/index.rst @@ -30,6 +30,9 @@ :ref:`NeMo Curator on Kubernetes ` Demonstration of how to run the NeMo Curator on a Dask Cluster deployed on top of Kubernetes +:ref:`NeMo Curator with NeMo SDK ` + Example of how to use NeMo Curator with NeMo SDK to run on various platforms + `Tutorials `__ To get started, you can explore the NeMo Curator GitHub repository and follow the available tutorials and notebooks. These resources cover various aspects of data curation, including training from scratch and Parameter-Efficient Fine-Tuning (PEFT). @@ -49,3 +52,4 @@ personalidentifiableinformationidentificationandremoval.rst distributeddataclassification.rst kubernetescurator.rst + nemosdk.rst diff --git a/docs/user-guide/nemosdk.rst b/docs/user-guide/nemosdk.rst new file mode 100644 index 000000000..dbf78c17a --- /dev/null +++ b/docs/user-guide/nemosdk.rst @@ -0,0 +1,127 @@ +.. _data-curator-nemo-sdk: + +====================================== +NeMo Curator with NeMo SDK +====================================== +----------------------------------------- +NeMo SDK +----------------------------------------- + +The NeMo SDK is a general purpose tool for configuring and executing Python functions and scripts acrosss various computing environments. +It is used across the NeMo Framework for managing machine learning experiments. +One of the key features of the NeMo SDK is the ability to run code locally or on platforms like SLURM with minimal changes. + +----------------------------------------- +Usage +----------------------------------------- + +We recommend getting slightly familiar with NeMo SDK before jumping into this. The documentation can be found here. + +Let's walk through the example usage for how you can launch a slurm job using `examples/launch_slurm.py `_. + +.. code-block:: python + + + import nemo_sdk as sdk + from nemo_sdk.core.execution import SlurmExecutor + + from nemo_curator.nemo_sdk import SlurmJobConfig + + @sdk.factory + def nemo_curator_slurm_executor() -> SlurmExecutor: + """ + Configure the following function with the details of your SLURM cluster + """ + return SlurmExecutor( + job_name_prefix="nemo-curator", + account="my-account", + nodes=2, + exclusive=True, + time="04:00:00", + container_image="nvcr.io/nvidia/nemo:dev", + container_mounts=["/path/on/machine:/path/in/container"], + ) + +First, we need to define a factory that can produce a ``SlurmExecutor``. +This exectuor is where you define all your cluster parameters. Note: NeMo SDK only supports running on SLURM clusters with `Pyxis `_ right now. +After this, there is the main function + +.. code-block:: python + + # Path to NeMo-Curator/examples/slurm/container_entrypoint.sh on the SLURM cluster + container_entrypoint = "/cluster/path/slurm/container_entrypoint.sh" + # The NeMo Curator command to run + curator_command = "text_cleaning --input-data-dir=/path/to/data --output-clean-dir=/path/to/output" + curator_job = SlurmJobConfig( + job_dir="/home/user/jobs", + container_entrypoint=container_entrypoint, + script_command=curator_command, + ) + +First, we need to specify the path to `examples/slurm/container-entrypoint.sh `_ on the cluster. +This shell script is responsible for setting up the Dask cluster on Slurm and will be the main script run. +Therefore, we need to define the path to it. + +Second, we need to establish the NeMo Curator script we want to run. +This can be a command line utility like ``text_cleaning`` we have above, or it can be your own custom script ran with ``python path/to/script.py`` + + +Finally, we combine all of these into a ``SlurmJobConfig``. This config has many options for configuring the Dask cluster. +We'll highlight a couple of important ones: + +* ``device="cpu"`` determines the type of Dask cluster to initialize. If you are using GPU modules, please set this equal to ``"gpu"``. +* ``interface="etho0"`` specifies the network interface to use for communication within the Dask cluster. It will likely be different for your Slurm cluster, so please modify as needed. You can determine what interfaces are available by running the following function on your cluster. + + .. code-block:: python + + from nemo_curator import get_network_interfaces + + print(get_network_interfaces()) + +.. code-block:: python + + executor = sdk.resolve(SlurmExecutor, "nemo_curator_slurm_executor") + with sdk.Experiment("example_nemo_curator_exp", executor=executor) as exp: + exp.add(curator_job.to_script(), tail_logs=True) + exp.run(detach=False) + +After configuring the job, we can finally run it. +First, we use the sdk to resolve our custom factory. +Next, we use it to begin an experiment named "example_nemo_curator_exp" running on our Slurm exectuor. + +``exp.add(curator_job.to_script(), tail_logs=True)`` adds the NeMo Curator script to be part of the experiment. +It converts the ``SlurmJobConfig`` to a ``sdk.Script``. +This ``curator_job.to_script()`` has two important parameters. +* ``add_scheduler_file=True`` +* ``add_device=True`` + +Both of these modify the command specified in ``curator_command``. +Setting both to ``True`` (the default) transforms the original command from: + +.. code-block:: bash + + # Original command + text_cleaning \ + --input-data-dir=/path/to/data \ + --output-clean-dir=/path/to/output + +to: + +.. code-block:: bash + + # Modified commmand + text_cleaning \ + --input-data-dir=/path/to/data \ + --output-clean-dir=/path/to/output \ + --scheduler-file=/path/to/scheduler/file \ + --device="cpu" + + +As you can see, ``add_scheduler_file=True`` causes ``--scheduler-file=/path/to/scheduer/file`` to be appended to the command, and ``add_device=True`` causes ``--device="cpu"`` (or whatever the device is set to) to be appended. +``/path/to/scheduer/file`` is determined by ``SlurmJobConfig``, and ``device`` is what the user specified in the ``device`` parameter previously. + +The scheduler file argument is necessary to connect to the Dask cluster on Slurm. +All NeMo Curator scripts accept both arguments, so the default is to automatically add them. +If your script is configured differently, feel free to turn these off. + +The final line ``exp.run(detach=False)`` starts the experiment on the Slurm cluster. \ No newline at end of file diff --git a/examples/nemo_sdk/launch_slurm.py b/examples/nemo_sdk/launch_slurm.py new file mode 100644 index 000000000..8bceb9702 --- /dev/null +++ b/examples/nemo_sdk/launch_slurm.py @@ -0,0 +1,56 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import nemo_sdk as sdk +from nemo_sdk.core.execution import SlurmExecutor + +from nemo_curator.nemo_sdk import SlurmJobConfig + + +@sdk.factory +def nemo_curator_slurm_executor() -> SlurmExecutor: + """ + Configure the following function with the details of your SLURM cluster + """ + return SlurmExecutor( + job_name_prefix="nemo-curator", + account="my-account", + nodes=2, + exclusive=True, + time="04:00:00", + container_image="nvcr.io/nvidia/nemo:dev", + container_mounts=["/path/on/machine:/path/in/container"], + ) + + +def main(): + # Path to NeMo-Curator/examples/slurm/container_entrypoint.sh on the SLURM cluster + container_entrypoint = "/cluster/path/slurm/container_entrypoint.sh" + # The NeMo Curator command to run + # This command can be susbstituted with any NeMo Curator command + curator_command = "text_cleaning --input-data-dir=/path/to/data --output-clean-dir=/path/to/output" + curator_job = SlurmJobConfig( + job_dir="/home/user/jobs", + container_entrypoint=container_entrypoint, + script_command=curator_command, + ) + + executor = sdk.resolve(SlurmExecutor, "nemo_curator_slurm_executor") + with sdk.Experiment("example_nemo_curator_exp", executor=executor) as exp: + exp.add(curator_job.to_script(), tail_logs=True) + exp.run(detach=False) + + +if __name__ == "__main__": + main() diff --git a/examples/slurm/container-entrypoint.sh b/examples/slurm/container-entrypoint.sh index 8bc6a9a39..e6e143b3d 100755 --- a/examples/slurm/container-entrypoint.sh +++ b/examples/slurm/container-entrypoint.sh @@ -16,6 +16,12 @@ # Start the scheduler on the rank 0 node if [[ -z "$SLURM_NODEID" ]] || [[ $SLURM_NODEID == 0 ]]; then + # Make the directories needed + echo "Making log directory $LOGDIR" + mkdir -p $LOGDIR + echo "Making profile directory $PROFILESDIR" + mkdir -p $PROFILESDIR + echo "Starting scheduler" if [[ $DEVICE == 'cpu' ]]; then dask scheduler \ @@ -58,7 +64,7 @@ fi sleep 60 if [[ -z "$SLURM_NODEID" ]] || [[ $SLURM_NODEID == 0 ]]; then - echo "Starting $SCRIPT_PATH" + echo "Starting $SCRIPT_COMMAND" bash -c "$SCRIPT_COMMAND" touch $DONE_MARKER fi diff --git a/examples/slurm/start-slurm.sh b/examples/slurm/start-slurm.sh index ab4074657..9e684298b 100644 --- a/examples/slurm/start-slurm.sh +++ b/examples/slurm/start-slurm.sh @@ -28,7 +28,8 @@ export BASE_JOB_DIR=`pwd`/nemo-curator-jobs export JOB_DIR=$BASE_JOB_DIR/$SLURM_JOB_ID -# Logging information +# Directory for Dask cluster communication and logging +# Must be paths inside the container that are accessible across nodes export LOGDIR=$JOB_DIR/logs export PROFILESDIR=$JOB_DIR/profiles export SCHEDULER_FILE=$LOGDIR/scheduler.json @@ -74,9 +75,6 @@ export DASK_DATAFRAME__QUERY_PLANNING=False # End easy customization # ================================================================= -mkdir -p $LOGDIR -mkdir -p $PROFILESDIR - # Start the container srun \ --container-mounts=${MOUNTS} \ diff --git a/nemo_curator/__init__.py b/nemo_curator/__init__.py index 9f1029316..80af4d698 100644 --- a/nemo_curator/__init__.py +++ b/nemo_curator/__init__.py @@ -41,7 +41,7 @@ NemoDeployClient, OpenAIClient, ) -from .utils.distributed_utils import get_client +from .utils.distributed_utils import get_client, get_network_interfaces # Dask will automatically convert the list score type # to a string without this option. diff --git a/nemo_curator/nemo_sdk/__init__.py b/nemo_curator/nemo_sdk/__init__.py new file mode 100644 index 000000000..fe4cd0291 --- /dev/null +++ b/nemo_curator/nemo_sdk/__init__.py @@ -0,0 +1,17 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .slurm import SlurmJobConfig + +__all__ = ["SlurmJobConfig"] diff --git a/nemo_curator/nemo_sdk/slurm.py b/nemo_curator/nemo_sdk/slurm.py new file mode 100644 index 000000000..ccefaffd5 --- /dev/null +++ b/nemo_curator/nemo_sdk/slurm.py @@ -0,0 +1,110 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +from typing import Dict + +from nemo_curator.utils.import_utils import safe_import + +sdk = safe_import("nemo_sdk") + + +@dataclass +class SlurmJobConfig: + """ + Configuration for running a NeMo Curator script on a SLURM cluster using + NeMo SDK + + Args: + job_dir: The base directory where all the files related to setting up + the Dask cluster for NeMo Curator will be written + container_entrypoint: A path to the container-entrypoint.sh script + on the cluster. container-entrypoint.sh is found in the repo + here: https://github.com/NVIDIA/NeMo-Curator/blob/main/examples/slurm/container-entrypoint.sh + script_command: The NeMo Curator CLI tool to run. Pass any additional arguments + needed directly in this string. + device: The type of script that will be running, and therefore the type + of Dask cluster that will be created. Must be either "cpu" or "gpu". + interface: The network interface the Dask cluster will communicate over. + Use nemo_curator.get_network_interfaces() to get a list of available ones. + protocol: The networking protocol to use. Can be either "tcp" or "ucx". + Setting to "ucx" is recommended for GPU jobs if your cluster supports it. + cpu_worker_memory_limit: The maximum memory per process that a Dask worker can use. + "5GB" or "5000M" are examples. "0" means no limit. + rapids_no_initialize: Will delay or disable the CUDA context creation of RAPIDS libraries, + allowing for improved compatibility with UCX-enabled clusters and preventing runtime warnings. + cudf_spill: Enables automatic spilling (and “unspilling”) of buffers from device to host to + enable out-of-memory computation, i.e., computing on objects that occupy more memory + than is available on the GPU. + rmm_scheduler_pool_size: Sets a small pool of GPU memory for message transfers when + the scheduler is using ucx + rmm_worker_pool_size: The amount of GPU memory each GPU worker process may use. + Recommended to set at 80-90% of available GPU memory. 72GiB is good for A100/H100 + libcudf_cufile_policy: Allows reading/writing directly from storage to GPU. + """ + + job_dir: str + container_entrypoint: str + script_command: str + device: str = "cpu" + interface: str = "eth0" + protocol: str = "tcp" + cpu_worker_memory_limit: str = "0" + rapids_no_initialize: str = "1" + cudf_spill: str = "1" + rmm_scheduler_pool_size: str = "1GB" + rmm_worker_pool_size: str = "72GiB" + libcudf_cufile_policy: str = "OFF" + + def to_script(self, add_scheduler_file: bool = True, add_device: bool = True): + """ + Converts to a script object executable by NeMo SDK + Args: + add_scheduler_file: Automatically appends a '--scheduler-file' argument to the + script_command where the value is job_dir/logs/scheduler.json. All + scripts included in NeMo Curator accept and require this argument to scale + properly on SLURM clusters. + add_device: Automatically appends a '--device' argument to the script_command + where the value is the member variable of device. All scripts included in + NeMo Curator accept and require this argument. + Returns: + A NeMo SDK Script that will intialize a Dask cluster, and run the specified command. + It is designed to be executed on a SLURM cluster + """ + env_vars = self._build_env_vars() + + if add_scheduler_file: + env_vars[ + "SCRIPT_COMMAND" + ] += f" --scheduler-file={env_vars['SCHEDULER_FILE']}" + if add_device: + env_vars["SCRIPT_COMMAND"] += f" --device={env_vars['DEVICE']}" + + # Surround the command in quotes so the variable gets set properly + env_vars["SCRIPT_COMMAND"] = f"\"{env_vars['SCRIPT_COMMAND']}\"" + + return sdk.Script(path=self.container_entrypoint, env=env_vars) + + def _build_env_vars(self) -> Dict[str, str]: + env_vars = vars(self) + # Convert to uppercase to match container_entrypoint.sh + env_vars = {key.upper(): val for key, val in env_vars.items()} + + env_vars["LOGDIR"] = f"{self.job_dir}/logs" + env_vars["PROFILESDIR"] = f"{self.job_dir}/profiles" + env_vars["SCHEDULER_FILE"] = f"{env_vars['LOGDIR']}/scheduler.json" + env_vars["SCHEDULER_LOG"] = f"{env_vars['LOGDIR']}/scheduler.log" + env_vars["DONE_MARKER"] = f"{env_vars['LOGDIR']}/done.txt" + + return env_vars diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 629cc387e..ef69963cf 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -21,11 +21,12 @@ import warnings from contextlib import nullcontext from pathlib import Path -from typing import Union +from typing import List, Union import dask.dataframe as dd import numpy as np import pandas as pd +import psutil from dask.distributed import Client, LocalCluster, get_worker, performance_report from nemo_curator.utils.gpu_utils import GPU_INSTALL_STRING, is_cudf_type @@ -611,3 +612,13 @@ def seed_all(seed: int = 42): # Ensure deterministic behavior for CUDA algorithms torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False + + +def get_network_interfaces() -> List[str]: + """ + Gets a list of all valid network interfaces on a machine + + Returns: + A list of all valid network interfaces on a machine + """ + return list(psutil.net_if_addrs().keys()) diff --git a/setup.py b/setup.py index 63ec91035..185fb3afe 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ setup( name="nemo_curator", - version="0.3.0", + version="0.4.0", description="Scalable Data Preprocessing Tool for " "Training Large Language Models", long_description=long_description, @@ -34,6 +34,7 @@ classifiers=[ "Development Status :: 3 - Alpha", "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", ], packages=find_packages(), python_requires=">=3.10, <3.11",