From b7004e10b46712a4bb78662c81d4ad53f950382b Mon Sep 17 00:00:00 2001 From: kenxu95 Date: Mon, 17 Apr 2023 19:08:29 -0400 Subject: [PATCH] done? --- sdk/aqueduct/artifacts/preview.py | 14 +++++++------- sdk/aqueduct/utils/integration_validation.py | 1 + sdk/aqueduct/utils/utils.py | 4 ++++ 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/sdk/aqueduct/artifacts/preview.py b/sdk/aqueduct/artifacts/preview.py index de46fa107..805d1a10f 100644 --- a/sdk/aqueduct/artifacts/preview.py +++ b/sdk/aqueduct/artifacts/preview.py @@ -36,6 +36,13 @@ def preview_artifacts( Returns a list of artifacts, each corresponding to one of the provided `target_artifact_ids`, in the same order. """ + global_engine_config: Optional[EngineConfig] = None + if globals.__GLOBAL_CONFIG__.engine is not None: + global_engine_config = generate_engine_config( + globals.__GLOBAL_API_CLIENT__.list_integrations(), + globals.__GLOBAL_CONFIG__.engine, + ) + subgraph = apply_deltas_to_dag( dag, deltas=[ @@ -49,13 +56,6 @@ def preview_artifacts( ], make_copy=True, ) - - global_engine_config: Optional[EngineConfig] = None - if globals.__GLOBAL_CONFIG__.engine is not None: - global_engine_config = generate_engine_config( - globals.__GLOBAL_API_CLIENT__.list_integrations(), - globals.__GLOBAL_CONFIG__.engine, - ) subgraph.set_engine_config(global_engine_config=global_engine_config) engine_statuses = globals.__GLOBAL_API_CLIENT__.get_dynamic_engine_status_by_dag(dag=subgraph) diff --git a/sdk/aqueduct/utils/integration_validation.py b/sdk/aqueduct/utils/integration_validation.py index 2a5d9e649..b8350744a 100644 --- a/sdk/aqueduct/utils/integration_validation.py +++ b/sdk/aqueduct/utils/integration_validation.py @@ -6,6 +6,7 @@ from aqueduct.models.execution_state import ExecutionState +# TODO: maybe move this to artifacts/? def validate_integration_is_connected(name: str, exec_state: Optional[ExecutionState]) -> None: """Method used to determine if this integration was successfully connected to or not. If not successfully connected (or pending), we will raise an Exception. diff --git a/sdk/aqueduct/utils/utils.py b/sdk/aqueduct/utils/utils.py index 309967cca..8f1820001 100644 --- a/sdk/aqueduct/utils/utils.py +++ b/sdk/aqueduct/utils/utils.py @@ -1,6 +1,8 @@ import uuid from typing import Any, Dict, List, Optional, Tuple, Union +from aqueduct.utils.integration_validation import validate_integration_is_connected + from croniter import croniter from aqueduct.constants.enums import ArtifactType, RuntimeType, ServiceType, TriggerType @@ -161,6 +163,8 @@ def generate_engine_config( ) integration = integrations[integration_name] + validate_integration_is_connected(integration_name, integration.exec_state) + if integration.service == ServiceType.AIRFLOW: return EngineConfig( type=RuntimeType.AIRFLOW,