diff --git a/airflow/providers/databricks/hooks/databricks.py b/airflow/providers/databricks/hooks/databricks.py index e56b15a99b33f..14447a09d422d 100644 --- a/airflow/providers/databricks/hooks/databricks.py +++ b/airflow/providers/databricks/hooks/databricks.py @@ -24,6 +24,7 @@ """ import time from time import sleep +from typing import Dict from urllib.parse import urlparse import requests @@ -135,14 +136,18 @@ def __init__( ) -> None: super().__init__() self.databricks_conn_id = databricks_conn_id - self.databricks_conn = None self.timeout_seconds = timeout_seconds if retry_limit < 1: raise ValueError('Retry limit must be greater than equal to 1') self.retry_limit = retry_limit self.retry_delay = retry_delay - self.aad_tokens = {} + self.aad_tokens: Dict[str, dict] = {} self.aad_timeout_seconds = 10 + self.databricks_conn = self.get_connection(self.databricks_conn_id) + if 'host' in self.databricks_conn.extra_dejson: + self.host = self._parse_host(self.databricks_conn.extra_dejson['host']) + else: + self.host = self._parse_host(self.databricks_conn.host) @staticmethod def _parse_host(host: str) -> str: @@ -229,7 +234,7 @@ def _get_aad_token(self, resource: str) -> str: f'Response: {e.response.content}, Status Code: {e.response.status_code}' ) - self._log_request_error(attempt_num, e) + self._log_request_error(attempt_num, e.strerror) if attempt_num == self.retry_limit: raise AirflowException(f'API requests to Azure failed {self.retry_limit} times. Giving up.') @@ -300,14 +305,6 @@ def _do_api_call(self, endpoint_info, json): """ method, endpoint = endpoint_info - if self.databricks_conn is None: - self.databricks_conn = self.get_connection(self.databricks_conn_id) - - if 'host' in self.databricks_conn.extra_dejson: - self.host = self._parse_host(self.databricks_conn.extra_dejson['host']) - else: - self.host = self._parse_host(self.databricks_conn.host) - url = f'https://{self.host}/{endpoint}' aad_headers = self._get_aad_headers()