-
Notifications
You must be signed in to change notification settings - Fork 16.6k
feat(datasource): remove deleted columns and update column type on metadata refresh #10619
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,17 +34,6 @@ const props = { | |
| onChange: () => {}, | ||
| }; | ||
|
|
||
| const extraColumn = { | ||
| column_name: 'new_column', | ||
| type: 'VARCHAR(10)', | ||
| description: null, | ||
| filterable: true, | ||
| verbose_name: null, | ||
| is_dttm: false, | ||
| expression: '', | ||
| groupby: true, | ||
| }; | ||
|
|
||
| const DATASOURCE_ENDPOINT = 'glob:*/datasource/external_metadata/*'; | ||
|
|
||
| describe('DatasourceEditor', () => { | ||
|
|
@@ -85,11 +74,65 @@ describe('DatasourceEditor', () => { | |
| }); | ||
| }); | ||
|
|
||
| it('merges columns', () => { | ||
| it('to add, remove and modify columns accordingly', () => { | ||
| const columns = [ | ||
| { | ||
| name: 'ds', | ||
| type: 'DATETIME', | ||
| nullable: true, | ||
| default: '', | ||
| primary_key: false, | ||
| }, | ||
| { | ||
| name: 'gender', | ||
| type: 'VARCHAR(32)', | ||
| nullable: true, | ||
| default: '', | ||
| primary_key: false, | ||
| }, | ||
| { | ||
| name: 'new_column', | ||
| type: 'VARCHAR(10)', | ||
| nullable: true, | ||
| default: '', | ||
| primary_key: false, | ||
| }, | ||
| ]; | ||
|
|
||
| const numCols = props.datasource.columns.length; | ||
| expect(inst.state.databaseColumns).toHaveLength(numCols); | ||
| inst.mergeColumns([extraColumn]); | ||
| expect(inst.state.databaseColumns).toHaveLength(numCols + 1); | ||
| inst.updateColumns(columns); | ||
| expect(inst.state.databaseColumns).toEqual( | ||
| expect.arrayContaining([ | ||
| { | ||
| type: 'DATETIME', | ||
| description: null, | ||
| filterable: false, | ||
| verbose_name: null, | ||
| is_dttm: true, | ||
| expression: '', | ||
| groupby: false, | ||
| column_name: 'ds', | ||
| }, | ||
|
Comment on lines
+107
to
+116
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This column is unchanged. |
||
| { | ||
| type: 'VARCHAR(32)', | ||
| description: null, | ||
| filterable: true, | ||
| verbose_name: null, | ||
| is_dttm: false, | ||
| expression: '', | ||
| groupby: true, | ||
| column_name: 'gender', | ||
| }, | ||
|
Comment on lines
+117
to
+126
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This type is updated from |
||
| expect.objectContaining({ | ||
| column_name: 'new_column', | ||
| type: 'VARCHAR(10)', | ||
| }), | ||
|
Comment on lines
+127
to
+130
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a new column. |
||
| ]), | ||
| ); | ||
| expect(inst.state.databaseColumns).not.toEqual( | ||
| expect.arrayContaining([expect.objectContaining({ name: 'name' })]), | ||
| ); | ||
|
Comment on lines
+133
to
+135
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is one of the removed columns in the original metadata. |
||
| }); | ||
|
|
||
| it('renders isSqla fields', () => { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -172,7 +172,7 @@ function ColumnCollectionTable({ | |
| ) : ( | ||
| v | ||
| ), | ||
| type: d => <Label bsStyle="s">{d}</Label>, | ||
| type: d => <Label>{d}</Label>, | ||
| is_dttm: checkboxGenerator, | ||
| filterable: checkboxGenerator, | ||
| groupby: checkboxGenerator, | ||
|
|
@@ -289,29 +289,58 @@ export class DatasourceEditor extends React.PureComponent { | |
| this.validate(this.onChange); | ||
| } | ||
|
|
||
| mergeColumns(cols) { | ||
| let { databaseColumns } = this.state; | ||
| let hasChanged; | ||
| const currentColNames = databaseColumns.map(col => col.column_name); | ||
| updateColumns(cols) { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Method was renamed to reflect that metadata is updated, not only merged. |
||
| const { databaseColumns } = this.state; | ||
| const databaseColumnNames = cols.map(col => col.name); | ||
| const currentCols = databaseColumns.reduce( | ||
| (agg, col) => ({ | ||
| ...agg, | ||
| [col.column_name]: col, | ||
| }), | ||
| {}, | ||
| ); | ||
| const finalColumns = []; | ||
| const results = { | ||
| added: [], | ||
| modified: [], | ||
| removed: databaseColumns | ||
| .map(col => col.column_name) | ||
| .filter(col => !databaseColumnNames.includes(col)), | ||
| }; | ||
| cols.forEach(col => { | ||
| if (currentColNames.indexOf(col.name) < 0) { | ||
| // Adding columns | ||
| databaseColumns = databaseColumns.concat([ | ||
| { | ||
| id: shortid.generate(), | ||
| column_name: col.name, | ||
| type: col.type, | ||
| groupby: true, | ||
| filterable: true, | ||
| }, | ||
| ]); | ||
| hasChanged = true; | ||
| const currentCol = currentCols[col.name]; | ||
| if (!currentCol) { | ||
| // new column | ||
| finalColumns.push({ | ||
| id: shortid.generate(), | ||
| column_name: col.name, | ||
| type: col.type, | ||
| groupby: true, | ||
| filterable: true, | ||
| }); | ||
| results.added.push(col.name); | ||
| } else if (currentCol.type !== col.type) { | ||
| // modified column | ||
| finalColumns.push({ | ||
| ...currentCol, | ||
| type: col.type, | ||
| }); | ||
| results.modified.push(col.name); | ||
| } else { | ||
| // unchanged | ||
| finalColumns.push(currentCol); | ||
| } | ||
| }); | ||
| if (hasChanged) { | ||
| this.setColumns({ databaseColumns }); | ||
| if ( | ||
| results.added.length || | ||
| results.modified.length || | ||
| results.removed.length | ||
| ) { | ||
| this.setColumns({ databaseColumns: finalColumns }); | ||
| } | ||
| return results; | ||
| } | ||
|
|
||
| syncMetadata() { | ||
| const { datasource } = this.state; | ||
| // Handle carefully when the schema is empty | ||
|
|
@@ -326,7 +355,19 @@ export class DatasourceEditor extends React.PureComponent { | |
|
|
||
| SupersetClient.get({ endpoint }) | ||
| .then(({ json }) => { | ||
| this.mergeColumns(json); | ||
| const results = this.updateColumns(json); | ||
| if (results.modified.length) | ||
| this.props.addSuccessToast( | ||
| t('Modified columns: %s', results.modified.join(', ')), | ||
| ); | ||
| if (results.removed.length) | ||
| this.props.addSuccessToast( | ||
| t('Removed columns: %s', results.removed.join(', ')), | ||
| ); | ||
| if (results.added.length) | ||
| this.props.addSuccessToast( | ||
| t('New columns added: %s', results.added.join(', ')), | ||
| ); | ||
| this.props.addSuccessToast(t('Metadata has been synced')); | ||
| this.setState({ metadataLoading: false }); | ||
| }) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,7 @@ | |
| # under the License. | ||
| import logging | ||
| from collections import OrderedDict | ||
| from dataclasses import dataclass, field | ||
| from datetime import datetime, timedelta | ||
| from typing import Any, Dict, Hashable, List, NamedTuple, Optional, Tuple, Union | ||
|
|
||
|
|
@@ -82,6 +83,13 @@ class QueryStringExtended(NamedTuple): | |
| sql: str | ||
|
|
||
|
|
||
| @dataclass | ||
| class MetadataResult: | ||
| added: List[str] = field(default_factory=list) | ||
| removed: List[str] = field(default_factory=list) | ||
| modified: List[str] = field(default_factory=list) | ||
|
|
||
|
|
||
| class AnnotationDatasource(BaseDatasource): | ||
| """ Dummy object so we can query annotations using 'Viz' objects just like | ||
| regular datasources. | ||
|
|
@@ -1230,10 +1238,15 @@ def mutator(df: pd.DataFrame) -> None: | |
| def get_sqla_table_object(self) -> Table: | ||
| return self.database.get_table(self.table_name, schema=self.schema) | ||
|
|
||
| def fetch_metadata(self, commit: bool = True) -> None: | ||
| """Fetches the metadata for the table and merges it in""" | ||
| def fetch_metadata(self, commit: bool = True) -> MetadataResult: | ||
| """ | ||
| Fetches the metadata for the table and merges it in | ||
|
|
||
| :param commit: should the changes be committed or not. | ||
| :return: Tuple with lists of added, removed and modified column names. | ||
| """ | ||
| try: | ||
| table_ = self.get_sqla_table_object() | ||
| new_table = self.get_sqla_table_object() | ||
| except SQLAlchemyError: | ||
| raise QueryObjectValidationError( | ||
| _( | ||
|
|
@@ -1247,35 +1260,46 @@ def fetch_metadata(self, commit: bool = True) -> None: | |
| any_date_col = None | ||
| db_engine_spec = self.database.db_engine_spec | ||
| db_dialect = self.database.get_dialect() | ||
| dbcols = ( | ||
| db.session.query(TableColumn) | ||
| .filter(TableColumn.table == self) | ||
| .filter(or_(TableColumn.column_name == col.name for col in table_.columns)) | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure what the intent here was, but I don't see any scenario where we want to bring in columns from another table with the same column names. |
||
| old_columns = db.session.query(TableColumn).filter(TableColumn.table == self) | ||
|
|
||
| old_columns_by_name = {col.column_name: col for col in old_columns} | ||
| results = MetadataResult( | ||
| removed=[ | ||
| col | ||
| for col in old_columns_by_name | ||
| if col not in {col.name for col in new_table.columns} | ||
| ] | ||
| ) | ||
| dbcols = {dbcol.column_name: dbcol for dbcol in dbcols} | ||
|
|
||
| for col in table_.columns: | ||
| # clear old columns before adding modified columns back | ||
| self.columns = [] | ||
| for col in new_table.columns: | ||
| try: | ||
| datatype = db_engine_spec.column_datatype_to_string( | ||
| col.type, db_dialect | ||
| ) | ||
| except Exception as ex: # pylint: disable=broad-except | ||
| datatype = "UNKNOWN" | ||
| logger.error("Unrecognized data type in %s.%s", table_, col.name) | ||
| logger.error("Unrecognized data type in %s.%s", new_table, col.name) | ||
| logger.exception(ex) | ||
| dbcol = dbcols.get(col.name, None) | ||
| if not dbcol: | ||
| dbcol = TableColumn(column_name=col.name, type=datatype, table=self) | ||
| dbcol.is_dttm = dbcol.is_temporal | ||
| db_engine_spec.alter_new_orm_column(dbcol) | ||
| old_column = old_columns_by_name.get(col.name, None) | ||
| if not old_column: | ||
| results.added.append(col.name) | ||
| new_column = TableColumn( | ||
| column_name=col.name, type=datatype, table=self | ||
| ) | ||
| new_column.is_dttm = new_column.is_temporal | ||
| db_engine_spec.alter_new_orm_column(new_column) | ||
| else: | ||
| dbcol.type = datatype | ||
| dbcol.groupby = True | ||
| dbcol.filterable = True | ||
| self.columns.append(dbcol) | ||
| if not any_date_col and dbcol.is_temporal: | ||
| new_column = old_column | ||
| if new_column.type != datatype: | ||
| results.modified.append(col.name) | ||
| new_column.type = datatype | ||
| new_column.groupby = True | ||
| new_column.filterable = True | ||
| self.columns.append(new_column) | ||
| if not any_date_col and new_column.is_temporal: | ||
| any_date_col = col.name | ||
|
|
||
| metrics.append( | ||
| SqlMetric( | ||
| metric_name="count", | ||
|
|
@@ -1294,6 +1318,7 @@ def fetch_metadata(self, commit: bool = True) -> None: | |
| db.session.merge(self) | ||
| if commit: | ||
| db.session.commit() | ||
| return results | ||
|
|
||
| @classmethod | ||
| def import_obj( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the correct schema coming back from the API (compare with
extraColumnabove).