diff --git a/notebook/agentchat_groupchat_RAG.ipynb b/notebook/agentchat_groupchat_RAG.ipynb index 293f46373595..7a8209a90103 100644 --- a/notebook/agentchat_groupchat_RAG.ipynb +++ b/notebook/agentchat_groupchat_RAG.ipynb @@ -48,14 +48,14 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "gpt-35-turbo\n" + "LLM models: ['gpt-35-turbo', 'gpt-35-turbo-0613']\n" ] } ], @@ -70,7 +70,7 @@ " },\n", ")\n", "\n", - "print(config_list[0][\"model\"])" + "print(\"LLM models: \", [config_list[i][\"model\"] for i in range(len(config_list))])" ] }, { @@ -88,11 +88,11 @@ " \"api_key\": \"\",\n", " }, # OpenAI API endpoint for gpt-4\n", " {\n", - " \"engine\": \"gpt-35-turbo\", \n", - " \"model\": \"gpt-35-turbo\",\n", + " \"engine\": \"gpt-35-turbo-0631\", \n", + " \"model\": \"gpt-35-turbo-0631\", # 0631 or newer is needed to use functions\n", " \"api_base\": \"\", \n", " \"api_type\": \"azure\", \n", - " \"api_version\": \"2023-08-01-preview\", # must be newer than 2023-07-01-preview to use functions\n", + " \"api_version\": \"2023-07-01-preview\", # 2023-07-01-preview or newer is needed to use functions\n", " \"api_key\": \"\"\n", " }\n", "]\n", @@ -113,18 +113,19 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": 5, "metadata": {}, "outputs": [], "source": [ - "from autogen.agentchat.contrib.retrieve_assistant_agent import RetrieveAssistantAgent\n", "from autogen.agentchat.contrib.retrieve_user_proxy_agent import RetrieveUserProxyAgent\n", + "from autogen import AssistantAgent\n", "import chromadb\n", "\n", "llm_config = {\n", " \"request_timeout\": 60,\n", " \"seed\": 42,\n", " \"config_list\": config_list,\n", + " \"temperature\": 0,\n", "}\n", "\n", "autogen.ChatCompletion.start_logging()\n", @@ -133,9 +134,9 @@ "boss = autogen.UserProxyAgent(\n", " name=\"Boss\",\n", " is_termination_msg=termination_msg,\n", + " human_input_mode=\"TERMINATE\",\n", " system_message=\"The boss who ask questions and give tasks.\",\n", - " code_execution_config={\"last_n_messages\": 2, \"work_dir\": \"groupchat\"},\n", - " human_input_mode=\"TERMINATE\"\n", + " code_execution_config=False, # we don't want to execute code in this case.\n", ")\n", "\n", "boss_aid = RetrieveUserProxyAgent(\n", @@ -153,10 +154,10 @@ " \"collection_name\": \"groupchat\",\n", " \"get_or_create\": True,\n", " },\n", - " code_execution_config={\"last_n_messages\": 2, \"work_dir\": \"groupchat\"},\n", + " code_execution_config=False, # we don't want to execute code in this case.\n", ")\n", "\n", - "coder = RetrieveAssistantAgent(\n", + "coder = AssistantAgent(\n", " name=\"Senior_Python_Engineer\",\n", " is_termination_msg=termination_msg,\n", " system_message=\"You are a senior python engineer. Reply `TERMINATE` in the end when everything is done.\",\n", @@ -179,7 +180,16 @@ "\n", "PROBLEM = \"How to use spark for parallel training in FLAML? Give me sample code.\"\n", "\n", + "\n", + "def _reset_agents():\n", + " boss.reset()\n", + " boss_aid.reset()\n", + " coder.reset()\n", + " pm.reset()\n", + " reviewer.reset()\n", + "\n", "def rag_chat():\n", + " _reset_agents()\n", " groupchat = autogen.GroupChat(\n", " agents=[boss_aid, coder, pm, reviewer], messages=[], max_round=12\n", " )\n", @@ -194,6 +204,7 @@ "\n", "\n", "def norag_chat():\n", + " _reset_agents()\n", " groupchat = autogen.GroupChat(\n", " agents=[boss, coder, pm, reviewer], messages=[], max_round=12\n", " )\n", @@ -207,6 +218,7 @@ "\n", "\n", "def call_rag_chat():\n", + " _reset_agents()\n", " # In this case, we will have multiple user proxy agents and we don't initiate the chat\n", " # with RAG user proxy agent.\n", " # In order to use RAG user proxy agent, we need to wrap RAG agents in a function and call\n", @@ -220,10 +232,8 @@ " _, ret_msg = boss_aid._generate_retrieve_user_reply(message)\n", " else:\n", " ret_msg = boss_aid.generate_init_message(message, n_results=n_results)\n", - " print(ret_msg, message)\n", " return ret_msg if ret_msg else message\n", " \n", - " boss_aid._code_execution_config = False # Disable code execution for boss_aid since it only retrieves content.\n", " boss_aid.human_input_mode = \"NEVER\" # Disable human input for boss_aid since it only retrieves content.\n", " \n", " llm_config = {\n", @@ -236,7 +246,7 @@ " \"properties\": {\n", " \"message\": {\n", " \"type\": \"string\",\n", - " \"description\": \"The message to query for content.\",\n", + " \"description\": \"Refined message which keeps the original meaning and can be used to retrieve content for code generation and question answering.\",\n", " }\n", " },\n", " \"required\": [\"message\"],\n", @@ -251,6 +261,21 @@ " coder.llm_config.update(llm_config)\n", " pm.llm_config.update(llm_config)\n", " reviewer.llm_config.update(llm_config)\n", + " coder.register_function(\n", + " function_map={\n", + " \"retrieve_content\": retrieve_content,\n", + " }\n", + " )\n", + " pm.register_function(\n", + " function_map={\n", + " \"retrieve_content\": retrieve_content,\n", + " }\n", + " )\n", + " reviewer.register_function(\n", + " function_map={\n", + " \"retrieve_content\": retrieve_content,\n", + " }\n", + " )\n", " boss.register_function(\n", " function_map={\n", " \"retrieve_content\": retrieve_content,\n", @@ -282,7 +307,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": 6, "metadata": {}, "outputs": [ { @@ -299,59 +324,112 @@ "--------------------------------------------------------------------------------\n", "\u001b[33mSenior_Python_Engineer\u001b[0m (to chat_manager):\n", "\n", - "To use Spark for parallel training in FLAML, first ensure that you have a Spark cluster set up and running. Then, you can use the `SparkExecutor` class provided by FLAML to execute the training jobs in parallel across multiple nodes in the cluster.\n", + "To use Spark for parallel training in FLAML, you can use the `SparkTrials` class provided by FLAML. Here is a sample code:\n", "\n", - "Here is a sample code snippet that demonstrates how to use Spark to run a parallel search for the best hyperparameters for a decision tree classifier:\n", - "\n", - "```\n", + "```python\n", "from flaml import AutoML\n", - "from pyspark.sql import SparkSession\n", - "from flaml.model import SparkExecutor\n", - "from sklearn.datasets import load_breast_cancer\n", - "\n", - "spark = SparkSession.builder.appName(\"FLAML\").getOrCreate()\n", + "from flaml.data import load_credit\n", + "from flaml.model import SparkTrials\n", "\n", - "data = load_breast_cancer(as_frame=True)\n", - "X_df = data['data']\n", - "y_df = data['target']\n", + "# Load data\n", + "X_train, y_train, X_test, y_test = load_credit()\n", "\n", - "# specify the search space\n", - "params = {\n", - " \"n_estimators\": [100, 400],\n", - " \"max_depth\": [6, 10, 20],\n", - " \"max_features\": [\"auto\", \"sqrt\"],\n", + "# Define the search space\n", + "search_space = {\n", + " \"n_estimators\": {\"domain\": range(10, 100)},\n", + " \"max_depth\": {\"domain\": range(6, 10)},\n", + " \"learning_rate\": {\"domain\": (0.01, 0.1, 1)},\n", "}\n", "\n", - "# create the AutoML object, configured to use Spark\n", + "# Create an AutoML instance with SparkTrials\n", "automl = AutoML(\n", - " search_space=params,\n", - " estimator=\"decision_tree\",\n", + " search_space=search_space,\n", " task=\"classification\",\n", - " n_jobs=-1,\n", - " executor=SparkExecutor(num_workers=2),\n", - " spark=spark\n", + " n_jobs=1,\n", + " ensemble_size=0,\n", + " max_time=60,\n", + " trials=SparkTrials(parallelism=2),\n", ")\n", "\n", - "# perform the hyperparameter search in parallel\n", - "best_model = automl.fit(X_train=X_df, y_train=y_df)\n", - "\n", - "# print the best hyperparameters and their performance\n", - "print(\"Best hyperparameters:\", best_model.config)\n", - "print(\"Best validation score:\", best_model.best_loss)\n", + "# Train the model\n", + "automl.fit(X_train=X_train, y_train=y_train)\n", "\n", - "spark.stop()\n", + "# Evaluate the model\n", + "print(\"Best model:\", automl.best_model)\n", + "print(\"Best hyperparameters:\", automl.best_config)\n", + "print(\"Test accuracy:\", automl.score(X_test, y_test))\n", "\n", + "# Terminate\n", "TERMINATE\n", "```\n", "\n", - "In this example, the `SparkExecutor` is configured to use two worker nodes (`num_workers=2`). This means that FLAML will execute parallel training jobs on two different nodes in the Spark cluster. You can adjust this parameter to control the number of nodes used for the parallel training.\n", - "\n", - "Note that you also need to pass the SparkSession object to the AutoML object using the `spark` parameter. This enables FLAML to use the Spark cluster for parallel training.\n", + "In this code, we first load the credit dataset. Then, we define the search space for the hyperparameters. We create an `AutoML` instance with `SparkTrials` as the `trials` parameter. We set the `parallelism` parameter to 2 to use 2 Spark workers for parallel training. Finally, we fit the model and evaluate it.\n", "\n", "--------------------------------------------------------------------------------\n", "\u001b[33mCode_Reviewer\u001b[0m (to chat_manager):\n", "\n", - "TERMINATE\n", + "Great! This code looks good to me.\n", + "\n", + "--------------------------------------------------------------------------------\n", + "\u001b[33mProduct_Manager\u001b[0m (to chat_manager):\n", + "\n", + "Thank you! Let me know if you have any other questions.\n", + "\n", + "--------------------------------------------------------------------------------\n", + "\u001b[31m\n", + ">>>>>>>> USING AUTO REPLY...\u001b[0m\n", + "\u001b[33mBoss\u001b[0m (to chat_manager):\n", + "\n", + "\n", + "\n", + "--------------------------------------------------------------------------------\n", + "\u001b[31m\n", + ">>>>>>>> USING AUTO REPLY...\u001b[0m\n", + "\u001b[33mBoss\u001b[0m (to chat_manager):\n", + "\n", + "\n", + "\n", + "--------------------------------------------------------------------------------\n", + "\u001b[31m\n", + ">>>>>>>> USING AUTO REPLY...\u001b[0m\n", + "\u001b[33mBoss\u001b[0m (to chat_manager):\n", + "\n", + "\n", + "\n", + "--------------------------------------------------------------------------------\n", + "\u001b[31m\n", + ">>>>>>>> USING AUTO REPLY...\u001b[0m\n", + "\u001b[33mBoss\u001b[0m (to chat_manager):\n", + "\n", + "\n", + "\n", + "--------------------------------------------------------------------------------\n", + "\u001b[31m\n", + ">>>>>>>> USING AUTO REPLY...\u001b[0m\n", + "\u001b[33mBoss\u001b[0m (to chat_manager):\n", + "\n", + "\n", + "\n", + "--------------------------------------------------------------------------------\n", + "\u001b[31m\n", + ">>>>>>>> USING AUTO REPLY...\u001b[0m\n", + "\u001b[33mBoss\u001b[0m (to chat_manager):\n", + "\n", + "\n", + "\n", + "--------------------------------------------------------------------------------\n", + "\u001b[31m\n", + ">>>>>>>> USING AUTO REPLY...\u001b[0m\n", + "\u001b[33mBoss\u001b[0m (to chat_manager):\n", + "\n", + "\n", + "\n", + "--------------------------------------------------------------------------------\n", + "\u001b[31m\n", + ">>>>>>>> USING AUTO REPLY...\u001b[0m\n", + "\u001b[33mBoss\u001b[0m (to chat_manager):\n", + "\n", + "\n", "\n", "--------------------------------------------------------------------------------\n" ] @@ -371,7 +449,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 7, "metadata": {}, "outputs": [ { @@ -900,46 +978,64 @@ "--------------------------------------------------------------------------------\n", "\u001b[33mSenior_Python_Engineer\u001b[0m (to chat_manager):\n", "\n", - "To use spark for parallel training in FLAML, here is sample code\n", + "To use Spark for parallel training in FLAML, you can activate Spark as the parallel backend during parallel tuning in both AutoML and Hyperparameter Tuning, by setting the `use_spark` to `true`. FLAML will dispatch your job to the distributed Spark backend using `joblib-spark`. \n", + "\n", + "Here is an example code snippet for using parallel Spark jobs:\n", "\n", "```python\n", "import flaml\n", - "from flaml.data import load_spark\n", - "# load data as spark df\n", - "spark = load_spark()\n", - "df = spark.read.csv('path/to/data.csv', header=True, inferSchema=True)\n", + "automl_experiment = flaml.AutoML()\n", + "automl_settings = {\n", + " \"metric\": \"r2\",\n", + " \"task\": \"regression\",\n", + " \"use_spark\": True,\n", + " \"estimator_list\": [\n", + " \"lgbm\",\n", + " \"rf\",\n", + " \"xgboost\",\n", + " \"extra_tree\",\n", + " \"xgb_limitdepth\",\n", + " ],\n", + "}\n", + "automl_experiment.fit(X_train=train_x, y_train=train_y, **automl_settings)\n", + "```\n", "\n", - "automl=flaml.AutoML()\n", - "settings = {\n", - " \"time_budget\": 120, # search time limit in seconds\n", - " \"metric\": 'r2', # evaluation metric\n", - " \"task\": 'regression', # task type\n", - " \"log_file_name\": 'multinode.log', # flaml log file\n", - " \"n_splits\": 3,\n", - " \"estimator_list\": ['lgbm', 'rf', 'xgboost', 'extratree', 'lightgbm'],\n", - " \"use_spark\": True, # when using spark for parallel learning\n", - " \"num_samples\": 10,# number of trials\n", - " \"max_iters\": 100,# maximum number of iterations\n", - " }\n", - " \n", - "automl_settings.update(settings)\n", - "\n", - "# train model\n", - "X_train, X_test, y_train, y_test = automl.fit(\n", - " X_train=df,\n", - " y_train='target_variable',\n", - " **settings,\n", - ")\n", + "Note that you should not set `use_spark` to `true` when applying AutoML and Tuning for Spark Data. This is because only SparkML models will be used for Spark Data in AutoML and Tuning. As SparkML models run in parallel, there is no need to distribute them with `use_spark` again.\n", + "\n", + "You can also use Spark ML estimators for AutoML. FLAML integrates estimators based on Spark ML models. These models are trained in parallel using Spark, so we called them Spark estimators. To use these models, you first need to organize your data in the required format.\n", + "\n", + "Here is an example code snippet for Spark Data:\n", + "\n", + "```python\n", + "import pandas as pd\n", + "from flaml.automl.spark.utils import to_pandas_on_spark\n", + "# Creating a dictionary\n", + "data = {\"Square_Feet\": [800, 1200, 1800, 1500, 850],\n", + " \"Age_Years\": [20, 15, 10, 7, 25],\n", + " \"Price\": [100000, 200000, 300000, 240000, 120000]}\n", "\n", - "# performers rating : dict, performer_id -> performer_rating\n", - "performers_rating = automl.leaderboard(by='cv')\n", - "print(performers_rating)\n", + "# Creating a pandas DataFrame\n", + "dataframe = pd.DataFrame(data)\n", + "label = \"Price\"\n", + "\n", + "# Convert to pandas-on-spark dataframe\n", + "psdf = to_pandas_on_spark(dataframe)\n", "```\n", - " \n", - "Please note that you need to change `path/to/data.csv` to the path of your data file and `target_variable` to your target variable. The data file should be stored in a distributed file system in your Spark cluster.\n", "\n", + "To use Spark ML models you need to format your data appropriately. Specifically, use `VectorAssembler` to merge all feature columns into a single vector column.\n", "\n", - "I hope this helps. Please let me know if you have further questions.\n", + "Here is an example of how to use it:\n", + "```python\n", + "from pyspark.ml.feature import VectorAssembler\n", + "columns = psdf.columns\n", + "feature_cols = [col for col in columns if col != label]\n", + "featurizer = VectorAssembler(inputCols=feature_cols, outputCol=\"features\")\n", + "psdf = featurizer.transform(psdf.to_spark(index_col=\"index\"))[\"index\", \"features\"]\n", + "```\n", + "\n", + "Later in conducting the experiment, use your pandas-on-spark data like non-spark data and pass them using `X_train, y_train` or `dataframe, label`.\n", + "\n", + "You can also plot the optimization process using `plotly` by providing your optimized `flaml.AutoML` or `flaml.tune.tune.ExperimentAnalysis` object as input. Optional parameters can be added using keyword arguments. Available plotting functions include `plot_optimization_history`, `plot_feature_importance`, `plot_parallel_coordinate`, `plot_contour`, `plot_edf`, `plot_timeline`, and `plot_slice`.\n", "\n", "--------------------------------------------------------------------------------\n", "\u001b[33mProduct_Manager\u001b[0m (to chat_manager):\n", @@ -949,7 +1045,24 @@ "--------------------------------------------------------------------------------\n", "\u001b[33mCode_Reviewer\u001b[0m (to chat_manager):\n", "\n", - "Great, I'm glad I could help! Let me know if you have any other questions or concerns.\n", + "Is there anything else you need help with?\n", + "\n", + "--------------------------------------------------------------------------------\n", + "\u001b[31m\n", + ">>>>>>>> USING AUTO REPLY...\u001b[0m\n", + "\u001b[33mBoss_Assistant\u001b[0m (to chat_manager):\n", + "\n", + "\n", + "\n", + "--------------------------------------------------------------------------------\n", + "\u001b[33mProduct_Manager\u001b[0m (to chat_manager):\n", + "\n", + "No, that's all. Thank you for your help!\n", + "\n", + "--------------------------------------------------------------------------------\n", + "\u001b[33mCode_Reviewer\u001b[0m (to chat_manager):\n", + "\n", + "You're welcome! Don't hesitate to ask if you have any more questions in the future. Have a great day!\n", "\n", "--------------------------------------------------------------------------------\n", "\u001b[31m\n", @@ -961,7 +1074,24 @@ "--------------------------------------------------------------------------------\n", "\u001b[33mSenior_Python_Engineer\u001b[0m (to chat_manager):\n", "\n", - "TERMINATE\n", + "Have a great day too!\n", + "\n", + "--------------------------------------------------------------------------------\n", + "\u001b[33mProduct_Manager\u001b[0m (to chat_manager):\n", + "\n", + "Thank you!\n", + "\n", + "--------------------------------------------------------------------------------\n", + "\u001b[33mCode_Reviewer\u001b[0m (to chat_manager):\n", + "\n", + "You're welcome!\n", + "\n", + "--------------------------------------------------------------------------------\n", + "\u001b[31m\n", + ">>>>>>>> USING AUTO REPLY...\u001b[0m\n", + "\u001b[33mBoss_Assistant\u001b[0m (to chat_manager):\n", + "\n", + "\n", "\n", "--------------------------------------------------------------------------------\n" ] @@ -977,12 +1107,12 @@ "metadata": {}, "source": [ "### Call RetrieveUserProxyAgent while init chat with another user proxy agent\n", - "Sometimes, we want to use RetrieveUserProxyAgent in the group chat, but we don't want to init the chat with RetrieveUserProxyAgent." + "Sometimes, there might be a need to use RetrieveUserProxyAgent in group chat without initializing the chat with it. In such scenarios, it becomes essential to create a function that wraps the RAG agents and allows them to be called from other agents." ] }, { "cell_type": "code", - "execution_count": 17, + "execution_count": 8, "metadata": {}, "outputs": [ { @@ -1002,130 +1132,353 @@ "\u001b[32m***** Suggested function Call: retrieve_content *****\u001b[0m\n", "Arguments: \n", "{\n", - " \"message\": \"spark_sample_code\"\n", + " \"message\": \"How to use spark for parallel training in FLAML?\"\n", "}\n", "\u001b[32m*****************************************************\u001b[0m\n", "\n", "--------------------------------------------------------------------------------\n", - "\u001b[33mCode_Reviewer\u001b[0m (to chat_manager):\n", + "\u001b[35m\n", + ">>>>>>>> EXECUTING FUNCTION retrieve_content...\u001b[0m\n", + "doc_ids: [['doc_0', 'doc_1', 'doc_4']]\n", + "\u001b[32mAdding doc_id doc_0 to context.\u001b[0m\n", + "\u001b[32mAdding doc_id doc_1 to context.\u001b[0m\n", + "\u001b[32mAdding doc_id doc_4 to context.\u001b[0m\n", + "\u001b[33mSenior_Python_Engineer\u001b[0m (to chat_manager):\n", "\n", "\u001b[32m***** Response from calling function \"retrieve_content\" *****\u001b[0m\n", - "Error: Function retrieve_content not found.\n", - "\u001b[32m*************************************************************\u001b[0m\n", + "You're a retrieve augmented coding assistant. You answer user's questions based on your own knowledge and the\n", + "context provided by the user.\n", + "If you can't answer the question with or without the current context, you should reply exactly `UPDATE CONTEXT`.\n", + "For code generation, you must obey the following rules:\n", + "Rule 1. You MUST NOT install any packages because all the packages needed are already installed.\n", + "Rule 2. You must follow the formats below to write your code:\n", + "```language\n", + "# your code\n", + "```\n", "\n", - "--------------------------------------------------------------------------------\n", - "\u001b[33mSenior_Python_Engineer\u001b[0m (to chat_manager):\n", + "User's question is: How to use spark for parallel training in FLAML?\n", + "\n", + "Context is: # Integrate - Spark\n", + "\n", + "FLAML has integrated Spark for distributed training. There are two main aspects of integration with Spark:\n", + "- Use Spark ML estimators for AutoML.\n", + "- Use Spark to run training in parallel spark jobs.\n", + "\n", + "## Spark ML Estimators\n", + "\n", + "FLAML integrates estimators based on Spark ML models. These models are trained in parallel using Spark, so we called them Spark estimators. To use these models, you first need to organize your data in the required format.\n", + "\n", + "### Data\n", + "\n", + "For Spark estimators, AutoML only consumes Spark data. FLAML provides a convenient function `to_pandas_on_spark` in the `flaml.automl.spark.utils` module to convert your data into a pandas-on-spark (`pyspark.pandas`) dataframe/series, which Spark estimators require.\n", + "\n", + "This utility function takes data in the form of a `pandas.Dataframe` or `pyspark.sql.Dataframe` and converts it into a pandas-on-spark dataframe. It also takes `pandas.Series` or `pyspark.sql.Dataframe` and converts it into a [pandas-on-spark](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/index.html) series. If you pass in a `pyspark.pandas.Dataframe`, it will not make any changes.\n", + "\n", + "This function also accepts optional arguments `index_col` and `default_index_type`.\n", + "- `index_col` is the column name to use as the index, default is None.\n", + "- `default_index_type` is the default index type, default is \"distributed-sequence\". More info about default index type could be found on Spark official [documentation](https://spark.apache.org/docs/latest/api/python/user_guide/pandas_on_spark/options.html#default-index-type)\n", "\n", - "To use Spark for parallel training in FLAML, you can follow the sample code below:\n", + "Here is an example code snippet for Spark Data:\n", "\n", "```python\n", - "from flaml import AutoML\n", - "from pyspark.sql import SparkSession\n", + "import pandas as pd\n", + "from flaml.automl.spark.utils import to_pandas_on_spark\n", + "# Creating a dictionary\n", + "data = {\"Square_Feet\": [800, 1200, 1800, 1500, 850],\n", + " \"Age_Years\": [20, 15, 10, 7, 25],\n", + " \"Price\": [100000, 200000, 300000, 240000, 120000]}\n", + "\n", + "# Creating a pandas DataFrame\n", + "dataframe = pd.DataFrame(data)\n", + "label = \"Price\"\n", + "\n", + "# Convert to pandas-on-spark dataframe\n", + "psdf = to_pandas_on_spark(dataframe)\n", + "```\n", + "\n", + "To use Spark ML models you need to format your data appropriately. Specifically, use [`VectorAssembler`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.VectorAssembler.html) to merge all feature columns into a single vector column.\n", + "\n", + "Here is an example of how to use it:\n", + "```python\n", + "from pyspark.ml.feature import VectorAssembler\n", + "columns = psdf.columns\n", + "feature_cols = [col for col in columns if col != label]\n", + "featurizer = VectorAssembler(inputCols=feature_cols, outputCol=\"features\")\n", + "psdf = featurizer.transform(psdf.to_spark(index_col=\"index\"))[\"index\", \"features\"]\n", + "```\n", "\n", - "# Initialize SparkSession\n", - "spark = SparkSession.builder.appName(\"FLAML-Spark\").getOrCreate()\n", + "Later in conducting the experiment, use your pandas-on-spark data like non-spark data and pass them using `X_train, y_train` or `dataframe, label`.\n", "\n", - "# Create DataFrame using Spark\n", - "data = spark.read.format(\"libsvm\").load(\"data.libsvm\")\n", + "### Estimators\n", + "#### Model List\n", + "- `lgbm_spark`: The class for fine-tuning Spark version LightGBM models, using [SynapseML](https://microsoft.github.io/SynapseML/docs/features/lightgbm/about/) API.\n", "\n", - "# Split the data into training and validation sets\n", - "train_data, val_data = data.randomSplit([0.7, 0.3], seed=42)\n", + "#### Usage\n", + "First, prepare your data in the required format as described in the previous section.\n", "\n", - "# Convert DataFrame to Pandas DataFrame for FLAML\n", - "train_data = train_data.toPandas()\n", - "val_data = val_data.toPandas()\n", + "By including the models you intend to try in the `estimators_list` argument to `flaml.automl`, FLAML will start trying configurations for these models. If your input is Spark data, FLAML will also use estimators with the `_spark` postfix by default, even if you haven't specified them.\n", "\n", - "# Initialize FLAML\n", - "automl = AutoML()\n", + "Here is an example code snippet using SparkML models in AutoML:\n", "\n", - "# Set the search space\n", + "```python\n", + "import flaml\n", + "# prepare your data in pandas-on-spark format as we previously mentioned\n", + "\n", + "automl = flaml.AutoML()\n", + "settings = {\n", + " \"time_budget\": 30,\n", + " \"metric\": \"r2\",\n", + " \"estimator_list\": [\"lgbm_spark\"], # this setting is optional\n", + " \"task\": \"regression\",\n", + "}\n", + "\n", + "automl.fit(\n", + " dataframe=psdf,\n", + " label=label,\n", + " **settings,\n", + ")\n", + "```\n", + "\n", + "\n", + "[Link to notebook](https://github.com/microsoft/FLAML/blob/main/notebook/automl_bankrupt_synapseml.ipynb) | [Open in colab](https://colab.research.google.com/github/microsoft/FLAML/blob/main/notebook/automl_bankrupt_synapseml.ipynb)\n", + "\n", + "## Parallel Spark Jobs\n", + "You can activate Spark as the parallel backend during parallel tuning in both [AutoML](/docs/Use-Cases/Task-Oriented-AutoML#parallel-tuning) and [Hyperparameter Tuning](/docs/Use-Cases/Tune-User-Defined-Function#parallel-tuning), by setting the `use_spark` to `true`. FLAML will dispatch your job to the distributed Spark backend using [`joblib-spark`](https://github.com/joblib/joblib-spark).\n", + "\n", + "Please note that you should not set `use_spark` to `true` when applying AutoML and Tuning for Spark Data. This is because only SparkML models will be used for Spark Data in AutoML and Tuning. As SparkML models run in parallel, there is no need to distribute them with `use_spark` again.\n", + "\n", + "All the Spark-related arguments are stated below. These arguments are available in both Hyperparameter Tuning and AutoML:\n", + "\n", + "\n", + "- `use_spark`: boolean, default=False | Whether to use spark to run the training in parallel spark jobs. This can be used to accelerate training on large models and large datasets, but will incur more overhead in time and thus slow down training in some cases. GPU training is not supported yet when use_spark is True. For Spark clusters, by default, we will launch one trial per executor. However, sometimes we want to launch more trials than the number of executors (e.g., local mode). In this case, we can set the environment variable `FLAML_MAX_CONCURRENT` to override the detected `num_executors`. The final number of concurrent trials will be the minimum of `n_concurrent_trials` and `num_executors`.\n", + "- `n_concurrent_trials`: int, default=1 | The number of concurrent trials. When n_concurrent_trials > 1, FLAML performes parallel tuning.\n", + "- `force_cancel`: boolean, default=False | Whether to forcely cancel Spark jobs if the search time exceeded the time budget. Spark jobs include parallel tuning jobs and Spark-based model training jobs.\n", + "\n", + "An example code snippet for using parallel Spark jobs:\n", + "```python\n", + "import flaml\n", + "automl_experiment = flaml.AutoML()\n", "automl_settings = {\n", - " \"time_budget\": 60, # total running time in seconds\n", - " \"task\": \"classification\", # task type\n", - " \"metric\": \"accuracy\", # metric to optimize\n", - " \"estimator_list\": [\"lgbm\", \"rf\", \"xgboost\"], # list of ML learners\n", - " \"spark_mode\": True, # enable Spark mode\n", - " \"n_sampling\": 1 # number of data subsets for FLAML\n", + " \"time_budget\": 30,\n", + " \"metric\": \"r2\",\n", + " \"task\": \"regression\",\n", + " \"n_concurrent_trials\": 2,\n", + " \"use_spark\": True,\n", + " \"force_cancel\": True, # Activating the force_cancel option can immediately halt Spark jobs once they exceed the allocated time_budget.\n", "}\n", "\n", - "# Train with Spark parallel mode\n", - "automl.fit(train_data=train_data, **automl_settings)\n", + "automl.fit(\n", + " dataframe=dataframe,\n", + " label=label,\n", + " **automl_settings,\n", + ")\n", + "```\n", + "\n", "\n", - "# Predict with the best model\n", - "y_pred = automl.predict(val_data)\n", + "[Link to notebook](https://github.com/microsoft/FLAML/blob/main/notebook/integrate_spark.ipynb) | [Open in colab](https://colab.research.google.com/github/microsoft/FLAML/blob/main/notebook/integrate_spark.ipynb)\n", "\n", - "# Check the best model performance\n", - "best_model = automl.best_model # best ML model\n", - "best_config = automl.best_config # best model configuration\n", - "best_loss = automl.best_loss # best loss\n", - "best_metric = automl.best_metric # best metric\n", "\n", - "# Terminate SparkSession\n", - "spark.stop()\n", + "```python\n", + "import flaml\n", + "# for flaml.tune\n", + "with mlflow.start_run(run_name=f\"spark_auto_trials_1686631558\"):\n", + " analysis = flaml.tune.run(\n", + " func_to_tune,\n", + " params,\n", + " metric=\"r2\",\n", + " mode=\"max\",\n", + " mlflow_exp_name=\"test_doc\",\n", + " use_spark=True,\n", + " )\n", + "\n", + "# for flaml.automl\n", + "automl_experiment = flaml.AutoML()\n", + "automl_settings = {\n", + " \"metric\": \"r2\",\n", + " \"task\": \"regression\",\n", + " \"use_spark\": True,\n", + " \"mlflow_exp_name\": \"test_doc\",\n", + " \"estimator_list\": [\n", + " \"lgbm\",\n", + " \"rf\",\n", + " \"xgboost\",\n", + " \"extra_tree\",\n", + " \"xgb_limitdepth\",\n", + " ], # catboost does not yet support mlflow autologging\n", + "}\n", + "with mlflow.start_run(run_name=f\"automl_spark_trials_1686631579\"):\n", + " automl_experiment.fit(X_train=train_x, y_train=train_y, **automl_settings)\n", "```\n", "\n", - "In this code, the `SparkSession` is initialized to enable Spark functionalities. The data is read as a Spark DataFrame and then split into training and validation sets. Afterwards, the data is converted to Pandas DataFrames as FLAML currently supports Pandas DataFrames. FLAML is then initialized, and the search space and settings are defined. Finally, parallel training with Spark is executed using the `fit` method, and the best model is used for prediction.\n", "\n", - "Note: Make sure you have FLAML and the necessary Spark dependencies installed in your environment.\n", "\n", - "Let me know if you need any further assistance!\n", + "### Results\n", + "*Tune Autolog Trials on MLFlow UI*\n", + "\n", + "\n", + "![Tune Autolog Trials on MLFlow UI](Images/tune_trials.png)\n", + "\n", + "\n", + "*AutoML Autolog Trials on MLFlow UI*\n", + "\n", + "\n", + "![AutoML Autolog Trials on MLFlow UI](Images/automl_trials.png)\n", + "\n", + "\n", + "### Differences Between Auto and Manual Logging\n", + "Autologging is managed by MLFlow, while manual logging is maintained by FLAML.\n", + "\n", + "\n", + "#### Details of Manual Logging\n", + "FLAML logs general artifacts for AutoML tasks. Specifically, we log these artifacts:\n", + "\n", + "**`flaml.tune`**\n", + "\n", + "\n", + "![Manual Log Example for Tuning](Images/manual_log_tune.png)\n", + "\n", + "\n", + "- We create a parent run to log the best metric and the best configuration for the entire tuning process.\n", + "- For each trial, we create a child run to log the metric specific to the tune function and the configuration for that trial.\n", + "\n", + "**`flaml.automl`**\n", + "\n", + "\n", + "![Manual Log Example for AutoML](Images/manual_log_automl.png)\n", + "\n", + "\n", + "- We create a parent run to log the results of the experiment. This includes:\n", + " - The configuration of this model.\n", + " - The `best_validation_loss` produced by this model.\n", + " - The `best_iteration` to identify the point at which this model was found.\n", + "- For each state (a specific learner with different hyperparameters), we record the best trial for this model. This includes:\n", + " - The configuration of the best trial.\n", + " - The `validation_loss` the best trial produces.\n", + " - The `iter_count` to identify how many trials we have conducted for this state.\n", + " - The `pred_time`, which is the time cost of predicting test data for this model.\n", + " - The `wall_clock_time`, which is the time cost of this state.\n", + " - The `sample_size` to show how much data we sampled in this state.\n", + "Note that we also added these information to autolog AutoML run.\n", + "\n", + "\n", + "#### Details of Autologging\n", + "Autolog artifacts typically include model parameters, model files, and runtime metrics like the following:\n", + "\n", + "\n", + "![Autolog Example](Images/autolog_example.png)\n", + "\n", + "\n", + "Artifacts can differ among various machine learning libraries. More detailed information can be found [here](https://mlflow.org/docs/latest/tracking.html#automatic-logging).\n", + "\n", + "\n", + "\n", + "\n", + "## Plot Experiment Result\n", + "The `flaml.visualization` module provides utility functions for plotting the optimization process using [plotly](https://plotly.com/python/). Leveraging `plotly`, users can interactively explore experiment results. To use these plotting functions, simply provide your optimized `flaml.AutoML` or `flaml.tune.tune.ExperimentAnalysis` object as input. Optional parameters can be added using keyword arguments.\n", + "\n", + "Avaliable plotting functions:\n", + "- `plot_optimization_history`: Plot optimization history of all trials in the experiment.\n", + "- `plot_feature_importance`: Plot importance for each feature in the dataset.\n", + "- `plot_parallel_coordinate`: Plot the high-dimensional parameter relationships in the experiment.\n", + "- `plot_contour`: Plot the parameter relationship as contour plot in the experiment.\n", + "- `plot_edf`: Plot the objective value EDF (empirical distribution function) of the experiment.\n", + "- `plot_timeline`: Plot the timeline of the experiment.\n", + "- `plot_slice`: Plot the parameter relationship as slice plot in a study.\n", + "\n", + "### Figure Examples\n", + "![Plot Examples](Images/plot_samples.png)\n", + "\n", + "Check out our example [notebook](../../notebook/trident/automl_plot.ipynb) for a preview of all interactive plots.\n", + "\n", + "\n", + "\n", + "\u001b[32m*************************************************************\u001b[0m\n", "\n", "--------------------------------------------------------------------------------\n", - "\u001b[33mCode_Reviewer\u001b[0m (to chat_manager):\n", + "\u001b[33mProduct_Manager\u001b[0m (to chat_manager):\n", "\n", - "The code to use Spark for parallel training in FLAML is as follows:\n", + "To use Spark for parallel training in FLAML, you can follow these steps:\n", + "\n", + "1. Prepare your data in the required format. FLAML only consumes Spark data for Spark estimators. You can use the `to_pandas_on_spark` function from the `flaml.automl.spark.utils` module to convert your data into a pandas-on-spark dataframe. Here's an example:\n", "\n", "```python\n", - "from flaml import AutoML\n", - "from pyspark.sql import SparkSession\n", + "import pandas as pd\n", + "from flaml.automl.spark.utils import to_pandas_on_spark\n", "\n", - "# Initialize SparkSession\n", - "spark = SparkSession.builder.appName(\"FLAML-Spark\").getOrCreate()\n", + "# Create a dictionary\n", + "data = {\n", + " \"Square_Feet\": [800, 1200, 1800, 1500, 850],\n", + " \"Age_Years\": [20, 15, 10, 7, 25],\n", + " \"Price\": [100000, 200000, 300000, 240000, 120000]\n", + "}\n", "\n", - "# Create DataFrame using Spark\n", - "data = spark.read.format(\"libsvm\").load(\"data.libsvm\")\n", + "# Create a pandas DataFrame\n", + "dataframe = pd.DataFrame(data)\n", + "label = \"Price\"\n", "\n", - "# Split the data into training and validation sets\n", - "train_data, val_data = data.randomSplit([0.7, 0.3], seed=42)\n", + "# Convert to pandas-on-spark dataframe\n", + "psdf = to_pandas_on_spark(dataframe)\n", + "```\n", "\n", - "# Convert DataFrame to Pandas DataFrame for FLAML\n", - "train_data = train_data.toPandas()\n", - "val_data = val_data.toPandas()\n", + "2. Use the Spark ML estimators in FLAML. FLAML integrates estimators based on Spark ML models. You can include the models you want to try in the `estimator_list` argument when creating an instance of `flaml.AutoML`. Here's an example:\n", "\n", - "# Initialize FLAML\n", - "automl = AutoML()\n", + "```python\n", + "import flaml\n", "\n", - "# Set the search space\n", - "automl_settings = {\n", - " \"time_budget\": 60, # total running time in seconds\n", - " \"task\": \"classification\", # task type\n", - " \"metric\": \"accuracy\", # metric to optimize\n", - " \"estimator_list\": [\"lgbm\", \"rf\", \"xgboost\"], # list of ML learners\n", - " \"spark_mode\": True, # enable Spark mode\n", - " \"n_sampling\": 1 # number of data subsets for FLAML\n", + "automl = flaml.AutoML()\n", + "settings = {\n", + " \"time_budget\": 30,\n", + " \"metric\": \"r2\",\n", + " \"estimator_list\": [\"lgbm_spark\"], # Optional: specify the Spark ML estimator\n", + " \"task\": \"regression\"\n", "}\n", "\n", - "# Train with Spark parallel mode\n", - "automl.fit(train_data=train_data, **automl_settings)\n", + "automl.fit(\n", + " dataframe=psdf,\n", + " label=label,\n", + " **settings\n", + ")\n", + "```\n", + "\n", + "3. Activate Spark as the parallel backend. You can set the `use_spark` parameter to `True` to activate Spark as the parallel backend during parallel tuning. FLAML will dispatch your job to the distributed Spark backend using `joblib-spark`. Here's an example:\n", "\n", - "# Predict with the best model\n", - "y_pred = automl.predict(val_data)\n", + "```python\n", + "import flaml\n", "\n", - "# Check the best model performance\n", - "best_model = automl.best_model # best ML model\n", - "best_config = automl.best_config # best model configuration\n", - "best_loss = automl.best_loss # best loss\n", - "best_metric = automl.best_metric # best metric\n", + "automl_experiment = flaml.AutoML()\n", + "automl_settings = {\n", + " \"time_budget\": 30,\n", + " \"metric\": \"r2\",\n", + " \"task\": \"regression\",\n", + " \"n_concurrent_trials\": 2,\n", + " \"use_spark\": True,\n", + " \"force_cancel\": True # Optional: force cancel Spark jobs if time budget is exceeded\n", + "}\n", "\n", - "# Terminate SparkSession\n", - "spark.stop()\n", + "automl_experiment.fit(\n", + " dataframe=dataframe,\n", + " label=label,\n", + " **automl_settings\n", + ")\n", "```\n", "\n", - "This code initializes the `SparkSession` to enable Spark functionalities. The data is read as a Spark DataFrame and then split into training and validation sets. Afterward, the data is converted to Pandas DataFrames as FLAML currently supports Pandas DataFrames. FLAML is then initialized, and the search space and settings are defined. Finally, parallel training with Spark is executed using the `fit` method, and the best model is used for prediction.\n", + "These are the steps to use Spark for parallel training in FLAML. Let me know if you need any further assistance!\n", + "\n", + "--------------------------------------------------------------------------------\n", + "\u001b[33mCode_Reviewer\u001b[0m (to chat_manager):\n", + "\n", + "Great! You now have the steps to use Spark for parallel training in FLAML. If you have any more questions, feel free to ask.\n", "\n", - "Please ensure that you have FLAML and the necessary Spark dependencies installed in your environment.\n", + "--------------------------------------------------------------------------------\n", + "\u001b[31m\n", + ">>>>>>>> USING AUTO REPLY...\u001b[0m\n", + "\u001b[33mBoss\u001b[0m (to chat_manager):\n", + "\n", + "\n", + "\n", + "--------------------------------------------------------------------------------\n", + "\u001b[33mSenior_Python_Engineer\u001b[0m (to chat_manager):\n", "\n", - "Let me know if you need any further assistance!\n", "TERMINATE\n", "\n", "--------------------------------------------------------------------------------\n", @@ -1137,13 +1490,6 @@ "source": [ "call_rag_chat()" ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": {