Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBT model instrumentation #11268

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
15 changes: 15 additions & 0 deletions core/dbt/context/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# These modules are added to the context. Consider alternative
# approaches which will extend well to potentially many modules
import pytz
from opentelemetry import context, trace

import dbt.flags as flags_module
from dbt import tracking, utils
Expand Down Expand Up @@ -86,12 +87,26 @@ def get_itertools_module_context() -> Dict[str, Any]:
return {name: getattr(itertools, name) for name in context_exports}


def get_otel_trace_module_context() -> Dict[str, Dict[str, Any]]:
context_exports = trace.__all__
return {name: getattr(trace, name) for name in context_exports}


def get_otel_context_module_context() -> Dict[str, Dict[str, Any]]:
context_exports = context.__all__
return {name: getattr(context, name) for name in context_exports}


def get_context_modules() -> Dict[str, Dict[str, Any]]:
return {
"pytz": get_pytz_module_context(),
"datetime": get_datetime_module_context(),
"re": get_re_module_context(),
"itertools": get_itertools_module_context(),
"opentelemetry": {
"trace": get_otel_trace_module_context(),
"context": get_otel_context_module_context(),
},
}


Expand Down
126 changes: 66 additions & 60 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from multiprocessing.pool import ThreadPool
from typing import AbstractSet, Any, Dict, Iterable, List, Optional, Set, Tuple, Type

from opentelemetry import context, trace

from dbt import tracking, utils
from dbt.adapters.base import BaseAdapter, BaseRelation
from dbt.adapters.capability import Capability
Expand Down Expand Up @@ -328,7 +330,6 @@ def execute(self, model, manifest):
)

hook_ctx = self.adapter.pre_model_hook(context_config)

return self._execute_model(hook_ctx, context_config, model, context, materialization_macro)


Expand Down Expand Up @@ -887,77 +888,82 @@ def safe_run_hooks(
failed = False
num_hooks = len(ordered_hooks)

for idx, hook in enumerate(ordered_hooks, 1):
with log_contextvars(node_info=hook.node_info):
hook.index = idx
hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}"
execution_time = 0.0
timing: List[TimingInfo] = []
failures = 1

if not failed:
with collect_timing_info("compile", timing.append):
sql = self.get_hook_sql(
adapter, hook, hook.index, num_hooks, extra_context
if num_hooks == 0:
return status

tracer = trace.get_tracer("dbt-runner")
with tracer.start_as_current_span(hook_type, context=context.get_current()) as _:
for idx, hook in enumerate(ordered_hooks, 1):
with log_contextvars(node_info=hook.node_info):
hook.index = idx
hook_name = f"{hook.package_name}.{hook_type}.{hook.index - 1}"
execution_time = 0.0
timing: List[TimingInfo] = []
failures = 1

if not failed:
with collect_timing_info("compile", timing.append):
sql = self.get_hook_sql(
adapter, hook, hook.index, num_hooks, extra_context
)

started_at = timing[0].started_at or datetime.utcnow()
hook.update_event_status(
started_at=started_at.isoformat(), node_status=RunningStatus.Started
)

started_at = timing[0].started_at or datetime.utcnow()
hook.update_event_status(
started_at=started_at.isoformat(), node_status=RunningStatus.Started
fire_event(
LogHookStartLine(
statement=hook_name,
index=hook.index,
total=num_hooks,
node_info=hook.node_info,
)
)

with collect_timing_info("execute", timing.append):
status, message = get_execution_status(sql, adapter)

finished_at = timing[1].completed_at or datetime.utcnow()
hook.update_event_status(finished_at=finished_at.isoformat())
execution_time = (finished_at - started_at).total_seconds()
failures = 0 if status == RunStatus.Success else 1

if status == RunStatus.Success:
message = f"{hook_name} passed"
else:
message = f"{hook_name} failed, error:\n {message}"
failed = True
else:
status = RunStatus.Skipped
message = f"{hook_name} skipped"

hook.update_event_status(node_status=status)

self.node_results.append(
RunResult(
status=status,
thread_id="main",
timing=timing,
message=message,
adapter_response={},
execution_time=execution_time,
failures=failures,
node=hook,
)
)

fire_event(
LogHookStartLine(
LogHookEndLine(
statement=hook_name,
status=status,
index=hook.index,
total=num_hooks,
execution_time=execution_time,
node_info=hook.node_info,
)
)

with collect_timing_info("execute", timing.append):
status, message = get_execution_status(sql, adapter)

finished_at = timing[1].completed_at or datetime.utcnow()
hook.update_event_status(finished_at=finished_at.isoformat())
execution_time = (finished_at - started_at).total_seconds()
failures = 0 if status == RunStatus.Success else 1

if status == RunStatus.Success:
message = f"{hook_name} passed"
else:
message = f"{hook_name} failed, error:\n {message}"
failed = True
else:
status = RunStatus.Skipped
message = f"{hook_name} skipped"

hook.update_event_status(node_status=status)

self.node_results.append(
RunResult(
status=status,
thread_id="main",
timing=timing,
message=message,
adapter_response={},
execution_time=execution_time,
failures=failures,
node=hook,
)
)

fire_event(
LogHookEndLine(
statement=hook_name,
status=status,
index=hook.index,
total=num_hooks,
execution_time=execution_time,
node_info=hook.node_info,
)
)

if hook_type == RunHookType.Start and ordered_hooks:
fire_event(Formatting(""))

Expand Down
37 changes: 32 additions & 5 deletions core/dbt/task/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from pathlib import Path
from typing import AbstractSet, Dict, Iterable, List, Optional, Set, Tuple, Type, Union

from opentelemetry import context, trace
from opentelemetry.trace import SpanContext, StatusCode

import dbt.exceptions
import dbt.tracking
import dbt.utils
Expand Down Expand Up @@ -91,6 +94,7 @@ def __init__(self, args: Flags, config: RuntimeConfig, manifest: Manifest) -> No
self.previous_defer_state: Optional[PreviousState] = None
self.run_count: int = 0
self.started_at: float = 0
self._node_span_context_mapping: Dict[str, SpanContext] = {}

if self.args.state:
self.previous_state = PreviousState(
Expand Down Expand Up @@ -222,14 +226,32 @@ def get_runner(self, node) -> BaseRunner:

return cls(self.config, adapter, node, run_count, num_nodes)

def call_runner(self, runner: BaseRunner) -> RunResult:
with log_contextvars(node_info=runner.node.node_info):
def call_runner(self, runner: BaseRunner, parent_context=None) -> RunResult:
tracer = trace.get_tracer("dbt-runner")
node_info = runner.node.node_info
if parent_context is None:
parent_context = context.get_current()
model_span = tracer.start_span(node_info["unique_id"], context=parent_context)
ctx = trace.set_span_in_context(model_span)
token = context.attach(ctx)
self._node_span_context_mapping[node_info["unique_id"]] = model_span.get_span_context()
if hasattr(runner.node.depends_on, "nodes"):
for parent_node in runner.node.depends_on.nodes:
if parent_node in self._node_span_context_mapping:
try:
model_span.add_link(
self._node_span_context_mapping[parent_node],
{"model_name": parent_node},
)
except Exception:
pass
with log_contextvars(node_info=node_info):
runner.node.update_event_status(
started_at=datetime.utcnow().isoformat(), node_status=RunningStatus.Started
)
fire_event(
NodeStart(
node_info=runner.node.node_info,
node_info=node_info,
)
)
try:
Expand All @@ -242,10 +264,14 @@ def call_runner(self, runner: BaseRunner) -> RunResult:
result = None
thread_exception = e
finally:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Java I know that the resource (in try-with-resource pattern) is closed before calling finally (would be the span in this case). Is this the case in Python as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but this try-catch-finally is different inner scope within with block correct?

if result.status in (NodeStatus.Error, NodeStatus.Fail, NodeStatus.PartialSuccess):
model_span.set_status(StatusCode.ERROR)
context.detach(token)
model_span.end()
if result is not None:
fire_event(
NodeFinished(
node_info=runner.node.node_info,
node_info=node_info,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to refer some attributes in node_info at different place and using full name seems lengthy and difficult to read hence shortened it.

run_result=result.to_msg_dict(),
)
)
Expand All @@ -256,7 +282,7 @@ def call_runner(self, runner: BaseRunner) -> RunResult:
GenericExceptionOnRun(
unique_id=runner.node.unique_id,
exc=str(thread_exception),
node_info=runner.node.node_info,
node_info=node_info,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

)
)

Expand Down Expand Up @@ -304,6 +330,7 @@ def _submit(self, pool, args, callback):

This does still go through the callback path for result collection.
"""
args.append(context.get_current())
if self.config.args.single_threaded:
callback(self.call_runner(*args))
else:
Expand Down
5 changes: 3 additions & 2 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-adapters
git+https://github.com/sfc-gh-vguttha/dbt-adapters.git@vguttha-add-telemetry#subdirectory=dbt-adapters
git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-tests-adapter
git+https://github.com/dbt-labs/dbt-common.git@main
git+https://github.com/dbt-labs/dbt-adapters.git@main#subdirectory=dbt-postgres
git+https://github.com/sfc-gh-vguttha/dbt-adapters.git@vguttha-add-telemetry#subdirectory=dbt-snowflake
# black must match what's in .pre-commit-config.yaml to be sure local env matches CI
black==24.3.0
bumpversion
Expand Down Expand Up @@ -38,3 +38,4 @@ types-pytz
types-requests
types-setuptools
mocker
opentelemetry-api>=1.23.0
Loading