Skip to content

Commit

Permalink
Eng 1086 create syntax sugar for sql queries (#103)
Browse files Browse the repository at this point in the history
* first commit

* integration test

* test

* style

* update on test and enable regex filtering out tag

* stlye

* change variable name to tag

* make regex constant and add doc

* style fix

* style fix

Co-authored-by: Ubuntu <[email protected]>
  • Loading branch information
Fanjia-Yan and Ubuntu authored Jun 16, 2022
1 parent 45572fe commit 7f75de0
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
10 changes: 10 additions & 0 deletions integration_tests/sdk/sql_integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,13 @@ def test_invalid_destination_integration(client):
output_artifact.save(
config=db.config(table=generate_table_name(), update_mode=LoadUpdateMode.REPLACE)
)


def test_sql_today_tag(client):
db = client.integration(name=get_integration_name())
sql_artifact_today = db.sql(query="select * from hotel_reviews where review_date = {{today}}")
assert sql_artifact_today.get().empty
sql_artifact_not_today = db.sql(
query="select * from hotel_reviews where review_date < {{today}}"
)
assert len(sql_artifact_not_today.get()) == 100
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
from typing import List

import pandas as pd
import re
from datetime import date
from sqlalchemy import engine, inspect
from sqlalchemy.exc import SQLAlchemyError

from aqueduct_executor.operators.connectors.tabular import connector, extract, load

# Regular Expression that matches any substring apperance with
# "{{ }}" and a word inside with optional space in front or after
# Potential Matches: "{{today}}", "{{ today }}""
TAG_PATTERN = r"{{[\s+]*\w+[\s+]*}}"


class RelationalConnector(connector.TabularConnector):
def __init__(self, conn_engine: engine.Engine):
Expand All @@ -24,7 +31,15 @@ def discover(self) -> List[str]:
return inspect(self.engine).get_table_names()

def extract(self, params: extract.RelationalParams) -> pd.DataFrame:
df = pd.read_sql(params.query, con=self.engine)
query = params.query
matches = re.findall(TAG_PATTERN, query)
for match in matches:
tag = match.strip(" " "{}")
if tag == "today":
today_python = date.today()
today_sql = "'" + today_python.strftime("%Y-%m-%d") + "'"
query = query.replace("{{today}}", today_sql)
df = pd.read_sql(query, con=self.engine)
return df

def load(self, params: load.RelationalParams, df: pd.DataFrame) -> None:
Expand Down

0 comments on commit 7f75de0

Please sign in to comment.