Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ml-to-fl examples with new APIs #2836

Merged
merged 5 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion examples/hello-world/ml-to-fl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ Utilizing the ```ClientAPILauncherExecutor```, this option offers flexibility in
* Communication via CellPipe (default)
* Communication via FilePipe ( no capability to stream experiment track log metrics)
This configuration is ideal for scenarios requiring multi-GPU or distributed PyTorch training.


Choose the option best suited to your specific requirements and workflow preferences.

These implementations can be easily configured using the JobAPI's ScriptRunner.
By default, the ```InProcessClientAPIExecutor``` is used, however setting `launch_external_process=True` uses the ```ClientAPILauncherExecutor```
with pre-configured CellPipes for communication and metrics streaming.

Note: Avoid install TensorFlow and PyTorch on the same virtual environment due to library conflicts.
170 changes: 37 additions & 133 deletions examples/hello-world/ml-to-fl/np/README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,6 @@
# NVFlare Client API

We will demonstrate how to send back model parameters or model parameters differences in different approaches in the following examples:

1. [Send model parameters back to the NVFlare server](#send-model-parameters-back-to-the-nvflare-server)
2. [Send model parameters differences back to the NVFlare server](#send-model-parameters-differences-back-to-the-nvflare-server)


By default, the "SubprocessLauncher" is going to launch the script once for a job.

If your data setup is taking a long time, you don't want to launch the whole training script every round.
(This implies that the dataset will be loaded again every round and all the cache will be lost for each round).

On the other hand, if your system is very resource limited, and you don't want the training process to live throughout the whole
job training, you can use "launch_once=False".

We demonstrate how to launch training script once and have training script keeps exchanging training parameters with NVFlare:

1. [Launch once for the whole job](#launch-once-for-the-whole-job)
In this example we use simple numpy scripts to showcase the Client API with the ScriptRunner in both in-process and sub-process settings.

## Software Requirements

Expand All @@ -26,170 +10,90 @@ Please install the requirements first, it is suggested to install inside a virtu
pip install -r requirements.txt
```

Please also configure the job templates folder:

```bash
nvflare config -jt ../../../../job_templates/
nvflare job list_templates
```

## Minimum Hardware Requirements

1 CPU


## Send model parameters back to the NVFlare server

We use the mock training script in [./code/train_full.py](./code/train_full.py)
And we send back the FLModel with "params_type"="FULL" in [./code/train_full.py](./code/train_full.py)

To send back the whole model parameters, we need to make sure the "params_transfer_type" is also "FULL".

Let reuse the job templates from [sag_np](../../../../job_templates/sag_np/):

```bash
nvflare job create -force -j ./jobs/np_param_full_transfer_full -w sag_np -sd ./code/ \
-f config_fed_client.conf app_script=train_full.py params_transfer_type=FULL launch_once=false
```

Then we can run it using the NVFlare Simulator:
## In-process Client API

```bash
nvflare simulator -n 2 -t 2 ./jobs/np_param_full_transfer_full -w np_param_full_transfer_full_workspace
```
The default mode of the `ScriptRunner` with `launch_external_process=False` uses the `InProcessClientAPIExecutor` for in-process script execution.
With the `InProcessClientAPIExecutor`, the client training script operates within the same process as the NVFlare Client job.
This provides benefits with efficient shared the memory usage and a simple configuration useful for development or single GPU use cases.

## Send model parameters differences back to the NVFlare server
### Send model parameters back to the NVFlare server

There are two ways to send model parameters differences back to the NVFlare server:
We use the mock training script in [./src/train_full.py](./src/train_full.py)
And we send back the FLModel with "params_type"="FULL" in [./src/train_full.py](./src/train_full.py)

1. Send the full parameters in training script, change params_transfer_type to "DIFF"
2. Calculate the parameters differences in training script and send it back via "flare.send"
After we modify our training script, we can create a job using the ScriptRunner: [np_client_api_job.py](./np_client_api_job.py).
(Please refer to [FedJob API](https://nvflare.readthedocs.io/en/main/programming_guide/fed_job_api.html) for more details on formulating a job)

For the first way, we can reuse the mock training script [./code/train_full.py](./code/train_full.py)

But we need to pass different parameters when creating job:
Then we can run the job using the simulator with the Job API. (This is equivalent to using the CLI command `nvflare simulator <job_folder>`)

```bash
nvflare job create -force -j ./jobs/np_param_full_transfer_diff -w sag_np -sd ./code/ \
-f config_fed_client.conf app_script=train_full.py params_transfer_type=DIFF launch_once=false \
-f config_fed_server.conf expected_data_kind=WEIGHT_DIFF
python3 np_client_api_job.py --script src/train_full.py
```
SYangster marked this conversation as resolved.
Show resolved Hide resolved

By setting "params_transfer_type=DIFF" we are using the NVFlare built-in parameter difference method to calculate differences.
Note: We can instead export the job configuration to use in other modes with the flag `--export_config`.

Then we can run it using the NVFlare Simulator:
### Send model parameters differences back to the NVFlare server

```bash
nvflare simulator -n 2 -t 2 ./jobs/np_param_full_transfer_diff -w np_param_full_transfer_diff_workspace
```

For the second way, we write a new mock training script that calculate the model difference and send it back: [./code/train_diff.py](./code/train_diff.py)
We can send model parameter differences back to the NVFlare server by calculating the parameters differences and sending it back: [./src/train_diff.py](./src/train_diff.py)

Note that we set the "params_type" to DIFF when creating flare.FLModel.

Then we create the job using the following command:

```bash
nvflare job create -force -j ./jobs/np_param_diff_transfer_full -w sag_np -sd ./code/ \
-f config_fed_client.conf app_script=train_diff.py launch_once=false \
-f config_fed_server.conf expected_data_kind=WEIGHT_DIFF
```

The "params_transfer_type" is "FULL", means that we DO NOT calculate the difference again using the NVFlare built-in parameter difference method.

Then we can run it using the NVFlare Simulator:

```bash
nvflare simulator -n 2 -t 2 ./jobs/np_param_diff_transfer_full -w np_param_diff_transfer_full_workspace
python3 np_client_api_job.py --script src/train_diff.py
```

## Launch once for the whole job
### Metrics streaming

In some training scenarios, the data loading is taking a lot of time.
And throughout the whole training job, we only want to load/set up the data once.
Sometimes we want to stream the training progress to the server.

In that case, we could use the "launch_once" option of "SubprocessLauncher" and wraps our training script into a loop.
We have several ways of doing that:

We wrap the [./code/train_full.py](./code/train_full.py) into a loop: [./code/train_loop.py](./code/train_loop.py)
- `SummaryWriter` mimics Tensorboard `SummaryWriter`'s `add_scalar`, `add_scalars` method
- `WandBWriter` mimics Weights And Biases's `log` method
- `MLflowWriter` mimics MLflow's tracking api

Then we can create the job:

```bash
nvflare job create -force -j ./jobs/np_loop -w sag_np -sd ./code/ \
-f config_fed_client.conf app_script=train_loop.py params_transfer_type=FULL launch_once=true
```
In this example we use `MLflowWriter` in [./src/train_metrics.py](./src/train_metrics.py) and configure a corresponding `MLflowReceiver` in the job script [np_client_api_job.py](np_client_api_job.py)

Then we can run it using the NVFlare Simulator:

```bash
nvflare simulator -n 2 -t 2 ./jobs/np_loop -w np_loop_workspace
python3 np_client_api_job.py --script src/train_metrics.py
```

## Data exchange mechanism

The underlying communication between the external process and NVFlare client is facilitated by the `Pipe` class.
After the experiment is finished, you can view the results by running the the mlflow command: `mlflow ui --port 5000` inside the directory `/tmp/nvflare/jobs/workdir/server/simulate_job/`.

Two distinct types of `Pipe` are implemented:
Please refer to MLflow examples and documentation for more information.

1. FilePipe:
- The `FilePipe` utilizes the file system for communication, involving read and write operations to a file.
- Suitable when the NVFlare client and the external system/process share a common file system.
- Ideal for scenarios where data exchange frequency is not high; however, it may not be efficient for high-frequency exchanges.

2. CellPipe:
- The `CellPipe` leverages the `Cell` from NVFlare's foundation layer (f3) for communication.
This allows it to make use of drivers from the f3 layer, such as TCP, GRPC, HTTP, and any customized drivers.
## Sub-process Client API

- Recommended for scenarios with a high frequency of data exchange (for example metrics logging)
or when the file system is beyond your control.
The `ScriptRunner` with `launch_external_process=True` uses the `ClientAPILauncherExecutor` for external process script execution.
This configuration is ideal for scenarios requiring multi-GPU or distributed PyTorch training.

You can also implement your own `Pipe`, please refer to https://github.com/NVIDIA/NVFlare/blob/main/nvflare/fuel/utils/pipe/pipe.py
### Launching the script

So far, we have demonstrated how to use the `FilePipe`.
The following example illustrates how to use the `CellPipe`.

First, let's create the job using the sag_np_cell_pipe template

```bash
nvflare job create -force -j ./jobs/np_loop_cell_pipe -w sag_np_cell_pipe -sd ./code/ \
-f config_fed_client.conf app_script=train_loop.py params_transfer_type=FULL launch_once=true
```
When launching a script in an external process, it is launched once for the entire job.
We must ensure our training script [./src/train_full.py](./src/train_full.py) is in a loop to support this.

Then we can run it using the NVFlare Simulator:

```bash
nvflare simulator -n 2 -t 2 ./jobs/np_loop_cell_pipe -w np_loop_cell_pipe_workspace
python3 np_client_api_job.py --script src/train_full.py --launch_process
```

## Launch once for the whole job and with metrics streaming
### Metrics streaming

Sometimes we want to stream the training progress to the server.
In this example we use `MLflowWriter` in [./src/train_metrics.py](./src/train_metrics.py) and configure a corresponding `MLflowReceiver` in the job script [np_client_api_job.py](np_client_api_job.py)

We have several ways of doing that:

- `SummaryWriter` mimics Tensorboard `SummaryWriter`'s `add_scalar`, `add_scalars` method
- `WandBWriter` mimics Weights And Biases's `log` method
- `MLflowWriter` mimics MLflow's tracking api
- `flare.log` is the underlying common pattern that can be directly used as well, you need to figure out the
corresponding `AnalyticsDataType` for your value

We showcase `MLflowWriter` in [./code/train_metrics.py](./code/train_metrics.py)

After that, we can set up the job using the sag_np_metrics template:

```bash
nvflare job create -force -j ./jobs/np_metrics -w sag_np_metrics -sd ./code/ \
-f config_fed_client.conf app_script=train_metrics.py params_transfer_type=DIFF launch_once=true \
-f config_fed_server.conf expected_data_kind=WEIGHT_DIFF
```

Once the job is set up, we can run it using the NVFlare Simulator:
Then we can run it using the NVFlare Simulator:

```bash
nvflare simulator -n 2 -t 2 ./jobs/np_metrics -w np_metrics_workspace
python3 np_client_api_job.py --script src/train_metrics.py --launch_process
```

Keep in mind that the difference between sag_np_cell_pipe and sag_np_metrics is the
addition of components like "metrics_pipe," "metric_relayer," and "event_to_fed."
These components allow values from an external process to be sent back to the server.

59 changes: 0 additions & 59 deletions examples/hello-world/ml-to-fl/np/code/train_full.py

This file was deleted.

82 changes: 82 additions & 0 deletions examples/hello-world/ml-to-fl/np/np_client_api_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse

from nvflare import FedJob
from nvflare.app_common.np.np_model_persistor import NPModelPersistor
from nvflare.app_common.workflows.fedavg import FedAvg
from nvflare.app_opt.tracking.mlflow.mlflow_receiver import MLflowReceiver
from nvflare.job_config.script_runner import FrameworkType, ScriptRunner


def define_parser():
parser = argparse.ArgumentParser()
parser.add_argument("--n_clients", type=int, default=2)
parser.add_argument("--num_rounds", type=int, default=5)
parser.add_argument("--script", type=str, default="src/train_full.py")
parser.add_argument("--launch_process", action=argparse.BooleanOptionalAction, default=False)
parser.add_argument("--export_config", action=argparse.BooleanOptionalAction, default=False)

return parser.parse_args()


def main():
# define local parameters
args = define_parser()

n_clients = args.n_clients
num_rounds = args.num_rounds
script = args.script
launch_process = args.launch_process
export_config = args.export_config

job = FedJob(name="np_client_api")

persistor_id = job.to_server(NPModelPersistor(), "persistor")

# Define the controller workflow and send to server
controller = FedAvg(num_clients=n_clients, num_rounds=num_rounds, persistor_id=persistor_id)
job.to_server(controller)

# Add MLflow Receiver for metrics streaming
if script == "src/train_metrics.py":
receiver = MLflowReceiver(
tracking_uri="file:///tmp/nvflare/jobs/workdir/server/simulate_job/mlruns",
kwargs={
"experiment_name": "nvflare-fedavg-np-experiment",
"run_name": "nvflare-fedavg-np-with-mlflow",
"experiment_tags": {"mlflow.note.content": "## **NVFlare FedAvg Numpy experiment with MLflow**"},
"run_tags": {"mlflow.note.content": "## Federated Experiment tracking with MLflow.\n"},
},
artifact_location="artifacts",
events=["fed.analytix_log_stats"],
)
job.to_server(receiver)

executor = ScriptRunner(
script=script,
launch_external_process=launch_process,
framework=FrameworkType.NUMPY,
)
job.to_clients(executor)

if export_config:
job.export_job("/tmp/nvflare/jobs/job_config")
else:
job.simulator_run("/tmp/nvflare/jobs/workdir", n_clients=n_clients, gpu="0")


if __name__ == "__main__":
main()
Loading
Loading