Skip to content

Commit

Permalink
Now enabling multi-cluster, connection info managed in UI
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Jul 30, 2015
1 parent 6032daf commit 374802e
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 58 deletions.
2 changes: 2 additions & 0 deletions app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,6 @@ class MyIndexView(IndexView):
app, db.session, base_template='panoramix/base.html',
indexview=MyIndexView)

get_session = appbuilder.get_session

from app import views
71 changes: 48 additions & 23 deletions app/models.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from flask.ext.appbuilder import Model
from datetime import datetime, timedelta
from flask.ext.appbuilder.models.mixins import AuditMixin, FileColumn, ImageColumn
from flask.ext.appbuilder.security.sqla.models import User
from pydruid import client
from datetime import timedelta
from flask.ext.appbuilder.models.mixins import AuditMixin, FileColumn
from sqlalchemy import Column, Integer, String, ForeignKey, Text, Boolean, DateTime
from sqlalchemy.orm import relationship
from app import db, utils
from app import get_session
from dateutil.parser import parse
import json


client = utils.get_pydruid_client()
import logging
import json
import requests


class Cluster(Model, AuditMixin):
Expand All @@ -24,6 +24,27 @@ class Cluster(Model, AuditMixin):
broker_endpoint = Column(String(256))
metadata_last_refreshed = Column(DateTime)

def __repr__(self):
return self.cluster_name

def get_pydruid_client(self):
cli = client.PyDruid(
"http://{0}:{1}/".format(self.broker_host, self.broker_port),
self.broker_endpoint)
return cli

def refresh_datasources(self):
endpoint = (
"http://{self.coordinator_host}:{self.coordinator_port}/"
"{self.coordinator_endpoint}/datasources"
).format(self=self)
datasources = json.loads(requests.get(endpoint).text)
for datasource in datasources:
#try:
Datasource.sync_to_db(datasource, self)
#except Exception as e:
# logging.exception(e)
# logging.error("Failed at syncing " + datasource)

class Datasource(Model, AuditMixin):
__tablename__ = 'datasources'
Expand Down Expand Up @@ -60,15 +81,15 @@ def get_metric_obj(self, metric_name):
if m.metric_name == metric_name
][0]

@classmethod
def latest_metadata(cls, name):
results = client.time_boundary(datasource=name)
def latest_metadata(self):
client = self.cluster.get_pydruid_client()
results = client.time_boundary(datasource=self.datasource_name)
max_time = results[0]['result']['minTime']
max_time = parse(max_time)
intervals = (max_time - timedelta(seconds=1)).isoformat() + '/'
intervals += (max_time + timedelta(seconds=1)).isoformat()
segment_metadata = client.segment_metadata(
datasource=name,
datasource=self.datasource_name,
intervals=intervals)
if segment_metadata:
return segment_metadata[-1]['columns']
Expand All @@ -78,31 +99,35 @@ def generate_metrics(self):
col.generate_metrics()

@classmethod
def sync_to_db(cls, name):
datasource = db.session.query(cls).filter_by(datasource_name=name).first()
def sync_to_db(cls, name, cluster):
session = get_session()
datasource = session.query(cls).filter_by(datasource_name=name).first()
if not datasource:
db.session.add(cls(datasource_name=name))
cols = cls.latest_metadata(name)
datasource = cls(datasource_name=name)
session.add(datasource)
datasource.cluster = cluster

cols = datasource.latest_metadata()
if not cols:
return
for col in cols:
col_obj = (
db.session
session
.query(Column)
.filter_by(datasource_name=name, column_name=col)
.first()
)
datatype = cols[col]['type']
if not col_obj:
col_obj = Column(datasource_name=name, column_name=col)
db.session.add(col_obj)
session.add(col_obj)
if datatype == "STRING":
col_obj.groupby = True
col_obj.filterable = True
if col_obj:
col_obj.type = cols[col]['type']
col_obj.generate_metrics()
db.session.commit()
#session.commit()

@property
def column_names(self):
Expand Down Expand Up @@ -171,8 +196,7 @@ def generate_metrics(self):
metric_name='count',
verbose_name='COUNT(*)',
metric_type='count',
json=json.dumps({
'type': 'count', 'name': 'count'})
json=json.dumps({'type': 'count', 'name': 'count'})
))

if self.sum and self.isnum:
Expand Down Expand Up @@ -217,14 +241,15 @@ def generate_metrics(self):
'name': name,
'fieldNames': [self.column_name]})
))
session = get_session()
for metric in metrics:
m = (
db.session.query(M)
session.query(M)
.filter(M.datasource_name==self.datasource_name)
.filter(M.metric_name==metric.metric_name)
.first()
)
metric.datasource_name = self.datasource_name
if not m:
db.session.add(metric)
db.session.commit()
session.add(metric)
session.commit()
8 changes: 2 additions & 6 deletions app/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import config
from datetime import timedelta, datetime
from datetime import datetime
import parsedatetime
from app import db


def get_pydruid_client():
from pydruid import client
return client.PyDruid(
"http://{0}:{1}/".format(config.DRUID_HOST, config.DRUID_PORT),
config.DRUID_BASE_ENDPOINT)


def parse_human_datetime(s):
Expand Down
51 changes: 33 additions & 18 deletions app/views.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from datetime import timedelta
from datetime import datetime
import logging
import json

from flask import request, redirect, flash, Response
from flask.ext.appbuilder.models.sqla.interface import SQLAInterface
from flask.ext.appbuilder import ModelView, CompactCRUDMixin, BaseView, expose
from app import appbuilder, db, models, viz, utils, app
from app import appbuilder, db, models, viz, utils, app, get_session
from flask.ext.appbuilder.security.decorators import has_access, permission_name
import config
from pydruid.client import doublesum
Expand Down Expand Up @@ -62,13 +62,32 @@ class MetricInlineView(CompactCRUDMixin, ModelView):
appbuilder.add_view_no_menu(MetricInlineView)


class ClusterModelView(ModelView, DeleteMixin):
datamodel = SQLAInterface(models.Cluster)
add_columns = [
'cluster_name',
'coordinator_host', 'coordinator_port', 'coordinator_endpoint',
'broker_host', 'broker_port', 'broker_endpoint',
]
edit_columns = add_columns
list_columns = ['cluster_name', 'metadata_last_refreshed']

appbuilder.add_view(
ClusterModelView,
"Clusters",
icon="fa-server",
category="Admin",
category_icon='fa-envelope')


class DatasourceModelView(ModelView, DeleteMixin):
datamodel = SQLAInterface(models.Datasource)
list_columns = ['datasource_link', 'owner', 'is_featured', 'is_hidden']
list_columns = [
'datasource_link', 'cluster', 'owner', 'is_featured', 'is_hidden']
related_views = [ColumnInlineView, MetricInlineView]
edit_columns = [
'datasource_name', 'description', 'owner', 'is_featured', 'is_hidden',
'default_endpoint']
'datasource_name', 'cluster', 'description', 'owner',
'is_featured', 'is_hidden', 'default_endpoint']
page_size = 100
base_order = ('datasource_name', 'asc')

Expand Down Expand Up @@ -129,19 +148,15 @@ def datasource(self, datasource_name):
@permission_name('refresh_datasources')
@expose("/refresh_datasources/")
def refresh_datasources(self):
import requests
endpoint = (
"http://{COORDINATOR_HOST}:{COORDINATOR_PORT}/"
"{COORDINATOR_BASE_ENDPOINT}/datasources"
).format(**config.__dict__)
datasources = json.loads(requests.get(endpoint).text)
for datasource in datasources:
try:
models.Datasource.sync_to_db(datasource)
except Exception as e:
logging.exception(e)
logging.error("Failed at syncing " + datasource)
flash("Refreshed metadata from Druid!", 'info')
session = db.session()
for cluster in session.query(models.Cluster).all():
cluster.refresh_datasources()
cluster.metadata_last_refreshed = datetime.now()
flash(
"Refreshed metadata from cluster "
"[" + cluster.cluster_name + "]",
'info')
session.commit()
return redirect("/datasourcemodelview/list/")

@expose("/autocomplete/<datasource>/<column>/")
Expand Down
6 changes: 3 additions & 3 deletions app/viz.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,12 +164,12 @@ def query_obj(self):
return d

def bake_query(self):
client = utils.get_pydruid_client()
client = self.datasource.cluster.get_pydruid_client()
client.groupby(**self.query_obj())
return client.export_pandas()

def get_query(self):
client = utils.get_pydruid_client()
client = self.datasource.cluster.get_pydruid_client()
client.groupby(**self.query_obj())
return client.query_dict

Expand Down Expand Up @@ -265,7 +265,7 @@ def bake_query(self):
"""
Doing a 2 phase query where we limit the number of series.
"""
client = utils.get_pydruid_client()
client = self.datasource.cluster.get_pydruid_client()
qry = self.query_obj()
orig_filter = qry['filter'] if 'filter' in qry else ''
qry['granularity'] = "all"
Expand Down
8 changes: 0 additions & 8 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,6 @@
#---------------------------------------------------------
ROW_LIMIT = 5000

DRUID_HOST = '0.0.0.0'
DRUID_PORT = '8084'
DRUID_BASE_ENDPOINT = 'druid/v2'

COORDINATOR_HOST = '0.0.0.0'
COORDINATOR_PORT = '8081'
COORDINATOR_BASE_ENDPOINT = 'druid/coordinator/v1'

PANORAMIX_WEBSERVER_PORT = 8088
#---------------------------------------------------------

Expand Down

0 comments on commit 374802e

Please sign in to comment.