Skip to content

Commit

Permalink
done?
Browse files Browse the repository at this point in the history
  • Loading branch information
kenxu95 committed Apr 18, 2023
1 parent cebcdfd commit b7004e1
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
14 changes: 7 additions & 7 deletions sdk/aqueduct/artifacts/preview.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ def preview_artifacts(
Returns a list of artifacts, each corresponding to one of the provided `target_artifact_ids`, in
the same order.
"""
global_engine_config: Optional[EngineConfig] = None
if globals.__GLOBAL_CONFIG__.engine is not None:
global_engine_config = generate_engine_config(
globals.__GLOBAL_API_CLIENT__.list_integrations(),
globals.__GLOBAL_CONFIG__.engine,
)

subgraph = apply_deltas_to_dag(
dag,
deltas=[
Expand All @@ -49,13 +56,6 @@ def preview_artifacts(
],
make_copy=True,
)

global_engine_config: Optional[EngineConfig] = None
if globals.__GLOBAL_CONFIG__.engine is not None:
global_engine_config = generate_engine_config(
globals.__GLOBAL_API_CLIENT__.list_integrations(),
globals.__GLOBAL_CONFIG__.engine,
)
subgraph.set_engine_config(global_engine_config=global_engine_config)

engine_statuses = globals.__GLOBAL_API_CLIENT__.get_dynamic_engine_status_by_dag(dag=subgraph)
Expand Down
1 change: 1 addition & 0 deletions sdk/aqueduct/utils/integration_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from aqueduct.models.execution_state import ExecutionState


# TODO: maybe move this to artifacts/?
def validate_integration_is_connected(name: str, exec_state: Optional[ExecutionState]) -> None:
"""Method used to determine if this integration was successfully connected to or not.
If not successfully connected (or pending), we will raise an Exception.
Expand Down
4 changes: 4 additions & 0 deletions sdk/aqueduct/utils/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import uuid
from typing import Any, Dict, List, Optional, Tuple, Union

from aqueduct.utils.integration_validation import validate_integration_is_connected

from croniter import croniter

from aqueduct.constants.enums import ArtifactType, RuntimeType, ServiceType, TriggerType
Expand Down Expand Up @@ -161,6 +163,8 @@ def generate_engine_config(
)

integration = integrations[integration_name]
validate_integration_is_connected(integration_name, integration.exec_state)

if integration.service == ServiceType.AIRFLOW:
return EngineConfig(
type=RuntimeType.AIRFLOW,
Expand Down

0 comments on commit b7004e1

Please sign in to comment.