From 776b86cac86c80463d64f11200748c67c4084a8b Mon Sep 17 00:00:00 2001 From: Dmitry Samsonov Date: Thu, 2 Mar 2017 17:08:08 +0200 Subject: [PATCH 1/5] Do not fail on version check on old Druid --- superset/connectors/druid/models.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/superset/connectors/druid/models.py b/superset/connectors/druid/models.py index edd7ec100972..f1f0dcfba9d9 100644 --- a/superset/connectors/druid/models.py +++ b/superset/connectors/druid/models.py @@ -120,6 +120,11 @@ def get_druid_version(self): 'http://{obj.coordinator_host}:{obj.coordinator_port}/status' ).format(obj=self) return json.loads(requests.get(endpoint).text)['version'] + ver = json.loads(requests.get(endpoint).text)['version'] + if ver is None: + return "0" + else: + return ver def refresh_datasources( self, From 930281406d3affeefde52972f66a5f0fcdc17c91 Mon Sep 17 00:00:00 2001 From: Dmitry Samsonov Date: Fri, 3 Mar 2017 19:27:05 +0200 Subject: [PATCH 2/5] Fix metrics for Druid v0.6 --- superset/connectors/druid/models.py | 62 ++++++++++++++++++++--------- 1 file changed, 43 insertions(+), 19 deletions(-) diff --git a/superset/connectors/druid/models.py b/superset/connectors/druid/models.py index f1f0dcfba9d9..78c0f39f251f 100644 --- a/superset/connectors/druid/models.py +++ b/superset/connectors/druid/models.py @@ -212,13 +212,14 @@ def refresh(self, datasource_names, merge_flag, refreshAll): if datatype == 'hyperUnique' or datatype == 'thetaSketch': col_obj.count_distinct = True # Allow sum/min/max for long or double - if datatype == 'LONG' or datatype == 'DOUBLE': + if datatype == 'LONG' or datatype == 'DOUBLE' or datatype == 'FLOAT': + col_obj.avg = True col_obj.sum = True col_obj.min = True col_obj.max = True col_obj.type = datatype col_obj.datasource = datasource - datasource.generate_metrics_for(col_objs_list) + datasource.generate_metrics_for(col_objs_list, cluster.druid_version) session.commit() @property @@ -272,7 +273,7 @@ def dimension_spec(self): if self.dimension_spec_json: return json.loads(self.dimension_spec_json) - def get_metrics(self): + def get_metrics(self, ver): metrics = {} metrics['count'] = DruidMetric( metric_name='count', @@ -298,18 +299,38 @@ def get_metrics(self): ) if self.avg and self.is_num: - mt = corrected_type.lower() + 'Avg' - name = 'avg__' + self.column_name - metrics[name] = DruidMetric( - metric_name=name, - metric_type='avg', - verbose_name='AVG({})'.format(self.column_name), - json=json.dumps({ - 'type': mt, 'name': name, 'fieldName': self.column_name}), - ) + if ver > '0': + mt = corrected_type.lower() + 'Avg' + name = 'avg__' + self.column_name + metrics[name] = DruidMetric( + metric_name=name, + metric_type='avg', + verbose_name='AVG({})'.format(self.column_name), + json=json.dumps({ + 'type': mt, 'name': name, 'fieldName': self.column_name}), + )) + else: + name = 'avg__' + self.column_name + metrics.append(DruidMetric( + metric_name=name, + metric_type='postagg', + verbose_name='AVG({})'.format(self.column_name), + json=json.dumps({ + 'type': 'arithmetic', + 'name': name, + 'fn': '/', + 'fields': [ + { 'type': 'fieldAccess', 'fieldName': 'sum__' + self.column_name }, + { 'type': 'fieldAccess', 'fieldName': 'count' } + ] + }) + )) if self.min and self.is_num: - mt = corrected_type.lower() + 'Min' + if ver > '0': + mt = corrected_type.lower() + 'Min' + else: + mt = 'min' name = 'min__' + self.column_name metrics[name] = DruidMetric( metric_name=name, @@ -319,7 +340,10 @@ def get_metrics(self): 'type': mt, 'name': name, 'fieldName': self.column_name}), ) if self.max and self.is_num: - mt = corrected_type.lower() + 'Max' + if ver > '0': + mt = corrected_type.lower() + 'Max' + else: + mt = 'max' name = 'max__' + self.column_name metrics[name] = DruidMetric( metric_name=name, @@ -353,9 +377,9 @@ def get_metrics(self): ) return metrics - def generate_metrics(self): + def generate_metrics(self, ver): """Generate metrics based on the column metadata""" - metrics = self.get_metrics() + metrics = self.get_metrics(ver) dbmetrics = ( db.session.query(DruidMetric) .filter(DruidMetric.datasource_id == self.datasource_id) @@ -645,12 +669,12 @@ def latest_metadata(self): return segment_metadata[-1]['columns'] def generate_metrics(self): - self.generate_metrics_for(self.columns) + self.generate_metrics_for(self.columns, self.cluster.druid_version) - def generate_metrics_for(self, columns): + def generate_metrics_for(self, columns, ver): metrics = {} for col in columns: - metrics.update(col.get_metrics()) + metrics.update(col.get_metrics(ver)) dbmetrics = ( db.session.query(DruidMetric) .filter(DruidMetric.datasource_id == self.id) From 53250e477332fe9049d6bf256a8031b2a1d782e2 Mon Sep 17 00:00:00 2001 From: Dmitry Samsonov Date: Thu, 9 Mar 2017 15:19:12 +0200 Subject: [PATCH 3/5] Compare Druid version with something more reasonable --- superset/connectors/druid/models.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/superset/connectors/druid/models.py b/superset/connectors/druid/models.py index 78c0f39f251f..f6a345e83b76 100644 --- a/superset/connectors/druid/models.py +++ b/superset/connectors/druid/models.py @@ -299,7 +299,7 @@ def get_metrics(self, ver): ) if self.avg and self.is_num: - if ver > '0': + if ver >= '0.7.': mt = corrected_type.lower() + 'Avg' name = 'avg__' + self.column_name metrics[name] = DruidMetric( @@ -327,7 +327,7 @@ def get_metrics(self, ver): )) if self.min and self.is_num: - if ver > '0': + if ver >= '0.7.': mt = corrected_type.lower() + 'Min' else: mt = 'min' @@ -340,7 +340,7 @@ def get_metrics(self, ver): 'type': mt, 'name': name, 'fieldName': self.column_name}), ) if self.max and self.is_num: - if ver > '0': + if ver >= '0.7.': mt = corrected_type.lower() + 'Max' else: mt = 'max' From c7dbaa1cae6c64d615d8befb8131ae2be9bca886 Mon Sep 17 00:00:00 2001 From: Dmitry Samsonov Date: Thu, 9 Mar 2017 16:00:14 +0200 Subject: [PATCH 4/5] Style fixes --- superset/connectors/druid/models.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/superset/connectors/druid/models.py b/superset/connectors/druid/models.py index f6a345e83b76..df6ae9f69ae8 100644 --- a/superset/connectors/druid/models.py +++ b/superset/connectors/druid/models.py @@ -307,7 +307,9 @@ def get_metrics(self, ver): metric_type='avg', verbose_name='AVG({})'.format(self.column_name), json=json.dumps({ - 'type': mt, 'name': name, 'fieldName': self.column_name}), + 'type': mt, + 'name': name, + 'fieldName': self.column_name}) )) else: name = 'avg__' + self.column_name @@ -320,8 +322,10 @@ def get_metrics(self, ver): 'name': name, 'fn': '/', 'fields': [ - { 'type': 'fieldAccess', 'fieldName': 'sum__' + self.column_name }, - { 'type': 'fieldAccess', 'fieldName': 'count' } + {'type': 'fieldAccess', + 'fieldName': 'sum__' + self.column_name}, + {'type': 'fieldAccess', + 'fieldName': 'count'} ] }) )) From 1704ee4d0b1226fc0bef9fc01ff6cc985509fd28 Mon Sep 17 00:00:00 2001 From: Dmitry Samsonov Date: Fri, 2 Feb 2018 13:31:01 +0200 Subject: [PATCH 5/5] Fix Druid version detection --- superset/connectors/druid/models.py | 1 - 1 file changed, 1 deletion(-) diff --git a/superset/connectors/druid/models.py b/superset/connectors/druid/models.py index df6ae9f69ae8..0a51b4c6fdc7 100644 --- a/superset/connectors/druid/models.py +++ b/superset/connectors/druid/models.py @@ -119,7 +119,6 @@ def get_druid_version(self): endpoint = ( 'http://{obj.coordinator_host}:{obj.coordinator_port}/status' ).format(obj=self) - return json.loads(requests.get(endpoint).text)['version'] ver = json.loads(requests.get(endpoint).text)['version'] if ver is None: return "0"