diff --git a/.changes/unreleased/Under the Hood-20240503-162655.yaml b/.changes/unreleased/Under the Hood-20240503-162655.yaml new file mode 100644 index 00000000..1d2946f0 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20240503-162655.yaml @@ -0,0 +1,6 @@ +kind: Under the Hood +body: Add query recording for adapters which use SQLConnectionManager +time: 2024-05-03T16:26:55.350916-04:00 +custom: + Author: peterallenwebb + Issue: "195" diff --git a/dbt/adapters/record.py b/dbt/adapters/record.py new file mode 100644 index 00000000..5204f59c --- /dev/null +++ b/dbt/adapters/record.py @@ -0,0 +1,67 @@ +import dataclasses +from io import StringIO +import json +import re +from typing import Any, Optional, Mapping + +from agate import Table + +from dbt_common.events.contextvars import get_node_info +from dbt_common.record import Record, Recorder + +from dbt.adapters.contracts.connection import AdapterResponse + + +@dataclasses.dataclass +class QueryRecordParams: + sql: str + auto_begin: bool = False + fetch: bool = False + limit: Optional[int] = None + node_unique_id: Optional[str] = None + + def __post_init__(self) -> None: + if self.node_unique_id is None: + node_info = get_node_info() + self.node_unique_id = node_info["unique_id"] if node_info else "" + + @staticmethod + def _clean_up_sql(sql: str) -> str: + sql = re.sub(r"--.*?\n", "", sql) # Remove single-line comments (--) + sql = re.sub(r"/\*.*?\*/", "", sql, flags=re.DOTALL) # Remove multi-line comments (/* */) + return sql.replace(" ", "").replace("\n", "") + + def _matches(self, other: "QueryRecordParams") -> bool: + return self.node_unique_id == other.node_unique_id and self._clean_up_sql( + self.sql + ) == self._clean_up_sql(other.sql) + + +@dataclasses.dataclass +class QueryRecordResult: + adapter_response: Optional["AdapterResponse"] + table: Optional[Table] + + def _to_dict(self) -> Any: + buf = StringIO() + self.table.to_json(buf) # type: ignore + + return { + "adapter_response": self.adapter_response.to_dict(), # type: ignore + "table": buf.getvalue(), + } + + @classmethod + def _from_dict(cls, dct: Mapping) -> "QueryRecordResult": + return QueryRecordResult( + adapter_response=AdapterResponse.from_dict(dct["adapter_response"]), + table=Table.from_object(json.loads(dct["table"])), + ) + + +class QueryRecord(Record): + params_cls = QueryRecordParams + result_cls = QueryRecordResult + + +Recorder.register_record_type(QueryRecord) diff --git a/dbt/adapters/sql/connections.py b/dbt/adapters/sql/connections.py index 78cd3c9b..9adaafce 100644 --- a/dbt/adapters/sql/connections.py +++ b/dbt/adapters/sql/connections.py @@ -5,6 +5,7 @@ from dbt_common.events.contextvars import get_node_info from dbt_common.events.functions import fire_event from dbt_common.exceptions import DbtInternalError, NotImplementedError +from dbt_common.record import record_function from dbt_common.utils import cast_to_str from dbt.adapters.base import BaseConnectionManager @@ -19,6 +20,7 @@ SQLQuery, SQLQueryStatus, ) +from dbt.adapters.record import QueryRecord if TYPE_CHECKING: import agate @@ -143,6 +145,7 @@ def get_result_from_cursor(cls, cursor: Any, limit: Optional[int]) -> "agate.Tab return table_from_data_flat(data, column_names) + @record_function(QueryRecord, method=True, tuple_result=True) def execute( self, sql: str, diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 00000000..a1096d54 --- /dev/null +++ b/docs/README.md @@ -0,0 +1,5 @@ +## Documentation + +The documentation is divided into the following sub-folders: +* arch: Architecture Decision Records (ADRs) which explain and justify major architectural decisions +* guides: Informal documents which describe the code or our development practices at a high level diff --git a/docs/guides/record_replay.md b/docs/guides/record_replay.md new file mode 100644 index 00000000..670bb843 --- /dev/null +++ b/docs/guides/record_replay.md @@ -0,0 +1,27 @@ +# Supporting Record/Replay in Adapters + +This document describes how to implement support for dbt's Record/Replay Subsystem for adapters. Before reading it, make sure you understand the fundamental ideas behind Record/Replay, which are [documented in the dbt-common repo](https://github.com/dbt-labs/dbt-common/blob/docs/guides/record_replay.md). + +## Recording and Replaying Warehouse Interaction + +The goal of the Record/Replay Subsystem is to record all interactions between dbt and external systems, of which the data warehouse is the most obvious. Since, warehouse interaction is mediated by adapters, full Record/Replay support requires that adapters record all interactions they have with the warehouse. (It also requires that they record access to the local filesystem or external service, if that is access is not mediated by dbt itself. This includes authentication steps, opening and closing connections, beginning and ending transactions, and so forth.) + +In practice, this means that any request sent to the warehouse must be recorded, along with the corresponding response. If this is done correctly, as described in the document linked in the intro, the Record portion of the Record/Replay subsystem should work as expected. + +At the time of this writing, there is only an incomplete implementation of this goal, which can be found in `dbt-adapters/dbt/adapters/record.py`. + +There are some important things to notice about this implementation. First, the QueryRecordResult class provides custom serialization methods `to_dict()` and `from_dict()`. This is necessary because the `AdapterResponse` and `Agate.Table` types cannot be automatically converted to and from JSON by the dataclass library, and JSON is the format used to persist recordings to disk and reload them for replay. + +Another important feature is that `QueryRecordParams` implements the `_matches()` method. This method allows `dbt-adapters` to customize the way that the Record/Replay determines whether a query issued by dbt matches a previously recorded query. In this case, the method performs a comparison which attempts to ignore comments and whitespace which would not affect query behavior. + +## Misc. Notes and Suggestions + +Currently, support for recording data warehouse interaction is very rudimentary, however, even rudimentary support is valuable and we should be concentrating on extending it in a way that adds the most value with the least work. Usefulness, rather than perfection, is the initial goal. + +Picking the right functions to record, at the right level of abstraction, will probably be the most important part of carrying this work forward. + +Not every interaction with an external system has to be recorded in full detail, and authentication might prove to be a place where we exclude sensitive secrets from the recording. For example, since replay will not actually be communicating with the warehouse, it may be possible to exclude passwords and auth keys from the parameters recorded, and to exclude auth tokens from the results. + +In addition to adding an appropriate decorator to functions which communicate with external systems, you should check those functions for side-effects. Since the function's calls will be mocked out in replay mode, those side-effects will not be carried out during replay. At present, we are focusing on support for recording and comparing recordings, but this is worth keeping in mind. + +The current implementation records which dbt node issues a query, and uses that information to ensure a match during replay. The same node should issue the same query. A better model might be to monitor which connection issued which query, and associate the same connection with open/close operations, transaction starts/stops and so forth.