Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENG-1358 Set up backend integration tests & include in Github Actions #214

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,21 @@ jobs:
- name: Fetch the API key
run: echo "API_KEY=$(aqueduct apikey)" >> $GITHUB_ENV

- name: Run the Integration Tests
- name: Run the SDK Integration Tests
timeout-minutes: 10
working-directory: integration_tests/sdk
env:
SERVER_ADDRESS: localhost:8080
INTEGRATION: aqueduct_demo
run: pytest . -rP --publish -n 1

- name: Run the Backend Integration Tests
timeout-minutes: 10
working-directory: integration_tests/backend
env:
SERVER_ADDRESS: localhost:8080
run: pytest . -rP -n 1

- uses: actions/upload-artifact@v3
if: always()
with:
Expand Down
29 changes: 29 additions & 0 deletions integration_tests/backend/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Backend Integration Tests

These tests are run against the Aqueduct backend to check the endpoints' reads and outputs are as expected.

The `setup_class` sets up all the workflows which are read by each test. When all the tests in the suite are done, the workflows set up in `setup_class` are deleted in the `teardown_class`.

## Creating Tests
The workflows ran in setup tests are all the Python files in the `setup/` folder. Each Python file is called in the format `{python_file} {api_key} {server_address}`. At the top, you can parse those arguments like so:
```
import sys
api_key, server_address = sys.argv[1], sys.argv[2]
```
After that, you can write a workflow as you would do as a typical user.
At the very end, the tests **require** you to print the flow id (e.g. `print(flow.id())`). This is parsed by the suite setup function and saved to a list of flow ids. At the end of testing, the teardown function will iterate through the flow ids and delete the associated workflows.

## Usage

Running all the tests in this repo:
`API_KEY=<your api key> SERVER_ADDRESS=<your server's address> pytest . -rP`

Running all the tests in a single file:
- `<your env variables> pytest <path to test file> -rP`

Running a specific test:
- `<your env variables> pytest . -rP -k '<specific test name>'`

Running tests in parallel, with concurrency 5:
- Install pytest-xdist
- `<your env variables> pytest . -rP -n 5`
18 changes: 18 additions & 0 deletions integration_tests/backend/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os

import pytest

import aqueduct

API_KEY_ENV_NAME = "API_KEY"
SERVER_ADDR_ENV_NAME = "SERVER_ADDRESS"


def pytest_configure(config):
pytest.api_key = os.getenv(API_KEY_ENV_NAME)
pytest.server_address = os.getenv(SERVER_ADDR_ENV_NAME)

if pytest.api_key is None or pytest.server_address is None:
raise Exception(
"Test Setup Error: api_key and server_address must be set as environmental variables."
)
64 changes: 64 additions & 0 deletions integration_tests/backend/setup/changing_saves.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import sys

api_key, server_address = sys.argv[1], sys.argv[2]

###
# Workflow that loads a table from the `aqueduct_demo` then saves it to `table_1` in append mode.
# This save operator is then replaced by one that saves to `table_1` in replace mode.
# In the next deployment of this run, it saves to `table_1` in append mode.
# In the last deployment, it saves to `table_2` in append mode.
###

import aqueduct

name = "Test: Changing Saves"
client = aqueduct.Client(api_key, server_address)
integration = client.integration(name="aqueduct_demo")

###

table = integration.sql(query="SELECT * FROM wine;")

table.save(integration.config(table="table_1", update_mode="append"))

flow = client.publish_flow(
name=name,
artifacts=[table],
)

###

table = integration.sql(query="SELECT * FROM wine;")

table.save(integration.config(table="table_1", update_mode="replace"))

flow = client.publish_flow(
name=name,
artifacts=[table],
)

###

table = integration.sql(query="SELECT * FROM wine;")

table.save(integration.config(table="table_1", update_mode="append"))

flow = client.publish_flow(
name=name,
artifacts=[table],
)

###

table = integration.sql(query="SELECT * FROM wine;")

table.save(integration.config(table="table_2", update_mode="append"))

flow = client.publish_flow(
name=name,
artifacts=[table],
)

###

print(flow.id())
76 changes: 76 additions & 0 deletions integration_tests/backend/test_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import os
import subprocess
import sys
from pathlib import Path

import pytest
import requests

import aqueduct


class TestBackend:
GET_WORKFLOW_TABLES_TEMPLATE = "/api/workflow/%s/tables"
WORKFLOW_PATH = Path(__file__).parent / "setup"

@classmethod
def setup_class(cls):
cls.client = aqueduct.Client(pytest.api_key, pytest.server_address)
cls.flows = {}

workflow_files = [
f
for f in os.listdir(cls.WORKFLOW_PATH)
if os.path.isfile(os.path.join(cls.WORKFLOW_PATH, f))
]
for workflow in workflow_files:
proc = subprocess.Popen(
[
"python3",
os.path.join(cls.WORKFLOW_PATH, workflow),
pytest.api_key,
pytest.server_address,
],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
out, err = proc.communicate()
out = out.decode("utf-8")
err = err.decode("utf-8")
if err:
raise Exception(f"Could not run workflow {workflow}.\n\n{err}")
else:
cls.flows[workflow] = out.strip().split()[-1]

@classmethod
def teardown_class(cls):
for flow in cls.flows:
cls.client.delete_flow(cls.flows[flow])

@classmethod
def get_response_class(cls, endpoint, additional_headers={}):
headers = {"api-key": pytest.api_key}
headers.update(additional_headers)
url = cls.client._api_client.construct_full_url(endpoint)
r = requests.get(url, headers=headers)
return r

def test_endpoint_getworkflowtables(self):
endpoint = self.GET_WORKFLOW_TABLES_TEMPLATE % self.flows["changing_saves.py"]
data = self.get_response_class(endpoint).json()["table_details"]

assert len(data) == 3

# table_name, update_mode
data_set = set(
[
("table_1", "append"),
("table_1", "replace"),
("table_2", "append"),
]
)
assert set([(item["table_name"], item["update_mode"]) for item in data]) == data_set

# Check all in same integration
assert len(set([item["integration_id"] for item in data])) == 1
assert len(set([item["service"] for item in data])) == 1
53 changes: 29 additions & 24 deletions sdk/aqueduct/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def __init__(self, api_key: str, aqueduct_address: str):
self.api_key = api_key
self.aqueduct_address = aqueduct_address

# Clean URL
if self.aqueduct_address.endswith("/"):
self.aqueduct_address = self.aqueduct_address[:-1]

# If a dummy client is initialized, don't perform validation.
if self.api_key == "" and self.aqueduct_address == "":
Logger.logger.info(
Expand All @@ -98,9 +102,16 @@ def __init__(self, api_key: str, aqueduct_address: str):
else:
self.use_https = self._test_connection_protocol(try_http=True, try_https=True)

def _construct_full_url(self, route_suffix: str, use_https: bool) -> str:
def construct_base_url(self, use_https: Optional[bool] = None) -> str:
if use_https is None:
use_https = self.use_https
protocol_prefix = self.HTTPS_PREFIX if use_https else self.HTTP_PREFIX
return "%s%s%s" % (protocol_prefix, self.aqueduct_address, route_suffix)
return "%s%s" % (protocol_prefix, self.aqueduct_address)

def construct_full_url(self, route_suffix: str, use_https: Optional[bool] = None) -> str:
if use_https is None:
use_https = self.use_https
return "%s%s" % (self.construct_base_url(use_https), route_suffix)

def _test_connection_protocol(self, try_http: bool, try_https: bool) -> bool:
"""Returns whether the connection uses https. Raises an exception if unable to connect at all.
Expand All @@ -111,7 +122,7 @@ def _test_connection_protocol(self, try_http: bool, try_https: bool) -> bool:

if try_https:
try:
url = self._construct_full_url(self.LIST_INTEGRATIONS_ROUTE, use_https=True)
url = self.construct_full_url(self.LIST_INTEGRATIONS_ROUTE, use_https=True)
self._test_url(url)
return True
except Exception as e:
Expand All @@ -121,7 +132,7 @@ def _test_connection_protocol(self, try_http: bool, try_https: bool) -> bool:

if try_http:
try:
url = self._construct_full_url(self.LIST_INTEGRATIONS_ROUTE, use_https=False)
url = self.construct_full_url(self.LIST_INTEGRATIONS_ROUTE, use_https=False)
self._test_url(url)
return False
except Exception as e:
Expand All @@ -147,7 +158,7 @@ def url_prefix(self) -> str:
return self.HTTPS_PREFIX if self.use_https else self.HTTP_PREFIX

def list_integrations(self) -> Dict[str, IntegrationInfo]:
url = self._construct_full_url(self.LIST_INTEGRATIONS_ROUTE, self.use_https)
url = self.construct_full_url(self.LIST_INTEGRATIONS_ROUTE)
headers = utils.generate_auth_headers(self.api_key)
resp = requests.get(url, headers=headers)
utils.raise_errors(resp)
Expand All @@ -162,22 +173,22 @@ def list_integrations(self) -> Dict[str, IntegrationInfo]:
}

def list_github_repos(self) -> List[str]:
url = self._construct_full_url(self.LIST_GITHUB_REPO_ROUTE, self.use_https)
url = self.construct_full_url(self.LIST_GITHUB_REPO_ROUTE)
headers = utils.generate_auth_headers(self.api_key)

resp = requests.get(url, headers=headers)
return [x for x in resp.json()["repos"]]

def list_github_branches(self, repo_url: str) -> List[str]:
url = self._construct_full_url(self.LIST_GITHUB_BRANCH_ROUTE, self.use_https)
url = self.construct_full_url(self.LIST_GITHUB_BRANCH_ROUTE)
headers = utils.generate_auth_headers(self.api_key)
headers["github-repo"] = repo_url

resp = requests.get(url, headers=headers)
return [x for x in resp.json()["branches"]]

def list_tables(self, limit: int) -> List[Tuple[str, str]]:
url = self._construct_full_url(self.LIST_TABLES_ROUTE, self.use_https)
url = self.construct_full_url(self.LIST_TABLES_ROUTE)
headers = utils.generate_auth_headers(self.api_key)
headers["limit"] = str(limit)
resp = requests.get(url, headers=headers)
Expand Down Expand Up @@ -212,7 +223,7 @@ def preview(
if file:
files[str(op.id)] = io.BytesIO(file)

url = self._construct_full_url(self.PREVIEW_ROUTE, self.use_https)
url = self.construct_full_url(self.PREVIEW_ROUTE)
resp = requests.post(url, headers=headers, data=body, files=files)
utils.raise_errors(resp)

Expand Down Expand Up @@ -241,7 +252,7 @@ def register_workflow(
if file:
files[str(op.id)] = io.BytesIO(file)

url = self._construct_full_url(self.REGISTER_WORKFLOW_ROUTE, self.use_https)
url = self.construct_full_url(self.REGISTER_WORKFLOW_ROUTE)
resp = requests.post(url, headers=headers, data=body, files=files)
utils.raise_errors(resp)

Expand All @@ -253,9 +264,7 @@ def refresh_workflow(
serialized_params: Optional[str] = None,
) -> None:
headers = utils.generate_auth_headers(self.api_key)
url = self._construct_full_url(
self.REFRESH_WORKFLOW_ROUTE_TEMPLATE % flow_id, self.use_https
)
url = self.construct_full_url(self.REFRESH_WORKFLOW_ROUTE_TEMPLATE % flow_id)

body = {}
if serialized_params is not None:
Expand All @@ -266,23 +275,21 @@ def refresh_workflow(

def delete_workflow(self, flow_id: str) -> None:
headers = utils.generate_auth_headers(self.api_key)
url = self._construct_full_url(
self.DELETE_WORKFLOW_ROUTE_TEMPLATE % flow_id, self.use_https
)
url = self.construct_full_url(self.DELETE_WORKFLOW_ROUTE_TEMPLATE % flow_id)
response = requests.post(url, headers=headers)
utils.raise_errors(response)

def get_workflow(self, flow_id: str) -> GetWorkflowResponse:
headers = utils.generate_auth_headers(self.api_key)
url = self._construct_full_url(self.GET_WORKFLOW_ROUTE_TEMPLATE % flow_id, self.use_https)
url = self.construct_full_url(self.GET_WORKFLOW_ROUTE_TEMPLATE % flow_id)
resp = requests.get(url, headers=headers)
utils.raise_errors(resp)
workflow_response = GetWorkflowResponse(**resp.json())
return workflow_response

def list_workflows(self) -> List[ListWorkflowResponseEntry]:
headers = utils.generate_auth_headers(self.api_key)
url = self._construct_full_url(self.LIST_WORKFLOWS_ROUTE, self.use_https)
url = self.construct_full_url(self.LIST_WORKFLOWS_ROUTE)
response = requests.get(url, headers=headers)
utils.raise_errors(response)

Expand All @@ -291,8 +298,8 @@ def list_workflows(self) -> List[ListWorkflowResponseEntry]:
def get_artifact_result_data(self, dag_result_id: str, artifact_id: str) -> str:
"""Returns an empty string if the artifact failed to be computed."""
headers = utils.generate_auth_headers(self.api_key)
url = self._construct_full_url(
self.GET_ARTIFACT_RESULT_TEMPLATE % (dag_result_id, artifact_id), self.use_https
url = self.construct_full_url(
self.GET_ARTIFACT_RESULT_TEMPLATE % (dag_result_id, artifact_id)
)
resp = requests.get(url, headers=headers)
utils.raise_errors(resp)
Expand All @@ -318,7 +325,7 @@ def get_node_positions(
Two mappings of UUIDs to positions, structured as a dictionary with the keys "x" and "y".
The first mapping is for operators and the second is for artifacts.
"""
url = self._construct_full_url(self.NODE_POSITION_ROUTE, self.use_https)
url = self.construct_full_url(self.NODE_POSITION_ROUTE)
headers = {
**utils.generate_auth_headers(self.api_key),
}
Expand All @@ -332,8 +339,6 @@ def get_node_positions(

def export_serialized_function(self, operator: Operator) -> bytes:
headers = utils.generate_auth_headers(self.api_key)
operator_url = self._construct_full_url(
self.EXPORT_FUNCTION_ROUTE % str(operator.id), self.use_https
)
operator_url = self.construct_full_url(self.EXPORT_FUNCTION_ROUTE % str(operator.id))
operator_resp = requests.get(operator_url, headers=headers)
return operator_resp.content
3 changes: 1 addition & 2 deletions sdk/aqueduct/aqueduct_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,7 @@ def publish_flow(
flow_id = self._api_client.register_workflow(dag).id

url = generate_ui_url(
self._api_client.url_prefix(),
self._api_client.aqueduct_address,
self._api_client.construct_base_url(),
str(flow_id),
)
print("Url: ", url)
Expand Down
Loading