Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change aqueduct_demo name to Demo #1289

Merged
merged 6 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/churn_prediction/Customer Churn Prediction.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@
}
],
"source": [
"warehouse = client.resource(name=\"aqueduct_demo\")\n",
"warehouse = client.resource(name=\"Demo\")\n",
"\n",
"# customers_table is an Aqueduct TableArtifact, which is a wrapper around\n",
"# a Pandas DataFrame. A TableArtifact can be used as argument to any operator\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,7 @@
}
],
"source": [
"demodb = client.resource(\"aqueduct_demo\")\n",
"demodb = client.resource(\"Demo\")\n",
"\n",
"# mpg_data is an Aqueduct TableArtifact, which is a wrapper around\n",
"# a Pandas DataFrame. A TableArtifact can be used as argument to any operator\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,7 @@
}
],
"source": [
"demo_db = client.resource(\"aqueduct_demo\")\n",
"demo_db = client.resource(\"Demo\")\n",
"raw_data = demo_db.sql(\"select * from house_prices;\")\n",
"\n",
"filled_data = fill_missing_data(raw_data)\n",
Expand Down
6 changes: 3 additions & 3 deletions examples/mpg-regressor/Predicting MPG.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"id": "3ec5e482",
"metadata": {},
"source": [
"Once we have our client, the first thing we'll do is load our data. Aqueduct has the ability to most common databases and storage systems (check out the Resources page on the Aqueduct UI). Here, we'll load a connection to the default `aqueduct_demo` database, which comes preloaded with a number of [common datasets](https://docs.aqueducthq.com/example-workflows/demo-data-warehouse). \n",
"Once we have our client, the first thing we'll do is load our data. Aqueduct has the ability to most common databases and storage systems (check out the Resources page on the Aqueduct UI). Here, we'll load a connection to the default `Demo` database, which comes preloaded with a number of [common datasets](https://docs.aqueducthq.com/example-workflows/demo-data-warehouse). \n",
"\n",
"Once we have a connection to the demo DB, we can run a SQL query to retrieve our base data."
]
Expand All @@ -73,7 +73,7 @@
}
],
"source": [
"demodb = client.resource(\"aqueduct_demo\")\n",
"demodb = client.resource(\"Demo\")\n",
"\n",
"# mpg_data is an Aqueduct TableArtifact, which is a wrapper around\n",
"# a Pandas DataFrame. A TableArtifact can be used as argument to any operator\n",
Expand Down Expand Up @@ -1735,7 +1735,7 @@
"id": "7f4f4c46",
"metadata": {},
"source": [
"The last line above calls `.save()` on the `predicted_mpg` table. This tells Aqueduct that the results of `predicted_mpg` should be written to a database (in this case the `aqueduct_demo` DB we accessed earlier) into a table called `predicted_mpg`.\n",
"The last line above calls `.save()` on the `predicted_mpg` table. This tells Aqueduct that the results of `predicted_mpg` should be written to a database (in this case the `Demo` DB we accessed earlier) into a table called `predicted_mpg`.\n",
"\n",
"Now that we've defined our pipeline, we can call `.get()` on `predicted_mpg` to ensure that the pipeline executed successfully. Here, we can verify that our `predicted_mpg` matches what we computed locally:"
]
Expand Down
2 changes: 1 addition & 1 deletion examples/sentiment-analysis/Sentiment Model.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
"metadata": {},
"outputs": [],
"source": [
"warehouse = client.resource(\"aqueduct_demo\")\n",
"warehouse = client.resource(\"Demo\")\n",
"\n",
"# reviews_table is an Aqueduct TableArtifact, which is a wrapper around\n",
"# a Pandas DataFrame. A TableArtifact can be used as argument to any operator\n",
Expand Down
2 changes: 1 addition & 1 deletion examples/tutorials/Parameters Tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
"metadata": {},
"outputs": [],
"source": [
"db = client.resource(\"aqueduct_demo\")\n",
"db = client.resource(\"Demo\")\n",
"\n",
"# reviews_table is an Aqueduct TableArtifact, which is a wrapper around\n",
"# a Pandas DataFrame. A TableArtifact can be used as argument to any operator\n",
Expand Down
4 changes: 2 additions & 2 deletions examples/tutorials/Quickstart Tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
"---\n",
"### Accessing Data\n",
"\n",
"The base data for our workflow is the [hotel reviews](https://docs.aqueducthq.com/integrations/aqueduct-demo-integration) dataset in the pre-built aqueduct_demo that comes with the Aqueduct server. This code does two things -- (1) it loads a connection to the demo database, and (2) it runs a SQL query against that DB and returns a pointer to the resulting dataset."
"The base data for our workflow is the [hotel reviews](https://docs.aqueducthq.com/integrations/aqueduct-demo-integration) dataset in the pre-built Demo that comes with the Aqueduct server. This code does two things -- (1) it loads a connection to the demo database, and (2) it runs a SQL query against that DB and returns a pointer to the resulting dataset."
]
},
{
Expand Down Expand Up @@ -222,7 +222,7 @@
}
],
"source": [
"demo_db = client.resource(\"aqueduct_demo\")\n",
"demo_db = client.resource(\"Demo\")\n",
"reviews_table = demo_db.sql(\"select * from hotel_reviews;\")\n",
"\n",
"# You will see the type of `reviews_table` is an Aqueduct TableArtifact.\n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
"metadata": {},
"outputs": [],
"source": [
"demodb = client.resource(\"aqueduct_demo\")\n",
"demodb = client.resource(\"Demo\")\n",
"\n",
"# wines is an Aqueduct TableArtifact, which is a wrapper around\n",
"# a Pandas DataFrame. A TableArtifact can be used as argument to any operator\n",
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/no_concurrency/sdk/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@


def get_integration_name() -> str:
return "aqueduct_demo"
return "Demo"
2 changes: 1 addition & 1 deletion integration_tests/notebook/imported_function.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
}
],
"source": [
"warehouse = client.resource(name=\"aqueduct_demo\")\n",
"warehouse = client.resource(name=\"Demo\")\n",
"customers_table = warehouse.sql(query=\"SELECT * FROM customers;\")\n",
"print(type(customers_table))"
]
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/notebook/util_dependency.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"metadata": {},
"outputs": [],
"source": [
"warehouse = client.resource(name=\"aqueduct_demo\")\n",
"warehouse = client.resource(name=\"Demo\")\n",
"customers_table = warehouse.sql(query=\"SELECT * FROM customers;\")\n",
"print(type(customers_table))"
]
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/sdk/aqueduct_tests/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def test_invalid_connect_integration(client):
with pytest.raises(
InvalidUserActionException, match="An integration with this name already exists."
):
client.connect_resource("aqueduct_demo", "SQLite", config)
client.connect_resource("Demo", "SQLite", config)

# Service is invalid.
with pytest.raises(
Expand Down
4 changes: 4 additions & 0 deletions integration_tests/sdk/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ def data_integration(request, pytestconfig, client):
"Skipped. Tests are only running against data integration %s." % cmdline_data_flag
)

# Translate aqueduct_demo -> Demo integration.
if request.param == "aqueduct_demo":
return client.resource("Demo")

return client.resource(request.param)


Expand Down
4 changes: 3 additions & 1 deletion integration_tests/sdk/setup_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def _add_missing_artifacts(
if len(missing_names) == 0:
return

demo = client.resource("aqueduct_demo")
demo = client.resource("Demo")
artifacts: List[BaseArtifact] = []
for table_name in missing_names:
data = _fetch_demo_data(demo, table_name)
Expand Down Expand Up @@ -205,6 +205,8 @@ def setup_data_integrations(client: Client, filter_to: Optional[str] = None) ->
# No need to do any setup for the demo db.
if "aqueduct_demo" in data_integrations:
data_integrations.remove("aqueduct_demo")
if "Demo" in data_integrations:
data_integrations.remove("Demo")

if len(data_integrations) == 0:
return
Expand Down
6 changes: 3 additions & 3 deletions manual_qa_tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ To run with more flexibility, configure the following commandline flags:
* **Workflow Details** Page: Each page should reflect the **workflow description**. Pay attention to any noted **sidesheets** behaviors in the description.
* **Integration** Page:
* There should be **1** *Cloud* integration, **11** *Data* integrations, **6** *Compute* integrations, and **2** *Notifications* integrations.
* If you are not using additional integration, `aqueduct_demo` should be the only available one. If you have passed in slack arguments to `initialize.py`, there should be two.
* If you are not using additional integration, `Demo` should be the only available one. If you have passed in slack arguments to `initialize.py`, there should be two.
* **Integration Details** Page:
* In the **Workflows** section of the `aqueduct_demo` page:
* In the **Workflows** section of the `Demo` page:
* There should be **13** workflows if using `--example-notebooks`
* There should be **7** workflows if **not** using `--example-notebooks`
* If you are using `aqueduct_demo`, there should be **8** tables in **Data** section.
* If you are using `Demo`, there should be **8** tables in **Data** section.
* **Data** Page: There should be **7** data rows available.
* **Slack channel**:
* If Slack flags are set, there should be **15** new notifications.
Expand Down
2 changes: 1 addition & 1 deletion manual_qa_tests/initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def deploy_flow(name, deploy_fn, api_key, address, data_integration) -> None:
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--addr", default="localhost:8080")
parser.add_argument("--data-integration", default="aqueduct_demo")
parser.add_argument("--data-integration", default="Demo")
parser.add_argument("--api-key", default="")
# parser.add_argument("-q", "--quiet", action="store_true")
parser.add_argument("--example-notebooks", action="store_true")
Expand Down
28 changes: 25 additions & 3 deletions sdk/aqueduct/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ def get_apikey() -> str:
exit(1)


DEPRECATED_AQUEDUCT_DEMO_DB_NAME = "aqueduct_demo"
AQUEDUCT_DEMO_DB_NAME = "Demo"


class Client:
"""This class allows users to interact with flows on their Aqueduct cluster."""

Expand Down Expand Up @@ -306,12 +310,19 @@ def delete_resource(
The name of the integration to delete.
"""
existing_integrations = globals.__GLOBAL_API_CLIENT__.list_resources()

# If the user uses the deprecated demo name, and there isn't a resource for this, that means they actually
# want to use the new demo name.
if (
name == DEPRECATED_AQUEDUCT_DEMO_DB_NAME
and DEPRECATED_AQUEDUCT_DEMO_DB_NAME not in existing_integrations.keys()
) or name == AQUEDUCT_DEMO_DB_NAME:
raise InvalidUserActionException("Cannot delete the Aqueduct demo database: %s" % name)
if name not in existing_integrations.keys():
raise InvalidIntegrationException("Not connected to integration %s!" % name)

globals.__GLOBAL_API_CLIENT__.delete_integration(existing_integrations[name].id)

# Update the connected integrations cached on this object.
globals.__GLOBAL_API_CLIENT__.delete_integration(existing_integrations[name].id)
self._connected_integrations = globals.__GLOBAL_API_CLIENT__.list_resources()

def list_integrations(self) -> Dict[str, ResourceInfo]:
Expand Down Expand Up @@ -385,8 +396,19 @@ def resource(
incompatible type.
"""
self._connected_integrations = globals.__GLOBAL_API_CLIENT__.list_resources()
connected_names = self._connected_integrations.keys()

if name == DEPRECATED_AQUEDUCT_DEMO_DB_NAME:
# If the user uses the deprecated demo name, and there isn't a resource for this, that means they actually
# want to use the new demo name. We implicitly convert this for them, with a warning.
if DEPRECATED_AQUEDUCT_DEMO_DB_NAME not in connected_names:
logger().warning(
"`%s` is the deprecated name for the aqueduct demo db. Please use `%s` instead."
% (DEPRECATED_AQUEDUCT_DEMO_DB_NAME, AQUEDUCT_DEMO_DB_NAME)
)
name = AQUEDUCT_DEMO_DB_NAME

if name not in self._connected_integrations.keys():
if name not in connected_names:
raise InvalidIntegrationException("Not connected to integration %s!" % name)

integration_info = self._connected_integrations[name]
Expand Down
2 changes: 0 additions & 2 deletions sdk/aqueduct/constants/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ class ServiceType(str, Enum, metaclass=MetaEnum):
MARIADB = "MariaDB"
SQLSERVER = "SQL Server"
BIGQUERY = "BigQuery"
AQUEDUCTDEMO = "Aqueduct Demo"
GITHUB = "Github"
SALESFORCE = "Salesforce"
GOOGLE_SHEETS = "Google Sheets"
Expand Down Expand Up @@ -93,7 +92,6 @@ class RelationalDBServices(str, Enum, metaclass=MetaEnum):
MARIADB = "MariaDB"
SQLSERVER = "SQL Server"
BIGQUERY = "BigQuery"
AQUEDUCTDEMO = "Aqueduct Demo"
SQLITE = "SQLite"
ATHENA = "Athena"

Expand Down
2 changes: 1 addition & 1 deletion sdk/aqueduct/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def query(
```
Then to use it:
```
warehouse = aqueduct_client.resource(name="aqueduct_demo")
warehouse = aqueduct_client.resource(name="Demo")
gh = aqueduct_client.github(repo=<repo_name>, branch=<branch_name>)
reviews = warehouse.sql(
query=gh.query(path="queries/hotel.sql")
Expand Down
1 change: 0 additions & 1 deletion sdk/aqueduct/models/integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ def is_relational(self) -> bool:
ServiceType.MARIADB,
ServiceType.SQLSERVER,
ServiceType.BIGQUERY,
ServiceType.AQUEDUCTDEMO,
ServiceType.SQLITE,
ServiceType.ATHENA,
]
Expand Down
1 change: 0 additions & 1 deletion sdk/aqueduct/resources/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def list_tables(self) -> pd.DataFrame:

if self.type() in [
ServiceType.POSTGRES,
ServiceType.AQUEDUCTDEMO,
ServiceType.REDSHIFT,
]:
list_tables_query = LIST_TABLES_QUERY_PG
Expand Down
6 changes: 6 additions & 0 deletions src/golang/cmd/server/handler/connect_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@ func (h *ConnectIntegrationHandler) Prepare(r *http.Request) (interface{}, int,
return nil, http.StatusBadRequest, errors.New("Resource name is not provided")
}

// On startup, we currently enforce that such a resource does not exist by forcibly deleting it.
// Therefore, we don't want users to be able to create a resource with this name.
if name == shared.DeprecatedDemoDBResourceName && service == shared.Sqlite {
return nil, http.StatusBadRequest, errors.Newf("%s is a reserved name for SQLite resources.", shared.DeprecatedDemoDBResourceName)
}

if service == shared.Github || service == shared.GoogleSheets {
return nil, http.StatusBadRequest, errors.Newf("%s integration type is currently not supported", service)
}
Expand Down
1 change: 1 addition & 0 deletions src/golang/cmd/server/server/aqueduct_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func NewAqServer(environment aq_context.ServerEnvironment, externalIP string, po
log.Fatal(err)
}

// If the deprecated demo db name still exists in the database, we delete it in this check too.
demoDBConnected, aqEngineConnected, err := CheckBuiltinIntegrations(ctx, s, accountOrganizationId)
if err != nil {
db.Close()
Expand Down
16 changes: 13 additions & 3 deletions src/golang/cmd/server/server/test_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func CreateTestAccount(
}

// CheckBuiltinIntegrations returns whether the builtin demo and compute integrations already exist.
// If we notice that the deprecated demo integration exists, we delete it. We expect the caller to add
// the appropriate demo integration with `ConnectBuiltinDemoDBIntegration()` next.
func CheckBuiltinIntegrations(ctx context.Context, s *AqServer, orgID string) (bool, bool, error) {
integrations, err := s.IntegrationRepo.GetByOrg(
context.Background(),
Expand All @@ -57,10 +59,18 @@ func CheckBuiltinIntegrations(ctx context.Context, s *AqServer, orgID string) (b
demoConnected := false
engineConnected := false
for _, integrationObject := range integrations {
if integrationObject.Name == shared.DemoDbIntegrationName {
if integrationObject.Name == shared.DeprecatedDemoDBResourceName && integrationObject.Service == shared.Sqlite {
if err := s.IntegrationRepo.Delete(
ctx,
integrationObject.ID,
s.Database,
); err != nil {
return false, false, errors.Newf("Unable to delete deprecated demo integration: %v", err)
}
continue
} else if integrationObject.Name == shared.DemoDbIntegrationName {
demoConnected = true
}
if integrationObject.Name == shared.AqueductComputeIntegrationName {
} else if integrationObject.Name == shared.AqueductComputeIntegrationName {
engineConnected = true
}

Expand Down
2 changes: 1 addition & 1 deletion src/golang/lib/job/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ func mapJobTypeToDockerImage(spec Spec, launchGpu bool, cudaVersion operator.Cud

func mapIntegrationServiceToDockerImage(service shared.Service) (string, error) {
switch service {
case shared.Postgres, shared.Redshift, shared.AqueductDemo:
case shared.Postgres, shared.Redshift:
return PostgresConnectorDockerImage, nil
case shared.Snowflake:
return SnowflakeConnectorDockerImage, nil
Expand Down
2 changes: 1 addition & 1 deletion src/golang/lib/job/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func mapIntegrationServiceToLambdaFunction(service shared.Service) (string, erro
switch service {
case shared.Snowflake:
return lambda_utils.SnowflakeLambdaFunction, nil
case shared.Postgres, shared.Redshift, shared.AqueductDemo:
case shared.Postgres, shared.Redshift:
return lambda_utils.PostgresLambdaFunction, nil
case shared.BigQuery:
return lambda_utils.BigQueryLambdaFunction, nil
Expand Down
2 changes: 1 addition & 1 deletion src/golang/lib/models/shared/operator/connector/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (e *Extract) UnmarshalJSON(data []byte) error {
// Initialize correct destination struct for this operator's Extract.Parameters
var params ExtractParams
switch e.Service {
case shared.Postgres, shared.AqueductDemo:
case shared.Postgres:
params = &PostgresExtractParams{}
case shared.Athena:
params = &AthenaExtractParams{}
Expand Down
2 changes: 1 addition & 1 deletion src/golang/lib/models/shared/operator/connector/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (l *Load) UnmarshalJSON(data []byte) error {
// Initialize correct destination struct for this operator's Load.Parameters
var params LoadParams
switch l.Service {
case shared.Postgres, shared.AqueductDemo:
case shared.Postgres:
params = &PostgresLoadParams{}
case shared.Snowflake:
params = &SnowflakeLoadParams{}
Expand Down
Loading