Skip to content

Commit b075ea9

Browse files
nfxdmoore247
authored andcommitted
Debug logs to print only the first 96 bytes of SQL query by default, tunable by debug_truncate_bytes SDK configuration property (#859)
Looking through debug logs might sometimes be challenging due to the amount of information. This PR truncates SQL queries in debug logs by default.
1 parent ed3365e commit b075ea9

File tree

4 files changed

+95
-85
lines changed

4 files changed

+95
-85
lines changed

src/databricks/labs/ucx/framework/crawlers.py

+37-27
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from databricks.sdk import WorkspaceClient
1111
from databricks.sdk.errors import (
1212
BadRequest,
13+
DatabricksError,
1314
DataLoss,
1415
NotFound,
1516
PermissionDenied,
@@ -32,11 +33,11 @@ class DataclassInstance(Protocol):
3233

3334
class SqlBackend(ABC):
3435
@abstractmethod
35-
def execute(self, sql):
36+
def execute(self, sql: str) -> None:
3637
raise NotImplementedError
3738

3839
@abstractmethod
39-
def fetch(self, sql) -> Iterator[Any]:
40+
def fetch(self, sql: str) -> Iterator[Any]:
4041
raise NotImplementedError
4142

4243
@abstractmethod
@@ -91,19 +92,29 @@ def _filter_none_rows(cls, rows, klass):
9192
results.append(row)
9293
return results
9394

95+
@staticmethod
96+
def _only_n_bytes(j: str, num_bytes: int = 96) -> str:
97+
diff = len(j.encode('utf-8')) - num_bytes
98+
if diff > 0:
99+
return f"{j[:num_bytes]}... ({diff} more bytes)"
100+
return j
101+
94102

95103
class StatementExecutionBackend(SqlBackend):
96104
def __init__(self, ws: WorkspaceClient, warehouse_id, *, max_records_per_batch: int = 1000):
97105
self._sql = StatementExecutionExt(ws.api_client)
98106
self._warehouse_id = warehouse_id
99107
self._max_records_per_batch = max_records_per_batch
108+
debug_truncate_bytes = ws.config.debug_truncate_bytes
109+
# while unit-testing, this value will contain a mock
110+
self._debug_truncate_bytes = debug_truncate_bytes if isinstance(debug_truncate_bytes, int) else 96
100111

101-
def execute(self, sql):
102-
logger.debug(f"[api][execute] {sql}")
112+
def execute(self, sql: str) -> None:
113+
logger.debug(f"[api][execute] {self._only_n_bytes(sql, self._debug_truncate_bytes)}")
103114
self._sql.execute(self._warehouse_id, sql)
104115

105-
def fetch(self, sql) -> Iterator[Row]:
106-
logger.debug(f"[api][fetch] {sql}")
116+
def fetch(self, sql: str) -> Iterator[Row]:
117+
logger.debug(f"[api][fetch] {self._only_n_bytes(sql, self._debug_truncate_bytes)}")
107118
return self._sql.execute_fetch_all(self._warehouse_id, sql)
108119

109120
def save_table(self, full_name: str, rows: Sequence[DataclassInstance], klass: Dataclass, mode="append"):
@@ -146,7 +157,7 @@ def _row_to_sql(row, fields):
146157

147158

148159
class RuntimeBackend(SqlBackend):
149-
def __init__(self):
160+
def __init__(self, debug_truncate_bytes=96):
150161
# pylint: disable-next=import-error,import-outside-toplevel
151162
from pyspark.sql.session import SparkSession # type: ignore[import-not-found]
152163

@@ -155,24 +166,23 @@ def __init__(self):
155166
raise RuntimeError(msg)
156167

157168
self._spark = SparkSession.builder.getOrCreate()
169+
self._debug_truncate_bytes = debug_truncate_bytes
158170

159-
def execute(self, sql):
160-
logger.debug(f"[spark][execute] {sql}")
171+
def execute(self, sql: str) -> None:
172+
logger.debug(f"[spark][execute] {self._only_n_bytes(sql, self._debug_truncate_bytes)}")
161173
try:
162-
immediate_response = self._spark.sql(sql)
163-
except Exception as e: # pylint: disable=broad-exception-caught
174+
self._spark.sql(sql)
175+
except Exception as e:
164176
error_message = str(e)
165-
self._raise_spark_sql_exceptions(error_message)
166-
return immediate_response
177+
raise self._api_error_from_spark_error(error_message) from None
167178

168-
def fetch(self, sql) -> Iterator[Row]:
169-
logger.debug(f"[spark][fetch] {sql}")
179+
def fetch(self, sql: str) -> Iterator[Row]:
180+
logger.debug(f"[spark][fetch] {self._only_n_bytes(sql, self._debug_truncate_bytes)}")
170181
try:
171-
fetch_query_response = self._spark.sql(sql).collect()
172-
except Exception as e: # pylint: disable=broad-exception-caught
182+
return self._spark.sql(sql).collect()
183+
except Exception as e:
173184
error_message = str(e)
174-
self._raise_spark_sql_exceptions(error_message)
175-
return fetch_query_response
185+
raise self._api_error_from_spark_error(error_message) from None
176186

177187
def save_table(self, full_name: str, rows: Sequence[DataclassInstance], klass: Dataclass, mode: str = "append"):
178188
rows = self._filter_none_rows(rows, klass)
@@ -185,20 +195,20 @@ def save_table(self, full_name: str, rows: Sequence[DataclassInstance], klass: D
185195
df.write.saveAsTable(full_name, mode=mode)
186196

187197
@staticmethod
188-
def _raise_spark_sql_exceptions(error_message: str):
198+
def _api_error_from_spark_error(error_message: str) -> DatabricksError:
189199
if "SCHEMA_NOT_FOUND" in error_message:
190-
raise NotFound(error_message) from None
200+
return NotFound(error_message)
191201
if "TABLE_OR_VIEW_NOT_FOUND" in error_message:
192-
raise NotFound(error_message) from None
202+
return NotFound(error_message)
193203
if "DELTA_TABLE_NOT_FOUND" in error_message:
194-
raise NotFound(error_message) from None
204+
return NotFound(error_message)
195205
if "DELTA_MISSING_TRANSACTION_LOG" in error_message:
196-
raise DataLoss(error_message) from None
206+
return DataLoss(error_message)
197207
if "PARSE_SYNTAX_ERROR" in error_message:
198-
raise BadRequest(error_message) from None
208+
return BadRequest(error_message)
199209
if "Operation not allowed" in error_message:
200-
raise PermissionDenied(error_message) from None
201-
raise Unknown(error_message) from None
210+
return PermissionDenied(error_message)
211+
return Unknown(error_message)
202212

203213

204214
class CrawlerBase(Generic[Result]):

src/databricks/labs/ucx/runtime.py

+40-23
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ def setup_tacl(_: WorkspaceConfig):
4242
"""(Optimization) Starts `tacl` job cluster in parallel to crawling tables."""
4343

4444

45+
def _must_truncate_bytes(cfg: WorkspaceConfig) -> int:
46+
if not cfg.connect:
47+
return 96
48+
truncate_bytes = cfg.connect.debug_truncate_bytes
49+
return truncate_bytes if truncate_bytes else 96
50+
51+
4552
@task("assessment", depends_on=[crawl_tables, setup_tacl], job_cluster="tacl")
4653
def crawl_grants(cfg: WorkspaceConfig):
4754
"""Scans the previously created Delta table named `$inventory_database.tables` and issues a `SHOW GRANTS`
@@ -52,9 +59,9 @@ def crawl_grants(cfg: WorkspaceConfig):
5259
5360
Note: This job runs on a separate cluster (named `tacl`) as it requires the proper configuration to have the Table
5461
ACLs enabled and available for retrieval."""
55-
backend = RuntimeBackend()
56-
tables = TablesCrawler(backend, cfg.inventory_database)
57-
udfs = UdfsCrawler(backend, cfg.inventory_database)
62+
sql_backend = RuntimeBackend(debug_truncate_bytes=_must_truncate_bytes(cfg))
63+
tables = TablesCrawler(sql_backend, cfg.inventory_database)
64+
udfs = UdfsCrawler(sql_backend, cfg.inventory_database)
5865
grants = GrantsCrawler(tables, udfs)
5966
grants.snapshot()
6067

@@ -65,8 +72,8 @@ def estimate_table_size_for_migration(cfg: WorkspaceConfig):
6572
"synced". These tables will have to be cloned in the migration process.
6673
Assesses the size of these tables and create `$inventory_database.table_size` table to list these sizes.
6774
The table size is a factor in deciding whether to clone these tables."""
68-
backend = RuntimeBackend()
69-
table_size = TableSizeCrawler(backend, cfg.inventory_database)
75+
sql_backend = RuntimeBackend(debug_truncate_bytes=_must_truncate_bytes(cfg))
76+
table_size = TableSizeCrawler(sql_backend, cfg.inventory_database)
7077
table_size.snapshot()
7178

7279

@@ -79,7 +86,8 @@ def crawl_mounts(cfg: WorkspaceConfig):
7986
The assessment involves scanning the workspace to compile a list of all existing mount points and subsequently
8087
storing this information in the `$inventory.mounts` table. This is crucial for planning the migration."""
8188
ws = WorkspaceClient(config=cfg.to_databricks_config())
82-
mounts = Mounts(backend=RuntimeBackend(), ws=ws, inventory_database=cfg.inventory_database)
89+
sql_backend = RuntimeBackend(debug_truncate_bytes=_must_truncate_bytes(cfg))
90+
mounts = Mounts(backend=sql_backend, ws=ws, inventory_database=cfg.inventory_database)
8391
mounts.snapshot()
8492

8593

@@ -94,7 +102,8 @@ def guess_external_locations(cfg: WorkspaceConfig):
94102
- Scanning all these locations to identify folders that can act as shared path prefixes
95103
- These identified external locations will be created subsequently prior to the actual table migration"""
96104
ws = WorkspaceClient(config=cfg.to_databricks_config())
97-
crawler = ExternalLocations(ws, RuntimeBackend(), cfg.inventory_database)
105+
sql_backend = RuntimeBackend(debug_truncate_bytes=_must_truncate_bytes(cfg))
106+
crawler = ExternalLocations(ws, sql_backend, cfg.inventory_database)
98107
crawler.snapshot()
99108

100109

@@ -110,7 +119,8 @@ def assess_jobs(cfg: WorkspaceConfig):
110119
- Clusters referencing DBFS locations in one or more config options
111120
"""
112121
ws = WorkspaceClient(config=cfg.to_databricks_config())
113-
crawler = JobsCrawler(ws, RuntimeBackend(), cfg.inventory_database)
122+
sql_backend = RuntimeBackend(debug_truncate_bytes=_must_truncate_bytes(cfg))
123+
crawler = JobsCrawler(ws, sql_backend, cfg.inventory_database)
114124
crawler.snapshot()
115125

116126

@@ -126,7 +136,8 @@ def assess_clusters(cfg: WorkspaceConfig):
126136
- Clusters referencing DBFS locations in one or more config options
127137
"""
128138
ws = WorkspaceClient(config=cfg.to_databricks_config())
129-
crawler = ClustersCrawler(ws, RuntimeBackend(), cfg.inventory_database)
139+
sql_backend = RuntimeBackend(debug_truncate_bytes=_must_truncate_bytes(cfg))
140+
crawler = ClustersCrawler(ws, sql_backend, cfg.inventory_database)
130141
crawler.snapshot()
131142

132143

@@ -142,7 +153,8 @@ def assess_pipelines(cfg: WorkspaceConfig):
142153
Subsequently, a list of all the pipelines with matching configurations are stored in the
143154
`$inventory.pipelines` table."""
144155
ws = WorkspaceClient(config=cfg.to_databricks_config())
145-
crawler = PipelinesCrawler(ws, RuntimeBackend(), cfg.inventory_database)
156+
sql_backend = RuntimeBackend(debug_truncate_bytes=_must_truncate_bytes(cfg))
157+
crawler = PipelinesCrawler(ws, sql_backend, cfg.inventory_database)
146158
crawler.snapshot()
147159

148160

@@ -159,7 +171,8 @@ def assess_azure_service_principals(cfg: WorkspaceConfig):
159171
in the `$inventory.azure_service_principals` table."""
160172
ws = WorkspaceClient(config=cfg.to_databricks_config())
161173
if ws.config.is_azure:
162-
crawler = AzureServicePrincipalCrawler(ws, RuntimeBackend(), cfg.inventory_database)
174+
sql_backend = RuntimeBackend(debug_truncate_bytes=_must_truncate_bytes(cfg))
175+
crawler = AzureServicePrincipalCrawler(ws, sql_backend, cfg.inventory_database)
163176
crawler.snapshot()
164177

165178

@@ -171,7 +184,8 @@ def assess_global_init_scripts(cfg: WorkspaceConfig):
171184
It looks in:
172185
- the list of all the global init scripts are saved in the `$inventory.azure_service_principals` table."""
173186
ws = WorkspaceClient(config=cfg.to_databricks_config())
174-
crawler = GlobalInitScriptCrawler(ws, RuntimeBackend(), cfg.inventory_database)
187+
sql_backend = RuntimeBackend(debug_truncate_bytes=_must_truncate_bytes(cfg))
188+
crawler = GlobalInitScriptCrawler(ws, sql_backend, cfg.inventory_database)
175189
crawler.snapshot()
176190

177191

@@ -183,8 +197,9 @@ def workspace_listing(cfg: WorkspaceConfig):
183197
It uses multi-threading to parallelize the listing process to speed up execution on big workspaces.
184198
It accepts starting path as the parameter defaulted to the root path '/'."""
185199
ws = WorkspaceClient(config=cfg.to_databricks_config())
200+
sql_backend = RuntimeBackend(debug_truncate_bytes=_must_truncate_bytes(cfg))
186201
crawler = WorkspaceListing(
187-
ws, RuntimeBackend(), cfg.inventory_database, num_threads=cfg.num_threads, start_path=cfg.workspace_start_path
202+
ws, sql_backend, cfg.inventory_database, num_threads=cfg.num_threads, start_path=cfg.workspace_start_path
188203
)
189204
crawler.snapshot()
190205

@@ -197,9 +212,10 @@ def crawl_permissions(cfg: WorkspaceConfig):
197212
This is the first step for the _group migration_ process, which is continued in the `migrate-groups` workflow.
198213
This step includes preparing Legacy Table ACLs for local group migration."""
199214
ws = WorkspaceClient(config=cfg.to_databricks_config())
215+
sql_backend = RuntimeBackend(debug_truncate_bytes=_must_truncate_bytes(cfg))
200216
permission_manager = PermissionManager.factory(
201217
ws,
202-
RuntimeBackend(),
218+
sql_backend,
203219
cfg.inventory_database,
204220
num_threads=cfg.num_threads,
205221
workspace_start_path=cfg.workspace_start_path,
@@ -211,7 +227,7 @@ def crawl_permissions(cfg: WorkspaceConfig):
211227
@task("assessment")
212228
def crawl_groups(cfg: WorkspaceConfig):
213229
"""Scans all groups for the local group migration scope"""
214-
sql_backend = RuntimeBackend()
230+
sql_backend = RuntimeBackend(debug_truncate_bytes=_must_truncate_bytes(cfg))
215231
ws = WorkspaceClient(config=cfg.to_databricks_config())
216232
group_manager = GroupManager(
217233
sql_backend,
@@ -251,7 +267,7 @@ def assessment_report(_: WorkspaceConfig):
251267
@task("migrate-groups", depends_on=[crawl_groups])
252268
def rename_workspace_local_groups(cfg: WorkspaceConfig):
253269
"""Renames workspace local groups by adding `ucx-renamed-` prefix."""
254-
sql_backend = RuntimeBackend()
270+
sql_backend = RuntimeBackend(debug_truncate_bytes=_must_truncate_bytes(cfg))
255271
ws = WorkspaceClient(config=cfg.to_databricks_config())
256272
verify_has_metastore = VerifyHasMetastore(ws)
257273
if verify_has_metastore.verify_metastore():
@@ -275,7 +291,7 @@ def rename_workspace_local_groups(cfg: WorkspaceConfig):
275291
def reflect_account_groups_on_workspace(cfg: WorkspaceConfig):
276292
"""Adds matching account groups to this workspace. The matching account level group(s) must preexist(s) for this
277293
step to be successful. This process does not create the account level group(s)."""
278-
sql_backend = RuntimeBackend()
294+
sql_backend = RuntimeBackend(debug_truncate_bytes=_must_truncate_bytes(cfg))
279295
ws = WorkspaceClient(config=cfg.to_databricks_config())
280296
group_manager = GroupManager(
281297
sql_backend,
@@ -302,10 +318,10 @@ def apply_permissions_to_account_groups(cfg: WorkspaceConfig):
302318
permissions, Secret Scopes, Notebooks, Directories, Repos, Files.
303319
304320
See [interactive tutorial here](https://app.getreprise.com/launch/myM3VNn/)."""
305-
backend = RuntimeBackend()
321+
sql_backend = RuntimeBackend(debug_truncate_bytes=_must_truncate_bytes(cfg))
306322
ws = WorkspaceClient(config=cfg.to_databricks_config())
307323
group_manager = GroupManager(
308-
backend,
324+
sql_backend,
309325
ws,
310326
cfg.inventory_database,
311327
cfg.include_group_names,
@@ -323,7 +339,7 @@ def apply_permissions_to_account_groups(cfg: WorkspaceConfig):
323339

324340
permission_manager = PermissionManager.factory(
325341
ws,
326-
backend,
342+
sql_backend,
327343
cfg.inventory_database,
328344
num_threads=cfg.num_threads,
329345
workspace_start_path=cfg.workspace_start_path,
@@ -336,10 +352,10 @@ def delete_backup_groups(cfg: WorkspaceConfig):
336352
"""Last step of the group migration process. Removes all workspace-level backup groups, along with their
337353
permissions. Execute this workflow only after you've confirmed that workspace-local migration worked
338354
successfully for all the groups involved."""
339-
backend = RuntimeBackend()
355+
sql_backend = RuntimeBackend(debug_truncate_bytes=_must_truncate_bytes(cfg))
340356
ws = WorkspaceClient(config=cfg.to_databricks_config())
341357
group_manager = GroupManager(
342-
backend,
358+
sql_backend,
343359
ws,
344360
cfg.inventory_database,
345361
cfg.include_group_names,
@@ -356,7 +372,8 @@ def delete_backup_groups(cfg: WorkspaceConfig):
356372
def destroy_schema(cfg: WorkspaceConfig):
357373
"""This _clean-up_ workflow allows to removes the `$inventory` database, with all the inventory tables created by
358374
the previous workflow runs. Use this to reset the entire state and start with the assessment step again."""
359-
RuntimeBackend().execute(f"DROP DATABASE {cfg.inventory_database} CASCADE")
375+
sql_backend = RuntimeBackend(debug_truncate_bytes=_must_truncate_bytes(cfg))
376+
sql_backend.execute(f"DROP DATABASE {cfg.inventory_database} CASCADE")
360377

361378

362379
def main(*argv):

tests/unit/framework/mocks.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,17 @@
99

1010

1111
class MockBackend(SqlBackend):
12-
def __init__(self, *, fails_on_first: dict | None = None, rows: dict | None = None):
12+
def __init__(self, *, fails_on_first: dict | None = None, rows: dict | None = None, debug_truncate_bytes=96):
1313
self._fails_on_first = fails_on_first
1414
if not rows:
1515
rows = {}
1616
self._rows = rows
1717
self._save_table: list[tuple[str, Sequence[DataclassInstance], str]] = []
18+
self._debug_truncate_bytes = debug_truncate_bytes
1819
self.queries: list[str] = []
1920

2021
def _sql(self, sql: str):
21-
logger.debug(f"Mock backend.sql() received SQL: {sql}")
22+
logger.debug(f"Mock backend.sql() received SQL: {self._only_n_bytes(sql, self._debug_truncate_bytes)}")
2223
seen_before = sql in self.queries
2324
self.queries.append(sql)
2425
if not seen_before and self._fails_on_first is not None:

0 commit comments

Comments
 (0)