Skip to content

Commit

Permalink
Eng 1345 create api endpoint to get tables associated for workflow_id (
Browse files Browse the repository at this point in the history
…#216)

* Add route url

* first draft

* Draft 2 -- custom sql

* SQL return all operators

* distinct load list

* lint

* Custom response struct

* package-lock update

* lint

* table_artifact merge conflict

* lint

* black

* Right version of black

* ENG-1358 Set up backend integration tests & include in Github Actions (#214)
  • Loading branch information
eunice-chan authored Jul 19, 2022
1 parent ef15abb commit df986f3
Show file tree
Hide file tree
Showing 19 changed files with 449 additions and 61 deletions.
9 changes: 8 additions & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,21 @@ jobs:
- name: Fetch the API key
run: echo "API_KEY=$(aqueduct apikey)" >> $GITHUB_ENV

- name: Run the Integration Tests
- name: Run the SDK Integration Tests
timeout-minutes: 10
working-directory: integration_tests/sdk
env:
SERVER_ADDRESS: localhost:8080
INTEGRATION: aqueduct_demo
run: pytest . -rP --publish -n 1

- name: Run the Backend Integration Tests
timeout-minutes: 10
working-directory: integration_tests/backend
env:
SERVER_ADDRESS: localhost:8080
run: pytest . -rP -n 1

- uses: actions/upload-artifact@v3
if: always()
with:
Expand Down
29 changes: 29 additions & 0 deletions integration_tests/backend/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Backend Integration Tests

These tests are run against the Aqueduct backend to check the endpoints' reads and outputs are as expected.

The `setup_class` sets up all the workflows which are read by each test. When all the tests in the suite are done, the workflows set up in `setup_class` are deleted in the `teardown_class`.

## Creating Tests
The workflows ran in setup tests are all the Python files in the `setup/` folder. Each Python file is called in the format `{python_file} {api_key} {server_address}`. At the top, you can parse those arguments like so:
```
import sys
api_key, server_address = sys.argv[1], sys.argv[2]
```
After that, you can write a workflow as you would do as a typical user.
At the very end, the tests **require** you to print the flow id (e.g. `print(flow.id())`). This is parsed by the suite setup function and saved to a list of flow ids. At the end of testing, the teardown function will iterate through the flow ids and delete the associated workflows.

## Usage

Running all the tests in this repo:
`API_KEY=<your api key> SERVER_ADDRESS=<your server's address> pytest . -rP`

Running all the tests in a single file:
- `<your env variables> pytest <path to test file> -rP`

Running a specific test:
- `<your env variables> pytest . -rP -k '<specific test name>'`

Running tests in parallel, with concurrency 5:
- Install pytest-xdist
- `<your env variables> pytest . -rP -n 5`
18 changes: 18 additions & 0 deletions integration_tests/backend/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os

import pytest

import aqueduct

API_KEY_ENV_NAME = "API_KEY"
SERVER_ADDR_ENV_NAME = "SERVER_ADDRESS"


def pytest_configure(config):
pytest.api_key = os.getenv(API_KEY_ENV_NAME)
pytest.server_address = os.getenv(SERVER_ADDR_ENV_NAME)

if pytest.api_key is None or pytest.server_address is None:
raise Exception(
"Test Setup Error: api_key and server_address must be set as environmental variables."
)
64 changes: 64 additions & 0 deletions integration_tests/backend/setup/changing_saves.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import sys

api_key, server_address = sys.argv[1], sys.argv[2]

###
# Workflow that loads a table from the `aqueduct_demo` then saves it to `table_1` in append mode.
# This save operator is then replaced by one that saves to `table_1` in replace mode.
# In the next deployment of this run, it saves to `table_1` in append mode.
# In the last deployment, it saves to `table_2` in append mode.
###

import aqueduct

name = "Test: Changing Saves"
client = aqueduct.Client(api_key, server_address)
integration = client.integration(name="aqueduct_demo")

###

table = integration.sql(query="SELECT * FROM wine;")

table.save(integration.config(table="table_1", update_mode="append"))

flow = client.publish_flow(
name=name,
artifacts=[table],
)

###

table = integration.sql(query="SELECT * FROM wine;")

table.save(integration.config(table="table_1", update_mode="replace"))

flow = client.publish_flow(
name=name,
artifacts=[table],
)

###

table = integration.sql(query="SELECT * FROM wine;")

table.save(integration.config(table="table_1", update_mode="append"))

flow = client.publish_flow(
name=name,
artifacts=[table],
)

###

table = integration.sql(query="SELECT * FROM wine;")

table.save(integration.config(table="table_2", update_mode="append"))

flow = client.publish_flow(
name=name,
artifacts=[table],
)

###

print(flow.id())
76 changes: 76 additions & 0 deletions integration_tests/backend/test_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import os
import subprocess
import sys
from pathlib import Path

import pytest
import requests

import aqueduct


class TestBackend:
GET_WORKFLOW_TABLES_TEMPLATE = "/api/workflow/%s/tables"
WORKFLOW_PATH = Path(__file__).parent / "setup"

@classmethod
def setup_class(cls):
cls.client = aqueduct.Client(pytest.api_key, pytest.server_address)
cls.flows = {}

workflow_files = [
f
for f in os.listdir(cls.WORKFLOW_PATH)
if os.path.isfile(os.path.join(cls.WORKFLOW_PATH, f))
]
for workflow in workflow_files:
proc = subprocess.Popen(
[
"python3",
os.path.join(cls.WORKFLOW_PATH, workflow),
pytest.api_key,
pytest.server_address,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
out, err = proc.communicate()
out = out.decode("utf-8")
err = err.decode("utf-8")
if err:
raise Exception(f"Could not run workflow {workflow}.\n\n{err}")
else:
cls.flows[workflow] = out.strip().split()[-1]

@classmethod
def teardown_class(cls):
for flow in cls.flows:
cls.client.delete_flow(cls.flows[flow])

@classmethod
def get_response_class(cls, endpoint, additional_headers={}):
headers = {"api-key": pytest.api_key}
headers.update(additional_headers)
url = cls.client._api_client.construct_full_url(endpoint)
r = requests.get(url, headers=headers)
return r

def test_endpoint_getworkflowtables(self):
endpoint = self.GET_WORKFLOW_TABLES_TEMPLATE % self.flows["changing_saves.py"]
data = self.get_response_class(endpoint).json()["table_details"]

assert len(data) == 3

# table_name, update_mode
data_set = set(
[
("table_1", "append"),
("table_1", "replace"),
("table_2", "append"),
]
)
assert set([(item["table_name"], item["update_mode"]) for item in data]) == data_set

# Check all in same integration
assert len(set([item["integration_id"] for item in data])) == 1
assert len(set([item["service"] for item in data])) == 1
8 changes: 6 additions & 2 deletions scripts/install_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,13 @@ def execute_command(args, cwd=None):
# directory /home/ec2-user/SageMaker exists. This is hacky but we couldn't find a better
# solution at the moment.
if isdir(join(os.sep, "home", "ec2-user", "SageMaker")):
shutil.copytree(join(cwd, "src", "ui", "app" ,"dist", "sagemaker"), ui_directory, dirs_exist_ok=True)
shutil.copytree(
join(cwd, "src", "ui", "app", "dist", "sagemaker"), ui_directory, dirs_exist_ok=True
)
else:
shutil.copytree(join(cwd, "src", "ui", "app" ,"dist", "default"), ui_directory, dirs_exist_ok=True)
shutil.copytree(
join(cwd, "src", "ui", "app", "dist", "default"), ui_directory, dirs_exist_ok=True
)

# Install the local SDK.
if args.update_sdk:
Expand Down
53 changes: 29 additions & 24 deletions sdk/aqueduct/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def __init__(self, api_key: str, aqueduct_address: str):
self.api_key = api_key
self.aqueduct_address = aqueduct_address

# Clean URL
if self.aqueduct_address.endswith("/"):
self.aqueduct_address = self.aqueduct_address[:-1]

# If a dummy client is initialized, don't perform validation.
if self.api_key == "" and self.aqueduct_address == "":
Logger.logger.info(
Expand All @@ -98,9 +102,16 @@ def __init__(self, api_key: str, aqueduct_address: str):
else:
self.use_https = self._test_connection_protocol(try_http=True, try_https=True)

def _construct_full_url(self, route_suffix: str, use_https: bool) -> str:
def construct_base_url(self, use_https: Optional[bool] = None) -> str:
if use_https is None:
use_https = self.use_https
protocol_prefix = self.HTTPS_PREFIX if use_https else self.HTTP_PREFIX
return "%s%s%s" % (protocol_prefix, self.aqueduct_address, route_suffix)
return "%s%s" % (protocol_prefix, self.aqueduct_address)

def construct_full_url(self, route_suffix: str, use_https: Optional[bool] = None) -> str:
if use_https is None:
use_https = self.use_https
return "%s%s" % (self.construct_base_url(use_https), route_suffix)

def _test_connection_protocol(self, try_http: bool, try_https: bool) -> bool:
"""Returns whether the connection uses https. Raises an exception if unable to connect at all.
Expand All @@ -111,7 +122,7 @@ def _test_connection_protocol(self, try_http: bool, try_https: bool) -> bool:

if try_https:
try:
url = self._construct_full_url(self.LIST_INTEGRATIONS_ROUTE, use_https=True)
url = self.construct_full_url(self.LIST_INTEGRATIONS_ROUTE, use_https=True)
self._test_url(url)
return True
except Exception as e:
Expand All @@ -121,7 +132,7 @@ def _test_connection_protocol(self, try_http: bool, try_https: bool) -> bool:

if try_http:
try:
url = self._construct_full_url(self.LIST_INTEGRATIONS_ROUTE, use_https=False)
url = self.construct_full_url(self.LIST_INTEGRATIONS_ROUTE, use_https=False)
self._test_url(url)
return False
except Exception as e:
Expand All @@ -147,7 +158,7 @@ def url_prefix(self) -> str:
return self.HTTPS_PREFIX if self.use_https else self.HTTP_PREFIX

def list_integrations(self) -> Dict[str, IntegrationInfo]:
url = self._construct_full_url(self.LIST_INTEGRATIONS_ROUTE, self.use_https)
url = self.construct_full_url(self.LIST_INTEGRATIONS_ROUTE)
headers = utils.generate_auth_headers(self.api_key)
resp = requests.get(url, headers=headers)
utils.raise_errors(resp)
Expand All @@ -162,22 +173,22 @@ def list_integrations(self) -> Dict[str, IntegrationInfo]:
}

def list_github_repos(self) -> List[str]:
url = self._construct_full_url(self.LIST_GITHUB_REPO_ROUTE, self.use_https)
url = self.construct_full_url(self.LIST_GITHUB_REPO_ROUTE)
headers = utils.generate_auth_headers(self.api_key)

resp = requests.get(url, headers=headers)
return [x for x in resp.json()["repos"]]

def list_github_branches(self, repo_url: str) -> List[str]:
url = self._construct_full_url(self.LIST_GITHUB_BRANCH_ROUTE, self.use_https)
url = self.construct_full_url(self.LIST_GITHUB_BRANCH_ROUTE)
headers = utils.generate_auth_headers(self.api_key)
headers["github-repo"] = repo_url

resp = requests.get(url, headers=headers)
return [x for x in resp.json()["branches"]]

def list_tables(self, limit: int) -> List[Tuple[str, str]]:
url = self._construct_full_url(self.LIST_TABLES_ROUTE, self.use_https)
url = self.construct_full_url(self.LIST_TABLES_ROUTE)
headers = utils.generate_auth_headers(self.api_key)
headers["limit"] = str(limit)
resp = requests.get(url, headers=headers)
Expand Down Expand Up @@ -212,7 +223,7 @@ def preview(
if file:
files[str(op.id)] = io.BytesIO(file)

url = self._construct_full_url(self.PREVIEW_ROUTE, self.use_https)
url = self.construct_full_url(self.PREVIEW_ROUTE)
resp = requests.post(url, headers=headers, data=body, files=files)
utils.raise_errors(resp)

Expand Down Expand Up @@ -241,7 +252,7 @@ def register_workflow(
if file:
files[str(op.id)] = io.BytesIO(file)

url = self._construct_full_url(self.REGISTER_WORKFLOW_ROUTE, self.use_https)
url = self.construct_full_url(self.REGISTER_WORKFLOW_ROUTE)
resp = requests.post(url, headers=headers, data=body, files=files)
utils.raise_errors(resp)

Expand All @@ -253,9 +264,7 @@ def refresh_workflow(
serialized_params: Optional[str] = None,
) -> None:
headers = utils.generate_auth_headers(self.api_key)
url = self._construct_full_url(
self.REFRESH_WORKFLOW_ROUTE_TEMPLATE % flow_id, self.use_https
)
url = self.construct_full_url(self.REFRESH_WORKFLOW_ROUTE_TEMPLATE % flow_id)

body = {}
if serialized_params is not None:
Expand All @@ -266,23 +275,21 @@ def refresh_workflow(

def delete_workflow(self, flow_id: str) -> None:
headers = utils.generate_auth_headers(self.api_key)
url = self._construct_full_url(
self.DELETE_WORKFLOW_ROUTE_TEMPLATE % flow_id, self.use_https
)
url = self.construct_full_url(self.DELETE_WORKFLOW_ROUTE_TEMPLATE % flow_id)
response = requests.post(url, headers=headers)
utils.raise_errors(response)

def get_workflow(self, flow_id: str) -> GetWorkflowResponse:
headers = utils.generate_auth_headers(self.api_key)
url = self._construct_full_url(self.GET_WORKFLOW_ROUTE_TEMPLATE % flow_id, self.use_https)
url = self.construct_full_url(self.GET_WORKFLOW_ROUTE_TEMPLATE % flow_id)
resp = requests.get(url, headers=headers)
utils.raise_errors(resp)
workflow_response = GetWorkflowResponse(**resp.json())
return workflow_response

def list_workflows(self) -> List[ListWorkflowResponseEntry]:
headers = utils.generate_auth_headers(self.api_key)
url = self._construct_full_url(self.LIST_WORKFLOWS_ROUTE, self.use_https)
url = self.construct_full_url(self.LIST_WORKFLOWS_ROUTE)
response = requests.get(url, headers=headers)
utils.raise_errors(response)

Expand All @@ -291,8 +298,8 @@ def list_workflows(self) -> List[ListWorkflowResponseEntry]:
def get_artifact_result_data(self, dag_result_id: str, artifact_id: str) -> str:
"""Returns an empty string if the artifact failed to be computed."""
headers = utils.generate_auth_headers(self.api_key)
url = self._construct_full_url(
self.GET_ARTIFACT_RESULT_TEMPLATE % (dag_result_id, artifact_id), self.use_https
url = self.construct_full_url(
self.GET_ARTIFACT_RESULT_TEMPLATE % (dag_result_id, artifact_id)
)
resp = requests.get(url, headers=headers)
utils.raise_errors(resp)
Expand All @@ -318,7 +325,7 @@ def get_node_positions(
Two mappings of UUIDs to positions, structured as a dictionary with the keys "x" and "y".
The first mapping is for operators and the second is for artifacts.
"""
url = self._construct_full_url(self.NODE_POSITION_ROUTE, self.use_https)
url = self.construct_full_url(self.NODE_POSITION_ROUTE)
headers = {
**utils.generate_auth_headers(self.api_key),
}
Expand All @@ -332,8 +339,6 @@ def get_node_positions(

def export_serialized_function(self, operator: Operator) -> bytes:
headers = utils.generate_auth_headers(self.api_key)
operator_url = self._construct_full_url(
self.EXPORT_FUNCTION_ROUTE % str(operator.id), self.use_https
)
operator_url = self.construct_full_url(self.EXPORT_FUNCTION_ROUTE % str(operator.id))
operator_resp = requests.get(operator_url, headers=headers)
return operator_resp.content
Loading

0 comments on commit df986f3

Please sign in to comment.