Skip to content

Commit

Permalink
Pivoting into sqla instead of sql
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Aug 6, 2015
1 parent f39b241 commit b70b270
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 46 deletions.
71 changes: 62 additions & 9 deletions app/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ def __repr__(self):
def get_sqla_engine(self):
return create_engine(self.sqlalchemy_uri)

def get_table(self):
def get_table(self, table_name):
meta = MetaData()
return sqlaTable(
self.table_name, meta,
table_name, meta,
autoload=True,
autoload_with=self.get_sqla_engine())

Expand Down Expand Up @@ -109,7 +109,26 @@ def query(
"ds >= '{from_dttm_iso}'",
"ds < '{to_dttm_iso}'"
]
for col, op, eq in filter:
if op in ('in', 'not in'):
l = ["'{}'".format(s) for s in eq.split(",")]
l = ", ".join(l)
op = op.upper()
where_clause.append(
"{col} {op} ({l})".format(**locals())
)
where_clause = " AND\n".join(where_clause).format(**locals())
if timeseries_limit:
limiting_join = """
JOIN (
SELECT {groupby_exprs}
FROM {self.table_name}
GROUP BY {groupby_exprs}
ORDER BY {metric} DESC
LIMIT {timeseries_limit}
) z ON
"""

sql = """
SELECT
{select_exprs}
Expand Down Expand Up @@ -326,17 +345,51 @@ def query(
granularity=granularity,
intervals= from_dttm.isoformat() + '/' + to_dttm.isoformat(),
)
if filter:
qry['filter'] = filter
if limit_spec:
qry['limit_spec'] = limit_spec
filters = None
for col, op, eq in filter:
cond = None
if op == '==':
cond = Dimension(col)==eq
elif op == '!=':
cond = ~(Dimension(col)==eq)
elif op in ('in', 'not in'):
fields = []
splitted = eq.split(',')
if len(splitted) > 1:
for s in eq.split(','):
s = s.strip()
fields.append(Filter.build_filter(Dimension(col)==s))
cond = Filter(type="or", fields=fields)
else:
cond = Dimension(col)==eq
if op == 'not in':
cond = ~cond
if filters:
filters = Filter(type="and", fields=[
Filter.build_filter(cond),
Filter.build_filter(filters)
])
else:
filters = cond

if filters:
qry['filter'] = filters

client = self.cluster.get_pydruid_client()
orig_filters = filters
if timeseries_limit:
# Limit on the number of timeseries, doing a two-phases query
pre_qry = deepcopy(qry)
pre_qry['granularity'] = "all"
client.groupby(**qry)
pre_qry['limit_spec'] = {
"type": "default",
"limit": timeseries_limit,
"columns": [{
"dimension": metrics[0] if metrics else self.metrics[0],
"direction": "descending",
}],
}
client.groupby(**pre_qry)
df = client.export_pandas()
if not df is None and not df.empty:
dims = qry['dimensions']
Expand All @@ -354,12 +407,12 @@ def query(

if filters:
ff = Filter(type="or", fields=filters)
if not filter:
if not orig_filters:
qry['filter'] = ff
else:
qry['filter'] = Filter(type="and", fields=[
Filter.build_filter(ff),
Filter.build_filter(filter)])
Filter.build_filter(orig_filters)])
qry['limit_spec'] = None

client.groupby(**qry)
Expand Down
2 changes: 1 addition & 1 deletion app/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class DatabaseView(ModelView, DeleteMixin):
class TableView(ModelView, DeleteMixin):
datamodel = SQLAInterface(models.Table)
list_columns = ['table_link', 'database']
add_columns = ['table_name', 'database']
add_columns = ['table_name', 'database', 'default_endpoint']
edit_columns = add_columns
related_views = [TableColumnInlineView, SqlMetricInlineView]

Expand Down
40 changes: 4 additions & 36 deletions app/viz.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,36 +90,13 @@ def form_class(self):
def query_filters(self):
args = self.form_data
# Building filters
filters = None
filters = []
for i in range(1, 10):
col = args.get("flt_col_" + str(i))
op = args.get("flt_op_" + str(i))
eq = args.get("flt_eq_" + str(i))
if col and op and eq:
cond = None
if op == '==':
cond = Dimension(col)==eq
elif op == '!=':
cond = ~(Dimension(col)==eq)
elif op in ('in', 'not in'):
fields = []
splitted = eq.split(',')
if len(splitted) > 1:
for s in eq.split(','):
s = s.strip()
fields.append(Filter.build_filter(Dimension(col)==s))
cond = Filter(type="or", fields=fields)
else:
cond = Dimension(col)==eq
if op == 'not in':
cond = ~cond
if filters:
filters = Filter(type="and", fields=[
Filter.build_filter(cond),
Filter.build_filter(filters)
])
else:
filters = cond
filters.append((col, op, eq))
return filters

def bake_query(self):
Expand Down Expand Up @@ -150,18 +127,9 @@ def query_obj(self):
'to_dttm': to_dttm,
'groupby': groupby,
'metrics': metrics,
'limit_spec': {
"type": "default",
"limit": limit,
"columns": [{
"dimension": metrics[0] if metrics else self.metrics[0],
"direction": "descending",
}],
},
'filter': self.query_filters(),
'timeseries_limit': limit,
}
filters = self.query_filters()
if filters:
d['filter'] = filters
return d

def df_prep(self):
Expand Down

0 comments on commit b70b270

Please sign in to comment.