diff --git a/integration/kubeflow/README.md b/integration/kubeflow/README.md new file mode 100644 index 00000000000..da1e9fe6336 --- /dev/null +++ b/integration/kubeflow/README.md @@ -0,0 +1,37 @@ +# Demo - Using FLuid in Kubeflow Pipelines v2 +This is a demo that wraps Fluid'operations as KFP v2 components to complete the processes of dataset creation, runtime creation, and cache preheating to accelerate model training (a simple CNN model for [Fashion MNIST](https://www.kaggle.com/datasets/zalando-research/fashionmnist)). + +## Prerequisites + +### Installation +- [Fluid](https://github.com/fluid-cloudnative/fluid) + +- [Kubeflow Pipelines v2](https://www.kubeflow.org/docs/components/pipelines/v2/) + +Please refer to the [Fluid installation guide](https://github.com/fluid-cloudnative/fluid/blob/master/docs/zh/userguide/install.md) and [KFP v2 installation guide](https://www.kubeflow.org/docs/components/pipelines/v2/installation/quickstart/) to complete the installation of Fluid and KFP v2. + +### Dataset +- [Fashion MNIST](https://www.kaggle.com/datasets/zalando-research/fashionmnist) + +You should upload `fashion-mnist_train.csv` and `fashion-mnist_test.csv` to your Amazon S3 (or any S3-compatible storage, such as [MinIO](https://min.io/)) and deploy the [s3-secret.yaml](./s3-secret.yaml) to provide the access key. + +### RBAC +Because KFP components require access or modification permissions to Fluid resources, it is necessary to deploy [rbac.yaml](./rbac.yaml) in advance to grant permissions. + + +## Demo +The sample is composed of dataset creation, alluxioruntime creation, data preloading ,and model training. + +The simple pipeline provides the following parameters: +- batch_size: int +- dataset_name: str +- epochs: int +- learning_rate: float +- mount_point: str +- mount_s3_endpoint: str +- mount_s3_region: str +- namespace: str (For now, this value should be the namespace in which you deploy your KFP) + +If you want to run the sample, you can upload the [train-cnn-for-fashion-mnist-pipline.yaml](./pipline-yaml/train-cnn-for-fashion-mnist-pipline.yaml) to your pipeline dashboard UI and fill in these parameters. + +Most importantly, this is just an example of packaging Fluid operations into KFP components, and users need to develop KFP components according to their own needs. \ No newline at end of file diff --git a/integration/kubeflow/component-yaml/cleanup-dataset-and-alluxioruntime.yaml b/integration/kubeflow/component-yaml/cleanup-dataset-and-alluxioruntime.yaml new file mode 100644 index 00000000000..f74a28db381 --- /dev/null +++ b/integration/kubeflow/component-yaml/cleanup-dataset-and-alluxioruntime.yaml @@ -0,0 +1,77 @@ +# PIPELINE DEFINITION +# Name: cleanup-dataset-and-alluxio-runtime +# Inputs: +# dataset_name: str +# namespace: str +components: + comp-cleanup-dataset-and-alluxio-runtime: + executorLabel: exec-cleanup-dataset-and-alluxio-runtime + inputDefinitions: + parameters: + dataset_name: + parameterType: STRING + namespace: + parameterType: STRING +deploymentSpec: + executors: + exec-cleanup-dataset-and-alluxio-runtime: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - cleanup_dataset_and_alluxio_runtime + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'git+https://github.com/fluid-cloudnative/fluid-client-python.git'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef cleanup_dataset_and_alluxio_runtime(dataset_name: str, namespace:\ + \ str):\n import logging\n from fluid import FluidClient\n\n fluid_client\ + \ = FluidClient()\n fluid_client.delete_runtime(name=dataset_name, namespace=namespace,\ + \ runtime_type=\"alluxio\", wait_until_cleaned_up=True)\n fluid_client.delete_dataset(name=dataset_name,\ + \ namespace=namespace, wait_until_cleaned_up=True)\n\n logging.info(f\"\ + Cleanup Dataset and AlluxioRuntime \\\"{namespace}/{dataset_name}\\\" successfully!\"\ + )\n\n" + image: python:3.7 +pipelineInfo: + name: cleanup-dataset-and-alluxio-runtime +root: + dag: + tasks: + cleanup-dataset-and-alluxio-runtime: + cachingOptions: + enableCache: true + componentRef: + name: comp-cleanup-dataset-and-alluxio-runtime + inputs: + parameters: + dataset_name: + componentInputParameter: dataset_name + namespace: + componentInputParameter: namespace + taskInfo: + name: cleanup-dataset-and-alluxio-runtime + inputDefinitions: + parameters: + dataset_name: + parameterType: STRING + namespace: + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.4.0 diff --git a/integration/kubeflow/component-yaml/cleanup-preheat-operation.yaml b/integration/kubeflow/component-yaml/cleanup-preheat-operation.yaml new file mode 100644 index 00000000000..25dfa45bb1d --- /dev/null +++ b/integration/kubeflow/component-yaml/cleanup-preheat-operation.yaml @@ -0,0 +1,75 @@ +# PIPELINE DEFINITION +# Name: cleanup-preheat-operation +# Inputs: +# dataset_name: str +# namespace: str +components: + comp-cleanup-preheat-operation: + executorLabel: exec-cleanup-preheat-operation + inputDefinitions: + parameters: + dataset_name: + parameterType: STRING + namespace: + parameterType: STRING +deploymentSpec: + executors: + exec-cleanup-preheat-operation: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - cleanup_preheat_operation + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'git+https://github.com/fluid-cloudnative/fluid-client-python.git'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef cleanup_preheat_operation(dataset_name: str, namespace: str):\n\ + \ import logging\n from fluid import FluidClient\n\n fluid_client\ + \ = FluidClient()\n fluid_client.delete_data_operation(name=\"%s-loader\"\ + \ % dataset_name, data_op_type=\"dataload\", namespace=namespace)\n logging.info(\"\ + Cleanup preheat dataset operation successfully!\")\n\n" + image: python:3.7 +pipelineInfo: + name: cleanup-preheat-operation +root: + dag: + tasks: + cleanup-preheat-operation: + cachingOptions: + enableCache: true + componentRef: + name: comp-cleanup-preheat-operation + inputs: + parameters: + dataset_name: + componentInputParameter: dataset_name + namespace: + componentInputParameter: namespace + taskInfo: + name: cleanup-preheat-operation + inputDefinitions: + parameters: + dataset_name: + parameterType: STRING + namespace: + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.4.0 diff --git a/integration/kubeflow/component-yaml/create-alluxioruntime.yaml b/integration/kubeflow/component-yaml/create-alluxioruntime.yaml new file mode 100644 index 00000000000..953db9f2579 --- /dev/null +++ b/integration/kubeflow/component-yaml/create-alluxioruntime.yaml @@ -0,0 +1,85 @@ +# PIPELINE DEFINITION +# Name: create-alluxio-runtime +# Inputs: +# dataset_name: str +# namespace: str +components: + comp-create-alluxio-runtime: + executorLabel: exec-create-alluxio-runtime + inputDefinitions: + parameters: + dataset_name: + parameterType: STRING + namespace: + parameterType: STRING +deploymentSpec: + executors: + exec-create-alluxio-runtime: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - create_alluxio_runtime + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'git+https://github.com/fluid-cloudnative/fluid-client-python.git'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef create_alluxio_runtime(dataset_name: str, namespace: str):\n\ + \ import logging\n from fluid import AlluxioRuntime, AlluxioRuntimeSpec,\ + \ models, FluidClient\n from kubernetes import client as k8s_client\n\ + \n fluid_client = FluidClient()\n\n FLUID_GROUP = \"data.fluid.io\"\ + \n FLUID_VERSION = \"v1alpha1\"\n\n replicas = 1\n\n # This is\ + \ the simplest configuration for AlluxioRuntime, you can change the AlluxioRuntime\ + \ according to your needs\n alluxio_runtime = AlluxioRuntime(\n \ + \ api_version=\"%s/%s\" % (FLUID_GROUP, FLUID_VERSION),\n kind=\"\ + AlluxioRuntime\",\n metadata=k8s_client.V1ObjectMeta(\n \ + \ name=dataset_name,\n namespace=namespace\n ),\n \ + \ spec=AlluxioRuntimeSpec(\n replicas=replicas,\n \ + \ tieredstore=models.TieredStore([models.Level('0.95', '0.7', 'MEM',\ + \ '/dev/shm', '2Gi', volume_type=None)])\n )\n )\n\n fluid_client.create_runtime(alluxio_runtime)\n\ + \n\n logging.info(f\"Runtime \\\"{alluxio_runtime.metadata.namespace}/{alluxio_runtime.metadata.name}\\\ + \" created successfully\")\n\n" + image: python:3.7 +pipelineInfo: + name: create-alluxio-runtime +root: + dag: + tasks: + create-alluxio-runtime: + cachingOptions: + enableCache: true + componentRef: + name: comp-create-alluxio-runtime + inputs: + parameters: + dataset_name: + componentInputParameter: dataset_name + namespace: + componentInputParameter: namespace + taskInfo: + name: create-alluxio-runtime + inputDefinitions: + parameters: + dataset_name: + parameterType: STRING + namespace: + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.4.0 diff --git a/integration/kubeflow/component-yaml/create-s3-dataset.yaml b/integration/kubeflow/component-yaml/create-s3-dataset.yaml new file mode 100644 index 00000000000..e01b47b3ab4 --- /dev/null +++ b/integration/kubeflow/component-yaml/create-s3-dataset.yaml @@ -0,0 +1,121 @@ +# PIPELINE DEFINITION +# Name: create-s3-dataset +# Inputs: +# dataset_name: str +# mount_point: str +# mount_s3_endpoint: str +# mount_s3_region: str +# namespace: str +components: + comp-create-s3-dataset: + executorLabel: exec-create-s3-dataset + inputDefinitions: + parameters: + dataset_name: + parameterType: STRING + mount_point: + parameterType: STRING + mount_s3_endpoint: + parameterType: STRING + mount_s3_region: + parameterType: STRING + namespace: + parameterType: STRING +deploymentSpec: + executors: + exec-create-s3-dataset: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - create_s3_dataset + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'git+https://github.com/fluid-cloudnative/fluid-client-python.git'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef create_s3_dataset(dataset_name: str, namespace: str, mount_point:\ + \ str, mount_s3_endpoint: str, mount_s3_region: str):\n import logging\n\ + \ import fluid\n from kubernetes import client\n fluid_client =\ + \ fluid.FluidClient()\n\n FLUID_GROUP = \"data.fluid.io\"\n FLUID_VERSION\ + \ = \"v1alpha1\"\n\n # This is an sample which use some pre-defined options.\n\ + \ # Users can change these code customily\n dataset = fluid.Dataset(\n\ + \ api_version=\"%s/%s\" % (FLUID_GROUP, FLUID_VERSION),\n \ + \ kind=\"Dataset\",\n metadata=client.V1ObjectMeta(\n \ + \ name=dataset_name,\n namespace=namespace\n ),\n \ + \ spec=fluid.DatasetSpec(\n mounts=[\n fluid.Mount(\n\ + \ mount_point=mount_point,\n name=dataset_name,\n\ + \ options={\n \"alluxio.underfs.s3.endpoint\"\ + : mount_s3_endpoint,\n \"alluxio.underfs.s3.endpoint.region\"\ + : mount_s3_region,\n \"alluxio.underfs.s3.disable.dns.buckets\"\ + : \"true\",\n \"alluxio.underfs.s3.disable.inherit.acl\"\ + : \"false\"\n },\n encrypt_options=[\n\ + \ {\n \"name\": \"aws.accessKeyId\"\ + ,\n \"valueFrom\": {\n \ + \ \"secretKeyRef\": {\n \"name\":\ + \ \"s3-secret\",\n \"key\": \"aws.accessKeyId\"\ + \n }\n }\n \ + \ },\n {\n \ + \ \"name\": \"aws.secretKey\",\n \"valueFrom\"\ + : {\n \"secretKeyRef\": {\n \ + \ \"name\": \"s3-secret\",\n \ + \ \"key\": \"aws.secretKey\"\n }\n \ + \ }\n }\n \ + \ ]\n )\n ]\n )\n )\n\n fluid_client.create_dataset(dataset)\n\ + \n logging.info(f\"Dataset \\\"{dataset.metadata.namespace}/{dataset.metadata.name}\\\ + \" created successfully\")\n\n" + image: python:3.7 +pipelineInfo: + name: create-s3-dataset +root: + dag: + tasks: + create-s3-dataset: + cachingOptions: + enableCache: true + componentRef: + name: comp-create-s3-dataset + inputs: + parameters: + dataset_name: + componentInputParameter: dataset_name + mount_point: + componentInputParameter: mount_point + mount_s3_endpoint: + componentInputParameter: mount_s3_endpoint + mount_s3_region: + componentInputParameter: mount_s3_region + namespace: + componentInputParameter: namespace + taskInfo: + name: create-s3-dataset + inputDefinitions: + parameters: + dataset_name: + parameterType: STRING + mount_point: + parameterType: STRING + mount_s3_endpoint: + parameterType: STRING + mount_s3_region: + parameterType: STRING + namespace: + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.4.0 diff --git a/integration/kubeflow/component-yaml/preheat-dataset.yaml b/integration/kubeflow/component-yaml/preheat-dataset.yaml new file mode 100644 index 00000000000..145adb92a31 --- /dev/null +++ b/integration/kubeflow/component-yaml/preheat-dataset.yaml @@ -0,0 +1,82 @@ +# PIPELINE DEFINITION +# Name: preheat-dataset +# Inputs: +# dataset_name: str +# namespace: str +components: + comp-preheat-dataset: + executorLabel: exec-preheat-dataset + inputDefinitions: + parameters: + dataset_name: + parameterType: STRING + namespace: + parameterType: STRING +deploymentSpec: + executors: + exec-preheat-dataset: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - preheat_dataset + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'git+https://github.com/fluid-cloudnative/fluid-client-python.git'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef preheat_dataset(dataset_name: str, namespace: str):\n import\ + \ logging\n from fluid import DataLoad, DataLoadSpec, FluidClient\n \ + \ from kubernetes import client as k8s_client\n\n fluid_client = FluidClient()\n\ + \n FLUID_GROUP = \"data.fluid.io\"\n FLUID_VERSION = \"v1alpha1\"\n\ + \n dataload = DataLoad(\n api_version=\"%s/%s\" % (FLUID_GROUP,\ + \ FLUID_VERSION),\n kind=\"DataLoad\",\n metadata=k8s_client.V1ObjectMeta(\n\ + \ name=\"%s-loader\" % dataset_name,\n namespace=namespace\n\ + \ ),\n spec=DataLoadSpec(\n dataset={\n \ + \ \"name\": dataset_name,\n \"namespace\": namespace\n\ + \ }\n )\n )\n\n fluid_client.create_data_operation(data_op=dataload,\ + \ wait=True)\n\n logging.info(f\"Load Dataset \\\"{namespace}/{dataset_name}\\\ + \" successfully\")\n\n" + image: python:3.7 +pipelineInfo: + name: preheat-dataset +root: + dag: + tasks: + preheat-dataset: + cachingOptions: + enableCache: true + componentRef: + name: comp-preheat-dataset + inputs: + parameters: + dataset_name: + componentInputParameter: dataset_name + namespace: + componentInputParameter: namespace + taskInfo: + name: preheat-dataset + inputDefinitions: + parameters: + dataset_name: + parameterType: STRING + namespace: + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.4.0 diff --git a/integration/kubeflow/fluid_components.py b/integration/kubeflow/fluid_components.py new file mode 100644 index 00000000000..ed3f494fa08 --- /dev/null +++ b/integration/kubeflow/fluid_components.py @@ -0,0 +1,168 @@ +# Copyright 2024 The Fluid Authors. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kfp import dsl, compiler + +# Create a Fluid dataset which contains data in S3. +@dsl.component(packages_to_install=['git+https://github.com/fluid-cloudnative/fluid-client-python.git']) +def create_s3_dataset(dataset_name: str, namespace: str, mount_point: str, mount_s3_endpoint: str, mount_s3_region: str): + import logging + import fluid + from kubernetes import client + fluid_client = fluid.FluidClient() + + FLUID_GROUP = "data.fluid.io" + FLUID_VERSION = "v1alpha1" + + # This is an sample which use some pre-defined options. + # Users can change these code customily + dataset = fluid.Dataset( + api_version="%s/%s" % (FLUID_GROUP, FLUID_VERSION), + kind="Dataset", + metadata=client.V1ObjectMeta( + name=dataset_name, + namespace=namespace + ), + spec=fluid.DatasetSpec( + mounts=[ + fluid.Mount( + mount_point=mount_point, + name=dataset_name, + options={ + "alluxio.underfs.s3.endpoint": mount_s3_endpoint, + "alluxio.underfs.s3.endpoint.region": mount_s3_region, + "alluxio.underfs.s3.disable.dns.buckets": "true", + "alluxio.underfs.s3.disable.inherit.acl": "false" + }, + encrypt_options=[ + { + "name": "aws.accessKeyId", + "valueFrom": { + "secretKeyRef": { + "name": "s3-secret", + "key": "aws.accessKeyId" + } + } + }, + { + "name": "aws.secretKey", + "valueFrom": { + "secretKeyRef": { + "name": "s3-secret", + "key": "aws.secretKey" + } + } + } + ] + ) + ] + ) + ) + + fluid_client.create_dataset(dataset) + + logging.info(f"Dataset \"{dataset.metadata.namespace}/{dataset.metadata.name}\" created successfully") + +# Deploy a simple AlluxioRuntime +@dsl.component(packages_to_install=['git+https://github.com/fluid-cloudnative/fluid-client-python.git']) +def create_alluxio_runtime(dataset_name: str, namespace: str): + import logging + from fluid import AlluxioRuntime, AlluxioRuntimeSpec, models, FluidClient + from kubernetes import client as k8s_client + + fluid_client = FluidClient() + + FLUID_GROUP = "data.fluid.io" + FLUID_VERSION = "v1alpha1" + + replicas = 1 + + # This is the simplest configuration for AlluxioRuntime, you can change the AlluxioRuntime according to your needs + alluxio_runtime = AlluxioRuntime( + api_version="%s/%s" % (FLUID_GROUP, FLUID_VERSION), + kind="AlluxioRuntime", + metadata=k8s_client.V1ObjectMeta( + name=dataset_name, + namespace=namespace + ), + spec=AlluxioRuntimeSpec( + replicas=replicas, + tieredstore=models.TieredStore([models.Level('0.95', '0.7', 'MEM', '/dev/shm', '2Gi', volume_type=None)]) + ) + ) + + fluid_client.create_runtime(alluxio_runtime) + + + logging.info(f"Runtime \"{alluxio_runtime.metadata.namespace}/{alluxio_runtime.metadata.name}\" created successfully") + +# Preheat the dataset with specific dataset name and namespace +@dsl.component(packages_to_install=['git+https://github.com/fluid-cloudnative/fluid-client-python.git']) +def preheat_dataset(dataset_name: str, namespace: str): + import logging + from fluid import DataLoad, DataLoadSpec, FluidClient + from kubernetes import client as k8s_client + + fluid_client = FluidClient() + + FLUID_GROUP = "data.fluid.io" + FLUID_VERSION = "v1alpha1" + + dataload = DataLoad( + api_version="%s/%s" % (FLUID_GROUP, FLUID_VERSION), + kind="DataLoad", + metadata=k8s_client.V1ObjectMeta( + name="%s-loader" % dataset_name, + namespace=namespace + ), + spec=DataLoadSpec( + dataset={ + "name": dataset_name, + "namespace": namespace + } + ) + ) + + fluid_client.create_data_operation(data_op=dataload, wait=True) + + logging.info(f"Load Dataset \"{namespace}/{dataset_name}\" successfully") + +# Cleanup the dataset along with the corresponding alluxioruntime +@dsl.component(packages_to_install=['git+https://github.com/fluid-cloudnative/fluid-client-python.git']) +def cleanup_dataset_and_alluxio_runtime(dataset_name: str, namespace: str): + import logging + from fluid import FluidClient + + fluid_client = FluidClient() + fluid_client.delete_runtime(name=dataset_name, namespace=namespace, runtime_type="alluxio", wait_until_cleaned_up=True) + fluid_client.delete_dataset(name=dataset_name, namespace=namespace, wait_until_cleaned_up=True) + + logging.info(f"Cleanup Dataset and AlluxioRuntime \"{namespace}/{dataset_name}\" successfully!") + +# Cleanup the preheat dataset operation +@dsl.component(packages_to_install=['git+https://github.com/fluid-cloudnative/fluid-client-python.git']) +def cleanup_preheat_operation(dataset_name: str, namespace: str): + import logging + from fluid import FluidClient + + fluid_client = FluidClient() + fluid_client.delete_data_operation(name="%s-loader" % dataset_name, data_op_type="dataload", namespace=namespace) + logging.info("Cleanup preheat dataset operation successfully!") + +# Re-run this file when you changed code above to re-generate components' yaml file. +compiler.Compiler().compile(create_s3_dataset, "./component-yaml/create-s3-dataset.yaml") +compiler.Compiler().compile(create_alluxio_runtime, "./component-yaml/create-alluxioruntime.yaml") +compiler.Compiler().compile(preheat_dataset, "./component-yaml/preheat-dataset.yaml") +compiler.Compiler().compile(cleanup_dataset_and_alluxio_runtime, "./component-yaml/cleanup-dataset-and-alluxioruntime.yaml") +compiler.Compiler().compile(cleanup_preheat_operation, "./component-yaml/cleanup-preheat-operation.yaml") \ No newline at end of file diff --git a/integration/kubeflow/pipline-yaml/train-cnn-for-fashion-mnist-pipline.yaml b/integration/kubeflow/pipline-yaml/train-cnn-for-fashion-mnist-pipline.yaml new file mode 100644 index 00000000000..a649f9b1512 --- /dev/null +++ b/integration/kubeflow/pipline-yaml/train-cnn-for-fashion-mnist-pipline.yaml @@ -0,0 +1,519 @@ +# PIPELINE DEFINITION +# Name: train-cnn-for-fashion-mnist +# Inputs: +# batch_size: int +# dataset_name: str +# epochs: int +# learning_rate: float +# mount_point: str +# mount_s3_endpoint: str +# mount_s3_region: str +# namespace: str +components: + comp-cleanup-dataset-and-alluxio-runtime: + executorLabel: exec-cleanup-dataset-and-alluxio-runtime + inputDefinitions: + parameters: + dataset_name: + parameterType: STRING + namespace: + parameterType: STRING + comp-cleanup-preheat-operation: + executorLabel: exec-cleanup-preheat-operation + inputDefinitions: + parameters: + dataset_name: + parameterType: STRING + namespace: + parameterType: STRING + comp-create-alluxio-runtime: + executorLabel: exec-create-alluxio-runtime + inputDefinitions: + parameters: + dataset_name: + parameterType: STRING + namespace: + parameterType: STRING + comp-create-s3-dataset: + executorLabel: exec-create-s3-dataset + inputDefinitions: + parameters: + dataset_name: + parameterType: STRING + mount_point: + parameterType: STRING + mount_s3_endpoint: + parameterType: STRING + mount_s3_region: + parameterType: STRING + namespace: + parameterType: STRING + comp-preheat-dataset: + executorLabel: exec-preheat-dataset + inputDefinitions: + parameters: + dataset_name: + parameterType: STRING + namespace: + parameterType: STRING + comp-train-simple-cnn: + executorLabel: exec-train-simple-cnn + inputDefinitions: + parameters: + batch_size: + parameterType: NUMBER_INTEGER + data_root_path: + parameterType: STRING + dataset_name: + parameterType: STRING + epochs: + parameterType: NUMBER_INTEGER + learning_rate: + parameterType: NUMBER_DOUBLE +deploymentSpec: + executors: + exec-cleanup-dataset-and-alluxio-runtime: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - cleanup_dataset_and_alluxio_runtime + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'git+https://github.com/fluid-cloudnative/fluid-client-python.git'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef cleanup_dataset_and_alluxio_runtime(dataset_name: str, namespace:\ + \ str):\n import logging\n from fluid import FluidClient\n\n fluid_client\ + \ = FluidClient()\n fluid_client.delete_runtime(name=dataset_name, namespace=namespace,\ + \ runtime_type=\"alluxio\", wait_until_cleaned_up=True)\n fluid_client.delete_dataset(name=dataset_name,\ + \ namespace=namespace, wait_until_cleaned_up=True)\n\n logging.info(f\"\ + Cleanup Dataset and AlluxioRuntime \\\"{namespace}/{dataset_name}\\\" successfully!\"\ + )\n\n" + image: python:3.7 + exec-cleanup-preheat-operation: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - cleanup_preheat_operation + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'git+https://github.com/fluid-cloudnative/fluid-client-python.git'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef cleanup_preheat_operation(dataset_name: str, namespace: str):\n\ + \ import logging\n from fluid import FluidClient\n\n fluid_client\ + \ = FluidClient()\n fluid_client.delete_data_operation(name=\"%s-loader\"\ + \ % dataset_name, data_op_type=\"dataload\", namespace=namespace)\n logging.info(\"\ + Cleanup preheat dataset operation successfully!\")\n\n" + image: python:3.7 + exec-create-alluxio-runtime: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - create_alluxio_runtime + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'git+https://github.com/fluid-cloudnative/fluid-client-python.git'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef create_alluxio_runtime(dataset_name: str, namespace: str):\n\ + \ import logging\n from fluid import AlluxioRuntime, AlluxioRuntimeSpec,\ + \ models, FluidClient\n from kubernetes import client as k8s_client\n\ + \n fluid_client = FluidClient()\n\n FLUID_GROUP = \"data.fluid.io\"\ + \n FLUID_VERSION = \"v1alpha1\"\n\n replicas = 1\n\n # This is\ + \ the simplest configuration for AlluxioRuntime, you can change the AlluxioRuntime\ + \ according to your needs\n alluxio_runtime = AlluxioRuntime(\n \ + \ api_version=\"%s/%s\" % (FLUID_GROUP, FLUID_VERSION),\n kind=\"\ + AlluxioRuntime\",\n metadata=k8s_client.V1ObjectMeta(\n \ + \ name=dataset_name,\n namespace=namespace\n ),\n \ + \ spec=AlluxioRuntimeSpec(\n replicas=replicas,\n \ + \ tieredstore=models.TieredStore([models.Level('0.95', '0.7', 'MEM',\ + \ '/dev/shm', '2Gi', volume_type=None)])\n )\n )\n\n fluid_client.create_runtime(alluxio_runtime)\n\ + \n\n logging.info(f\"Runtime \\\"{alluxio_runtime.metadata.namespace}/{alluxio_runtime.metadata.name}\\\ + \" created successfully\")\n\n" + image: python:3.7 + exec-create-s3-dataset: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - create_s3_dataset + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'git+https://github.com/fluid-cloudnative/fluid-client-python.git'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef create_s3_dataset(dataset_name: str, namespace: str, mount_point:\ + \ str, mount_s3_endpoint: str, mount_s3_region: str):\n import logging\n\ + \ import fluid\n from kubernetes import client\n fluid_client =\ + \ fluid.FluidClient()\n\n FLUID_GROUP = \"data.fluid.io\"\n FLUID_VERSION\ + \ = \"v1alpha1\"\n\n # This is an sample which use some pre-defined options.\n\ + \ # Users can change these code customily\n dataset = fluid.Dataset(\n\ + \ api_version=\"%s/%s\" % (FLUID_GROUP, FLUID_VERSION),\n \ + \ kind=\"Dataset\",\n metadata=client.V1ObjectMeta(\n \ + \ name=dataset_name,\n namespace=namespace\n ),\n \ + \ spec=fluid.DatasetSpec(\n mounts=[\n fluid.Mount(\n\ + \ mount_point=mount_point,\n name=dataset_name,\n\ + \ options={\n \"alluxio.underfs.s3.endpoint\"\ + : mount_s3_endpoint,\n \"alluxio.underfs.s3.endpoint.region\"\ + : mount_s3_region,\n \"alluxio.underfs.s3.disable.dns.buckets\"\ + : \"true\",\n \"alluxio.underfs.s3.disable.inherit.acl\"\ + : \"false\"\n },\n encrypt_options=[\n\ + \ {\n \"name\": \"aws.accessKeyId\"\ + ,\n \"valueFrom\": {\n \ + \ \"secretKeyRef\": {\n \"name\":\ + \ \"s3-secret\",\n \"key\": \"aws.accessKeyId\"\ + \n }\n }\n \ + \ },\n {\n \ + \ \"name\": \"aws.secretKey\",\n \"valueFrom\"\ + : {\n \"secretKeyRef\": {\n \ + \ \"name\": \"s3-secret\",\n \ + \ \"key\": \"aws.secretKey\"\n }\n \ + \ }\n }\n \ + \ ]\n )\n ]\n )\n )\n\n fluid_client.create_dataset(dataset)\n\ + \n logging.info(f\"Dataset \\\"{dataset.metadata.namespace}/{dataset.metadata.name}\\\ + \" created successfully\")\n\n" + image: python:3.7 + exec-preheat-dataset: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - preheat_dataset + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'git+https://github.com/fluid-cloudnative/fluid-client-python.git'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef preheat_dataset(dataset_name: str, namespace: str):\n import\ + \ logging\n from fluid import DataLoad, DataLoadSpec, FluidClient\n \ + \ from kubernetes import client as k8s_client\n\n fluid_client = FluidClient()\n\ + \n FLUID_GROUP = \"data.fluid.io\"\n FLUID_VERSION = \"v1alpha1\"\n\ + \n dataload = DataLoad(\n api_version=\"%s/%s\" % (FLUID_GROUP,\ + \ FLUID_VERSION),\n kind=\"DataLoad\",\n metadata=k8s_client.V1ObjectMeta(\n\ + \ name=\"%s-loader\" % dataset_name,\n namespace=namespace\n\ + \ ),\n spec=DataLoadSpec(\n dataset={\n \ + \ \"name\": dataset_name,\n \"namespace\": namespace\n\ + \ }\n )\n )\n\n fluid_client.create_data_operation(data_op=dataload,\ + \ wait=True)\n\n logging.info(f\"Load Dataset \\\"{namespace}/{dataset_name}\\\ + \" successfully\")\n\n" + image: python:3.7 + exec-train-simple-cnn: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - train_simple_cnn + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.4.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'pandas' &&\ + \ \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef train_simple_cnn(dataset_name: str, data_root_path: str, batch_size:\ + \ int, epochs: int, learning_rate: float):\n import logging\n\n logging.info(\"\ + Training a simple CNN...\")\n\n import os\n import pandas as pd\n\ + \ import torch\n import torch.nn as nn\n import torch.optim as\ + \ optim\n import torch.nn.init as init\n from torchvision import transforms\n\ + \ from torch.utils.data import DataLoader, Dataset, random_split \n\ + \n dataset_path = os.path.join(data_root_path, dataset_name)\n logging.info(\"\ + Dataset file location: \" + dataset_path)\n\n # Check Dataset Path\n\ + \ logging.info(\"Check dataset\")\n for dirname, _, filenames in os.walk(dataset_path):\n\ + \ for filename in filenames:\n logging.info(os.path.join(dirname,\ + \ filename))\n\n # Prepare Data\n logging.info(\"Start load dataset...\"\ + )\n train_data=pd.read_csv(f\"{dataset_path}/fashion-mnist_train.csv\"\ + )\n test_data=pd.read_csv(f\"{dataset_path}/fashion-mnist_test.csv\"\ + )\n logging.info(\"Load dataset successfully!\")\n\n class CustomDataset(Dataset):\n\ + \n def __init__(self,dataframe,transform=None):\n self.dataframe=dataframe\n\ + \ self.transform=transform\n\n\n def __len__(self):\n\n\ + \ return len(self.dataframe)\n\n def __getitem__(self,idx):\n\ + \ label = self.dataframe.iloc[idx, 0]\n image_data\ + \ = self.dataframe.iloc[idx, 1:].values.astype('uint8').reshape((28, 28,\ + \ 1))\n\n if(self.transform):\n image=self.transform(image_data)\n\ + \n return image,label\n\n transform = transforms.Compose([transforms.ToTensor()])\n\ + \n train_dataset=CustomDataset(train_data,transform=transform)\n test_dataset=CustomDataset(test_data,\ + \ transform=transform)\n\n train_size=int(0.8*len(train_dataset))\n \ + \ valid_size=len(train_dataset)-train_size\n\n train_dataset,valid_dataset=random_split(train_dataset,[train_size,valid_size])\n\ + \ train_loader=DataLoader(train_dataset,batch_size=batch_size,shuffle=True)\n\ + \ valid_loader=DataLoader(valid_dataset,batch_size=batch_size)\n test_loader=DataLoader(test_dataset,batch_size=batch_size)\n\ + \n # Build a simple CNN\n class CNN(nn.Module):\n\n def __init__(self,num_classes):\n\ + \ super(CNN, self).__init__()\n self.feature = nn.Sequential(\n\ + \ nn.Conv2d(1,24,kernel_size=3,padding=1),\n \ + \ nn.ReLU(inplace=True),\n nn.MaxPool2d(kernel_size=2),\n\ + \ nn.Conv2d(24,128,kernel_size=3,padding=1),\n \ + \ nn.ReLU(inplace=True),\n nn.MaxPool2d(kernel_size=2)\n\ + \ )\n self.classifier = nn.Sequential(\n \ + \ nn.Linear(128*7*7,48),\n nn.ReLU(inplace=True),\n\ + \ nn.Linear(48,num_classes)\n )\n\n def\ + \ forward(self,x):\n x = self.feature(x)\n x = x.view(x.size(0),-1)\n\ + \ x = self.classifier(x)\n return x \n\n # Train\ + \ this CNN\n def train(model,train_loader,optimizer,criterion, device):\n\ + \ model.train()\n train_loss=0\n correct=0\n \ + \ total=0\n\n for images,labels in train_loader:\n images,labels\ + \ =images.to(device),labels.to(device)\n\n optimizer.zero_grad()\n\ + \ outputs=model(images)\n loss=criterion(outputs,labels)\n\ + \ loss.backward()\n optimizer.step()\n\n \ + \ train_loss+=loss.item()\n _,predicted=outputs.max(1)\n \ + \ total+=labels.size(0)\n correct+=predicted.eq(labels).sum().item()\n\ + \n train_accuracy=100*correct/total\n train_loss/=len(train_loader)\n\ + \ return train_loss,train_accuracy\n\n def validate(model,valid_loader,criterion,device):\n\ + \ model.eval()\n val_loss=0\n correct=0\n total=0\n\ + \n with torch.no_grad():\n for images,labels in valid_loader:\n\ + \ images,labels=images.to(device),labels.to(device)\n\n \ + \ outputs=model(images)\n loss=criterion(outputs,labels)\n\ + \n val_loss+=loss.item()\n _,predicted=outputs.max(1)\n\ + \ total+=labels.size(0)\n correct+=predicted.eq(labels).sum().item()\n\ + \n val_accuracy = 100.0 * correct / total\n val_loss\ + \ /= len(valid_loader)\n return val_loss, val_accuracy\n\n def\ + \ test(model, test_loader, device):\n model.eval()\n correct=0\n\ + \ total=0\n\n with torch.no_grad():\n for images,labels\ + \ in test_loader:\n images,labels=images.to(device),labels.to(device)\n\ + \n outputs=model(images)\n _,predicted=outputs.max(1)\n\ + \ total+=labels.size(0)\n correct+=predicted.eq(labels).sum().item()\n\ + \n test_accuracy = 100.0 * correct / total\n\n return\ + \ test_accuracy\n\n device = torch.device(\"cuda\" if torch.cuda.is_available()\ + \ else \"cpu\")\n cnn = CNN(10).to(device)\n\n criterion = nn.CrossEntropyLoss()\n\ + \ optimizer = optim.Adam(cnn.parameters(),lr=learning_rate)\n\n train_accuracy=[]\n\ + \ validation_accuracy=[]\n train_losses=[]\n validation_losses=[]\n\ + \n logging.info(\"Begin training...\")\n for epoch in range(epochs):\n\ + \ train_loss, train_acc = train(cnn, train_loader, optimizer, criterion,\ + \ device)\n val_loss, val_acc = validate(cnn, valid_loader, criterion,\ + \ device)\n\n train_accuracy.append(train_acc)\n validation_accuracy.append(val_acc)\n\ + \ train_losses.append(train_loss)\n validation_losses.append(val_loss)\n\ + \n\n logging.info(f\"Epoch {epoch+1}/{epochs}: Train Loss: {train_loss:.4f},\ + \ Validation Loss: {val_loss:.4f} Train Accuracy: {train_acc:.2f}%, Validation\ + \ Accuracy: {val_acc:.2f}%\")\n\n logging.info(\"Test the CNN...\")\n\ + \ test_acc = test(cnn, test_loader, device)\n logging.info(f\"Final\ + \ Test Accuracy: {test_acc:.2f}%\")\n\n" + image: bitnami/pytorch:2.1.2 +pipelineInfo: + name: train-cnn-for-fashion-mnist +root: + dag: + tasks: + cleanup-dataset-and-alluxio-runtime: + cachingOptions: {} + componentRef: + name: comp-cleanup-dataset-and-alluxio-runtime + dependentTasks: + - train-simple-cnn + inputs: + parameters: + dataset_name: + componentInputParameter: dataset_name + namespace: + componentInputParameter: namespace + taskInfo: + name: cleanup-dataset-and-alluxio-runtime + triggerPolicy: + strategy: ALL_UPSTREAM_TASKS_COMPLETED + cleanup-preheat-operation: + cachingOptions: {} + componentRef: + name: comp-cleanup-preheat-operation + dependentTasks: + - train-simple-cnn + inputs: + parameters: + dataset_name: + componentInputParameter: dataset_name + namespace: + componentInputParameter: namespace + taskInfo: + name: cleanup-preheat-operation + triggerPolicy: + strategy: ALL_UPSTREAM_TASKS_COMPLETED + create-alluxio-runtime: + cachingOptions: {} + componentRef: + name: comp-create-alluxio-runtime + dependentTasks: + - create-s3-dataset + inputs: + parameters: + dataset_name: + componentInputParameter: dataset_name + namespace: + componentInputParameter: namespace + taskInfo: + name: create-alluxio-runtime + create-s3-dataset: + cachingOptions: {} + componentRef: + name: comp-create-s3-dataset + inputs: + parameters: + dataset_name: + componentInputParameter: dataset_name + mount_point: + componentInputParameter: mount_point + mount_s3_endpoint: + componentInputParameter: mount_s3_endpoint + mount_s3_region: + componentInputParameter: mount_s3_region + namespace: + componentInputParameter: namespace + taskInfo: + name: create-s3-dataset + preheat-dataset: + cachingOptions: {} + componentRef: + name: comp-preheat-dataset + dependentTasks: + - create-alluxio-runtime + inputs: + parameters: + dataset_name: + componentInputParameter: dataset_name + namespace: + componentInputParameter: namespace + taskInfo: + name: preheat-dataset + train-simple-cnn: + cachingOptions: {} + componentRef: + name: comp-train-simple-cnn + dependentTasks: + - preheat-dataset + inputs: + parameters: + batch_size: + componentInputParameter: batch_size + data_root_path: + runtimeValue: + constant: /datasets + dataset_name: + componentInputParameter: dataset_name + epochs: + componentInputParameter: epochs + learning_rate: + componentInputParameter: learning_rate + taskInfo: + name: train-simple-cnn + inputDefinitions: + parameters: + batch_size: + parameterType: NUMBER_INTEGER + dataset_name: + parameterType: STRING + epochs: + parameterType: NUMBER_INTEGER + learning_rate: + parameterType: NUMBER_DOUBLE + mount_point: + parameterType: STRING + mount_s3_endpoint: + parameterType: STRING + mount_s3_region: + parameterType: STRING + namespace: + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.4.0 +--- +platforms: + kubernetes: + deploymentSpec: + executors: + exec-train-simple-cnn: + pvcMount: + - componentInputParameter: dataset_name + mountPath: /datasets diff --git a/integration/kubeflow/rbac.yaml b/integration/kubeflow/rbac.yaml new file mode 100644 index 00000000000..31339108043 --- /dev/null +++ b/integration/kubeflow/rbac.yaml @@ -0,0 +1,27 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: kubeflow-fluid +rules: + - apiGroups: + - data.fluid.io + resources: + - datasets + - datasets/status + - alluxioruntimes + - alluxioruntimes/status + verbs: + - '*' +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: kubeflow-fluid-clusterrolebinding +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: kubeflow-fluid +subjects: + - kind: ServiceAccount + name: pipeline-runner + namespace: kubeflow \ No newline at end of file diff --git a/integration/kubeflow/s3-secret.yaml b/integration/kubeflow/s3-secret.yaml new file mode 100644 index 00000000000..1c656157a63 --- /dev/null +++ b/integration/kubeflow/s3-secret.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: Secret +metadata: + name: s3-secret + namespace: +stringData: + aws.accessKeyId: + aws.secretKey: \ No newline at end of file diff --git a/integration/kubeflow/simple_cnn_sample.py b/integration/kubeflow/simple_cnn_sample.py new file mode 100644 index 00000000000..491d9f33fe2 --- /dev/null +++ b/integration/kubeflow/simple_cnn_sample.py @@ -0,0 +1,246 @@ +# Copyright 2024 The Fluid Authors. + +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kfp import dsl, components, kubernetes, compiler + +# Load Fluid components +create_s3_dataset = components.load_component_from_file('./component-yaml/create-s3-dataset.yaml') +create_alluxio_runtime = components.load_component_from_file('./component-yaml/create-alluxioruntime.yaml') +preheat_dataset = components.load_component_from_file('./component-yaml/preheat-dataset.yaml') +cleanup_preheat_operation = components.load_component_from_file('./component-yaml/cleanup-preheat-operation.yaml') +cleanup_dataset_and_alluxio_runtime = components.load_component_from_file('./component-yaml/cleanup-dataset-and-alluxioruntime.yaml') + +# The component to train a simple CNN with FashionMNIST +# In production environment, you'd better not to use packages_to_install, +# since the KFP SDK will install these dependencies at task runtime. +# More details in https://www.kubeflow.org/docs/components/pipelines/v2/components/containerized-python-components/ +@dsl.component( + base_image='bitnami/pytorch:2.1.2', + packages_to_install=['pandas'] +) +def train_simple_cnn(dataset_name: str, data_root_path: str, batch_size: int, epochs: int, learning_rate: float): + import logging + + logging.info("Training a simple CNN...") + + import os + import pandas as pd + import torch + import torch.nn as nn + import torch.optim as optim + import torch.nn.init as init + from torchvision import transforms + from torch.utils.data import DataLoader, Dataset, random_split + + dataset_path = os.path.join(data_root_path, dataset_name) + logging.info("Dataset file location: " + dataset_path) + + # Check Dataset Path + logging.info("Check dataset") + for dirname, _, filenames in os.walk(dataset_path): + for filename in filenames: + logging.info(os.path.join(dirname, filename)) + + # Prepare Data + logging.info("Start load dataset...") + train_data=pd.read_csv(f"{dataset_path}/fashion-mnist_train.csv") + test_data=pd.read_csv(f"{dataset_path}/fashion-mnist_test.csv") + logging.info("Load dataset successfully!") + + class CustomDataset(Dataset): + + def __init__(self,dataframe,transform=None): + self.dataframe=dataframe + self.transform=transform + + + def __len__(self): + + return len(self.dataframe) + + def __getitem__(self,idx): + label = self.dataframe.iloc[idx, 0] + image_data = self.dataframe.iloc[idx, 1:].values.astype('uint8').reshape((28, 28, 1)) + + if(self.transform): + image=self.transform(image_data) + + return image,label + + transform = transforms.Compose([transforms.ToTensor()]) + + train_dataset=CustomDataset(train_data,transform=transform) + test_dataset=CustomDataset(test_data, transform=transform) + + train_size=int(0.8*len(train_dataset)) + valid_size=len(train_dataset)-train_size + + train_dataset,valid_dataset=random_split(train_dataset,[train_size,valid_size]) + train_loader=DataLoader(train_dataset,batch_size=batch_size,shuffle=True) + valid_loader=DataLoader(valid_dataset,batch_size=batch_size) + test_loader=DataLoader(test_dataset,batch_size=batch_size) + + # Build a simple CNN + class CNN(nn.Module): + + def __init__(self,num_classes): + super(CNN, self).__init__() + self.feature = nn.Sequential( + nn.Conv2d(1,24,kernel_size=3,padding=1), + nn.ReLU(inplace=True), + nn.MaxPool2d(kernel_size=2), + nn.Conv2d(24,128,kernel_size=3,padding=1), + nn.ReLU(inplace=True), + nn.MaxPool2d(kernel_size=2) + ) + self.classifier = nn.Sequential( + nn.Linear(128*7*7,48), + nn.ReLU(inplace=True), + nn.Linear(48,num_classes) + ) + + def forward(self,x): + x = self.feature(x) + x = x.view(x.size(0),-1) + x = self.classifier(x) + return x + + # Train this CNN + def train(model,train_loader,optimizer,criterion, device): + model.train() + train_loss=0 + correct=0 + total=0 + + for images,labels in train_loader: + images,labels =images.to(device),labels.to(device) + + optimizer.zero_grad() + outputs=model(images) + loss=criterion(outputs,labels) + loss.backward() + optimizer.step() + + train_loss+=loss.item() + _,predicted=outputs.max(1) + total+=labels.size(0) + correct+=predicted.eq(labels).sum().item() + + train_accuracy=100*correct/total + train_loss/=len(train_loader) + return train_loss,train_accuracy + + def validate(model,valid_loader,criterion,device): + model.eval() + val_loss=0 + correct=0 + total=0 + + with torch.no_grad(): + for images,labels in valid_loader: + images,labels=images.to(device),labels.to(device) + + outputs=model(images) + loss=criterion(outputs,labels) + + val_loss+=loss.item() + _,predicted=outputs.max(1) + total+=labels.size(0) + correct+=predicted.eq(labels).sum().item() + + val_accuracy = 100.0 * correct / total + val_loss /= len(valid_loader) + return val_loss, val_accuracy + + def test(model, test_loader, device): + model.eval() + correct=0 + total=0 + + with torch.no_grad(): + for images,labels in test_loader: + images,labels=images.to(device),labels.to(device) + + outputs=model(images) + _,predicted=outputs.max(1) + total+=labels.size(0) + correct+=predicted.eq(labels).sum().item() + + test_accuracy = 100.0 * correct / total + + return test_accuracy + + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + cnn = CNN(10).to(device) + + criterion = nn.CrossEntropyLoss() + optimizer = optim.Adam(cnn.parameters(),lr=learning_rate) + + train_accuracy=[] + validation_accuracy=[] + train_losses=[] + validation_losses=[] + + logging.info("Begin training...") + for epoch in range(epochs): + train_loss, train_acc = train(cnn, train_loader, optimizer, criterion, device) + val_loss, val_acc = validate(cnn, valid_loader, criterion, device) + + train_accuracy.append(train_acc) + validation_accuracy.append(val_acc) + train_losses.append(train_loss) + validation_losses.append(val_loss) + + + logging.info(f"Epoch {epoch+1}/{epochs}: Train Loss: {train_loss:.4f}, Validation Loss: {val_loss:.4f} Train Accuracy: {train_acc:.2f}%, Validation Accuracy: {val_acc:.2f}%") + + logging.info("Test the CNN...") + test_acc = test(cnn, test_loader, device) + logging.info(f"Final Test Accuracy: {test_acc:.2f}%") + +@dsl.pipeline(name='train-cnn-for-fashion-mnist') +def train_cnn_for_fashion_mnist_pipeline(dataset_name: str, namespace: str, mount_s3_endpoint: str, mount_s3_region: str, mount_point: str, batch_size: int, epochs: int, learning_rate: float): + # dataset's mount path when training + mount_path = '/datasets' + # prepare dataset + create_dataset_op = create_s3_dataset(dataset_name=dataset_name, namespace=namespace, mount_point=mount_point, mount_s3_endpoint=mount_s3_endpoint, mount_s3_region=mount_s3_region) + create_alluxio_runtime_op = create_alluxio_runtime(dataset_name=dataset_name, namespace=namespace) + preheat_dataset_op = preheat_dataset(dataset_name=dataset_name, namespace=namespace) + # disable caching + create_dataset_op.set_caching_options(False) + create_alluxio_runtime_op.set_caching_options(False) + preheat_dataset_op.set_caching_options(False) + # train cnn + train_simple_cnn_op = train_simple_cnn(dataset_name=dataset_name, data_root_path=mount_path, batch_size=batch_size, epochs=epochs, learning_rate=learning_rate) + train_simple_cnn_op.set_caching_options(False) + # mount dataset pvc to training component + kubernetes.mount_pvc(task=train_simple_cnn_op, pvc_name=dataset_name, mount_path=mount_path) + # define components' dependence + create_alluxio_runtime_op.after(create_dataset_op) + preheat_dataset_op.after(create_alluxio_runtime_op) + train_simple_cnn_op.after(preheat_dataset_op) + # cleanup dataset and preheat operation + cleanup_dataset_and_alluxio_runtime_op = cleanup_dataset_and_alluxio_runtime(dataset_name=dataset_name, namespace=namespace) + cleanup_dataset_and_alluxio_runtime_op.set_caching_options(False) + cleanup_dataset_and_alluxio_runtime_op.after(train_simple_cnn_op) + cleanup_dataset_and_alluxio_runtime_op.ignore_upstream_failure() + cleanup_preheat_operation_op = cleanup_preheat_operation(dataset_name=dataset_name, namespace=namespace) + cleanup_preheat_operation_op.set_caching_options(False) + cleanup_preheat_operation_op.after(train_simple_cnn_op) + cleanup_preheat_operation_op.ignore_upstream_failure() + + +# Compile into IR YAML +# Re-run this file to re-generate the yaml file when you changed the code above. +compiler.Compiler().compile(train_cnn_for_fashion_mnist_pipeline, './pipline-yaml/train-cnn-for-fashion-mnist-pipline.yaml') \ No newline at end of file