-
Notifications
You must be signed in to change notification settings - Fork 16.5k
Authentication: Enable user impersonation for Superset to HiveServer2 using hive.server2.proxy.user (a.fernandez) #3652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -13,6 +13,7 @@ | |
| from future.standard_library import install_aliases | ||
| from copy import copy | ||
| from datetime import datetime, date | ||
| from copy import deepcopy | ||
|
|
||
| import pandas as pd | ||
| import sqlalchemy as sqla | ||
|
|
@@ -47,6 +48,7 @@ | |
| stats_logger = config.get('STATS_LOGGER') | ||
| metadata = Model.metadata # pylint: disable=no-member | ||
|
|
||
| PASSWORD_MASK = "X" * 10 | ||
|
|
||
| def set_related_perm(mapper, connection, target): # noqa | ||
| src_class = target.cls_model | ||
|
|
@@ -581,30 +583,56 @@ def backend(self): | |
| url = make_url(self.sqlalchemy_uri_decrypted) | ||
| return url.get_backend_name() | ||
|
|
||
| @classmethod | ||
| def get_password_masked_url_from_uri(cls, uri): | ||
| url = make_url(uri) | ||
| return cls.get_password_masked_url(url) | ||
|
|
||
| @classmethod | ||
| def get_password_masked_url(cls, url): | ||
| url_copy = deepcopy(url) | ||
| if url_copy.password is not None and url_copy.password != PASSWORD_MASK: | ||
| url_copy.password = PASSWORD_MASK | ||
| return url_copy | ||
|
|
||
| def set_sqlalchemy_uri(self, uri): | ||
| password_mask = "X" * 10 | ||
| conn = sqla.engine.url.make_url(uri) | ||
| if conn.password != password_mask and not self.custom_password_store: | ||
| if conn.password != PASSWORD_MASK and not self.custom_password_store: | ||
| # do not over-write the password with the password mask | ||
| self.password = conn.password | ||
| conn.password = password_mask if conn.password else None | ||
| conn.password = PASSWORD_MASK if conn.password else None | ||
| self.sqlalchemy_uri = str(conn) # hides the password | ||
|
|
||
| def get_effective_user(self, url, user_name=None): | ||
|
||
| """ | ||
| Get the effective user, especially during impersonation. | ||
| :param url: SQL Alchemy URL object | ||
| :param user_name: Default username | ||
| :return: The effective username | ||
| """ | ||
| effective_username = None | ||
| if self.impersonate_user: | ||
| effective_username = url.username | ||
| if user_name: | ||
| effective_username = user_name | ||
| elif hasattr(g, 'user') and g.user.username: | ||
| effective_username = g.user.username | ||
| return effective_username | ||
|
|
||
| def get_sqla_engine(self, schema=None, nullpool=False, user_name=None): | ||
|
||
| extra = self.get_extra() | ||
| uri = make_url(self.sqlalchemy_uri_decrypted) | ||
| url = make_url(self.sqlalchemy_uri_decrypted) | ||
| params = extra.get('engine_params', {}) | ||
| if nullpool: | ||
| params['poolclass'] = NullPool | ||
| uri = self.db_engine_spec.adjust_database_uri(uri, schema) | ||
| if self.impersonate_user: | ||
| eff_username = uri.username | ||
| if user_name: | ||
| eff_username = user_name | ||
| elif hasattr(g, 'user') and g.user.username: | ||
| eff_username = g.user.username | ||
| uri.username = eff_username | ||
| return create_engine(uri, **params) | ||
| url = self.db_engine_spec.adjust_database_uri(url, schema) | ||
| effective_username = self.get_effective_user(url, user_name) | ||
| self.db_engine_spec.modify_url_for_impersonation(url, self.impersonate_user, effective_username) | ||
|
|
||
| masked_url = self.get_password_masked_url(url) | ||
| logging.info("Database.get_sqla_engine(). Masked URL: {0}".format(masked_url)) | ||
|
||
|
|
||
| return create_engine(url, **params) | ||
|
|
||
| def get_reserved_words(self): | ||
| return self.get_sqla_engine().dialect.preparer.reserved_words | ||
|
|
@@ -688,6 +716,10 @@ def db_engine_spec(self): | |
| return db_engine_specs.engines.get( | ||
| self.backend, db_engine_specs.BaseEngineSpec) | ||
|
|
||
| @classmethod | ||
| def get_db_engine_spec_for_backend(cls, backend): | ||
| return db_engine_specs.engines.get(backend, db_engine_specs.BaseEngineSpec) | ||
|
|
||
| def grains(self): | ||
| """Defines time granularity database-specific expressions. | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -168,6 +168,7 @@ def handle_error(msg): | |
| session.merge(query) | ||
| session.commit() | ||
| logging.info("Set query to 'running'") | ||
| conn = None | ||
|
||
| try: | ||
| engine = database.get_sqla_engine( | ||
| schema=query.schema, nullpool=not ctask.request.called_directly, user_name=user_name) | ||
|
|
@@ -183,20 +184,23 @@ def handle_error(msg): | |
| data = db_engine_spec.fetch_data(cursor, query.limit) | ||
| except SoftTimeLimitExceeded as e: | ||
| logging.exception(e) | ||
| conn.close() | ||
| if conn is not None: | ||
| conn.close() | ||
| return handle_error( | ||
| "SQL Lab timeout. This environment's policy is to kill queries " | ||
| "after {} seconds.".format(SQLLAB_TIMEOUT)) | ||
| except Exception as e: | ||
| logging.exception(e) | ||
| conn.close() | ||
| if conn is not None: | ||
| conn.close() | ||
| return handle_error(db_engine_spec.extract_error_message(e)) | ||
|
|
||
| logging.info("Fetching cursor description") | ||
| cursor_description = cursor.description | ||
|
|
||
| conn.commit() | ||
| conn.close() | ||
| if conn is not None: | ||
| conn.commit() | ||
| conn.close() | ||
|
|
||
| if query.status == utils.QueryStatus.STOPPED: | ||
| return json.dumps( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -276,13 +276,15 @@ def test_misc(self): | |
| assert self.get_resp('/health') == "OK" | ||
| assert self.get_resp('/ping') == "OK" | ||
|
|
||
| def test_testconn(self): | ||
| def test_testconn(self, username='admin'): | ||
| self.login(username=username) | ||
| database = self.get_main_database(db.session) | ||
|
|
||
| # validate that the endpoint works with the password-masked sqlalchemy uri | ||
| data = json.dumps({ | ||
| 'uri': database.safe_sqlalchemy_uri(), | ||
| 'name': 'main' | ||
| 'name': 'main', | ||
| 'impersonate_user': False | ||
|
||
| }) | ||
| response = self.client.post('/superset/testconn', data=data, content_type='application/json') | ||
| assert response.status_code == 200 | ||
|
|
@@ -291,7 +293,8 @@ def test_testconn(self): | |
| # validate that the endpoint works with the decrypted sqlalchemy uri | ||
| data = json.dumps({ | ||
| 'uri': database.sqlalchemy_uri_decrypted, | ||
| 'name': 'main' | ||
| 'name': 'main', | ||
| 'impersonate_user': False | ||
| }) | ||
| response = self.client.post('/superset/testconn', data=data, content_type='application/json') | ||
| assert response.status_code == 200 | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Base class has methods for how to modify a URI and URL object for impersonation