diff --git a/superset/connectors/druid/models.py b/superset/connectors/druid/models.py index edd7ec100972..0a51b4c6fdc7 100644 --- a/superset/connectors/druid/models.py +++ b/superset/connectors/druid/models.py @@ -119,7 +119,11 @@ def get_druid_version(self): endpoint = ( 'http://{obj.coordinator_host}:{obj.coordinator_port}/status' ).format(obj=self) - return json.loads(requests.get(endpoint).text)['version'] + ver = json.loads(requests.get(endpoint).text)['version'] + if ver is None: + return "0" + else: + return ver def refresh_datasources( self, @@ -207,13 +211,14 @@ def refresh(self, datasource_names, merge_flag, refreshAll): if datatype == 'hyperUnique' or datatype == 'thetaSketch': col_obj.count_distinct = True # Allow sum/min/max for long or double - if datatype == 'LONG' or datatype == 'DOUBLE': + if datatype == 'LONG' or datatype == 'DOUBLE' or datatype == 'FLOAT': + col_obj.avg = True col_obj.sum = True col_obj.min = True col_obj.max = True col_obj.type = datatype col_obj.datasource = datasource - datasource.generate_metrics_for(col_objs_list) + datasource.generate_metrics_for(col_objs_list, cluster.druid_version) session.commit() @property @@ -267,7 +272,7 @@ def dimension_spec(self): if self.dimension_spec_json: return json.loads(self.dimension_spec_json) - def get_metrics(self): + def get_metrics(self, ver): metrics = {} metrics['count'] = DruidMetric( metric_name='count', @@ -293,18 +298,42 @@ def get_metrics(self): ) if self.avg and self.is_num: - mt = corrected_type.lower() + 'Avg' - name = 'avg__' + self.column_name - metrics[name] = DruidMetric( - metric_name=name, - metric_type='avg', - verbose_name='AVG({})'.format(self.column_name), - json=json.dumps({ - 'type': mt, 'name': name, 'fieldName': self.column_name}), - ) + if ver >= '0.7.': + mt = corrected_type.lower() + 'Avg' + name = 'avg__' + self.column_name + metrics[name] = DruidMetric( + metric_name=name, + metric_type='avg', + verbose_name='AVG({})'.format(self.column_name), + json=json.dumps({ + 'type': mt, + 'name': name, + 'fieldName': self.column_name}) + )) + else: + name = 'avg__' + self.column_name + metrics.append(DruidMetric( + metric_name=name, + metric_type='postagg', + verbose_name='AVG({})'.format(self.column_name), + json=json.dumps({ + 'type': 'arithmetic', + 'name': name, + 'fn': '/', + 'fields': [ + {'type': 'fieldAccess', + 'fieldName': 'sum__' + self.column_name}, + {'type': 'fieldAccess', + 'fieldName': 'count'} + ] + }) + )) if self.min and self.is_num: - mt = corrected_type.lower() + 'Min' + if ver >= '0.7.': + mt = corrected_type.lower() + 'Min' + else: + mt = 'min' name = 'min__' + self.column_name metrics[name] = DruidMetric( metric_name=name, @@ -314,7 +343,10 @@ def get_metrics(self): 'type': mt, 'name': name, 'fieldName': self.column_name}), ) if self.max and self.is_num: - mt = corrected_type.lower() + 'Max' + if ver >= '0.7.': + mt = corrected_type.lower() + 'Max' + else: + mt = 'max' name = 'max__' + self.column_name metrics[name] = DruidMetric( metric_name=name, @@ -348,9 +380,9 @@ def get_metrics(self): ) return metrics - def generate_metrics(self): + def generate_metrics(self, ver): """Generate metrics based on the column metadata""" - metrics = self.get_metrics() + metrics = self.get_metrics(ver) dbmetrics = ( db.session.query(DruidMetric) .filter(DruidMetric.datasource_id == self.datasource_id) @@ -640,12 +672,12 @@ def latest_metadata(self): return segment_metadata[-1]['columns'] def generate_metrics(self): - self.generate_metrics_for(self.columns) + self.generate_metrics_for(self.columns, self.cluster.druid_version) - def generate_metrics_for(self, columns): + def generate_metrics_for(self, columns, ver): metrics = {} for col in columns: - metrics.update(col.get_metrics()) + metrics.update(col.get_metrics(ver)) dbmetrics = ( db.session.query(DruidMetric) .filter(DruidMetric.datasource_id == self.id)