Skip to content

Commit

Permalink
Integrate Fluid with Kubeflow Pipline (#3694)
Browse files Browse the repository at this point in the history
* add create alluxioruntime kfp

Signed-off-by: wang-mask <[email protected]>

* add rbac

Signed-off-by: wang-mask <[email protected]>

* add more components and a simple CNN sample

Signed-off-by: ZhangXiaozheng <[email protected]>

* add readme

Signed-off-by: wang-mask <[email protected]>

* add more notes to the README

Signed-off-by: ZhangXiaozheng <[email protected]>

---------

Signed-off-by: wang-mask <[email protected]>
Signed-off-by: ZhangXiaozheng <[email protected]>
Co-authored-by: wang-mask <[email protected]>
  • Loading branch information
zhang-x-z and wang-mask committed Jan 21, 2024
1 parent 5e13f2c commit d98b4ca
Show file tree
Hide file tree
Showing 11 changed files with 1,445 additions and 0 deletions.
37 changes: 37 additions & 0 deletions integration/kubeflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Demo - Using FLuid in Kubeflow Pipelines v2
This is a demo that wraps Fluid'operations as KFP v2 components to complete the processes of dataset creation, runtime creation, and cache preheating to accelerate model training (a simple CNN model for [Fashion MNIST](https://www.kaggle.com/datasets/zalando-research/fashionmnist)).

## Prerequisites

### Installation
- [Fluid](https://github.com/fluid-cloudnative/fluid)

- [Kubeflow Pipelines v2](https://www.kubeflow.org/docs/components/pipelines/v2/)

Please refer to the [Fluid installation guide](https://github.com/fluid-cloudnative/fluid/blob/master/docs/zh/userguide/install.md) and [KFP v2 installation guide](https://www.kubeflow.org/docs/components/pipelines/v2/installation/quickstart/) to complete the installation of Fluid and KFP v2.

### Dataset
- [Fashion MNIST](https://www.kaggle.com/datasets/zalando-research/fashionmnist)

You should upload `fashion-mnist_train.csv` and `fashion-mnist_test.csv` to your Amazon S3 (or any S3-compatible storage, such as [MinIO](https://min.io/)) and deploy the [s3-secret.yaml](./s3-secret.yaml) to provide the access key.

### RBAC
Because KFP components require access or modification permissions to Fluid resources, it is necessary to deploy [rbac.yaml](./rbac.yaml) in advance to grant permissions.


## Demo
The sample is composed of dataset creation, alluxioruntime creation, data preloading ,and model training.

The simple pipeline provides the following parameters:
- batch_size: int
- dataset_name: str
- epochs: int
- learning_rate: float
- mount_point: str
- mount_s3_endpoint: str
- mount_s3_region: str
- namespace: str (For now, this value should be the namespace in which you deploy your KFP)

If you want to run the sample, you can upload the [train-cnn-for-fashion-mnist-pipline.yaml](./pipline-yaml/train-cnn-for-fashion-mnist-pipline.yaml) to your pipeline dashboard UI and fill in these parameters.

Most importantly, this is just an example of packaging Fluid operations into KFP components, and users need to develop KFP components according to their own needs.
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# PIPELINE DEFINITION
# Name: cleanup-dataset-and-alluxio-runtime
# Inputs:
# dataset_name: str
# namespace: str
components:
comp-cleanup-dataset-and-alluxio-runtime:
executorLabel: exec-cleanup-dataset-and-alluxio-runtime
inputDefinitions:
parameters:
dataset_name:
parameterType: STRING
namespace:
parameterType: STRING
deploymentSpec:
executors:
exec-cleanup-dataset-and-alluxio-runtime:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- cleanup_dataset_and_alluxio_runtime
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'git+https://github.com/fluid-cloudnative/fluid-client-python.git'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef cleanup_dataset_and_alluxio_runtime(dataset_name: str, namespace:\
\ str):\n import logging\n from fluid import FluidClient\n\n fluid_client\
\ = FluidClient()\n fluid_client.delete_runtime(name=dataset_name, namespace=namespace,\
\ runtime_type=\"alluxio\", wait_until_cleaned_up=True)\n fluid_client.delete_dataset(name=dataset_name,\
\ namespace=namespace, wait_until_cleaned_up=True)\n\n logging.info(f\"\
Cleanup Dataset and AlluxioRuntime \\\"{namespace}/{dataset_name}\\\" successfully!\"\
)\n\n"
image: python:3.7
pipelineInfo:
name: cleanup-dataset-and-alluxio-runtime
root:
dag:
tasks:
cleanup-dataset-and-alluxio-runtime:
cachingOptions:
enableCache: true
componentRef:
name: comp-cleanup-dataset-and-alluxio-runtime
inputs:
parameters:
dataset_name:
componentInputParameter: dataset_name
namespace:
componentInputParameter: namespace
taskInfo:
name: cleanup-dataset-and-alluxio-runtime
inputDefinitions:
parameters:
dataset_name:
parameterType: STRING
namespace:
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.4.0
75 changes: 75 additions & 0 deletions integration/kubeflow/component-yaml/cleanup-preheat-operation.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# PIPELINE DEFINITION
# Name: cleanup-preheat-operation
# Inputs:
# dataset_name: str
# namespace: str
components:
comp-cleanup-preheat-operation:
executorLabel: exec-cleanup-preheat-operation
inputDefinitions:
parameters:
dataset_name:
parameterType: STRING
namespace:
parameterType: STRING
deploymentSpec:
executors:
exec-cleanup-preheat-operation:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- cleanup_preheat_operation
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'git+https://github.com/fluid-cloudnative/fluid-client-python.git'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef cleanup_preheat_operation(dataset_name: str, namespace: str):\n\
\ import logging\n from fluid import FluidClient\n\n fluid_client\
\ = FluidClient()\n fluid_client.delete_data_operation(name=\"%s-loader\"\
\ % dataset_name, data_op_type=\"dataload\", namespace=namespace)\n logging.info(\"\
Cleanup preheat dataset operation successfully!\")\n\n"
image: python:3.7
pipelineInfo:
name: cleanup-preheat-operation
root:
dag:
tasks:
cleanup-preheat-operation:
cachingOptions:
enableCache: true
componentRef:
name: comp-cleanup-preheat-operation
inputs:
parameters:
dataset_name:
componentInputParameter: dataset_name
namespace:
componentInputParameter: namespace
taskInfo:
name: cleanup-preheat-operation
inputDefinitions:
parameters:
dataset_name:
parameterType: STRING
namespace:
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.4.0
85 changes: 85 additions & 0 deletions integration/kubeflow/component-yaml/create-alluxioruntime.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# PIPELINE DEFINITION
# Name: create-alluxio-runtime
# Inputs:
# dataset_name: str
# namespace: str
components:
comp-create-alluxio-runtime:
executorLabel: exec-create-alluxio-runtime
inputDefinitions:
parameters:
dataset_name:
parameterType: STRING
namespace:
parameterType: STRING
deploymentSpec:
executors:
exec-create-alluxio-runtime:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- create_alluxio_runtime
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\
\ python3 -m pip install --quiet --no-warn-script-location 'git+https://github.com/fluid-cloudnative/fluid-client-python.git'\
\ && \"$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef create_alluxio_runtime(dataset_name: str, namespace: str):\n\
\ import logging\n from fluid import AlluxioRuntime, AlluxioRuntimeSpec,\
\ models, FluidClient\n from kubernetes import client as k8s_client\n\
\n fluid_client = FluidClient()\n\n FLUID_GROUP = \"data.fluid.io\"\
\n FLUID_VERSION = \"v1alpha1\"\n\n replicas = 1\n\n # This is\
\ the simplest configuration for AlluxioRuntime, you can change the AlluxioRuntime\
\ according to your needs\n alluxio_runtime = AlluxioRuntime(\n \
\ api_version=\"%s/%s\" % (FLUID_GROUP, FLUID_VERSION),\n kind=\"\
AlluxioRuntime\",\n metadata=k8s_client.V1ObjectMeta(\n \
\ name=dataset_name,\n namespace=namespace\n ),\n \
\ spec=AlluxioRuntimeSpec(\n replicas=replicas,\n \
\ tieredstore=models.TieredStore([models.Level('0.95', '0.7', 'MEM',\
\ '/dev/shm', '2Gi', volume_type=None)])\n )\n )\n\n fluid_client.create_runtime(alluxio_runtime)\n\
\n\n logging.info(f\"Runtime \\\"{alluxio_runtime.metadata.namespace}/{alluxio_runtime.metadata.name}\\\
\" created successfully\")\n\n"
image: python:3.7
pipelineInfo:
name: create-alluxio-runtime
root:
dag:
tasks:
create-alluxio-runtime:
cachingOptions:
enableCache: true
componentRef:
name: comp-create-alluxio-runtime
inputs:
parameters:
dataset_name:
componentInputParameter: dataset_name
namespace:
componentInputParameter: namespace
taskInfo:
name: create-alluxio-runtime
inputDefinitions:
parameters:
dataset_name:
parameterType: STRING
namespace:
parameterType: STRING
schemaVersion: 2.1.0
sdkVersion: kfp-2.4.0
Loading

0 comments on commit d98b4ca

Please sign in to comment.