diff --git a/requirements.txt b/requirements.txt index 4babe3d7bf3d..1b15a19ed104 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,7 +21,7 @@ croniter==0.3.31 # via apache-superset (setup.py) cryptography==2.8 # via apache-superset (setup.py) decorator==4.4.1 # via retry defusedxml==0.6.0 # via python3-openid -flask-appbuilder==2.3.0 +flask-appbuilder==2.3.0 # via apache-superset (setup.py) flask-babel==1.0.0 # via flask-appbuilder flask-caching==1.8.0 flask-compress==1.4.0 diff --git a/superset/commands/exceptions.py b/superset/commands/exceptions.py index 61a18ebdc33f..fc79bab684ed 100644 --- a/superset/commands/exceptions.py +++ b/superset/commands/exceptions.py @@ -25,7 +25,10 @@ class CommandException(SupersetException): """ Common base class for Command exceptions. """ - pass + def __repr__(self): + if self._exception: + return self._exception + return self class CommandInvalidError(CommandException): diff --git a/superset/connectors/sqla/models.py b/superset/connectors/sqla/models.py index aaa4252e7f33..947551cf5bf2 100644 --- a/superset/connectors/sqla/models.py +++ b/superset/connectors/sqla/models.py @@ -1073,7 +1073,7 @@ def mutator(df: pd.DataFrame) -> None: def get_sqla_table_object(self) -> Table: return self.database.get_table(self.table_name, schema=self.schema) - def fetch_metadata(self) -> None: + def fetch_metadata(self, commit=True) -> None: """Fetches the metadata for the table and merges it in""" try: table = self.get_sqla_table_object() @@ -1086,7 +1086,6 @@ def fetch_metadata(self) -> None: ).format(self.table_name) ) - M = SqlMetric metrics = [] any_date_col = None db_engine_spec = self.database.db_engine_spec @@ -1123,7 +1122,7 @@ def fetch_metadata(self) -> None: any_date_col = col.name metrics.append( - M( + SqlMetric( metric_name="count", verbose_name="COUNT(*)", metric_type="count", @@ -1134,7 +1133,8 @@ def fetch_metadata(self) -> None: self.main_dttm_col = any_date_col self.add_missing_metrics(metrics) db.session.merge(self) - db.session.commit() + if commit: + db.session.commit() @classmethod def import_obj(cls, i_datasource, import_time=None) -> int: diff --git a/superset/datasets/api.py b/superset/datasets/api.py index 2e12629c42f1..0a2f1fe7e20e 100644 --- a/superset/datasets/api.py +++ b/superset/datasets/api.py @@ -30,8 +30,10 @@ DatasetForbiddenError, DatasetInvalidError, DatasetNotFoundError, + DatasetRefreshFailedError, DatasetUpdateFailedError, ) +from superset.datasets.commands.refresh import RefreshDatasetCommand from superset.datasets.commands.update import UpdateDatasetCommand from superset.datasets.schemas import DatasetPostSchema, DatasetPutSchema from superset.views.base import DatasourceFilter @@ -49,9 +51,12 @@ class DatasetRestApi(BaseSupersetModelRestApi): allow_browser_login = True class_permission_name = "TableModelView" - include_route_methods = RouteMethod.REST_MODEL_VIEW_CRUD_SET | {RouteMethod.RELATED} + include_route_methods = ( + RouteMethod.REST_MODEL_VIEW_CRUD_SET | {RouteMethod.RELATED} | {"refresh"} + ) list_columns = [ + "database_name", "changed_by_name", "changed_by_url", "changed_by.username", @@ -79,6 +84,8 @@ class DatasetRestApi(BaseSupersetModelRestApi): "template_params", "owners.id", "owners.username", + "columns", + "metrics", ] add_model_schema = DatasetPostSchema() edit_model_schema = DatasetPutSchema() @@ -97,6 +104,8 @@ class DatasetRestApi(BaseSupersetModelRestApi): "is_sqllab_view", "template_params", "owners", + "columns", + "metrics", ] openapi_spec_tag = "Datasets" @@ -268,3 +277,49 @@ def delete(self, pk: int) -> Response: # pylint: disable=arguments-differ except DatasetDeleteFailedError as e: logger.error(f"Error deleting model {self.__class__.__name__}: {e}") return self.response_422(message=str(e)) + + @expose("//refresh", methods=["PUT"]) + @protect() + @safe + def refresh(self, pk: int) -> Response: # pylint: disable=invalid-name + """Refresh a Dataset + --- + put: + description: >- + Refreshes and updates columns of a dataset + parameters: + - in: path + schema: + type: integer + name: pk + responses: + 200: + description: Dataset delete + content: + application/json: + schema: + type: object + properties: + message: + type: string + 401: + $ref: '#/components/responses/401' + 403: + $ref: '#/components/responses/403' + 404: + $ref: '#/components/responses/404' + 422: + $ref: '#/components/responses/422' + 500: + $ref: '#/components/responses/500' + """ + try: + RefreshDatasetCommand(g.user, pk).run() + return self.response(200, message="OK") + except DatasetNotFoundError: + return self.response_404() + except DatasetForbiddenError: + return self.response_403() + except DatasetRefreshFailedError as e: + logger.error(f"Error refreshing dataset {self.__class__.__name__}: {e}") + return self.response_422(message=str(e)) diff --git a/superset/datasets/commands/create.py b/superset/datasets/commands/create.py index 466e35dd7f81..71d622569e3a 100644 --- a/superset/datasets/commands/create.py +++ b/superset/datasets/commands/create.py @@ -19,6 +19,7 @@ from flask_appbuilder.security.sqla.models import User from marshmallow import ValidationError +from sqlalchemy.exc import SQLAlchemyError from superset.commands.base import BaseCommand from superset.commands.utils import populate_owners @@ -31,6 +32,7 @@ TableNotFoundValidationError, ) from superset.datasets.dao import DatasetDAO +from superset.extensions import db, security_manager logger = logging.getLogger(__name__) @@ -43,9 +45,23 @@ def __init__(self, user: User, data: Dict): def run(self): self.validate() try: - dataset = DatasetDAO.create(self._properties) - except DAOCreateFailedError as e: - logger.exception(e.exception) + # Creates SqlaTable (Dataset) + dataset = DatasetDAO.create(self._properties, commit=False) + # Updates columns and metrics from the dataset + dataset.fetch_metadata(commit=False) + # Add datasource access permission + security_manager.add_permission_view_menu( + "datasource_access", dataset.get_perm() + ) + # Add schema access permission if exists + if dataset.schema: + security_manager.add_permission_view_menu( + "schema_access", dataset.schema_perm + ) + db.session.commit() + except (SQLAlchemyError, DAOCreateFailedError) as e: + logger.exception(e) + db.session.rollback() raise DatasetCreateFailedError() return dataset diff --git a/superset/datasets/commands/delete.py b/superset/datasets/commands/delete.py index 85837f96f7fb..668d63a90248 100644 --- a/superset/datasets/commands/delete.py +++ b/superset/datasets/commands/delete.py @@ -18,6 +18,7 @@ from typing import Optional from flask_appbuilder.security.sqla.models import User +from sqlalchemy.exc import SQLAlchemyError from superset.commands.base import BaseCommand from superset.connectors.sqla.models import SqlaTable @@ -29,6 +30,7 @@ ) from superset.datasets.dao import DatasetDAO from superset.exceptions import SupersetSecurityException +from superset.extensions import db, security_manager from superset.views.base import check_ownership logger = logging.getLogger(__name__) @@ -43,9 +45,14 @@ def __init__(self, user: User, model_id: int): def run(self): self.validate() try: - dataset = DatasetDAO.delete(self._model) - except DAODeleteFailedError as e: - logger.exception(e.exception) + dataset = DatasetDAO.delete(self._model, commit=False) + security_manager.del_permission_view_menu( + "datasource_access", dataset.get_perm() + ) + db.session.commit() + except (SQLAlchemyError, DAODeleteFailedError) as e: + logger.exception(e) + db.session.rollback() raise DatasetDeleteFailedError() return dataset diff --git a/superset/datasets/commands/exceptions.py b/superset/datasets/commands/exceptions.py index a6d0ed7deda3..35c6106da35b 100644 --- a/superset/datasets/commands/exceptions.py +++ b/superset/datasets/commands/exceptions.py @@ -57,6 +57,68 @@ def __init__(self, table_name: str): ) +class DatasetColumnNotFoundValidationError(ValidationError): + """ + Marshmallow validation error when dataset column for update does not exist + """ + + def __init__(self): + super().__init__(_("One or more columns do not exist"), field_names=["columns"]) + + +class DatasetColumnsDuplicateValidationError(ValidationError): + """ + Marshmallow validation error when dataset columns have a duplicate on the list + """ + + def __init__(self): + super().__init__( + _("One or more columns are duplicated"), field_names=["columns"] + ) + + +class DatasetColumnsExistsValidationError(ValidationError): + """ + Marshmallow validation error when dataset columns already exist + """ + + def __init__(self): + super().__init__( + _("One or more columns already exist"), field_names=["columns"] + ) + + +class DatasetMetricsNotFoundValidationError(ValidationError): + """ + Marshmallow validation error when dataset metric for update does not exist + """ + + def __init__(self): + super().__init__(_("One or more metrics do not exist"), field_names=["metrics"]) + + +class DatasetMetricsDuplicateValidationError(ValidationError): + """ + Marshmallow validation error when dataset metrics have a duplicate on the list + """ + + def __init__(self): + super().__init__( + _("One or more metrics are duplicated"), field_names=["metrics"] + ) + + +class DatasetMetricsExistsValidationError(ValidationError): + """ + Marshmallow validation error when dataset metrics already exist + """ + + def __init__(self): + super().__init__( + _("One or more metrics already exist"), field_names=["metrics"] + ) + + class TableNotFoundValidationError(ValidationError): """ Marshmallow validation error when a table does not exist on the database @@ -99,5 +161,9 @@ class DatasetDeleteFailedError(DeleteFailedError): message = _("Dataset could not be deleted.") +class DatasetRefreshFailedError(UpdateFailedError): + message = _("Dataset could not be updated.") + + class DatasetForbiddenError(ForbiddenError): message = _("Changing this dataset is forbidden") diff --git a/superset/datasets/commands/refresh.py b/superset/datasets/commands/refresh.py new file mode 100644 index 000000000000..2c58cc4b2afc --- /dev/null +++ b/superset/datasets/commands/refresh.py @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +from typing import Optional + +from flask_appbuilder.security.sqla.models import User + +from superset.commands.base import BaseCommand +from superset.connectors.sqla.models import SqlaTable +from superset.datasets.commands.exceptions import ( + DatasetForbiddenError, + DatasetNotFoundError, + DatasetRefreshFailedError, +) +from superset.datasets.dao import DatasetDAO +from superset.exceptions import SupersetSecurityException +from superset.views.base import check_ownership + +logger = logging.getLogger(__name__) + + +class RefreshDatasetCommand(BaseCommand): + def __init__(self, user: User, model_id: int): + self._actor = user + self._model_id = model_id + self._model: Optional[SqlaTable] = None + + def run(self): + self.validate() + try: + # Updates columns and metrics from the dataset + self._model.fetch_metadata() + except Exception as e: + logger.exception(e) + raise DatasetRefreshFailedError() + return self._model + + def validate(self) -> None: + # Validate/populate model exists + self._model = DatasetDAO.find_by_id(self._model_id) + if not self._model: + raise DatasetNotFoundError() + # Check ownership + try: + check_ownership(self._model) + except SupersetSecurityException: + raise DatasetForbiddenError() diff --git a/superset/datasets/commands/update.py b/superset/datasets/commands/update.py index 05a0b96664bd..0b40952836ce 100644 --- a/superset/datasets/commands/update.py +++ b/superset/datasets/commands/update.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. import logging +from collections import Counter from typing import Dict, List, Optional from flask_appbuilder.security.sqla.models import User @@ -26,9 +27,15 @@ from superset.dao.exceptions import DAOUpdateFailedError from superset.datasets.commands.exceptions import ( DatabaseChangeValidationError, + DatasetColumnNotFoundValidationError, + DatasetColumnsDuplicateValidationError, + DatasetColumnsExistsValidationError, DatasetExistsValidationError, DatasetForbiddenError, DatasetInvalidError, + DatasetMetricsDuplicateValidationError, + DatasetMetricsExistsValidationError, + DatasetMetricsNotFoundValidationError, DatasetNotFoundError, DatasetUpdateFailedError, ) @@ -84,7 +91,64 @@ def validate(self) -> None: self._properties["owners"] = owners except ValidationError as e: exceptions.append(e) + + # Validate columns + columns = self._properties.get("columns") + if columns: + self._validate_columns(columns, exceptions) + + # Validate metrics + metrics = self._properties.get("metrics") + if metrics: + self._validate_metrics(metrics, exceptions) + if exceptions: exception = DatasetInvalidError() exception.add_list(exceptions) raise exception + + def _validate_columns(self, columns: List[Dict], exceptions: List[ValidationError]): + # Validate duplicates on data + if self._get_duplicates(columns, "column_name"): + exceptions.append(DatasetColumnsDuplicateValidationError()) + else: + # validate invalid id's + columns_ids: List[int] = [ + column["id"] for column in columns if "id" in column + ] + if not DatasetDAO.validate_columns_exist(self._model_id, columns_ids): + exceptions.append(DatasetColumnNotFoundValidationError()) + # validate new column names uniqueness + columns_names: List[str] = [ + column["column_name"] for column in columns if "id" not in column + ] + if not DatasetDAO.validate_columns_uniqueness( + self._model_id, columns_names + ): + exceptions.append(DatasetColumnsExistsValidationError()) + + def _validate_metrics(self, metrics: List[Dict], exceptions: List[ValidationError]): + if self._get_duplicates(metrics, "metric_name"): + exceptions.append(DatasetMetricsDuplicateValidationError()) + else: + # validate invalid id's + metrics_ids: List[int] = [ + metric["id"] for metric in metrics if "id" in metric + ] + if not DatasetDAO.validate_metrics_exist(self._model_id, metrics_ids): + exceptions.append(DatasetMetricsNotFoundValidationError()) + # validate new metric names uniqueness + metric_names: List[str] = [ + metric["metric_name"] for metric in metrics if "id" not in metric + ] + if not DatasetDAO.validate_metrics_uniqueness(self._model_id, metric_names): + exceptions.append(DatasetMetricsExistsValidationError()) + + @staticmethod + def _get_duplicates(data: List[Dict], key: str): + duplicates = [ + name + for name, count in Counter([item[key] for item in data]).items() + if count > 1 + ] + return duplicates diff --git a/superset/datasets/dao.py b/superset/datasets/dao.py index 7e08ce8c0c99..7a35eaed4ce8 100644 --- a/superset/datasets/dao.py +++ b/superset/datasets/dao.py @@ -15,18 +15,13 @@ # specific language governing permissions and limitations # under the License. import logging -from typing import Dict, Optional +from typing import Dict, List, Optional from flask import current_app -from flask_appbuilder.models.sqla.interface import SQLAInterface from sqlalchemy.exc import SQLAlchemyError -from superset.commands.exceptions import ( - CreateFailedError, - DeleteFailedError, - UpdateFailedError, -) -from superset.connectors.sqla.models import SqlaTable +from superset.connectors.sqla.models import SqlaTable, SqlMetric, TableColumn +from superset.dao.base import BaseDAO from superset.extensions import db from superset.models.core import Database from superset.views.base import DatasourceFilter @@ -34,7 +29,10 @@ logger = logging.getLogger(__name__) -class DatasetDAO: +class DatasetDAO(BaseDAO): + model_cls = SqlaTable + base_filter = DatasourceFilter + @staticmethod def get_owner_by_id(owner_id: int) -> Optional[object]: return ( @@ -79,47 +77,108 @@ def validate_update_uniqueness( return not db.session.query(dataset_query.exists()).scalar() @staticmethod - def find_by_id(model_id: int) -> SqlaTable: - data_model = SQLAInterface(SqlaTable, db.session) - query = db.session.query(SqlaTable) - query = DatasourceFilter("id", data_model).apply(query, None) - return query.filter_by(id=model_id).one_or_none() + def validate_columns_exist(dataset_id: int, columns_ids: List[int]) -> bool: + dataset_query = ( + db.session.query(TableColumn.id).filter( + TableColumn.table_id == dataset_id, TableColumn.id.in_(columns_ids) + ) + ).all() + return len(columns_ids) == len(dataset_query) @staticmethod - def create(properties: Dict, commit=True) -> Optional[SqlaTable]: - model = SqlaTable() - for key, value in properties.items(): - setattr(model, key, value) - try: - db.session.add(model) - if commit: - db.session.commit() - except SQLAlchemyError as e: # pragma: no cover - db.session.rollback() - raise CreateFailedError(exception=e) - return model + def validate_columns_uniqueness(dataset_id: int, columns_names: List[str]) -> bool: + dataset_query = ( + db.session.query(TableColumn.id).filter( + TableColumn.table_id == dataset_id, + TableColumn.column_name.in_(columns_names), + ) + ).all() + return len(dataset_query) == 0 @staticmethod - def update(model: SqlaTable, properties: Dict, commit=True) -> Optional[SqlaTable]: - for key, value in properties.items(): - setattr(model, key, value) - try: - db.session.merge(model) - if commit: - db.session.commit() - except SQLAlchemyError as e: # pragma: no cover - db.session.rollback() - raise UpdateFailedError(exception=e) - return model + def validate_metrics_exist(dataset_id: int, metrics_ids: List[int]) -> bool: + dataset_query = ( + db.session.query(SqlMetric.id).filter( + SqlMetric.table_id == dataset_id, SqlMetric.id.in_(metrics_ids) + ) + ).all() + return len(metrics_ids) == len(dataset_query) @staticmethod - def delete(model: SqlaTable, commit=True): - try: - db.session.delete(model) - if commit: - db.session.commit() - except SQLAlchemyError as e: # pragma: no cover - logger.error(f"Failed to delete dataset: {e}") - db.session.rollback() - raise DeleteFailedError(exception=e) - return model + def validate_metrics_uniqueness(dataset_id: int, metrics_names: List[str]) -> bool: + dataset_query = ( + db.session.query(SqlMetric.id).filter( + SqlMetric.table_id == dataset_id, + SqlMetric.metric_name.in_(metrics_names), + ) + ).all() + return len(dataset_query) == 0 + + @classmethod + def update( + cls, model: SqlaTable, properties: Dict, commit=True + ) -> Optional[SqlaTable]: + """ + Updates a Dataset model on the metadata DB + """ + if "columns" in properties: + new_columns = list() + for column in properties.get("columns", []): + if column.get("id"): + column_obj = db.session.query(TableColumn).get(column.get("id")) + column_obj = DatasetDAO.update_column( + column_obj, column, commit=commit + ) + else: + column_obj = DatasetDAO.create_column(column, commit=commit) + new_columns.append(column_obj) + properties["columns"] = new_columns + + if "metrics" in properties: + new_metrics = list() + for metric in properties.get("metrics", []): + if metric.get("id"): + metric_obj = db.session.query(SqlMetric).get(metric.get("id")) + metric_obj = DatasetDAO.update_metric( + metric_obj, metric, commit=commit + ) + else: + metric_obj = DatasetDAO.create_metric(metric, commit=commit) + new_metrics.append(metric_obj) + properties["metrics"] = new_metrics + + return super().update(model, properties, commit=commit) + + @classmethod + def update_column( + cls, model: TableColumn, properties: Dict, commit=True + ) -> Optional[TableColumn]: + return DatasetColumnDAO.update(model, properties, commit=commit) + + @classmethod + def create_column(cls, properties: Dict, commit=True) -> Optional[TableColumn]: + """ + Creates a Dataset model on the metadata DB + """ + return DatasetColumnDAO.create(properties, commit=commit) + + @classmethod + def update_metric( + cls, model: SqlMetric, properties: Dict, commit=True + ) -> Optional[SqlMetric]: + return DatasetMetricDAO.update(model, properties, commit=commit) + + @classmethod + def create_metric(cls, properties: Dict, commit=True) -> Optional[SqlMetric]: + """ + Creates a Dataset model on the metadata DB + """ + return DatasetMetricDAO.create(properties, commit=commit) + + +class DatasetColumnDAO(BaseDAO): + model_cls = TableColumn + + +class DatasetMetricDAO(BaseDAO): + model_cls = SqlMetric diff --git a/superset/datasets/schemas.py b/superset/datasets/schemas.py index 370550da619c..b5b74e56d63f 100644 --- a/superset/datasets/schemas.py +++ b/superset/datasets/schemas.py @@ -14,11 +14,54 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import re -from marshmallow import fields, Schema +from flask_babel import lazy_gettext as _ +from marshmallow import fields, Schema, ValidationError from marshmallow.validate import Length +def validate_python_date_format(value): + regex = re.compile( + r""" + ^( + epoch_s|epoch_ms| + (?P%Y(-%m(-%d)?)?)([\sT](?P