Skip to content

ENG-2640: MySQL periodic integration test #1179

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Apr 14, 2023
10 changes: 5 additions & 5 deletions .github/workflows/mypy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ jobs:
python3 -m pip install mypy==1.0.0 pydantic pymongo

- name: Install Type Stub Libraries
run : |
python3 -m pip install types-croniter types-requests types-PyYAML types-setuptools
run: |
python3 -m pip install types-croniter types-requests types-PyYAML types-setuptools types-PyMySQL

- name: mypy sdk
working-directory: sdk
Expand All @@ -55,9 +55,9 @@ jobs:
python3 -m pip install mypy pydantic pymongo

- name: Install Type Stub Libraries
run : |
python3 -m pip install types-croniter types-requests types-PyYAML types-setuptools
run: |
python3 -m pip install types-croniter types-requests types-PyYAML types-setuptools types-PyMySQL

- name: mypy executor
working-directory: src/python
run: |
Expand Down
19 changes: 18 additions & 1 deletion .github/workflows/periodic-integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,23 @@ jobs:
timeout-minutes: 60
name: SDK Integration Tests against Data Connectors
services:
mysql:
image: mysql:8.0
env:
MYSQL_USER: aqueduct
MYSQL_PASSWORD: Password123!
MYSQL_DATABASE: aqueducttest
MYSQL_ROOT_PASSWORD: Password123!
ports:
# Maps tcp port 3306 on service container to the host
- 3306:3306
options: >-
--name=mysql
--health-cmd="mysqladmin ping"
--health-interval=10s
--health-timeout=5s
--health-retries=3

postgres:
image: postgres:15
env:
Expand All @@ -127,7 +144,7 @@ jobs:
with:
aws_access_key_id: ${{ secrets.KENNY_AWS_ACCESS_KEY_ID }}
aws_secret_access_key: ${{ secrets.KENNY_AWS_SECRET_ACCESS_KEY }}
s3_test_config_path: periodic-data-integration-test-config.yml
s3_test_config_path: periodic-data-integration-test-config-agiron123.yml
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change back to normal test-config when done with this PR.


- name: Install any data connector packages
run: |
Expand Down
1 change: 1 addition & 0 deletions integration_tests/sdk/aqueduct_tests/save.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def save(
update_mode = LoadUpdateMode.REPLACE

if isinstance(integration, RelationalDBIntegration):
print('Saving artifact to table ' + str(name))
integration.save(artifact, name, update_mode)

elif isinstance(integration, S3Integration):
Expand Down
20 changes: 19 additions & 1 deletion integration_tests/sdk/setup_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def _parse_credentials_file() -> Dict[str, Any]:

def _fetch_demo_data(demo: RelationalDBIntegration, table_name: str) -> pd.DataFrame:
df = demo.table(table_name)
print('retrieved demo table data')

# Certain tables in our demo db read out some unexpected tokens that
# we need to remove before saving into other databases. The unsanitized version
Expand Down Expand Up @@ -101,8 +102,10 @@ def _add_missing_artifacts(
"""
# Force name comparisons to be case-insensitive.
existing_names = [elem.lower() for elem in existing_names]
print('existing_names: ' + str(existing_names))

needed_names = set(demo_db_tables())
print('needed_names: ' + str(needed_names))
already_set_up_names = needed_names.intersection(existing_names)
missing_names = needed_names.difference(already_set_up_names)
if len(missing_names) == 0:
Expand All @@ -111,6 +114,7 @@ def _add_missing_artifacts(
demo = client.integration("aqueduct_demo")
artifacts: List[BaseArtifact] = []
for table_name in missing_names:
print('Missing table name: ' + str(table_name))
data = _fetch_demo_data(demo, table_name)
data_param = client.create_param(generate_object_name(), default=data)

Expand Down Expand Up @@ -164,9 +168,20 @@ def _setup_postgres_db():
_execute_command(["aqueduct", "install", "postgres"])


def _setup_mysql_db():
print("Setting up MySQL database ...")
_execute_command(["aqueduct", "install", "mysql"])


def _setup_relational_data(client: Client, db: RelationalDBIntegration) -> None:
# Find all the tables that already exist.
existing_table_names = set(db.list_tables()["tablename"])
print('inside _setup_relational_data')
print('databaseTables: ' + db.list_tables())
# For MYSQL, this field is called TABLE_NAME
# Inside list_tables there is a query to get the talbes, need to do the query of table_name AS tablename to work here
# Looks like the field here is called TABLE_NAME
existing_table_names = set(db.list_tables()["TABLE_NAME"])
#existing_table_names = set(db.list_tables()["tablename"])
_add_missing_artifacts(client, db, existing_table_names)


Expand Down Expand Up @@ -219,6 +234,9 @@ def setup_data_integrations(client: Client, filter_to: Optional[str] = None) ->
if integration_config["type"] == ServiceType.POSTGRES:
_setup_postgres_db()

if integration_config["type"] == ServiceType.MYSQL:
_setup_mysql_db()

client.connect_integration(
integration_name,
integration_config["type"],
Expand Down
6 changes: 3 additions & 3 deletions regression_tests/tests/gitbook/test_docs.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import argparse
import os
import re
import time
import argparse
import subprocess
import time
from glob import glob
from pathlib import Path
from typing import NamedTuple, List
from typing import List, NamedTuple

start = time.time()

Expand Down
6 changes: 5 additions & 1 deletion sdk/aqueduct/integrations/sql_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

LIST_TABLES_QUERY_PG = "SELECT tablename, tableowner FROM pg_catalog.pg_tables WHERE schemaname != 'pg_catalog' AND schemaname != 'information_schema';"
LIST_TABLES_QUERY_SNOWFLAKE = "SELECT table_name AS \"tablename\", table_owner AS \"tableowner\" FROM information_schema.tables WHERE table_schema != 'INFORMATION_SCHEMA' AND table_type = 'BASE TABLE';"
LIST_TABLES_QUERY_MYSQL = "SELECT table_name FROM INFORMATION_SCHEMA.TABLES WHERE table_type = 'BASE TABLE' AND table_schema NOT IN ('mysql', 'sys', 'performance_schema');"
LIST_TABLES_QUERY_MYSQL = "SELECT table_name AS tablename FROM INFORMATION_SCHEMA.TABLES WHERE table_type = 'BASE TABLE' AND table_schema NOT IN ('mysql', 'sys', 'performance_schema');"
LIST_TABLES_QUERY_MARIADB = "SELECT table_name AS \"tablename\" FROM INFORMATION_SCHEMA.TABLES WHERE table_type = 'BASE TABLE' AND table_schema NOT IN ('mysql', 'sys', 'performance_schema');"

LIST_TABLES_QUERY_SQLSERVER = (
Expand All @@ -53,6 +53,10 @@ def list_tables(self) -> pd.DataFrame:
Returns:
pd.DataFrame of available tables.
"""

print('hello from list tables finally')

#This is the actual list_tables implementation, not the save one.
if self.type() in [ServiceType.BIGQUERY, ServiceType.SNOWFLAKE]:
# Use the list integration objects endpoint instead of
# providing a hardcoded SQL query to execute
Expand Down
22 changes: 9 additions & 13 deletions src/python/aqueduct_executor/operators/connectors/data/execute.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import platform
import sys
from typing import Any

Expand Down Expand Up @@ -267,28 +268,23 @@ def setup_connector(
from aqueduct_executor.operators.connectors.data.sql_server import ( # type: ignore
SqlServerConnector as OpConnector,
)
elif connector_name == common.Name.MYSQL:
elif connector_name == common.Name.MYSQL or connector_name == common.Name.MARIA_DB:
try:
# Use pythonic mysql library to fix crossplatform compatibility issues.
# MySQLdb is a C-based library
import pymysql

# Implementation can be found here: https://github.com/PyMySQL/PyMySQL/blob/main/pymysql/__init__.py
pymysql.install_as_MySQLdb()
import MySQLdb
except:
raise MissingConnectorDependencyException(
"Unable to initialize the MySQL connector. Have you run `aqueduct install mysql`?"
"Unable to initialize the MySQL\/MariaDB connector. Have you run `aqueduct install mysql`?"
)

from aqueduct_executor.operators.connectors.data.mysql import ( # type: ignore
MySqlConnector as OpConnector,
)
elif connector_name == common.Name.MARIA_DB:
try:
import MySQLdb
except:
raise MissingConnectorDependencyException(
"Unable to initialize the MariaDB connector. Have you run `aqueduct install mariadb`?"
)

from aqueduct_executor.operators.connectors.data.maria_db import ( # type: ignore
MariaDbConnector as OpConnector,
)
elif connector_name == common.Name.AZURE_SQL:
try:
import pyodbc
Expand Down
58 changes: 36 additions & 22 deletions src/python/bin/aqueduct
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,9 @@ def download_server_binaries(architecture):
with open(os.path.join(server_directory, "bin/server"), "wb") as f:
_download_file(os.path.join(s3_server_prefix, f"bin/{architecture}/server"), f, "Server")
with open(os.path.join(server_directory, "bin/executor"), "wb") as f:
_download_file(
os.path.join(s3_server_prefix, f"bin/{architecture}/executor"), f, "Executor"
)
_download_file(os.path.join(s3_server_prefix, f"bin/{architecture}/executor"), f, "Executor")
with open(os.path.join(server_directory, "bin/migrator"), "wb") as f:
_download_file(
os.path.join(s3_server_prefix, f"bin/{architecture}/migrator"), f, "Migrator"
)
_download_file(os.path.join(s3_server_prefix, f"bin/{architecture}/migrator"), f, "Migrator")

print("Downloading integration set up scripts...")
with open(os.path.join(server_directory, "bin/start-function-executor.sh"), "wb") as f:
Expand Down Expand Up @@ -388,25 +384,26 @@ def get_address(expose):
)
if ec2_ip.status_code != 404: # User is in EC2 instance.
expose_ip = ec2_ip.content.decode("utf-8")
else:
else:
# Assume is Google Cloud
metadata_flavor = {'Metadata-Flavor': 'Google'}
gcp_ip = requests.get('http://169.254.169.254/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip', headers = metadata_flavor, timeout=0.25)
gcp_ip = requests.get('http://169.254.169.254/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip', headers=metadata_flavor, timeout=0.25)
if gcp_ip.status_code != 404:
expose_ip = gcp_ip.text
else:
# Assume is Azure
azure_ip = requests.get('http://169.254.169.254/metadata/instance/network/interface/0/ipv4/ipAddress/0/publicIpAddress?api-version=2017-08-01&format=text', headers = {'Metadata': 'true'}, timeout=0.25)
azure_ip = requests.get('http://169.254.169.254/metadata/instance/network/interface/0/ipv4/ipAddress/0/publicIpAddress?api-version=2017-08-01&format=text', headers={'Metadata': 'true'}, timeout=0.25)
if azure_ip.status_code != 404:
expose_ip = azure_ip.text
else:
# Default
expose_ip = "<IP_ADDRESS>"
except: # If you're not running on EC2, this will return an error.
expose_ip = "<IP_ADDRESS>"

return expose_ip


def generate_welcome_message(addr, port):
apikey = get_apikey()

Expand Down Expand Up @@ -457,7 +454,7 @@ def start(addr, expose, port, verbose, env, disable_usage_stats):

if expose:
command.append("--expose")

if verbose:
command.append("--verbose")

Expand All @@ -467,24 +464,31 @@ def start(addr, expose, port, verbose, env, disable_usage_stats):
popen_handle = execute_command_nonblocking(command)
return popen_handle, server_port


def install_mongodb():
execute_command([sys.executable, "-m", "pip", "install", "pymongo%s" % PYMONGO_VERSION_BOUND])


def install_postgres():
execute_command([sys.executable, "-m", "pip", "install", "psycopg2-binary%s" % PSYCOPG2_VERSION_BOUND])


def install_bigquery():
execute_command([sys.executable, "-m", "pip", "install", "google-cloud-bigquery%s" % BIGQUERY_VERSION_BOUND])


def install_snowflake():
execute_command([sys.executable, "-m", "pip", "install", "snowflake-sqlalchemy%s" % SNOWFLAKE_VERSION_BOUND])


def install_s3():
execute_command([sys.executable, "-m", "pip", "install", "pyarrow%s" % PYARROW_VERSION_BOUND])


def install_athena():
execute_command([sys.executable, "-m", "pip", "install", "awswrangler%s" % AWS_WRANGLER_VERSION_BOUND])


def install_mysql():
system = platform.system()
if system == "Linux":
Expand All @@ -506,22 +510,26 @@ def install_mysql():
print("Unsupported distribution:", distro.id())
elif system == "Darwin":
cmd = ["brew", "install", "mysql"]
architecture = subprocess.Popen(["which", "-a", "brew"], stdout=subprocess.PIPE ).communicate()[0]
if architecture.startswith(b"/opt/homebrew"): # Using arm verison of brew
architecture = subprocess.Popen(
["which", "-a", "brew"], stdout=subprocess.PIPE).communicate()[0]
# Using arm verison of brew
if architecture.startswith(b"/opt/homebrew"):
cmd = ["arch", "-arm64", *cmd]
execute_command(cmd)
else:
print("Unsupported operating system:", system)

execute_command([sys.executable, "-m", "pip", "install", "mysqlclient%s" % MYSQL_CLIENT_VERSION_BOUND])

execute_command(["pip", "install", "PyMySQL"])
execute_command([sys.executable, "-m", "pip", "install",
"mysqlclient%s" % MYSQL_CLIENT_VERSION_BOUND])

def install_sqlserver():
system = platform.system()
if system == "Linux":
if distro.id() == "ubuntu":
execute_command(
["bash", os.path.join(server_directory, "bin", "install_sqlserver_ubuntu.sh")]
["bash", os.path.join(
server_directory, "bin", "install_sqlserver_ubuntu.sh")]
)
else:
print("Unsupported distribution:", distro.id())
Expand All @@ -548,7 +556,8 @@ def install_sqlserver():
else:
print("Unsupported operating system:", system)

execute_command([sys.executable, "-m", "pip", "install", "pyodbc%s" % PYODBC_VERSION_BOUND])
execute_command([sys.executable, "-m", "pip", "install",
"pyodbc%s" % PYODBC_VERSION_BOUND])


def install(system):
Expand Down Expand Up @@ -695,9 +704,12 @@ if __name__ == "__main__":
help="Supported integrations: postgres, redshift, mysql, mariadb, sqlserver, azuresql, s3, athena, snowflake, bigquery.",
)

apikey_args = subparsers.add_parser("apikey", help="Display your Aqueduct API key.")
clear_args = subparsers.add_parser("clear", help="Erase your Aqueduct installation.")
version_args = subparsers.add_parser("version", help="Retrieve the package version number.")
apikey_args = subparsers.add_parser(
"apikey", help="Display your Aqueduct API key.")
clear_args = subparsers.add_parser(
"clear", help="Erase your Aqueduct installation.")
version_args = subparsers.add_parser(
"version", help="Retrieve the package version number.")

storage_args = subparsers.add_parser(
"storage",
Expand Down Expand Up @@ -761,7 +773,8 @@ if __name__ == "__main__":
if terminated:
print("Server terminated due to an error.")
else:
welcome_message, url = generate_welcome_message(addr, server_port)
welcome_message, url = generate_welcome_message(
addr, server_port)
print(welcome_message)

if not args.expose:
Expand All @@ -777,7 +790,8 @@ if __name__ == "__main__":
"aqueduct ui and aqueduct server have been deprecated; please use aqueduct start to run both the UI and backend servers"
)
elif args.command == "install":
install(args.system[0]) # argparse makes this an array so only pass in value [0].
# argparse makes this an array so only pass in value [0].
install(args.system[0])
elif args.command == "ui":
print(
"aqueduct ui and aqueduct server have been deprecated; please use aqueduct start to run both the UI and backend servers"
Expand Down