diff --git a/examples/hello-world/ml-to-fl/README.md b/examples/hello-world/ml-to-fl/README.md index cf01c38dfc..1b7a9b9745 100644 --- a/examples/hello-world/ml-to-fl/README.md +++ b/examples/hello-world/ml-to-fl/README.md @@ -39,8 +39,11 @@ Utilizing the ```ClientAPILauncherExecutor```, this option offers flexibility in * Communication via CellPipe (default) * Communication via FilePipe ( no capability to stream experiment track log metrics) This configuration is ideal for scenarios requiring multi-GPU or distributed PyTorch training. - Choose the option best suited to your specific requirements and workflow preferences. +These implementations can be easily configured using the JobAPI's ScriptRunner. +By default, the ```InProcessClientAPIExecutor``` is used, however setting `launch_external_process=True` uses the ```ClientAPILauncherExecutor``` +with pre-configured CellPipes for communication and metrics streaming. + Note: Avoid install TensorFlow and PyTorch on the same virtual environment due to library conflicts. diff --git a/examples/hello-world/ml-to-fl/np/README.md b/examples/hello-world/ml-to-fl/np/README.md index 1de78e0e5a..96ae0bc051 100644 --- a/examples/hello-world/ml-to-fl/np/README.md +++ b/examples/hello-world/ml-to-fl/np/README.md @@ -1,22 +1,6 @@ # NVFlare Client API -We will demonstrate how to send back model parameters or model parameters differences in different approaches in the following examples: - - 1. [Send model parameters back to the NVFlare server](#send-model-parameters-back-to-the-nvflare-server) - 2. [Send model parameters differences back to the NVFlare server](#send-model-parameters-differences-back-to-the-nvflare-server) - - -By default, the "SubprocessLauncher" is going to launch the script once for a job. - -If your data setup is taking a long time, you don't want to launch the whole training script every round. -(This implies that the dataset will be loaded again every round and all the cache will be lost for each round). - -On the other hand, if your system is very resource limited, and you don't want the training process to live throughout the whole -job training, you can use "launch_once=False". - -We demonstrate how to launch training script once and have training script keeps exchanging training parameters with NVFlare: - - 1. [Launch once for the whole job](#launch-once-for-the-whole-job) +In this example we use simple numpy scripts to showcase the Client API with the ScriptRunner in both in-process and sub-process settings. ## Software Requirements @@ -26,170 +10,88 @@ Please install the requirements first, it is suggested to install inside a virtu pip install -r requirements.txt ``` -Please also configure the job templates folder: - -```bash -nvflare config -jt ../../../../job_templates/ -nvflare job list_templates -``` - ## Minimum Hardware Requirements 1 CPU -## Send model parameters back to the NVFlare server +## In-process Client API -We use the mock training script in [./code/train_full.py](./code/train_full.py) -And we send back the FLModel with "params_type"="FULL" in [./code/train_full.py](./code/train_full.py) +The default mode of the `ScriptRunner` with `launch_external_process=False` uses the `InProcessClientAPIExecutor` for in-process script execution. +With the `InProcessClientAPIExecutor`, the client training script operates within the same process as the NVFlare Client job. +This provides benefits with efficient shared the memory usage and a simple configuration useful for development or single GPU use cases. -To send back the whole model parameters, we need to make sure the "params_transfer_type" is also "FULL". +### Send model parameters back to the NVFlare server -Let reuse the job templates from [sag_np](../../../../job_templates/sag_np/): - -```bash -nvflare job create -force -j ./jobs/np_param_full_transfer_full -w sag_np -sd ./code/ \ --f config_fed_client.conf app_script=train_full.py params_transfer_type=FULL launch_once=false -``` +We use the mock training script in [./src/train_full.py](./src/train_full.py) +And we send back the FLModel with "params_type"="FULL" in [./src/train_full.py](./src/train_full.py) -Then we can run it using the NVFlare Simulator: +After we modify our training script, we can create a job using the ScriptRunner: [np_client_api_job.py](./np_client_api_job.py). +(Please refer to [FedJob API](https://nvflare.readthedocs.io/en/main/programming_guide/fed_job_api.html) for more details on formulating a job) ```bash -nvflare simulator -n 2 -t 2 ./jobs/np_param_full_transfer_full -w np_param_full_transfer_full_workspace +python3 np_client_api_job.py --script src/train_full.py ``` -## Send model parameters differences back to the NVFlare server - -There are two ways to send model parameters differences back to the NVFlare server: - -1. Send the full parameters in training script, change params_transfer_type to "DIFF" -2. Calculate the parameters differences in training script and send it back via "flare.send" +### Send model parameters differences back to the NVFlare server -For the first way, we can reuse the mock training script [./code/train_full.py](./code/train_full.py) - -But we need to pass different parameters when creating job: - -```bash -nvflare job create -force -j ./jobs/np_param_full_transfer_diff -w sag_np -sd ./code/ \ --f config_fed_client.conf app_script=train_full.py params_transfer_type=DIFF launch_once=false \ --f config_fed_server.conf expected_data_kind=WEIGHT_DIFF -``` - -By setting "params_transfer_type=DIFF" we are using the NVFlare built-in parameter difference method to calculate differences. - -Then we can run it using the NVFlare Simulator: - -```bash -nvflare simulator -n 2 -t 2 ./jobs/np_param_full_transfer_diff -w np_param_full_transfer_diff_workspace -``` - -For the second way, we write a new mock training script that calculate the model difference and send it back: [./code/train_diff.py](./code/train_diff.py) +We can send model parameter differences back to the NVFlare server by calculating the parameters differences and sending it back: [./src/train_diff.py](./src/train_diff.py) Note that we set the "params_type" to DIFF when creating flare.FLModel. -Then we create the job using the following command: - -```bash -nvflare job create -force -j ./jobs/np_param_diff_transfer_full -w sag_np -sd ./code/ \ --f config_fed_client.conf app_script=train_diff.py launch_once=false \ --f config_fed_server.conf expected_data_kind=WEIGHT_DIFF -``` - -The "params_transfer_type" is "FULL", means that we DO NOT calculate the difference again using the NVFlare built-in parameter difference method. - Then we can run it using the NVFlare Simulator: ```bash -nvflare simulator -n 2 -t 2 ./jobs/np_param_diff_transfer_full -w np_param_diff_transfer_full_workspace +python3 np_client_api_job.py --script src/train_diff.py ``` -## Launch once for the whole job +### Metrics streaming -In some training scenarios, the data loading is taking a lot of time. -And throughout the whole training job, we only want to load/set up the data once. - -In that case, we could use the "launch_once" option of "SubprocessLauncher" and wraps our training script into a loop. +Sometimes we want to stream the training progress to the server. -We wrap the [./code/train_full.py](./code/train_full.py) into a loop: [./code/train_loop.py](./code/train_loop.py) +We have several ways of doing that: -Then we can create the job: + - `SummaryWriter` mimics Tensorboard `SummaryWriter`'s `add_scalar`, `add_scalars` method + - `WandBWriter` mimics Weights And Biases's `log` method + - `MLflowWriter` mimics MLflow's tracking api + - `flare.log` is the underlying common pattern that can be directly used as well, you need to figure out the + corresponding `AnalyticsDataType` for your value -```bash -nvflare job create -force -j ./jobs/np_loop -w sag_np -sd ./code/ \ --f config_fed_client.conf app_script=train_loop.py params_transfer_type=FULL launch_once=true -``` +In this example we use `MLflowWriter` in [./src/train_metrics.py](./src/train_metrics.py) and configure a corresponding `MLflowReceiver` in the job script [np_client_api_job.py](np_client_api_job.py) Then we can run it using the NVFlare Simulator: ```bash -nvflare simulator -n 2 -t 2 ./jobs/np_loop -w np_loop_workspace +python3 np_client_api_job.py --script src/train_metrics.py ``` -## Data exchange mechanism - -The underlying communication between the external process and NVFlare client is facilitated by the `Pipe` class. - -Two distinct types of `Pipe` are implemented: +After the experiment is finished, you can view the results by running the the mlflow command: `mlflow ui --port 5000` inside the directory `/tmp/nvflare/jobs/workdir/server/simulate_job/`. -1. FilePipe: - - The `FilePipe` utilizes the file system for communication, involving read and write operations to a file. - - Suitable when the NVFlare client and the external system/process share a common file system. - - Ideal for scenarios where data exchange frequency is not high; however, it may not be efficient for high-frequency exchanges. +Please refer to MLflow examples and documentation for more information. -2. CellPipe: - - The `CellPipe` leverages the `Cell` from NVFlare's foundation layer (f3) for communication. - This allows it to make use of drivers from the f3 layer, such as TCP, GRPC, HTTP, and any customized drivers. - - Recommended for scenarios with a high frequency of data exchange (for example metrics logging) - or when the file system is beyond your control. +## Sub-process Client API -You can also implement your own `Pipe`, please refer to https://github.com/NVIDIA/NVFlare/blob/main/nvflare/fuel/utils/pipe/pipe.py +The `ScriptRunner` with `launch_external_process=True` uses the `ClientAPILauncherExecutor` for external process script execution. +This configuration is ideal for scenarios requiring multi-GPU or distributed PyTorch training. -So far, we have demonstrated how to use the `FilePipe`. -The following example illustrates how to use the `CellPipe`. +### Launching the script -First, let's create the job using the sag_np_cell_pipe template - -```bash -nvflare job create -force -j ./jobs/np_loop_cell_pipe -w sag_np_cell_pipe -sd ./code/ \ --f config_fed_client.conf app_script=train_loop.py params_transfer_type=FULL launch_once=true -``` +When launching a script in an external process, it is launched once for the entire job. +We must ensure our training script [./src/train_full.py](./src/train_full.py) is in a loop to support this. Then we can run it using the NVFlare Simulator: ```bash -nvflare simulator -n 2 -t 2 ./jobs/np_loop_cell_pipe -w np_loop_cell_pipe_workspace +python3 np_client_api_job.py --script src/train_full.py --launch ``` -## Launch once for the whole job and with metrics streaming - -Sometimes we want to stream the training progress to the server. - -We have several ways of doing that: - - - `SummaryWriter` mimics Tensorboard `SummaryWriter`'s `add_scalar`, `add_scalars` method - - `WandBWriter` mimics Weights And Biases's `log` method - - `MLflowWriter` mimics MLflow's tracking api - - `flare.log` is the underlying common pattern that can be directly used as well, you need to figure out the - corresponding `AnalyticsDataType` for your value - -We showcase `MLflowWriter` in [./code/train_metrics.py](./code/train_metrics.py) +### Metrics streaming -After that, we can set up the job using the sag_np_metrics template: - -```bash -nvflare job create -force -j ./jobs/np_metrics -w sag_np_metrics -sd ./code/ \ --f config_fed_client.conf app_script=train_metrics.py params_transfer_type=DIFF launch_once=true \ --f config_fed_server.conf expected_data_kind=WEIGHT_DIFF -``` +In this example we use `MLflowWriter` in [./src/train_metrics.py](./src/train_metrics.py) and configure a corresponding `MLflowReceiver` in the job script [np_client_api_job.py](np_client_api_job.py) -Once the job is set up, we can run it using the NVFlare Simulator: +Then we can run it using the NVFlare Simulator: ```bash -nvflare simulator -n 2 -t 2 ./jobs/np_metrics -w np_metrics_workspace +python3 np_client_api_job.py --script src/train_metrics.py --launch ``` - -Keep in mind that the difference between sag_np_cell_pipe and sag_np_metrics is the -addition of components like "metrics_pipe," "metric_relayer," and "event_to_fed." -These components allow values from an external process to be sent back to the server. - diff --git a/examples/hello-world/ml-to-fl/np/code/train_full.py b/examples/hello-world/ml-to-fl/np/code/train_full.py deleted file mode 100755 index 7566f49779..0000000000 --- a/examples/hello-world/ml-to-fl/np/code/train_full.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import copy - -import nvflare.client as flare - - -def train(input_arr): - output_arr = copy.deepcopy(input_arr) - # mock training with plus 1 - return output_arr + 1 - - -def evaluate(input_arr): - # mock evaluation metrics - return 100 - - -def main(): - # initializes NVFlare interface - flare.init() - - # get model from NVFlare - input_model = flare.receive() - print(f"received weights is: {input_model.params}") - - # get system information - sys_info = flare.system_info() - print(f"system info is: {sys_info}") - - input_numpy_array = input_model.params["numpy_key"] - - # training - output_numpy_array = train(input_numpy_array) - - # evaluation - metrics = evaluate(input_numpy_array) - - # send back the model - print(f"send back: {output_numpy_array}") - flare.send( - flare.FLModel(params={"numpy_key": output_numpy_array}, params_type="FULL", metrics={"accuracy": metrics}) - ) - - -if __name__ == "__main__": - main() diff --git a/examples/hello-world/ml-to-fl/np/np_client_api_job.py b/examples/hello-world/ml-to-fl/np/np_client_api_job.py new file mode 100644 index 0000000000..fb4b8b57d5 --- /dev/null +++ b/examples/hello-world/ml-to-fl/np/np_client_api_job.py @@ -0,0 +1,78 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse + +from nvflare import FedJob +from nvflare.app_common.np.np_model_persistor import NPModelPersistor +from nvflare.app_common.workflows.fedavg import FedAvg +from nvflare.app_opt.tracking.mlflow.mlflow_receiver import MLflowReceiver +from nvflare.job_config.script_runner import FrameworkType, ScriptRunner + + +def define_parser(): + parser = argparse.ArgumentParser() + parser.add_argument("--n_clients", type=int, default=2) + parser.add_argument("--num_rounds", type=int, default=5) + parser.add_argument("--script", type=str, default="src/train_full.py") + parser.add_argument("--launch", action=argparse.BooleanOptionalAction, default=False) + + return parser.parse_args() + + +def main(): + # define local parameters + args = define_parser() + + n_clients = args.n_clients + num_rounds = args.num_rounds + script = args.script + launch = args.launch + + job = FedJob(name="np_client_api") + + persistor_id = job.to_server(NPModelPersistor(), "persistor") + + # Define the controller workflow and send to server + controller = FedAvg(num_clients=n_clients, num_rounds=num_rounds, persistor_id=persistor_id) + job.to_server(controller) + + # Add MLflow Receiver for metrics streaming + if script == "src/train_metrics.py": + receiver = MLflowReceiver( + tracking_uri="file:///tmp/nvflare/jobs/workdir/server/simulate_job/mlruns", + kwargs={ + "experiment_name": "nvflare-fedavg-np-experiment", + "run_name": "nvflare-fedavg-np-with-mlflow", + "experiment_tags": {"mlflow.note.content": "## **NVFlare FedAvg Numpy experiment with MLflow**"}, + "run_tags": {"mlflow.note.content": "## Federated Experiment tracking with MLflow.\n"}, + }, + artifact_location="artifacts", + events=["fed.analytix_log_stats"], + ) + job.to_server(receiver) + + executor = ScriptRunner( + script=script, + launch_external_process=launch, + framework=FrameworkType.NUMPY, + ) + job.to_clients(executor) + + # job.export_job("/tmp/nvflare/jobs/job_config") + job.simulator_run("/tmp/nvflare/jobs/workdir", n_clients=n_clients, gpu="0") + + +if __name__ == "__main__": + main() diff --git a/examples/hello-world/ml-to-fl/np/requirements.txt b/examples/hello-world/ml-to-fl/np/requirements.txt index e4605852b5..ec750098b4 100644 --- a/examples/hello-world/ml-to-fl/np/requirements.txt +++ b/examples/hello-world/ml-to-fl/np/requirements.txt @@ -1 +1 @@ -nvflare~=2.4.0rc +nvflare~=2.5.0rc diff --git a/examples/hello-world/ml-to-fl/np/code/train_diff.py b/examples/hello-world/ml-to-fl/np/src/train_diff.py similarity index 55% rename from examples/hello-world/ml-to-fl/np/code/train_diff.py rename to examples/hello-world/ml-to-fl/np/src/train_diff.py index c0b6ebd648..341b8e5a6d 100755 --- a/examples/hello-world/ml-to-fl/np/code/train_diff.py +++ b/examples/hello-world/ml-to-fl/np/src/train_diff.py @@ -32,28 +32,30 @@ def main(): # initializes NVFlare interface flare.init() - # get model from NVFlare - input_model = flare.receive() - print(f"received weights is: {input_model.params}") + while flare.is_running(): - # get system information - sys_info = flare.system_info() - print(f"system info is: {sys_info}") + # get model from NVFlare + input_model = flare.receive() + print(f"received weights is: {input_model.params}") - input_numpy_array = input_model.params["numpy_key"] + # get system information + sys_info = flare.system_info() + print(f"system info is: {sys_info}") - # training - output_numpy_array = train(input_numpy_array) + input_numpy_array = input_model.params["numpy_key"] - # evaluation - metrics = evaluate(input_numpy_array) + # training + output_numpy_array = train(input_numpy_array) - # calculate difference here - diff = output_numpy_array - input_numpy_array + # evaluation + metrics = evaluate(input_numpy_array) - # send back the model difference - print(f"send back: {diff}") - flare.send(flare.FLModel(params={"numpy_key": diff}, params_type="DIFF", metrics={"accuracy": metrics})) + # calculate difference here + diff = output_numpy_array - input_numpy_array + + # send back the model difference + print(f"send back: {diff}") + flare.send(flare.FLModel(params={"numpy_key": diff}, params_type="DIFF", metrics={"accuracy": metrics})) if __name__ == "__main__": diff --git a/examples/hello-world/ml-to-fl/np/code/train_loop.py b/examples/hello-world/ml-to-fl/np/src/train_full.py similarity index 75% rename from examples/hello-world/ml-to-fl/np/code/train_loop.py rename to examples/hello-world/ml-to-fl/np/src/train_full.py index e809268a57..8ffb8bdd26 100755 --- a/examples/hello-world/ml-to-fl/np/code/train_loop.py +++ b/examples/hello-world/ml-to-fl/np/src/train_full.py @@ -32,14 +32,16 @@ def main(): # initializes NVFlare interface flare.init() - # get system information - sys_info = flare.system_info() - print(f"system info is: {sys_info}", flush=True) - while flare.is_running(): + + # get model from NVFlare input_model = flare.receive() print(f"received weights is: {input_model.params}", flush=True) + # get system information + sys_info = flare.system_info() + print(f"system info is: {sys_info}") + input_numpy_array = input_model.params["numpy_key"] # training @@ -48,19 +50,10 @@ def main(): # evaluation metrics = evaluate(input_numpy_array) - sys_info = flare.system_info() - print(f"system info is: {sys_info}", flush=True) - print(f"finish round: {input_model.current_round}", flush=True) - # send back the model print(f"send back: {output_numpy_array}", flush=True) flare.send( - flare.FLModel( - params={"numpy_key": output_numpy_array}, - params_type="FULL", - metrics={"accuracy": metrics}, - current_round=input_model.current_round, - ) + flare.FLModel(params={"numpy_key": output_numpy_array}, params_type="FULL", metrics={"accuracy": metrics}) ) diff --git a/examples/hello-world/ml-to-fl/np/code/train_metrics.py b/examples/hello-world/ml-to-fl/np/src/train_metrics.py similarity index 97% rename from examples/hello-world/ml-to-fl/np/code/train_metrics.py rename to examples/hello-world/ml-to-fl/np/src/train_metrics.py index 4bbdd9128e..c3448a8d15 100755 --- a/examples/hello-world/ml-to-fl/np/code/train_metrics.py +++ b/examples/hello-world/ml-to-fl/np/src/train_metrics.py @@ -28,12 +28,12 @@ def train(input_arr, current_round, epochs=3): for i in range(epochs): for j in range(num_of_batches): global_step = current_round * num_of_batches * epochs + i * num_of_batches + j - print(f"logging record: {global_step}") writer.log_metric( key="global_step", value=global_step, step=global_step, ) + print(f"logged records from epoch: {i}") # mock training with plus 1 output_arr += 1 # assume each epoch takes 1 seconds diff --git a/examples/hello-world/ml-to-fl/pt/README.md b/examples/hello-world/ml-to-fl/pt/README.md index 2e0bb074f9..30c81b7f36 100644 --- a/examples/hello-world/ml-to-fl/pt/README.md +++ b/examples/hello-world/ml-to-fl/pt/README.md @@ -4,8 +4,7 @@ We will demonstrate how to transform an existing DL code into an FL application 1. [Show a baseline training script](#the-baseline) 2. [How to modify an existing training script using DL2FL Client API](#transform-cifar10-dl-training-code-to-fl-including-best-model-selection-using-client-api) - 3. [How to modify a structured script using DL2FL decorator](#the-decorator-use-case) - 4. [How to modify a PyTorch Lightning script using DL2FL Lightning Client API](#transform-cifar10-pytorch-lightning-training-code-to-fl-with-nvflare-client-lightning-integration-api) + 3. [How to modify a PyTorch Lightning script using DL2FL Lightning Client API](#transform-cifar10-pytorch-lightning-training-code-to-fl-with-nvflare-client-lightning-integration-api) If you have multi GPU please refer to the following examples: @@ -20,13 +19,6 @@ Please install the requirements first, it is suggested to install inside a virtu pip install -r requirements.txt ``` -Please also configure the job templates folder: - -```bash -nvflare config -jt ../../../../job_templates/ -nvflare job list_templates -``` - ## Minimum Hardware Requirements Each example has different requirements: @@ -35,7 +27,6 @@ Each example has different requirements: | ------------ | -------------------- | | [Show a baseline training script](#the-baseline) | 1 CPU or 1 GPU* | | [How to modify an existing training script using DL2FL Client API](#transform-cifar10-dl-training-code-to-fl-including-best-model-selection-using-client-api) | 1 CPU or 1 GPU* | -| [How to modify a structured script using DL2FL decorator](#the-decorator-use-case) | 1 CPU or 1 GPU* | | [How to modify a PyTorch Lightning script using DL2FL Lightning Client API](#transform-cifar10-pytorch-lightning-training-code-to-fl-with-nvflare-client-lightning-integration-api) | 1 CPU or 1 GPU* | | [How to modify a PyTorch DDP training script using DL2FL Client API](#transform-cifar10-pytorch--ddp-training-code-to-fl-using-client-api) | 2 GPUs | | [How to modify a PyTorch Lightning DDP training script using DL2FL Lightning Client API](#transform-cifar10-pytorch-lightning--ddp-training-code-to-fl-with-nvflare-client-lightning-integration-api) | 2 CPUs or 2 GPUs** | @@ -46,17 +37,17 @@ Each example has different requirements: ## The baseline -We take a CIFAR10 example directly from [PyTorch website](https://github.com/pytorch/tutorials/blob/main/beginner_source/blitz/cifar10_tutorial.py) and do the following cleanups to get [cifar10_original.py](./code/cifar10_original.py): +We take a CIFAR10 example directly from [PyTorch website](https://github.com/pytorch/tutorials/blob/main/beginner_source/blitz/cifar10_tutorial.py) and do the following cleanups to get [cifar10_original.py](./src/cifar10_original.py): 1. Remove the comments -2. Move the definition of Convolutional Neural Network to [net.py](./code/net.py) +2. Move the definition of Convolutional Neural Network to [net.py](./src/net.py) 3. Wrap the whole code inside a main method (https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods) 4. Add the ability to run on GPU to speed up the training process (optional) You can run the baseline using ```bash -python3 ./code/cifar10_original.py +python3 ./src/cifar10_original.py ``` It will run for 2 epochs. @@ -99,90 +90,32 @@ We made the following changes: Optional: Change the data path to an absolute path and use ```./prepare_data.sh``` to download data -The modified code can be found in [./code/cifar10_fl.py](./code/cifar10_fl.py) - -After we modify our training script, we need to put it into a [job structure](https://nvflare.readthedocs.io/en/latest/real_world_fl/job.html) so that NVFlare system knows how to deploy and run the job. - -Please refer to [JOB CLI tutorial](../../../tutorials/job_cli.ipynb) on how to generate a job easily from our existing job templates. +The modified code can be found in [./src/cifar10_fl.py](./src/cifar10_fl.py) -We will use the in-process client API, we choose the [sag_pt in_proc job template](../../../../job_templates/sag_pt_in_proc) and run the following command to create the job: - -```bash -nvflare job create -force -j ./jobs/client_api -w sag_pt_in_proc -sd ./code/ \ - -f config_fed_client.conf app_script=cifar10_fl.py -``` +After we modify our training script, we can create a job using the in-process ScriptRunner: [pt_client_api_job.py](pt_client_api_job.py). +(Please refer to [FedJob API](https://nvflare.readthedocs.io/en/main/programming_guide/fed_job_api.html) for more details on formulating a job) Then we can run it using the NVFlare Simulator: ```bash bash ./prepare_data.sh -nvflare simulator -n 2 -t 2 ./jobs/client_api -w client_api_workspace +python3 pt_client_api_job.py --script src/cifar10_fl.py ``` Congratulations! You have finished an FL training! -## The Decorator use case - -The above case shows how you can change an existing DL code to FL. - -Usually, people have already put their codes into "train", "evaluate", and "test" methods, so they can reuse them. -In that case, the NVFlare DL2FL decorator is the way to go. - -To structure the code, we make the following changes to [./code/cifar10_original.py](./code/cifar10_original.py): - -1. Wrap training logic into a ``train`` method -2. Wrap evaluation logic into an ``evaluate`` method -3. Call train method and evaluate method - -The result is [./code/cifar10_structured_original.py](./code/cifar10_structured_original.py) - -To modify this structured code to be used in FL. -We made the following changes: - -1. Import NVFlare Client API: ```import nvflare.client as flare``` -2. Initialize NVFlare Client API: ```flare.init()``` -3. Modify the ``train`` method: - - Decorate with ```@flare.train``` - - Take additional argument in the beginning - - Load the received aggregated/global model weights into the model structure: ```net.load_state_dict(input_model.params)``` - - Return an FLModel object -4. Add an ```fl_evaluate``` method: - - Decorate with ```@flare.evaluate``` - - The first argument is input FLModel - - Return a float number of metric -5. Receive aggregated/global FLModel from NVFlare side each round: ```input_model = flare.receive()``` -6. Call ```fl_evaluate``` method before training to get metrics on the received aggregated/global model - -Optional: Change the data path to an absolute path and use ```./prepare_data.sh``` to download data - -The modified code can be found in [./code/cifar10_structured_fl.py](./code/cifar10_structured_fl.py) - - -We choose the [sag_pt job template](../../../../job_templates/sag_pt) and run the following command to create the job: - -```bash -nvflare job create -force -j ./jobs/decorator -w sag_pt -sd ./code/ -f config_fed_client.conf app_script=cifar10_structured_fl.py -``` - -Then we can run it using the NVFlare simulator: - -```bash -bash ./prepare_data.sh -nvflare simulator -n 2 -t 2 ./jobs/decorator -w decorator_workspace -``` - ## Transform CIFAR10 PyTorch Lightning training code to FL with NVFLARE Client lightning integration API If you are using [PyTorch Lightning](https://lightning.ai/) to write your training scripts, you can use our NVFlare lightning client API to convert it into FL. -Given a CIFAR10 PyTorch Lightning code example: [./code/cifar10_lightning_original.py](./code/cifar10_lightning_original.py). -Notice we wrap the [Net class](./code/net.py) into LightningModule: [LitNet class](./code/lit_net.py) +Given a CIFAR10 PyTorch Lightning code example: [./src/cifar10_lightning_original.py](./src/cifar10_lightning_original.py). +Notice we wrap the [Net class](./src/net.py) into LightningModule: [LitNet class](./src/lit_net.py) You can run it using ```bash -python3 ./code/cifar10_lightning_original.py +python3 ./src/cifar10_lightning_original.py ``` @@ -193,39 +126,32 @@ To transform the existing code to FL training code, we made the following change 3. Receive aggregated/global FLModel from NVFlare side each round: ```input_model = flare.receive()``` 4. Call trainer.evaluate() method to evaluate newly received aggregated/global model. The resulting evaluation metric will be used for the best model selection -The modified code can be found in [./code/cifar10_lightning_fl.py](./code/cifar10_lightning_fl.py) +The modified code can be found in [./src/cifar10_lightning_fl.py](./src/cifar10_lightning_fl.py) -Then we can create the job using sag_pt_in_proc template: +After we modify our training script, we can create a job using the in-process ScriptRunner: [pt_client_api_job.py](pt_client_api_job.py). -```bash -nvflare job create -force -j ./jobs/lightning -w sag_pt_in_proc -sd ./code/ \ - -f config_fed_client.conf app_script=cifar10_lightning_fl.py \ - -f config_fed_server.conf key_metric=val_acc_epoch model_class_path=lit_net.LitNet -``` - -Note that we pass the "key_metric"="val_acc_epoch" (this name originates from the code [here](./code/lit_net.py#L58)) +Note that for PyTorch Lightning we pass the "key_metric"="val_acc_epoch" (this name originates from the code [here](./src/lit_net.py#L58)) which means the validation accuracy for that epoch. -And we use "lit_net.LitNet" instead of "net.Net" for model class. +And we use `lit_net.LitNet` instead of `net.Net` for model class. Then we run it using the NVFlare simulator: ```bash bash ./prepare_data.sh -nvflare simulator -n 2 -t 2 ./jobs/lightning -w lightning_workspace +python3 pt_client_api_job.py --script src/cifar10_lightning_fl.py --key_metric val_acc_epoch ``` - ## Transform CIFAR10 PyTorch + DDP training code to FL using Client API -We follow the official [PyTorch documentation](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html#initialize-ddp-with-torch-distributed-run-torchrun) and write a [./code/cifar10_ddp_original.py](./code/cifar10_ddp_original.py). +We follow the official [PyTorch documentation](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html#initialize-ddp-with-torch-distributed-run-torchrun) and write a [./src/cifar10_ddp_original.py](./src/cifar10_ddp_original.py). Note that we wrap the evaluation logic into a method for better usability. It can be run using the torch distributed run: ```bash -python3 -m torch.distributed.run --nnodes=1 --nproc_per_node=2 --master_port=6666 ./code/cifar10_ddp_original.py +python3 -m torch.distributed.run --nnodes=1 --nproc_per_node=2 --master_port=6666 ./src/cifar10_ddp_original.py ``` To modify this multi-GPU code to be used in FL. @@ -243,30 +169,20 @@ Note that we only do flare receive and send on the first process (rank 0). Because all the worker processes launched by torch distributed will have the same model in the end, we don't need to send duplicate models back. -The modified code can be found in [./code/cifar10_ddp_fl.py](./code/cifar10_ddp_fl.py) - -In this example, we are going to to use a different job template, where we leverage Client API with sub-process launcher -instead of in-process launcher in other examples. Here is the command we use to create the job: +The modified code can be found in [./src/cifar10_ddp_fl.py](./src/cifar10_ddp_fl.py) -```bash -nvflare job create -force -j ./jobs/client_api_ddp -w sag_pt_deploy_map -sd ./code/ \ - -f app_1/config_fed_client.conf script="python3 -m torch.distributed.run --nnodes\=1 --nproc_per_node\=2 --master_port\=7777 custom/cifar10_ddp_fl.py" \ - -f app_2/config_fed_client.conf script="python3 -m torch.distributed.run --nnodes\=1 --nproc_per_node\=2 --master_port\=8888 custom/cifar10_ddp_fl.py" -``` +After we modify our training script, we can create a job using the ex-process ScriptRunner and set the command to the torch.distributed.run: [pt_client_api_job.py](pt_client_api_job.py). - -Then we run it using the NVFlare simulator: +Then we run it using the NVFlare simulator with the ```torch.distributed.run``` with different ports: ```bash bash ./prepare_data.sh -nvflare simulator -n 2 -t 2 ./jobs/client_api_ddp -w client_api_ddp_workspace +python3 pt_client_api_job.py --script src/cifar10_ddp_fl.py --launch --launch_command 'python3 -m torch.distributed.run --nnodes\=1 --nproc_per_node\=2 --master_port\={PORT}' --ports 7777,8888 ``` - This will start 2 clients and each client will start 2 worker processes. -Note that you might need to change the "master_port" in the "config_fed_client.conf" - if those ports are already taken on your machine. +Note that you might need to change the ports if they are already taken on your machine. ## Transform CIFAR10 PyTorch Lightning + ddp training code to FL with NVFLARE Client lightning integration API @@ -276,32 +192,26 @@ show how to convert multi GPU training as well. We just need to change the Trainer initialize to add extra options: `strategy="ddp", devices=2` -The modified Lightning + DPP code can be found in [./code/cifar10_lightning_ddp_original.py](./code/cifar10_lightning_ddp_original.py) +The modified Lightning + DPP code can be found in [./src/cifar10_lightning_ddp_original.py](./src/cifar10_lightning_ddp_original.py) You can execute it using: ```bash -python3 ./code/cifar10_lightning_ddp_original.py +python3 ./src/cifar10_lightning_ddp_original.py ``` -The modified FL code can be found in [./code/cifar10_lightning_ddp_fl.py](./code/cifar10_lightning_ddp_fl.py) - -Then we can create the job using sag_pt template: +The modified FL code can be found in [./src/cifar10_lightning_ddp_fl.py](./src/cifar10_lightning_ddp_fl.py) -```bash -nvflare job create -force -j ./jobs/lightning_ddp -w sag_pt -sd ./code/ \ - -f config_fed_client.conf app_script=cifar10_lightning_ddp_fl.py \ - -f config_fed_server.conf key_metric=val_acc_epoch model_class_path=lit_net.LitNet -``` +After we modify our training script, we can create a job using the ScriptRunner to launch our script: [pt_client_api_job.py](pt_client_api_job.py). -Note that we pass the "key_metric"="val_acc_epoch" (this name originates from the code [here](./code/lit_net.py#L58)) +Note that we pass the "key_metric"="val_acc_epoch" (this name originates from the code [here](./src/lit_net.py#L58)) which means the validation accuracy for that epoch. -And we use "lit_net.LitNet" instead of "net.Net" for model class. +And we use `lit_net.LitNet` instead of `net.Net` for model class. Then we run it using the NVFlare simulator: ```bash bash ./prepare_data.sh -nvflare simulator -n 2 -t 2 ./jobs/lightning_ddp -w lightning_ddp_workspace +python3 pt_client_api_job.py --script src/cifar10_lightning_ddp_fl.py --key_metric val_acc_epoch --launch ``` diff --git a/examples/hello-world/ml-to-fl/pt/code/cifar10_structured_fl.py b/examples/hello-world/ml-to-fl/pt/code/cifar10_structured_fl.py deleted file mode 100644 index b92b37cb54..0000000000 --- a/examples/hello-world/ml-to-fl/pt/code/cifar10_structured_fl.py +++ /dev/null @@ -1,139 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import torch -import torch.nn as nn -import torch.optim as optim -import torchvision -import torchvision.transforms as transforms -from net import Net - -# (1) import nvflare client API -import nvflare.client as flare - -# (optional) set a fix place so we don't need to download everytime -DATASET_PATH = "/tmp/nvflare/data" -# (optional) We change to use GPU to speed things up. -# if you want to use CPU, change DEVICE="cpu" -DEVICE = "cuda:0" -PATH = "./cifar_net.pth" - - -def main(): - transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) - - batch_size = 4 - - trainset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=True, download=True, transform=transform) - trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2) - - testset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=False, download=True, transform=transform) - testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2) - - net = Net() - - # (2) initializes NVFlare client API - flare.init() - - # (3) decorates with flare.train and load model from the first argument - # wraps training logic into a method - @flare.train - def train(input_model=None, total_epochs=2, lr=0.001): - net.load_state_dict(input_model.params) - - criterion = nn.CrossEntropyLoss() - optimizer = optim.SGD(net.parameters(), lr=lr, momentum=0.9) - - # (optional) use GPU to speed things up - net.to(DEVICE) - # (optional) calculate total steps - steps = total_epochs * len(trainloader) - - for epoch in range(total_epochs): # loop over the dataset multiple times - - running_loss = 0.0 - for i, data in enumerate(trainloader, 0): - # get the inputs; data is a list of [inputs, labels] - # (optional) use GPU to speed things up - inputs, labels = data[0].to(DEVICE), data[1].to(DEVICE) - - # zero the parameter gradients - optimizer.zero_grad() - - # forward + backward + optimize - outputs = net(inputs) - loss = criterion(outputs, labels) - loss.backward() - optimizer.step() - - # print statistics - running_loss += loss.item() - if i % 2000 == 1999: # print every 2000 mini-batches - print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}") - running_loss = 0.0 - - print("Finished Training") - - torch.save(net.state_dict(), PATH) - - # (4) construct trained FL model - output_model = flare.FLModel(params=net.cpu().state_dict(), meta={"NUM_STEPS_CURRENT_ROUND": steps}) - return output_model - - # (5) decorates with flare.evaluate and load model from the first argument - @flare.evaluate - def fl_evaluate(input_model=None): - return evaluate(input_weights=input_model.params) - - # wraps evaluate logic into a method - def evaluate(input_weights): - net.load_state_dict(input_weights) - # (optional) use GPU to speed things up - net.to(DEVICE) - - correct = 0 - total = 0 - # since we're not training, we don't need to calculate the gradients for our outputs - with torch.no_grad(): - for data in testloader: - # (optional) use GPU to speed things up - images, labels = data[0].to(DEVICE), data[1].to(DEVICE) - # calculate outputs by running images through the network - outputs = net(images) - # the class with the highest energy is what we choose as prediction - _, predicted = torch.max(outputs.data, 1) - total += labels.size(0) - correct += (predicted == labels).sum().item() - - # return evaluation metrics - return 100 * correct // total - - while flare.is_running(): - # (6) receives FLModel from NVFlare - input_model = flare.receive() - print(f"current_round={input_model.current_round}") - - # (7) call fl_evaluate method before training - # to evaluate on the received/aggregated model - global_metric = fl_evaluate(input_model) - print(f"Accuracy of the global model on the 10000 test images: {global_metric} %") - # call train method - train(input_model, total_epochs=2, lr=0.001) - # call evaluate method - metric = evaluate(input_weights=torch.load(PATH)) - print(f"Accuracy of the trained model on the 10000 test images: {metric} %") - - -if __name__ == "__main__": - main() diff --git a/examples/hello-world/ml-to-fl/pt/code/cifar10_structured_original.py b/examples/hello-world/ml-to-fl/pt/code/cifar10_structured_original.py deleted file mode 100644 index 0f2d8f7f89..0000000000 --- a/examples/hello-world/ml-to-fl/pt/code/cifar10_structured_original.py +++ /dev/null @@ -1,109 +0,0 @@ -# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import torch -import torch.nn as nn -import torch.optim as optim -import torchvision -import torchvision.transforms as transforms -from net import Net - -# (optional) set a fix place so we don't need to download everytime -DATASET_PATH = "/tmp/nvflare/data" -# (optional) We change to use GPU to speed things up. -# if you want to use CPU, change DEVICE="cpu" -DEVICE = "cuda:0" -PATH = "./cifar_net.pth" - - -def main(): - transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]) - - batch_size = 4 - - trainset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=True, download=True, transform=transform) - trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2) - - testset = torchvision.datasets.CIFAR10(root=DATASET_PATH, train=False, download=True, transform=transform) - testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=2) - - net = Net() - - # wraps training logic into a method - def train(total_epochs=2, lr=0.001): - criterion = nn.CrossEntropyLoss() - optimizer = optim.SGD(net.parameters(), lr=lr, momentum=0.9) - - # (optional) use GPU to speed things up - net.to(DEVICE) - - for epoch in range(total_epochs): # loop over the dataset multiple times - - running_loss = 0.0 - for i, data in enumerate(trainloader, 0): - # get the inputs; data is a list of [inputs, labels] - # (optional) use GPU to speed things up - inputs, labels = data[0].to(DEVICE), data[1].to(DEVICE) - - # zero the parameter gradients - optimizer.zero_grad() - - # forward + backward + optimize - outputs = net(inputs) - loss = criterion(outputs, labels) - loss.backward() - optimizer.step() - - # print statistics - running_loss += loss.item() - if i % 2000 == 1999: # print every 2000 mini-batches - print(f"[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}") - running_loss = 0.0 - - print("Finished Training") - - torch.save(net.state_dict(), PATH) - - # wraps evaluate logic into a method - def evaluate(input_weights): - net.load_state_dict(input_weights) - # (optional) use GPU to speed things up - net.to(DEVICE) - - correct = 0 - total = 0 - # since we're not training, we don't need to calculate the gradients for our outputs - with torch.no_grad(): - for data in testloader: - # (optional) use GPU to speed things up - images, labels = data[0].to(DEVICE), data[1].to(DEVICE) - # calculate outputs by running images through the network - outputs = net(images) - # the class with the highest energy is what we choose as prediction - _, predicted = torch.max(outputs.data, 1) - total += labels.size(0) - correct += (predicted == labels).sum().item() - - # return evaluation metrics - return 100 * correct // total - - # call train method - train(total_epochs=2, lr=0.001) - # call evaluate method - metric = evaluate(input_weights=torch.load(PATH)) - print(f"Accuracy of the trained model on the 10000 test images: {metric} %") - - -if __name__ == "__main__": - main() diff --git a/examples/hello-world/ml-to-fl/pt/pt_client_api_job.py b/examples/hello-world/ml-to-fl/pt/pt_client_api_job.py new file mode 100644 index 0000000000..35a22e03ad --- /dev/null +++ b/examples/hello-world/ml-to-fl/pt/pt_client_api_job.py @@ -0,0 +1,71 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse + +from src.lit_net import LitNet +from src.net import Net + +from nvflare.app_opt.pt.job_config.fed_avg import FedAvgJob +from nvflare.job_config.script_runner import FrameworkType, ScriptRunner + + +def define_parser(): + parser = argparse.ArgumentParser() + parser.add_argument("--n_clients", type=int, default=2) + parser.add_argument("--num_rounds", type=int, default=5) + parser.add_argument("--script", type=str, default="src/cifar10_fl.py") + parser.add_argument("--key_metric", type=str, default="accuracy") + parser.add_argument("--launch", action=argparse.BooleanOptionalAction, default=False) + parser.add_argument("--launch_command", type=str, default="python3 -u") + parser.add_argument("--ports", type=str, default="7777,8888") + + return parser.parse_args() + + +def main(): + # define local parameters + args = define_parser() + + n_clients = args.n_clients + num_rounds = args.num_rounds + script = args.script + key_metric = args.key_metric + launch = args.launch + launch_command = args.launch_command + ports = args.ports.split(",") + + job = FedAvgJob( + name="pt_client_api", + n_clients=n_clients, + num_rounds=num_rounds, + key_metric=key_metric, + initial_model=LitNet() if "lightning" in script else Net(), + ) + + for i in range(n_clients): + executor = ScriptRunner( + script=script, + launch_external_process=launch, + command=launch_command.replace("{PORT}", ports[i]), + framework=FrameworkType.PYTORCH, + ) + job.to(executor, f"site-{i+1}") + + # job.export_job("/tmp/nvflare/jobs/job_config") + job.simulator_run("/tmp/nvflare/jobs/workdir", gpu="0") + + +if __name__ == "__main__": + main() diff --git a/examples/hello-world/ml-to-fl/pt/requirements.txt b/examples/hello-world/ml-to-fl/pt/requirements.txt index ea496a9976..aa8e937270 100644 --- a/examples/hello-world/ml-to-fl/pt/requirements.txt +++ b/examples/hello-world/ml-to-fl/pt/requirements.txt @@ -1,4 +1,4 @@ -nvflare~=2.4.0rc +nvflare~=2.5.0rc torch torchvision jsonargparse[signatures]>=4.17.0 diff --git a/examples/hello-world/ml-to-fl/pt/code/cifar10_ddp_fl.py b/examples/hello-world/ml-to-fl/pt/src/cifar10_ddp_fl.py similarity index 100% rename from examples/hello-world/ml-to-fl/pt/code/cifar10_ddp_fl.py rename to examples/hello-world/ml-to-fl/pt/src/cifar10_ddp_fl.py diff --git a/examples/hello-world/ml-to-fl/pt/code/cifar10_ddp_original.py b/examples/hello-world/ml-to-fl/pt/src/cifar10_ddp_original.py similarity index 100% rename from examples/hello-world/ml-to-fl/pt/code/cifar10_ddp_original.py rename to examples/hello-world/ml-to-fl/pt/src/cifar10_ddp_original.py diff --git a/examples/hello-world/ml-to-fl/pt/code/cifar10_fl.py b/examples/hello-world/ml-to-fl/pt/src/cifar10_fl.py similarity index 100% rename from examples/hello-world/ml-to-fl/pt/code/cifar10_fl.py rename to examples/hello-world/ml-to-fl/pt/src/cifar10_fl.py diff --git a/examples/hello-world/ml-to-fl/pt/code/cifar10_lightning_ddp_fl.py b/examples/hello-world/ml-to-fl/pt/src/cifar10_lightning_ddp_fl.py similarity index 100% rename from examples/hello-world/ml-to-fl/pt/code/cifar10_lightning_ddp_fl.py rename to examples/hello-world/ml-to-fl/pt/src/cifar10_lightning_ddp_fl.py diff --git a/examples/hello-world/ml-to-fl/pt/code/cifar10_lightning_ddp_original.py b/examples/hello-world/ml-to-fl/pt/src/cifar10_lightning_ddp_original.py similarity index 100% rename from examples/hello-world/ml-to-fl/pt/code/cifar10_lightning_ddp_original.py rename to examples/hello-world/ml-to-fl/pt/src/cifar10_lightning_ddp_original.py diff --git a/examples/hello-world/ml-to-fl/pt/code/cifar10_lightning_fl.py b/examples/hello-world/ml-to-fl/pt/src/cifar10_lightning_fl.py similarity index 100% rename from examples/hello-world/ml-to-fl/pt/code/cifar10_lightning_fl.py rename to examples/hello-world/ml-to-fl/pt/src/cifar10_lightning_fl.py diff --git a/examples/hello-world/ml-to-fl/pt/code/cifar10_lightning_original.py b/examples/hello-world/ml-to-fl/pt/src/cifar10_lightning_original.py similarity index 100% rename from examples/hello-world/ml-to-fl/pt/code/cifar10_lightning_original.py rename to examples/hello-world/ml-to-fl/pt/src/cifar10_lightning_original.py diff --git a/examples/hello-world/ml-to-fl/pt/code/cifar10_original.py b/examples/hello-world/ml-to-fl/pt/src/cifar10_original.py similarity index 100% rename from examples/hello-world/ml-to-fl/pt/code/cifar10_original.py rename to examples/hello-world/ml-to-fl/pt/src/cifar10_original.py diff --git a/examples/hello-world/ml-to-fl/pt/code/lit_net.py b/examples/hello-world/ml-to-fl/pt/src/lit_net.py similarity index 99% rename from examples/hello-world/ml-to-fl/pt/code/lit_net.py rename to examples/hello-world/ml-to-fl/pt/src/lit_net.py index aaf150a4c5..01df5eedde 100644 --- a/examples/hello-world/ml-to-fl/pt/code/lit_net.py +++ b/examples/hello-world/ml-to-fl/pt/src/lit_net.py @@ -16,8 +16,8 @@ import torch.nn as nn import torch.optim as optim -from net import Net from pytorch_lightning import LightningModule +from src.net import Net from torchmetrics import Accuracy NUM_CLASSES = 10 diff --git a/examples/hello-world/ml-to-fl/pt/code/net.py b/examples/hello-world/ml-to-fl/pt/src/net.py similarity index 100% rename from examples/hello-world/ml-to-fl/pt/code/net.py rename to examples/hello-world/ml-to-fl/pt/src/net.py diff --git a/examples/hello-world/ml-to-fl/tf/README.md b/examples/hello-world/ml-to-fl/tf/README.md index 2cdd292f86..e40ce4c721 100644 --- a/examples/hello-world/ml-to-fl/tf/README.md +++ b/examples/hello-world/ml-to-fl/tf/README.md @@ -14,13 +14,6 @@ Please install the requirements first, it is suggested to install inside a virtu pip install -r requirements.txt ``` -Please also configure the job templates folder: - -```bash -nvflare config -jt ../../../../job_templates/ -nvflare job list_templates -``` - ## Minimum Hardware Requirements | Example name | minimum requirements | @@ -50,12 +43,12 @@ This can be achieved using the `-gpu` argument during simulation, e.g., `nvflare ## Transform CIFAR10 TensorFlow training code to FL with NVFLARE Client API -Given a TensorFlow CIFAR10 example: [./code/cifar10_tf_original.py](./code/cifar10_tf_original.py). +Given a TensorFlow CIFAR10 example: [./src/cifar10_tf_original.py](./src/cifar10_tf_original.py). You can run it using ```bash -python3 ./code/cifar10_tf_original.py +python3 ./src/cifar10_tf_original.py ``` To transform the existing code to FL training code, we made the following changes: @@ -70,57 +63,40 @@ To transform the existing code to FL training code, we made the following change Notice that we need to get / load the model weights as a ``dict`` of arrays because we want to reuse existing NVFlare components. -The modified code can be found here: [./code/cifar10_tf_fl.py](./code/cifar10_tf_fl.py), [./code/tf_net.py](./code/tf_net.py). - -After we modify our training script, we need to put it into a [job structure](https://nvflare.readthedocs.io/en/latest/real_world_fl/job.html) so that NVFlare system knows how to deploy and run the job. - -Please refer to [JOB CLI tutorial](../../../tutorials/job_cli.ipynb) on how to generate a job easily from our existing job templates. - - -We choose the [tensorflow job template](../../../../job_templates/sag_tf/) and run the following command to create the job: +The modified code can be found here: [./src/cifar10_tf_fl.py](./src/cifar10_tf_fl.py), [./src/tf_net.py](./src/tf_net.py). -```bash -nvflare job create -force -j ./jobs/tensorflow -w sag_tf -sd ./code/ -f config_fed_client.conf app_script=cifar10_tf_fl.py -``` +After we modify our training script, we can create a job using the in-process ScriptRunner: [tf_client_api_job.py](tf_client_api_job.py). +(Please refer to [FedJob API](https://nvflare.readthedocs.io/en/main/programming_guide/fed_job_api.html) for more details on formulating a job) Then we can run the job using the simulator: ```bash bash ./prepare_data.sh -TF_FORCE_GPU_ALLOW_GROWTH=true TF_GPU_ALLOCATOR=cuda_malloc_async nvflare simulator -n 2 -t 2 ./jobs/tensorflow -w tensorflow_workspace +TF_FORCE_GPU_ALLOW_GROWTH=true TF_GPU_ALLOCATOR=cuda_malloc_async python3 tf_client_api_job.py --script src/cifar10_tf_fl.py ``` ## Transform CIFAR10 TensorFlow multi GPU training code to FL with NVFLARE Client API Following the [official documentation](https://www.tensorflow.org/guide/keras/distributed_training#single-host_multi-device_synchronous_training), we modified the single -device TensorFlow CIFAR10 example: [./code/cifar10_tf_original.py](./code/cifar10_tf_original.py) to -a multi-device version: [./code/cifar10_tf_multi_gpu_original.py](./code/cifar10_tf_multi_gpu_original.py) +device TensorFlow CIFAR10 example: [./src/cifar10_tf_original.py](./src/cifar10_tf_original.py) to +a multi-device version: [./src/cifar10_tf_multi_gpu_original.py](./src/cifar10_tf_multi_gpu_original.py) You can run it using ```bash -python3 ./code/cifar10_tf_multi_gpu_original.py +python3 ./src/cifar10_tf_multi_gpu_original.py ``` To transform the existing multi-gpu code to FL training code, we can apply the same changes as in [single GPU case](#transform-cifar10-tensorflow-training-code-to-fl-with-nvflare-client-api). -The modified code can be found here: [./code/cifar10_tf_multi_gpu_fl.py](./code/cifar10_tf_multi_gpu_fl.py). - -After we modify our training script, we need to put it into a [job structure](https://nvflare.readthedocs.io/en/latest/real_world_fl/job.html) so that NVFlare system knows how to deploy and run the job. - -Please refer to [JOB CLI tutorial](../../../tutorials/job_cli.ipynb) on how to generate a job easily from our existing job templates. +The modified code can be found here: [./src/cifar10_tf_multi_gpu_fl.py](./src/cifar10_tf_multi_gpu_fl.py). - -We choose the [tensorflow job template](../../../../job_templates/sag_tf/) and run the following command to create the job: - -```bash -nvflare job create -force -j ./jobs/tensorflow_multi_gpu -w sag_tf -sd ./code/ -f config_fed_client.conf app_script=cifar10_tf_multi_gpu_fl.py -``` +After we modify our training script, we can create a job using the ScriptRunner to launch our script: [tf_client_api_job.py](tf_client_api_job.py). Then we can run the job using the simulator: ```bash bash ./prepare_data.sh -TF_FORCE_GPU_ALLOW_GROWTH=true TF_GPU_ALLOCATOR=cuda_malloc_async nvflare simulator -n 2 -t 2 ./jobs/tensorflow_multi_gpu -w tensorflow_multi_gpu_workspace +TF_FORCE_GPU_ALLOW_GROWTH=true TF_GPU_ALLOCATOR=cuda_malloc_async python3 tf_client_api_job.py --script src/cifar10_tf_multi_gpu_fl.py --launch ``` diff --git a/examples/hello-world/ml-to-fl/tf/requirements.txt b/examples/hello-world/ml-to-fl/tf/requirements.txt index 8f8b6bc27b..3df9484525 100644 --- a/examples/hello-world/ml-to-fl/tf/requirements.txt +++ b/examples/hello-world/ml-to-fl/tf/requirements.txt @@ -1,2 +1,2 @@ -nvflare~=2.4.0rc +nvflare~=2.5.0rc tensorflow diff --git a/examples/hello-world/ml-to-fl/tf/code/cifar10_tf_fl.py b/examples/hello-world/ml-to-fl/tf/src/cifar10_tf_fl.py similarity index 98% rename from examples/hello-world/ml-to-fl/tf/code/cifar10_tf_fl.py rename to examples/hello-world/ml-to-fl/tf/src/cifar10_tf_fl.py index 6a0d40c793..15512a2eb8 100644 --- a/examples/hello-world/ml-to-fl/tf/code/cifar10_tf_fl.py +++ b/examples/hello-world/ml-to-fl/tf/src/cifar10_tf_fl.py @@ -29,6 +29,7 @@ def main(): train_images, test_images = train_images / 255.0, test_images / 255.0 model = TFNet() + model.build(input_shape=(None, 32, 32, 3)) model.compile( optimizer="adam", loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=["accuracy"] ) diff --git a/examples/hello-world/ml-to-fl/tf/code/cifar10_tf_multi_gpu_fl.py b/examples/hello-world/ml-to-fl/tf/src/cifar10_tf_multi_gpu_fl.py similarity index 98% rename from examples/hello-world/ml-to-fl/tf/code/cifar10_tf_multi_gpu_fl.py rename to examples/hello-world/ml-to-fl/tf/src/cifar10_tf_multi_gpu_fl.py index 94edbfa309..ead10362a9 100644 --- a/examples/hello-world/ml-to-fl/tf/code/cifar10_tf_multi_gpu_fl.py +++ b/examples/hello-world/ml-to-fl/tf/src/cifar10_tf_multi_gpu_fl.py @@ -34,6 +34,7 @@ def main(): with strategy.scope(): model = TFNet() + model.build(input_shape=(None, 32, 32, 3)) model.compile( optimizer="adam", loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=["accuracy"] ) diff --git a/examples/hello-world/ml-to-fl/tf/code/cifar10_tf_multi_gpu_original.py b/examples/hello-world/ml-to-fl/tf/src/cifar10_tf_multi_gpu_original.py similarity index 100% rename from examples/hello-world/ml-to-fl/tf/code/cifar10_tf_multi_gpu_original.py rename to examples/hello-world/ml-to-fl/tf/src/cifar10_tf_multi_gpu_original.py diff --git a/examples/hello-world/ml-to-fl/tf/code/cifar10_tf_original.py b/examples/hello-world/ml-to-fl/tf/src/cifar10_tf_original.py similarity index 100% rename from examples/hello-world/ml-to-fl/tf/code/cifar10_tf_original.py rename to examples/hello-world/ml-to-fl/tf/src/cifar10_tf_original.py diff --git a/examples/hello-world/ml-to-fl/tf/code/tf_net.py b/examples/hello-world/ml-to-fl/tf/src/tf_net.py similarity index 82% rename from examples/hello-world/ml-to-fl/tf/code/tf_net.py rename to examples/hello-world/ml-to-fl/tf/src/tf_net.py index 83feaef613..95bf8ab462 100644 --- a/examples/hello-world/ml-to-fl/tf/code/tf_net.py +++ b/examples/hello-world/ml-to-fl/tf/src/tf_net.py @@ -16,9 +16,11 @@ class TFNet(models.Sequential): - def __init__(self): + def __init__(self, input_shape=(None, 32, 32, 3)): super().__init__() - self.add(layers.Input(shape=(32, 32, 3))) + self._input_shape = input_shape + # Do not specify input as we will use delayed built only during runtime of the model + # self.add(layers.Input(shape=(32, 32, 3))) self.add(layers.Conv2D(32, (3, 3), activation="relu")) self.add(layers.MaxPooling2D((2, 2))) self.add(layers.Conv2D(64, (3, 3), activation="relu")) diff --git a/examples/hello-world/ml-to-fl/tf/tf_client_api_job.py b/examples/hello-world/ml-to-fl/tf/tf_client_api_job.py new file mode 100644 index 0000000000..7683c9c8a3 --- /dev/null +++ b/examples/hello-world/ml-to-fl/tf/tf_client_api_job.py @@ -0,0 +1,61 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse + +from src.tf_net import TFNet + +from nvflare.app_opt.tf.job_config.fed_avg import FedAvgJob +from nvflare.job_config.script_runner import FrameworkType, ScriptRunner + + +def define_parser(): + parser = argparse.ArgumentParser() + parser.add_argument("--n_clients", type=int, default=2) + parser.add_argument("--num_rounds", type=int, default=5) + parser.add_argument("--script", type=str, default="src/cifar10_tf_fl.py") + parser.add_argument("--launch", action=argparse.BooleanOptionalAction, default=False) + + return parser.parse_args() + + +def main(): + # define local parameters + args = define_parser() + + n_clients = args.n_clients + num_rounds = args.num_rounds + script = args.script + launch = args.launch + + job = FedAvgJob( + name="tf_client_api_in_process", + n_clients=n_clients, + num_rounds=num_rounds, + initial_model=TFNet(input_shape=(None, 32, 32, 3)), + ) + + executor = ScriptRunner( + script=script, + launch_external_process=launch, + framework=FrameworkType.TENSORFLOW, + ) + job.to_clients(executor) + + # job.export_job("/tmp/nvflare/jobs/job_config") + job.simulator_run("/tmp/nvflare/jobs/workdir", n_clients=n_clients, gpu="0") + + +if __name__ == "__main__": + main()