diff --git a/caravel/bin/caravel b/caravel/bin/caravel index 83c6bb6071fc..f03684cc16d6 100755 --- a/caravel/bin/caravel +++ b/caravel/bin/caravel @@ -118,5 +118,11 @@ def refresh_druid(): session.commit() +@manager.command +def worker(): + """Starts a Caravel worker for async query load""" + raise NotImplementedError("# TODO! @b.kyryliuk") + + if __name__ == "__main__": manager.run() diff --git a/caravel/config.py b/caravel/config.py index 1c66411e99db..4cbf9af5088b 100644 --- a/caravel/config.py +++ b/caravel/config.py @@ -173,6 +173,23 @@ INTERVAL = 1 BACKUP_COUNT = 30 +# Default celery config is to use SQLA as a broker, in a production setting +# you'll want to use a proper broker as specified here: +# http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html +""" +# Example: +class CeleryConfig(object): + BROKER_URL = 'amqp://guest:guest@localhost:5672//' + ## Broker settings. + BROKER_URL = 'amqp://guest:guest@localhost:5672//' + CELERY_IMPORTS = ('myapp.tasks', ) + CELERY_RESULT_BACKEND = 'db+sqlite:///results.db' + CELERY_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}} +""" +CELERY_CONFIG = None + +# Maximum number of rows returned in the SQL editor +SQL_MAX_ROW = 1000 try: from caravel_config import * # noqa @@ -181,3 +198,4 @@ if not CACHE_DEFAULT_TIMEOUT: CACHE_DEFAULT_TIMEOUT = CACHE_CONFIG.get('CACHE_DEFAULT_TIMEOUT') + diff --git a/caravel/migrations/versions/33459b145c15_allow_temp_table.py b/caravel/migrations/versions/33459b145c15_allow_temp_table.py new file mode 100644 index 000000000000..7fab1a13a63e --- /dev/null +++ b/caravel/migrations/versions/33459b145c15_allow_temp_table.py @@ -0,0 +1,23 @@ +"""allow_temp_table + +Revision ID: 33459b145c15 +Revises: d8bc074f7aad +Create Date: 2016-06-13 15:54:08.117103 + +""" + +# revision identifiers, used by Alembic. +revision = '33459b145c15' +down_revision = 'd8bc074f7aad' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.add_column( + 'dbs', sa.Column('allow_temp_table', sa.Boolean(), nullable=True)) + + +def downgrade(): + op.drop_column('dbs', 'allow_temp_table') diff --git a/caravel/models.py b/caravel/models.py index 651918dd3886..f9f28895a557 100644 --- a/caravel/models.py +++ b/caravel/models.py @@ -368,6 +368,7 @@ class Database(Model, AuditMixinNullable): sqlalchemy_uri = Column(String(1024)) password = Column(EncryptedType(String(1024), config.get('SECRET_KEY'))) cache_timeout = Column(Integer) + allow_temp_table = Column(Boolean, default=False) extra = Column(Text, default=textwrap.dedent("""\ { "metadata_params": {}, diff --git a/caravel/views.py b/caravel/views.py index d2e8e5b9db52..bda4e9efea17 100644 --- a/caravel/views.py +++ b/caravel/views.py @@ -4,17 +4,18 @@ from __future__ import print_function from __future__ import unicode_literals +from datetime import datetime import json import logging import re import sys import time import traceback -from datetime import datetime import pandas as pd import sqlalchemy as sqla +import celery from flask import ( g, request, redirect, flash, Response, render_template, Markup) from flask_appbuilder import ModelView, CompactCRUDMixin, BaseView, expose @@ -36,6 +37,47 @@ config = app.config log_this = models.Log.log_this +celery_app = celery.Celery(celery_config=config.get('CELERY_CONFIG')) + +@celery_app.task +def get_sql_results(database_id, sql, async=False): + """Gets sql results from a Caravel database connection""" + # TODO @b.kyryliuk handle async + # handle models.Queries (userid, sql, timestamps, status) index on userid, state, start_ddtm + session = db.session() + mydb = session.query(models.Database).filter_by(id=database_id).first() + + if ( + not self.appbuilder.sm.has_access( + 'all_datasource_access', 'all_datasource_access')): + raise utils.CaravelSecurityException(_( + "This view requires the `all_datasource_access` permission")) + content = "" + if mydb: + eng = mydb.get_sqla_engine() + if config.SQL_MAX_ROW: + sql = sql.strip().strip(';') + qry = ( + select('*') + .select_from(TextAsFrom(text(sql), ['*']).alias('inner_qry')) + .limit(config.SQL_MAX_ROW) + ) + sql = str(qry.compile(eng, compile_kwargs={"literal_binds": True})) + try: + df = pd.read_sql_query(sql=sql, con=eng) + content = df.to_html( + index=False, + na_rep='', + classes=( + "dataframe table table-striped table-bordered " + "table-condensed sql_results").split(' ')) + except Exception as e: + content = ( + '
' + "{}
" + ).format(e.message) + session.commit() + def check_ownership(obj, raise_if_false=True): """Meant to be used in `pre_update` hooks on models to enforce ownership @@ -285,7 +327,8 @@ class DatabaseView(CaravelModelView, DeleteMixin): # noqa datamodel = SQLAInterface(models.Database) list_columns = ['database_name', 'sql_link', 'creator', 'changed_on_'] add_columns = [ - 'database_name', 'sqlalchemy_uri', 'cache_timeout', 'extra'] + 'database_name', 'sqlalchemy_uri', 'cache_timeout', 'extra', + 'allow_temp_table'] search_exclude_columns = ('password',) edit_columns = add_columns add_template = "caravel/models/database/add.html" @@ -305,6 +348,10 @@ class DatabaseView(CaravelModelView, DeleteMixin): # noqa "gets unpacked into the [sqlalchemy.MetaData]" "(http://docs.sqlalchemy.org/en/rel_1_0/core/metadata.html" "#sqlalchemy.schema.MetaData) call. ", True), + 'allow_temp_table': ( + "Whether Caravel can run async queries by and attempt to " + "store results in temporary tables" + ), } label_columns = { 'database_name': _("Database"), @@ -1018,45 +1065,19 @@ def select_star(self, database_id, table_name): @log_this def runsql(self): """Runs arbitrary sql and returns and html table""" - session = db.session() - limit = 1000 data = json.loads(request.form.get('data')) sql = data.get('sql') database_id = data.get('database_id') - mydb = session.query(models.Database).filter_by(id=database_id).first() - - if ( - not self.appbuilder.sm.has_access( - 'all_datasource_access', 'all_datasource_access')): - raise utils.CaravelSecurityException(_( - "This view requires the `all_datasource_access` permission")) - content = "" - if mydb: - eng = mydb.get_sqla_engine() - if limit: - sql = sql.strip().strip(';') - qry = ( - select('*') - .select_from(TextAsFrom(text(sql), ['*']).alias('inner_qry')) - .limit(limit) - ) - sql = str(qry.compile(eng, compile_kwargs={"literal_binds": True})) - try: - df = pd.read_sql_query(sql=sql, con=eng) - content = df.to_html( - index=False, - na_rep='', - classes=( - "dataframe table table-striped table-bordered " - "table-condensed sql_results").split(' ')) - except Exception as e: - content = ( - '
' - "{}
" - ).format(e.message) - session.commit() + # TODO @b.kyryliuk handle async + content = get_sql_results(database_id, sql) + # get_sql_results.async(database_id, sql) return content + @expose("/async_sql_status/") + def async_sql_status(self, userid): + #TODO @b.kyryliuk + return + @has_access @expose("/refresh_datasources/") def refresh_datasources(self): diff --git a/setup.py b/setup.py index 127f57e28d84..b80de6bff600 100644 --- a/setup.py +++ b/setup.py @@ -18,6 +18,7 @@ install_requires=[ 'alembic>=0.8.5, <0.9.0', 'babel==2.3.4', + 'celery==3.1.23', 'cryptography>=1.1.1, <2.0.0', 'flask-appbuilder>=1.7.1, <2.0.0', 'Flask-BabelPkg==0.9.6',