From 6b0bf5b7d5ada7522a953df2f3a435ab5a77b127 Mon Sep 17 00:00:00 2001 From: kenxu95 Date: Wed, 5 Apr 2023 16:27:07 -0700 Subject: [PATCH 1/4] context switch --- examples/churn_prediction/temp.py | 202 ++++++++++++++++++ manual_qa_tests/initialize.py | 21 +- .../workflows/check_status_test.py | 3 + 3 files changed, 222 insertions(+), 4 deletions(-) create mode 100644 examples/churn_prediction/temp.py diff --git a/examples/churn_prediction/temp.py b/examples/churn_prediction/temp.py new file mode 100644 index 000000000..214f863aa --- /dev/null +++ b/examples/churn_prediction/temp.py @@ -0,0 +1,202 @@ +print("Cell 0") +import pandas as pd +import numpy as np +import aqueduct as aq + +# Read some customer data from the Aqueduct repo. +customers_table = pd.read_csv( + "https://raw.githubusercontent.com/aqueducthq/aqueduct/main/examples/churn_prediction/data/customers.csv" +) +churn_table = pd.read_csv( + "https://raw.githubusercontent.com/aqueducthq/aqueduct/main/examples/churn_prediction/data/churn_data.csv" +) +pd.merge(customers_table, churn_table, on="cust_id").head() + + +print("Cell 1") +# The @op decorator here allows Aqueduct to run this function as +# a part of an Aqueduct workflow. It tells Aqueduct that when +# we execute this function, we're defining a step in the workflow. +# While the results can be retrieved immediately, nothing is +# published until we call `publish_flow()` below. +@aq.op +def log_featurize(cust: pd.DataFrame) -> pd.DataFrame: + """ + log_featurize takes in customer data from the Aqueduct customers table + and log normalizes the numerical columns using the numpy.log function. + It skips the cust_id, using_deep_learning, and using_dbt columns because + these are not numerical columns that require regularization. + + log_featurize adds all the log-normalized values into new columns, and + maintains the original values as-is. In addition to the original company_size + column, log_featurize will add a log_company_size column. + """ + features = cust.copy() + skip_cols = ["cust_id", "using_deep_learning", "using_dbt"] + + for col in features.columns.difference(skip_cols): + features["log_" + col] = np.log(features[col] + 1.0) + + return features.drop(columns="cust_id") + + +print("Cell 2") +# Calling `.local()` on an @op-annotated function allows us to execute the +# function locally for testing purposes. When a function is called with +# `.local()`, Aqueduct does not capture the function execution as a part of +# the definition of a workflow. +features_table = log_featurize.local(customers_table) +features_table.head() + + +print("Cell 3") +from sklearn.linear_model import LogisticRegression + +linear_model = LogisticRegression(max_iter=10000) +linear_model.fit(features_table, churn_table["churn"]) + + +print("Cell 4") +from sklearn.tree import DecisionTreeClassifier + +decision_tree_model = DecisionTreeClassifier(max_depth=10, min_samples_split=3) +decision_tree_model.fit(features_table, churn_table["churn"]) + + +print("Cell 5") +@aq.op +def predict_linear(features_table): + """ + Generates predictions using the logistic regression model and + returns a new DataFrame with a column called linear that has + the likelihood of the customer churning. + """ + return pd.DataFrame({"linear": linear_model.predict_proba(features_table)[:, 1]}) + +@aq.op +def predict_tree(features_table): + """ + Generates predictions using the decision tree model and + returns a new DataFrame with a column called tree that has + the likelihood of the customer churning. + """ + return pd.DataFrame({"tree": decision_tree_model.predict_proba(features_table)[:, 1]}) + +@aq.op +def predict_ensemble(customers_table, linear_pred_table, tree_pred_table): + """ + predict_ensemble combines the results from our logistic regression + and decision tree models by taking the average of the two models' + probabilities that a user might churn. The resulting average is + then assigned into the `prob_churn` column on the customers_table. + """ + return customers_table.assign(prob_churn=linear_pred_table.join(tree_pred_table).mean(axis=1)) + + +print("Cell 6") +features_table = log_featurize.local(customers_table) +linear_pred_table = predict_linear.local(features_table) +tree_pred_table = predict_tree.local(features_table) +churn_table = predict_ensemble.local(customers_table, linear_pred_table, tree_pred_table) + + +print("Cell 7") +churn_table.head() + + +print("Cell 8") +# If you're running your notebook on a separate machine from your +# Aqueduct server, change this to the address of your Aqueduct server. +address = "localhost:8080" + +# If you're running youre notebook on a separate machine from your +# Aqueduct server, you will have to copy your API key here rather than +# using `get_apikey()`. +api_key = "09LOAH7CW3MDUVGQF5JP62BRK1ZX8INS" +client = aq.Client(api_key, address) + + +print("Cell 9") +warehouse = client.integration(name="aqueduct_demo") + +# customers_table is an Aqueduct TableArtifact, which is a wrapper around +# a Pandas DataFrame. A TableArtifact can be used as argument to any operator +# in a workflow; you can also call .get() on a TableArtifact to retrieve +# the underlying DataFrame and interact with it directly. +customers_table = warehouse.sql(query="SELECT * FROM customers;") +print(type(customers_table)) + + +print("Cell 10") +# This gets the head of the underlying DataFrame. Note that you can't +# pass a DataFrame as an argument to a workflow; you must use the Aqueduct +# TableArtifact! +customers_table.get().head() + + +print("Cell 11") +features_table = log_featurize(customers_table) +print(type(features_table)) + + +print("Cell 12") +features_table.get().head() + + +print("Cell 13") +linear_pred_table = predict_linear(features_table) +tree_pred_table = predict_tree(features_table) +churn_table = predict_ensemble(customers_table, linear_pred_table, tree_pred_table) + + +print("Cell 14") +churn_table.get().head() + + +print("Cell 15") +@aq.check(description="Ensuring valid probabilities.") +def valid_probabilities(df: pd.DataFrame): + return (df["prob_churn"] >= 0) & (df["prob_churn"] <= 1) + + +print("Cell 16") +check_result = valid_probabilities(churn_table) + + +print("Cell 17") +# Use Aqueduct's built-in mean metric to calculate the average value of `prob_churn`. +# Calling .get() on the metric will retrieve the current value. +avg_pred_churn_metric = churn_table.mean("prob_churn") +avg_pred_churn_metric.get() + + +print("Cell 18") +# Bounds on metrics ensure that the metric stays within a valid range. +# In this case, we'd ideally like churn to be between .1 and .3, and we +# know something's gone wrong if it's above .4. +avg_pred_churn_metric.bound(lower=0.1) +avg_pred_churn_metric.bound(upper=0.3) +avg_pred_churn_metric.bound(upper=0.4, severity="error") + + +print("Cell 19") +# This tells Aqueduct to save the results in churn_table +# back to the demo DB we configured earlier. +# NOTE: At this point, no data is actually saved! This is just +# part of a workflow spec that will be executed once the workflow +# is published in the next cell. +warehouse.save(churn_table, table_name="pred_churn", update_mode="replace") + + +print("Cell 20") +# This publishes all of the logic needed to create churn_table +# and avg_pred_churn_metric to Aqueduct. The URL below will +# take you to the Aqueduct UI, which will show you the status +# of your workflow runs and allow you to inspect them. +churn_flow = client.publish_flow( + name="Demo Churn Ensemble", + artifacts=[churn_table, avg_pred_churn_metric], + # Uncomment the following line to schedule on a hourly basis. + # schedule=aq.hourly(), +) +print(churn_flow.id()) \ No newline at end of file diff --git a/manual_qa_tests/initialize.py b/manual_qa_tests/initialize.py index ee2e769f0..056d50d79 100644 --- a/manual_qa_tests/initialize.py +++ b/manual_qa_tests/initialize.py @@ -1,7 +1,8 @@ import argparse - import deploy_example + from aqueduct.constants.enums import NotificationLevel +from multiprocessing import Process from notification import connect_slack from wait_for_flows import wait_for_all_flows_to_complete from workflows import ( @@ -16,6 +17,8 @@ warning_bad_check, ) +from workflows.check_status_test import hello + import aqueduct as aq # when adding new deployments, keep the order of `fail`, `warning`, and `succeed` @@ -47,6 +50,12 @@ TEMP_NOTEBOOK_PATH = "temp.py" RUN_NOTEBOOK_SCRIPT = "examples/run_notebook.py" + +def deploy_flow(name, deploy_fn, api_key, address, data_integration) -> None: + print(f"Deploying {name}...") + client = aq.Client(api_key, address) + deploy_fn(client, data_integration) + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--addr", default="localhost:8080") @@ -63,7 +72,6 @@ args = parser.parse_args() api_key = args.api_key if args.api_key else aq.get_apikey() - client = aq.Client(api_key, args.addr) if args.slack_token and args.slack_channel: connect_slack( @@ -89,9 +97,14 @@ ) if not args.example_notebooks_only and not args.demo_container_notebooks_only: + processes = [] for pkg in WORKFLOW_PKGS: - print(f"Deploying {pkg.NAME}...") - pkg.deploy(client, args.data_integration) + p = Process(target=deploy_flow, args=(pkg.NAME, pkg.deploy, api_key, args.addr, args.data_integration)) + processes.append(p) + p.start() + + for p in processes: + p.join() if args.wait_to_complete: wait_for_all_flows_to_complete(client) diff --git a/manual_qa_tests/workflows/check_status_test.py b/manual_qa_tests/workflows/check_status_test.py index 7de362c0c..851225d18 100644 --- a/manual_qa_tests/workflows/check_status_test.py +++ b/manual_qa_tests/workflows/check_status_test.py @@ -70,3 +70,6 @@ def deploy(client, integration_name): pass_level_error_artf, ], ) + +def hello(): + return "hello" \ No newline at end of file From 43c55139c4f9c9d4fda50717bc26a5d29e871451 Mon Sep 17 00:00:00 2001 From: kenxu95 Date: Thu, 6 Apr 2023 12:16:43 -0700 Subject: [PATCH 2/4] done --- manual_qa_tests/initialize.py | 46 +++++++++++++------ .../workflows/check_status_test.py | 3 -- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/manual_qa_tests/initialize.py b/manual_qa_tests/initialize.py index 056d50d79..84034a3a1 100644 --- a/manual_qa_tests/initialize.py +++ b/manual_qa_tests/initialize.py @@ -17,8 +17,6 @@ warning_bad_check, ) -from workflows.check_status_test import hello - import aqueduct as aq # when adding new deployments, keep the order of `fail`, `warning`, and `succeed` @@ -51,11 +49,23 @@ RUN_NOTEBOOK_SCRIPT = "examples/run_notebook.py" +def deploy_example_notebook(deploy_fn, dir_path, notebook_name, api_key, address) -> None: + print(f"Deploying example notebooks {notebook_name}...") + deploy_fn( + dir_path, + notebook_name, + TEMP_NOTEBOOK_PATH, + address, + api_key, + ) + + def deploy_flow(name, deploy_fn, api_key, address, data_integration) -> None: print(f"Deploying {name}...") client = aq.Client(api_key, address) deploy_fn(client, data_integration) + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--addr", default="localhost:8080") @@ -69,9 +79,11 @@ def deploy_flow(name, deploy_fn, api_key, address, data_integration) -> None: parser.add_argument("--slack-channel", default="") parser.add_argument("--notification-level", default="success") parser.add_argument("--wait-to-complete", action="store_true") + parser.add_argument("--single-threaded", action="store_true") args = parser.parse_args() api_key = args.api_key if args.api_key else aq.get_apikey() + client = aq.Client(api_key, args.addr) if args.slack_token and args.slack_channel: connect_slack( @@ -81,28 +93,32 @@ def deploy_flow(name, deploy_fn, api_key, address, data_integration) -> None: NotificationLevel(args.notification_level), ) + # This is only populated in the default multi-process case. + processes = [] + if args.example_notebooks or args.example_notebooks_only or args.demo_container_notebooks_only: notebooks = DEMO_NOTEBOOKS_PATHS if not args.demo_container_notebooks_only: notebooks += ADDITIONAL_EXAMPLE_NOTEBOOKS_PATHS for example_path in notebooks: - print(f"Deploying example notebooks {example_path[1]}...") - deploy_example.deploy( - example_path[0], - example_path[1], - TEMP_NOTEBOOK_PATH, - args.addr, - api_key, - ) + if args.single_threaded: + deploy_example_notebook(deploy_example.deploy, example_path[0], example_path[1], api_key, args.addr) + else: + p = Process(target=deploy_example_notebook, args=(deploy_example.deploy, example_path[0], example_path[1], api_key, args.addr)) + processes.append(p) + p.start() if not args.example_notebooks_only and not args.demo_container_notebooks_only: - processes = [] for pkg in WORKFLOW_PKGS: - p = Process(target=deploy_flow, args=(pkg.NAME, pkg.deploy, api_key, args.addr, args.data_integration)) - processes.append(p) - p.start() - + if args.single_threaded: + deploy_flow(pkg.NAME, pkg.deploy, api_key, args.addr, args.data_integration) + else: + p = Process(target=deploy_flow, args=(pkg.NAME, pkg.deploy, api_key, args.addr, args.data_integration)) + processes.append(p) + p.start() + + if len(processes) > 0: for p in processes: p.join() diff --git a/manual_qa_tests/workflows/check_status_test.py b/manual_qa_tests/workflows/check_status_test.py index 851225d18..7de362c0c 100644 --- a/manual_qa_tests/workflows/check_status_test.py +++ b/manual_qa_tests/workflows/check_status_test.py @@ -70,6 +70,3 @@ def deploy(client, integration_name): pass_level_error_artf, ], ) - -def hello(): - return "hello" \ No newline at end of file From 3958a9e7010e3dc3d797f9dc94df32c43b2aeab1 Mon Sep 17 00:00:00 2001 From: kenxu95 Date: Thu, 6 Apr 2023 12:16:53 -0700 Subject: [PATCH 3/4] lint --- examples/churn_prediction/temp.py | 3 ++- manual_qa_tests/initialize.py | 24 +++++++++++++++++++----- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/examples/churn_prediction/temp.py b/examples/churn_prediction/temp.py index 214f863aa..d630478aa 100644 --- a/examples/churn_prediction/temp.py +++ b/examples/churn_prediction/temp.py @@ -1,6 +1,7 @@ print("Cell 0") -import pandas as pd import numpy as np +import pandas as pd + import aqueduct as aq # Read some customer data from the Aqueduct repo. diff --git a/manual_qa_tests/initialize.py b/manual_qa_tests/initialize.py index 84034a3a1..7c068a809 100644 --- a/manual_qa_tests/initialize.py +++ b/manual_qa_tests/initialize.py @@ -1,8 +1,8 @@ import argparse -import deploy_example +from multiprocessing import Process +import deploy_example from aqueduct.constants.enums import NotificationLevel -from multiprocessing import Process from notification import connect_slack from wait_for_flows import wait_for_all_flows_to_complete from workflows import ( @@ -103,9 +103,20 @@ def deploy_flow(name, deploy_fn, api_key, address, data_integration) -> None: for example_path in notebooks: if args.single_threaded: - deploy_example_notebook(deploy_example.deploy, example_path[0], example_path[1], api_key, args.addr) + deploy_example_notebook( + deploy_example.deploy, example_path[0], example_path[1], api_key, args.addr + ) else: - p = Process(target=deploy_example_notebook, args=(deploy_example.deploy, example_path[0], example_path[1], api_key, args.addr)) + p = Process( + target=deploy_example_notebook, + args=( + deploy_example.deploy, + example_path[0], + example_path[1], + api_key, + args.addr, + ), + ) processes.append(p) p.start() @@ -114,7 +125,10 @@ def deploy_flow(name, deploy_fn, api_key, address, data_integration) -> None: if args.single_threaded: deploy_flow(pkg.NAME, pkg.deploy, api_key, args.addr, args.data_integration) else: - p = Process(target=deploy_flow, args=(pkg.NAME, pkg.deploy, api_key, args.addr, args.data_integration)) + p = Process( + target=deploy_flow, + args=(pkg.NAME, pkg.deploy, api_key, args.addr, args.data_integration), + ) processes.append(p) p.start() From 927a14459ae6c1add7a48f2ee5357686db30a8c3 Mon Sep 17 00:00:00 2001 From: kenxu95 Date: Thu, 6 Apr 2023 12:18:49 -0700 Subject: [PATCH 4/4] done --- examples/churn_prediction/temp.py | 203 ------------------------------ 1 file changed, 203 deletions(-) delete mode 100644 examples/churn_prediction/temp.py diff --git a/examples/churn_prediction/temp.py b/examples/churn_prediction/temp.py deleted file mode 100644 index d630478aa..000000000 --- a/examples/churn_prediction/temp.py +++ /dev/null @@ -1,203 +0,0 @@ -print("Cell 0") -import numpy as np -import pandas as pd - -import aqueduct as aq - -# Read some customer data from the Aqueduct repo. -customers_table = pd.read_csv( - "https://raw.githubusercontent.com/aqueducthq/aqueduct/main/examples/churn_prediction/data/customers.csv" -) -churn_table = pd.read_csv( - "https://raw.githubusercontent.com/aqueducthq/aqueduct/main/examples/churn_prediction/data/churn_data.csv" -) -pd.merge(customers_table, churn_table, on="cust_id").head() - - -print("Cell 1") -# The @op decorator here allows Aqueduct to run this function as -# a part of an Aqueduct workflow. It tells Aqueduct that when -# we execute this function, we're defining a step in the workflow. -# While the results can be retrieved immediately, nothing is -# published until we call `publish_flow()` below. -@aq.op -def log_featurize(cust: pd.DataFrame) -> pd.DataFrame: - """ - log_featurize takes in customer data from the Aqueduct customers table - and log normalizes the numerical columns using the numpy.log function. - It skips the cust_id, using_deep_learning, and using_dbt columns because - these are not numerical columns that require regularization. - - log_featurize adds all the log-normalized values into new columns, and - maintains the original values as-is. In addition to the original company_size - column, log_featurize will add a log_company_size column. - """ - features = cust.copy() - skip_cols = ["cust_id", "using_deep_learning", "using_dbt"] - - for col in features.columns.difference(skip_cols): - features["log_" + col] = np.log(features[col] + 1.0) - - return features.drop(columns="cust_id") - - -print("Cell 2") -# Calling `.local()` on an @op-annotated function allows us to execute the -# function locally for testing purposes. When a function is called with -# `.local()`, Aqueduct does not capture the function execution as a part of -# the definition of a workflow. -features_table = log_featurize.local(customers_table) -features_table.head() - - -print("Cell 3") -from sklearn.linear_model import LogisticRegression - -linear_model = LogisticRegression(max_iter=10000) -linear_model.fit(features_table, churn_table["churn"]) - - -print("Cell 4") -from sklearn.tree import DecisionTreeClassifier - -decision_tree_model = DecisionTreeClassifier(max_depth=10, min_samples_split=3) -decision_tree_model.fit(features_table, churn_table["churn"]) - - -print("Cell 5") -@aq.op -def predict_linear(features_table): - """ - Generates predictions using the logistic regression model and - returns a new DataFrame with a column called linear that has - the likelihood of the customer churning. - """ - return pd.DataFrame({"linear": linear_model.predict_proba(features_table)[:, 1]}) - -@aq.op -def predict_tree(features_table): - """ - Generates predictions using the decision tree model and - returns a new DataFrame with a column called tree that has - the likelihood of the customer churning. - """ - return pd.DataFrame({"tree": decision_tree_model.predict_proba(features_table)[:, 1]}) - -@aq.op -def predict_ensemble(customers_table, linear_pred_table, tree_pred_table): - """ - predict_ensemble combines the results from our logistic regression - and decision tree models by taking the average of the two models' - probabilities that a user might churn. The resulting average is - then assigned into the `prob_churn` column on the customers_table. - """ - return customers_table.assign(prob_churn=linear_pred_table.join(tree_pred_table).mean(axis=1)) - - -print("Cell 6") -features_table = log_featurize.local(customers_table) -linear_pred_table = predict_linear.local(features_table) -tree_pred_table = predict_tree.local(features_table) -churn_table = predict_ensemble.local(customers_table, linear_pred_table, tree_pred_table) - - -print("Cell 7") -churn_table.head() - - -print("Cell 8") -# If you're running your notebook on a separate machine from your -# Aqueduct server, change this to the address of your Aqueduct server. -address = "localhost:8080" - -# If you're running youre notebook on a separate machine from your -# Aqueduct server, you will have to copy your API key here rather than -# using `get_apikey()`. -api_key = "09LOAH7CW3MDUVGQF5JP62BRK1ZX8INS" -client = aq.Client(api_key, address) - - -print("Cell 9") -warehouse = client.integration(name="aqueduct_demo") - -# customers_table is an Aqueduct TableArtifact, which is a wrapper around -# a Pandas DataFrame. A TableArtifact can be used as argument to any operator -# in a workflow; you can also call .get() on a TableArtifact to retrieve -# the underlying DataFrame and interact with it directly. -customers_table = warehouse.sql(query="SELECT * FROM customers;") -print(type(customers_table)) - - -print("Cell 10") -# This gets the head of the underlying DataFrame. Note that you can't -# pass a DataFrame as an argument to a workflow; you must use the Aqueduct -# TableArtifact! -customers_table.get().head() - - -print("Cell 11") -features_table = log_featurize(customers_table) -print(type(features_table)) - - -print("Cell 12") -features_table.get().head() - - -print("Cell 13") -linear_pred_table = predict_linear(features_table) -tree_pred_table = predict_tree(features_table) -churn_table = predict_ensemble(customers_table, linear_pred_table, tree_pred_table) - - -print("Cell 14") -churn_table.get().head() - - -print("Cell 15") -@aq.check(description="Ensuring valid probabilities.") -def valid_probabilities(df: pd.DataFrame): - return (df["prob_churn"] >= 0) & (df["prob_churn"] <= 1) - - -print("Cell 16") -check_result = valid_probabilities(churn_table) - - -print("Cell 17") -# Use Aqueduct's built-in mean metric to calculate the average value of `prob_churn`. -# Calling .get() on the metric will retrieve the current value. -avg_pred_churn_metric = churn_table.mean("prob_churn") -avg_pred_churn_metric.get() - - -print("Cell 18") -# Bounds on metrics ensure that the metric stays within a valid range. -# In this case, we'd ideally like churn to be between .1 and .3, and we -# know something's gone wrong if it's above .4. -avg_pred_churn_metric.bound(lower=0.1) -avg_pred_churn_metric.bound(upper=0.3) -avg_pred_churn_metric.bound(upper=0.4, severity="error") - - -print("Cell 19") -# This tells Aqueduct to save the results in churn_table -# back to the demo DB we configured earlier. -# NOTE: At this point, no data is actually saved! This is just -# part of a workflow spec that will be executed once the workflow -# is published in the next cell. -warehouse.save(churn_table, table_name="pred_churn", update_mode="replace") - - -print("Cell 20") -# This publishes all of the logic needed to create churn_table -# and avg_pred_churn_metric to Aqueduct. The URL below will -# take you to the Aqueduct UI, which will show you the status -# of your workflow runs and allow you to inspect them. -churn_flow = client.publish_flow( - name="Demo Churn Ensemble", - artifacts=[churn_table, avg_pred_churn_metric], - # Uncomment the following line to schedule on a hourly basis. - # schedule=aq.hourly(), -) -print(churn_flow.id()) \ No newline at end of file