Skip to content

Commit

Permalink
fix asset checks + step launchers (#19443)
Browse files Browse the repository at this point in the history
https://dagsterlabs.slack.com/archives/C058314NT5H/p1705693260012779

Populate step launched instances with ASSET_CHECK_EVALUATION_PLANNED
events. The alternative would be to raise the restriction that
ASSET_CHECK_EVALUATION events are preceded by planned events, either
globally or just for step launcher instances. I think this approach is
workable.
  • Loading branch information
johannkm authored Jan 30, 2024
1 parent 85184e5 commit ccc7c48
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -931,9 +931,9 @@ def is_in_graph_asset(self) -> bool:
@property
def is_asset_check_step(self) -> bool:
"""Whether this step corresponds to an asset check."""
node_handle = self.node_handle
return (
self.job_def.asset_layer.asset_checks_defs_by_node_handle.get(node_handle) is not None
self.job_def.asset_layer.asset_checks_defs_by_node_handle.get(self.node_handle)
is not None
)

def set_data_version(self, asset_key: AssetKey, data_version: "DataVersion") -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
import pickle
import shutil
Expand All @@ -13,7 +14,7 @@
from dagster._core.definitions.resource_definition import dagster_maintained_resource, resource
from dagster._core.definitions.step_launcher import StepLauncher, StepRunRef
from dagster._core.errors import raise_execution_interrupts
from dagster._core.events import DagsterEvent
from dagster._core.events import AssetCheckEvaluationPlanned, DagsterEvent, DagsterEventType
from dagster._core.events.log import EventLogEntry
from dagster._core.execution.api import create_execution_plan
from dagster._core.execution.context.system import StepExecutionContext
Expand Down Expand Up @@ -253,6 +254,27 @@ def run_step_from_ref(
partitions_def_name=partitions_def.name, partition_keys=[step_context.partition_key]
)

# Note: this patches ASSET_CHECK_EVALUATION_PLANNED events into the mini event log present
# on external steps. This is necessary because we test that ASSET_CHECK_EVALUATION events
# have corresponding ASSET_CHECK_EVALUATION_PLANNED events.
for output in step_context.step.step_outputs:
asset_check_key = check.not_none(output.properties).asset_check_key
if asset_check_key:
event = DagsterEvent(
event_type_value=DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED.value,
job_name=step_context.job_name,
message=(
f"{step_context.job_name} intends to execute asset check {asset_check_key.name} on"
f" asset {asset_check_key.asset_key.to_string()}"
),
event_specific_data=AssetCheckEvaluationPlanned(
asset_check_key.asset_key,
check_name=asset_check_key.name,
),
step_key=step_context.step.key,
)
instance.report_dagster_event(event, step_run_ref.run_id, logging.DEBUG)

# The step should be forced to run locally with respect to the remote process that this step
# context is being deserialized in
return dagster_event_sequence_for_step(step_context, force_local_execution=True)
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@

import pytest
from dagster import (
AssetCheckResult,
AssetKey,
AssetsDefinition,
DynamicOut,
DynamicOutput,
Failure,
Field,
Output,
ResourceDefinition,
RetryPolicy,
RetryRequested,
Expand All @@ -29,6 +31,8 @@
resource,
with_resources,
)
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.cacheable_assets import (
AssetsDefinitionCacheableData,
CacheableAssetsDefinition,
Expand Down Expand Up @@ -69,7 +73,7 @@ def make_run_config(scratch_dir: str, resource_set: str):
else:
step_launcher_resource_keys = ["second_step_launcher"]
return deep_merge_dicts(
RUN_CONFIG_BASE,
RUN_CONFIG_BASE if resource_set != "no_base" else {},
{
"resources": merge_dicts(
{"io_manager": {"config": {"base_dir": scratch_dir}}},
Expand Down Expand Up @@ -301,14 +305,42 @@ def sleepy_job():
return sleepy_job


def initialize_step_context(scratch_dir: str, instance: DagsterInstance) -> IStepContext:
def define_asset_check_job():
@asset(
check_specs=[
AssetCheckSpec(
asset="asset1",
name="check1",
)
],
resource_defs={
"second_step_launcher": local_external_step_launcher,
"io_manager": fs_io_manager,
},
)
def asset1():
yield Output(1)
yield AssetCheckResult(passed=True)

return define_asset_job(name="asset_check_job", selection=[asset1]).resolve(
asset_graph=AssetGraph.from_assets([asset1])
)


def initialize_step_context(
scratch_dir: str,
instance: DagsterInstance,
job_def_fn=define_basic_job_external,
resource_set="external",
step_name="return_two",
) -> IStepContext:
run = DagsterRun(
job_name="foo_job",
run_id=str(uuid.uuid4()),
run_config=make_run_config(scratch_dir, "external"),
run_config=make_run_config(scratch_dir, resource_set),
)

recon_job = reconstructable(define_basic_job_external)
recon_job = reconstructable(job_def_fn)

plan = create_execution_plan(recon_job, run.run_config)

Expand All @@ -325,7 +357,7 @@ def initialize_step_context(scratch_dir: str, instance: DagsterInstance) -> ISte
job_context = initialization_manager.get_context()

step_context = job_context.for_step(
plan.get_step_by_key("return_two"), # type: ignore
plan.get_step_by_key(step_name), # type: ignore
KnownExecutionState(),
)
return step_context
Expand Down Expand Up @@ -363,6 +395,25 @@ def test_local_external_step_launcher():
assert DagsterEventType.STEP_FAILURE not in event_types


def test_asset_check_step_launcher():
with tempfile.TemporaryDirectory() as tmpdir:
with DagsterInstance.ephemeral() as instance:
step_context = initialize_step_context(
tmpdir,
instance,
job_def_fn=define_asset_check_job,
resource_set="no_base",
step_name="asset1",
)

step_launcher = LocalExternalStepLauncher(tmpdir)
events = list(step_launcher.launch_step(step_context))
event_types = [event.event_type for event in events]
assert DagsterEventType.STEP_START in event_types
assert DagsterEventType.STEP_SUCCESS in event_types
assert DagsterEventType.STEP_FAILURE not in event_types


@pytest.mark.parametrize("resource_set", ["external", "internal_and_external"])
def test_job(resource_set):
if resource_set == "external":
Expand Down

0 comments on commit ccc7c48

Please sign in to comment.