Skip to content

Commit

Permalink
update ml-to-fl examples with new apis
Browse files Browse the repository at this point in the history
  • Loading branch information
SYangster committed Aug 23, 2024
1 parent d7f1010 commit 22874b8
Show file tree
Hide file tree
Showing 31 changed files with 329 additions and 636 deletions.
5 changes: 4 additions & 1 deletion examples/hello-world/ml-to-fl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ Utilizing the ```ClientAPILauncherExecutor```, this option offers flexibility in
* Communication via CellPipe (default)
* Communication via FilePipe ( no capability to stream experiment track log metrics)
This configuration is ideal for scenarios requiring multi-GPU or distributed PyTorch training.


Choose the option best suited to your specific requirements and workflow preferences.

These implementations can be easily configured using the JobAPI's ScriptRunner.
By default, the ```InProcessClientAPIExecutor``` is used, however setting `launch_external_process=True` uses the ```ClientAPILauncherExecutor```
with pre-configured CellPipes for communication and metrics streaming.

Note: Avoid install TensorFlow and PyTorch on the same virtual environment due to library conflicts.
172 changes: 37 additions & 135 deletions examples/hello-world/ml-to-fl/np/README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,6 @@
# NVFlare Client API

We will demonstrate how to send back model parameters or model parameters differences in different approaches in the following examples:

1. [Send model parameters back to the NVFlare server](#send-model-parameters-back-to-the-nvflare-server)
2. [Send model parameters differences back to the NVFlare server](#send-model-parameters-differences-back-to-the-nvflare-server)


By default, the "SubprocessLauncher" is going to launch the script once for a job.

If your data setup is taking a long time, you don't want to launch the whole training script every round.
(This implies that the dataset will be loaded again every round and all the cache will be lost for each round).

On the other hand, if your system is very resource limited, and you don't want the training process to live throughout the whole
job training, you can use "launch_once=False".

We demonstrate how to launch training script once and have training script keeps exchanging training parameters with NVFlare:

1. [Launch once for the whole job](#launch-once-for-the-whole-job)
In this example we use simple numpy scripts to showcase the Client API with the ScriptRunner in both in-process and sub-process settings.

## Software Requirements

Expand All @@ -26,170 +10,88 @@ Please install the requirements first, it is suggested to install inside a virtu
pip install -r requirements.txt
```

Please also configure the job templates folder:

```bash
nvflare config -jt ../../../../job_templates/
nvflare job list_templates
```

## Minimum Hardware Requirements

1 CPU


## Send model parameters back to the NVFlare server
## In-process Client API

We use the mock training script in [./code/train_full.py](./code/train_full.py)
And we send back the FLModel with "params_type"="FULL" in [./code/train_full.py](./code/train_full.py)
The default mode of the `ScriptRunner` with `launch_external_process=False` uses the `InProcessClientAPIExecutor` for in-process script execution.
With the `InProcessClientAPIExecutor`, the client training script operates within the same process as the NVFlare Client job.
This provides benefits with efficient shared the memory usage and a simple configuration useful for development or single GPU use cases.

To send back the whole model parameters, we need to make sure the "params_transfer_type" is also "FULL".
### Send model parameters back to the NVFlare server

Let reuse the job templates from [sag_np](../../../../job_templates/sag_np/):

```bash
nvflare job create -force -j ./jobs/np_param_full_transfer_full -w sag_np -sd ./code/ \
-f config_fed_client.conf app_script=train_full.py params_transfer_type=FULL launch_once=false
```
We use the mock training script in [./src/train_full.py](./src/train_full.py)
And we send back the FLModel with "params_type"="FULL" in [./src/train_full.py](./src/train_full.py)

Then we can run it using the NVFlare Simulator:
After we modify our training script, we can create a job using the ScriptRunner: [np_client_api_job.py](./np_client_api_job.py).
(Please refer to [FedJob API](https://nvflare.readthedocs.io/en/main/programming_guide/fed_job_api.html) for more details on formulating a job)

```bash
nvflare simulator -n 2 -t 2 ./jobs/np_param_full_transfer_full -w np_param_full_transfer_full_workspace
python3 np_client_api_job.py --script src/train_full.py
```

## Send model parameters differences back to the NVFlare server

There are two ways to send model parameters differences back to the NVFlare server:

1. Send the full parameters in training script, change params_transfer_type to "DIFF"
2. Calculate the parameters differences in training script and send it back via "flare.send"
### Send model parameters differences back to the NVFlare server

For the first way, we can reuse the mock training script [./code/train_full.py](./code/train_full.py)

But we need to pass different parameters when creating job:

```bash
nvflare job create -force -j ./jobs/np_param_full_transfer_diff -w sag_np -sd ./code/ \
-f config_fed_client.conf app_script=train_full.py params_transfer_type=DIFF launch_once=false \
-f config_fed_server.conf expected_data_kind=WEIGHT_DIFF
```

By setting "params_transfer_type=DIFF" we are using the NVFlare built-in parameter difference method to calculate differences.

Then we can run it using the NVFlare Simulator:

```bash
nvflare simulator -n 2 -t 2 ./jobs/np_param_full_transfer_diff -w np_param_full_transfer_diff_workspace
```

For the second way, we write a new mock training script that calculate the model difference and send it back: [./code/train_diff.py](./code/train_diff.py)
We can send model parameter differences back to the NVFlare server by calculating the parameters differences and sending it back: [./src/train_diff.py](./src/train_diff.py)

Note that we set the "params_type" to DIFF when creating flare.FLModel.

Then we create the job using the following command:

```bash
nvflare job create -force -j ./jobs/np_param_diff_transfer_full -w sag_np -sd ./code/ \
-f config_fed_client.conf app_script=train_diff.py launch_once=false \
-f config_fed_server.conf expected_data_kind=WEIGHT_DIFF
```

The "params_transfer_type" is "FULL", means that we DO NOT calculate the difference again using the NVFlare built-in parameter difference method.

Then we can run it using the NVFlare Simulator:

```bash
nvflare simulator -n 2 -t 2 ./jobs/np_param_diff_transfer_full -w np_param_diff_transfer_full_workspace
python3 np_client_api_job.py --script src/train_diff.py
```

## Launch once for the whole job
### Metrics streaming

In some training scenarios, the data loading is taking a lot of time.
And throughout the whole training job, we only want to load/set up the data once.

In that case, we could use the "launch_once" option of "SubprocessLauncher" and wraps our training script into a loop.
Sometimes we want to stream the training progress to the server.

We wrap the [./code/train_full.py](./code/train_full.py) into a loop: [./code/train_loop.py](./code/train_loop.py)
We have several ways of doing that:

Then we can create the job:
- `SummaryWriter` mimics Tensorboard `SummaryWriter`'s `add_scalar`, `add_scalars` method
- `WandBWriter` mimics Weights And Biases's `log` method
- `MLflowWriter` mimics MLflow's tracking api
- `flare.log` is the underlying common pattern that can be directly used as well, you need to figure out the
corresponding `AnalyticsDataType` for your value

```bash
nvflare job create -force -j ./jobs/np_loop -w sag_np -sd ./code/ \
-f config_fed_client.conf app_script=train_loop.py params_transfer_type=FULL launch_once=true
```
In this example we use `MLflowWriter` in [./src/train_metrics.py](./src/train_metrics.py) and configure a corresponding `MLflowReceiver` in the job script [np_client_api_job.py](np_client_api_job.py)

Then we can run it using the NVFlare Simulator:

```bash
nvflare simulator -n 2 -t 2 ./jobs/np_loop -w np_loop_workspace
python3 np_client_api_job.py --script src/train_metrics.py
```

## Data exchange mechanism

The underlying communication between the external process and NVFlare client is facilitated by the `Pipe` class.

Two distinct types of `Pipe` are implemented:
After the experiment is finished, you can view the results by running the the mlflow command: `mlflow ui --port 5000` inside the directory `/tmp/nvflare/jobs/workdir/server/simulate_job/`.

1. FilePipe:
- The `FilePipe` utilizes the file system for communication, involving read and write operations to a file.
- Suitable when the NVFlare client and the external system/process share a common file system.
- Ideal for scenarios where data exchange frequency is not high; however, it may not be efficient for high-frequency exchanges.
Please refer to MLflow examples and documentation for more information.

2. CellPipe:
- The `CellPipe` leverages the `Cell` from NVFlare's foundation layer (f3) for communication.
This allows it to make use of drivers from the f3 layer, such as TCP, GRPC, HTTP, and any customized drivers.

- Recommended for scenarios with a high frequency of data exchange (for example metrics logging)
or when the file system is beyond your control.
## Sub-process Client API

You can also implement your own `Pipe`, please refer to https://github.com/NVIDIA/NVFlare/blob/main/nvflare/fuel/utils/pipe/pipe.py
The `ScriptRunner` with `launch_external_process=True` uses the `ClientAPILauncherExecutor` for external process script execution.
This configuration is ideal for scenarios requiring multi-GPU or distributed PyTorch training.

So far, we have demonstrated how to use the `FilePipe`.
The following example illustrates how to use the `CellPipe`.
### Launching the script

First, let's create the job using the sag_np_cell_pipe template

```bash
nvflare job create -force -j ./jobs/np_loop_cell_pipe -w sag_np_cell_pipe -sd ./code/ \
-f config_fed_client.conf app_script=train_loop.py params_transfer_type=FULL launch_once=true
```
When launching a script in an external process, it is launched once for the entire job.
We must ensure our training script [./src/train_full.py](./src/train_full.py) is in a loop to support this.

Then we can run it using the NVFlare Simulator:

```bash
nvflare simulator -n 2 -t 2 ./jobs/np_loop_cell_pipe -w np_loop_cell_pipe_workspace
python3 np_client_api_job.py --script src/train_full.py --launch
```

## Launch once for the whole job and with metrics streaming

Sometimes we want to stream the training progress to the server.

We have several ways of doing that:

- `SummaryWriter` mimics Tensorboard `SummaryWriter`'s `add_scalar`, `add_scalars` method
- `WandBWriter` mimics Weights And Biases's `log` method
- `MLflowWriter` mimics MLflow's tracking api
- `flare.log` is the underlying common pattern that can be directly used as well, you need to figure out the
corresponding `AnalyticsDataType` for your value

We showcase `MLflowWriter` in [./code/train_metrics.py](./code/train_metrics.py)
### Metrics streaming

After that, we can set up the job using the sag_np_metrics template:

```bash
nvflare job create -force -j ./jobs/np_metrics -w sag_np_metrics -sd ./code/ \
-f config_fed_client.conf app_script=train_metrics.py params_transfer_type=DIFF launch_once=true \
-f config_fed_server.conf expected_data_kind=WEIGHT_DIFF
```
In this example we use `MLflowWriter` in [./src/train_metrics.py](./src/train_metrics.py) and configure a corresponding `MLflowReceiver` in the job script [np_client_api_job.py](np_client_api_job.py)

Once the job is set up, we can run it using the NVFlare Simulator:
Then we can run it using the NVFlare Simulator:

```bash
nvflare simulator -n 2 -t 2 ./jobs/np_metrics -w np_metrics_workspace
python3 np_client_api_job.py --script src/train_metrics.py --launch
```

Keep in mind that the difference between sag_np_cell_pipe and sag_np_metrics is the
addition of components like "metrics_pipe," "metric_relayer," and "event_to_fed."
These components allow values from an external process to be sent back to the server.

59 changes: 0 additions & 59 deletions examples/hello-world/ml-to-fl/np/code/train_full.py

This file was deleted.

78 changes: 78 additions & 0 deletions examples/hello-world/ml-to-fl/np/np_client_api_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse

from nvflare import FedJob
from nvflare.app_common.np.np_model_persistor import NPModelPersistor
from nvflare.app_common.workflows.fedavg import FedAvg
from nvflare.app_opt.tracking.mlflow.mlflow_receiver import MLflowReceiver
from nvflare.job_config.script_runner import FrameworkType, ScriptRunner


def define_parser():
parser = argparse.ArgumentParser()
parser.add_argument("--n_clients", type=int, default=2)
parser.add_argument("--num_rounds", type=int, default=5)
parser.add_argument("--script", type=str, default="src/train_full.py")
parser.add_argument("--launch", action=argparse.BooleanOptionalAction, default=False)

return parser.parse_args()


def main():
# define local parameters
args = define_parser()

n_clients = args.n_clients
num_rounds = args.num_rounds
script = args.script
launch = args.launch

job = FedJob(name="np_client_api")

persistor_id = job.to_server(NPModelPersistor(), "persistor")

# Define the controller workflow and send to server
controller = FedAvg(num_clients=n_clients, num_rounds=num_rounds, persistor_id=persistor_id)
job.to_server(controller)

# Add MLflow Receiver for metrics streaming
if script == "src/train_metrics.py":
receiver = MLflowReceiver(
tracking_uri="file:///tmp/nvflare/jobs/workdir/server/simulate_job/mlruns",
kwargs={
"experiment_name": "nvflare-fedavg-np-experiment",
"run_name": "nvflare-fedavg-np-with-mlflow",
"experiment_tags": {"mlflow.note.content": "## **NVFlare FedAvg Numpy experiment with MLflow**"},
"run_tags": {"mlflow.note.content": "## Federated Experiment tracking with MLflow.\n"},
},
artifact_location="artifacts",
events=["fed.analytix_log_stats"],
)
job.to_server(receiver)

executor = ScriptRunner(
script=script,
launch_external_process=launch,
framework=FrameworkType.NUMPY,
)
job.to_clients(executor)

# job.export_job("/tmp/nvflare/jobs/job_config")
job.simulator_run("/tmp/nvflare/jobs/workdir", n_clients=n_clients, gpu="0")


if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion examples/hello-world/ml-to-fl/np/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
nvflare~=2.4.0rc
nvflare~=2.5.0rc
Loading

0 comments on commit 22874b8

Please sign in to comment.