From cebcdfd7a33d881dd092727209baa4b42f24bd6f Mon Sep 17 00:00:00 2001 From: kenxu95 Date: Mon, 17 Apr 2023 18:58:33 -0400 Subject: [PATCH] made the change for all data integrations --- sdk/aqueduct/error.py | 8 +++++ .../integrations/dynamic_k8s_integration.py | 6 ++++ .../integrations/google_sheets_integration.py | 4 +++ .../integrations/integration_decorators.py | 13 ++++++++ .../integrations/mongodb_integration.py | 6 ++++ sdk/aqueduct/integrations/s3_integration.py | 4 +++ .../integrations/salesforce_integration.py | 5 ++++ sdk/aqueduct/integrations/sql_integration.py | 18 ++++++++--- sdk/aqueduct/models/integration.py | 25 +++++++++++++--- sdk/aqueduct/utils/integration_validation.py | 30 +++++++++++++++++++ 10 files changed, 111 insertions(+), 8 deletions(-) create mode 100644 sdk/aqueduct/integrations/integration_decorators.py create mode 100644 sdk/aqueduct/utils/integration_validation.py diff --git a/sdk/aqueduct/error.py b/sdk/aqueduct/error.py index b29217030..471afd775 100644 --- a/sdk/aqueduct/error.py +++ b/sdk/aqueduct/error.py @@ -135,3 +135,11 @@ class ClientValidationError(Error): # Exception raised when requirements.txt file is missing class RequirementsMissingError(Error): pass + +# Exception raised when the user attempts to use an integration that has failed to connected. +class IntegrationFailedToConnect(Error): + pass + +# Exception raised when the user attempts to use an integration that is still connecting. +class IntegrationConnectionInProgress(Error): + pass \ No newline at end of file diff --git a/sdk/aqueduct/integrations/dynamic_k8s_integration.py b/sdk/aqueduct/integrations/dynamic_k8s_integration.py index e9a04e2ea..3286485a0 100644 --- a/sdk/aqueduct/integrations/dynamic_k8s_integration.py +++ b/sdk/aqueduct/integrations/dynamic_k8s_integration.py @@ -1,6 +1,8 @@ from typing import Dict, Union from aqueduct import globals +from aqueduct.integrations.integration_decorators import validate_is_connected + from aqueduct.constants.enums import K8sClusterActionType, K8sClusterStatusType from aqueduct.error import InvalidIntegrationException, InvalidUserArgumentException from aqueduct.integrations.connect_config import DynamicK8sConfig @@ -40,6 +42,7 @@ class DynamicK8sIntegration(Integration): def __init__(self, metadata: IntegrationInfo): self._metadata = metadata + @validate_is_connected() def status(self) -> str: """Get the current status of the dynamic Kubernetes cluster.""" engine_statuses = globals.__GLOBAL_API_CLIENT__.get_dynamic_engine_status( @@ -50,6 +53,7 @@ def status(self) -> str: return engine_statuses[self._metadata.name].status.value + @validate_is_connected() def create( self, config_delta: Union[Dict[str, Union[int, str]], DynamicK8sConfig] = {} ) -> None: @@ -92,6 +96,7 @@ def create( config_delta=config_delta, ) + @validate_is_connected() def update(self, config_delta: Union[Dict[str, Union[int, str]], DynamicK8sConfig]) -> None: """Update the dynamic Kubernetes cluster. This can only be done when the cluster is in Active status. @@ -134,6 +139,7 @@ def update(self, config_delta: Union[Dict[str, Union[int, str]], DynamicK8sConfi config_delta=config_delta, ) + @validate_is_connected() def delete(self, force: bool = False) -> None: """Deletes the dynamic Kubernetes cluster if it is running, ignoring the keepalive period. diff --git a/sdk/aqueduct/integrations/google_sheets_integration.py b/sdk/aqueduct/integrations/google_sheets_integration.py index 13345663b..b44d0e1fe 100644 --- a/sdk/aqueduct/integrations/google_sheets_integration.py +++ b/sdk/aqueduct/integrations/google_sheets_integration.py @@ -1,5 +1,7 @@ from typing import Optional +from aqueduct.integrations.integration_decorators import validate_is_connected + from aqueduct.artifacts.base_artifact import BaseArtifact from aqueduct.artifacts.table_artifact import TableArtifact from aqueduct.constants.enums import ArtifactType, GoogleSheetsSaveMode @@ -29,6 +31,7 @@ def __init__(self, dag: DAG, metadata: IntegrationInfo): self._dag = dag self._metadata = metadata + @validate_is_connected() def spreadsheet( self, spreadsheet_id: str, @@ -96,6 +99,7 @@ def spreadsheet( artifact_id=output_artifact_id, ) + @validate_is_connected() def save( self, artifact: BaseArtifact, diff --git a/sdk/aqueduct/integrations/integration_decorators.py b/sdk/aqueduct/integrations/integration_decorators.py new file mode 100644 index 000000000..b44fd12ed --- /dev/null +++ b/sdk/aqueduct/integrations/integration_decorators.py @@ -0,0 +1,13 @@ +from aqueduct.utils.integration_validation import validate_integration_is_connected + + +def validate_is_connected(): + """This decorator, when used on an Integration class method, ensures that the integration is + connected before allowing the method to be called.""" + + def decorator(method): + def wrapper(self, *args, **kwargs): + validate_integration_is_connected(self.name(), self._metadata.exec_state) + return method(*args, **kwargs) + return wrapper + return decorator diff --git a/sdk/aqueduct/integrations/mongodb_integration.py b/sdk/aqueduct/integrations/mongodb_integration.py index 2964fda4f..58ba70534 100644 --- a/sdk/aqueduct/integrations/mongodb_integration.py +++ b/sdk/aqueduct/integrations/mongodb_integration.py @@ -1,6 +1,8 @@ import json from typing import Any, Dict, List, Optional +from aqueduct.integrations.integration_decorators import validate_is_connected + from aqueduct import globals from aqueduct.artifacts import preview as artifact_utils from aqueduct.artifacts.base_artifact import BaseArtifact @@ -32,6 +34,7 @@ def __init__(self, dag: DAG, metadata: IntegrationInfo, collection_name: str) -> self._dag = dag self._collection_name = collection_name + @validate_is_connected() def find( self, *args: List[Any], @@ -155,6 +158,7 @@ def find( # We are in lazy mode. return TableArtifact(self._dag, output_artf_id) + @validate_is_connected() def save(self, artifact: BaseArtifact, update_mode: LoadUpdateMode) -> None: """Registers a save operator of the given artifact, to be executed when it's computed in a published flow. @@ -187,6 +191,7 @@ def __init__(self, dag: DAG, metadata: IntegrationInfo): self._dag = dag self._metadata = metadata + @validate_is_connected() def collection(self, name: str) -> MongoDBCollectionIntegration: """Returns a specific collection object to call `.find()` method. @@ -202,6 +207,7 @@ def describe(self) -> None: print("==================== MongoDB Integration =============================") self._metadata.describe() + @validate_is_connected() def save(self, artifact: BaseArtifact, collection: str, update_mode: LoadUpdateMode) -> None: """Registers a save operator of the given artifact, to be executed when it's computed in a published flow. diff --git a/sdk/aqueduct/integrations/s3_integration.py b/sdk/aqueduct/integrations/s3_integration.py index 139e7ee27..29090866c 100644 --- a/sdk/aqueduct/integrations/s3_integration.py +++ b/sdk/aqueduct/integrations/s3_integration.py @@ -1,6 +1,8 @@ import json from typing import List, Optional, Union +from aqueduct.integrations.integration_decorators import validate_is_connected + from aqueduct import globals from aqueduct.artifacts import preview as artifact_utils from aqueduct.artifacts.base_artifact import BaseArtifact @@ -50,6 +52,7 @@ def __init__(self, dag: DAG, metadata: IntegrationInfo): self._dag = dag self._metadata = metadata + @validate_is_connected() def file( self, filepaths: Union[List[str], str], @@ -170,6 +173,7 @@ def _is_multi_file_search() -> bool: # We are in lazy mode. return to_artifact_class(self._dag, output_artifact_id, artifact_type) + @validate_is_connected() def save(self, artifact: BaseArtifact, filepath: str, format: Optional[str] = None) -> None: """Registers a save operator of the given artifact, to be executed when it's computed in a published flow. diff --git a/sdk/aqueduct/integrations/salesforce_integration.py b/sdk/aqueduct/integrations/salesforce_integration.py index f1ff900ab..634731e14 100644 --- a/sdk/aqueduct/integrations/salesforce_integration.py +++ b/sdk/aqueduct/integrations/salesforce_integration.py @@ -1,6 +1,8 @@ import uuid from typing import Optional +from aqueduct.integrations.integration_decorators import validate_is_connected + from aqueduct.artifacts.base_artifact import BaseArtifact from aqueduct.artifacts.table_artifact import TableArtifact from aqueduct.constants.enums import ArtifactType, SalesforceExtractType @@ -30,6 +32,7 @@ def __init__(self, dag: DAG, metadata: IntegrationInfo): self._dag = dag self._metadata = metadata + @validate_is_connected() def search( self, search_query: str, @@ -69,6 +72,7 @@ def search( artifact_id=output_artifact_id, ) + @validate_is_connected() def query( self, query: str, @@ -102,6 +106,7 @@ def query( artifact_id=output_artifact_id, ) + @validate_is_connected() def save(self, artifact: BaseArtifact, object: str) -> None: """Registers a save operator of the given artifact, to be executed when it's computed in a published flow. diff --git a/sdk/aqueduct/integrations/sql_integration.py b/sdk/aqueduct/integrations/sql_integration.py index 3654e1a6c..dd9b77d89 100644 --- a/sdk/aqueduct/integrations/sql_integration.py +++ b/sdk/aqueduct/integrations/sql_integration.py @@ -1,8 +1,8 @@ from typing import List, Optional, Union import pandas as pd +from aqueduct.integrations.integration_decorators import validate_is_connected -from aqueduct import globals from aqueduct.artifacts.base_artifact import BaseArtifact from aqueduct.artifacts.preview import preview_artifact from aqueduct.artifacts.table_artifact import TableArtifact @@ -46,6 +46,7 @@ def __init__(self, dag: DAG, metadata: IntegrationInfo): self._dag = dag self._metadata = metadata + @validate_is_connected() def list_tables(self) -> pd.DataFrame: """ Lists the tables available in the RelationalDB integration. @@ -80,6 +81,7 @@ def list_tables(self) -> pd.DataFrame: sql_artifact = self.sql(query=list_tables_query) return sql_artifact.get() + @validate_is_connected() def table(self, name: str) -> pd.DataFrame: """ Retrieves a table from a RealtionalDB integration. @@ -94,6 +96,7 @@ def table(self, name: str) -> pd.DataFrame: sql_artifact = self.sql(query=GET_TABLE_QUERY % name) return sql_artifact.get() + @validate_is_connected() def sql( self, query: Union[str, List[str], RelationalDBExtractParams], @@ -233,6 +236,7 @@ def sql( # We are in lazy mode. return TableArtifact(self._dag, sql_output_artifact_id) + @validate_is_connected() def save(self, artifact: BaseArtifact, table_name: str, update_mode: LoadUpdateMode) -> None: """Registers a save operator of the given artifact, to be executed when it's computed in a published flow. @@ -269,6 +273,12 @@ def describe(self) -> None: print("==================== SQL Integration =============================") print("Integration Information:") self._metadata.describe() - print("Integration Table List Preview:") - print(self.list_tables()["name"].head().to_string()) - print("(only first 5 tables are shown)") + + # Only list the tables if the integration is connected. + try: + print("Integration Table List Preview:") + print(self.list_tables()["name"].head().to_string()) + print("(only first 5 tables are shown)") + except: + pass + diff --git a/sdk/aqueduct/models/integration.py b/sdk/aqueduct/models/integration.py index 31294c408..e81dc4bbb 100644 --- a/sdk/aqueduct/models/integration.py +++ b/sdk/aqueduct/models/integration.py @@ -4,6 +4,7 @@ from typing import Any, Dict, Optional from aqueduct.constants.enums import ExecutionStatus +from aqueduct.error import IntegrationFailedToConnect, IntegrationConnectionInProgress from aqueduct.models.execution_state import ExecutionState from aqueduct.models.utils import human_readable_timestamp from pydantic import BaseModel @@ -78,11 +79,27 @@ def name(self) -> str: def type(self) -> ServiceType: return self._metadata.service - def _is_connected(self) -> bool: - """Method used to determine if this integration was successfully connected to or not.""" + def _validate_is_connected(self) -> None: + """Method used to determine if this integration was successfully connected to or not. + If not successfully connected (or pending), we will raise an Exception. + """ # TODO(ENG-2813): Remove the assumption that a missing `exec_state` means success. - if self._metadata.exec_state - + if self._metadata.exec_state is None or self._metadata.exec_state.status == ExecutionStatus.SUCCEEDED: + return + + if self._metadata.exec_state.status == ExecutionStatus.FAILED: + raise IntegrationFailedToConnect( + "The attempt to connect to %s has failed with error: %s\n%s\n\n " + "Please see the /integrations page on the UI for more details." % ( + self.name(), self._metadata.exec_state.error.tip, self._metadata.exec_state.error.context, + ) + ) + else: + raise IntegrationConnectionInProgress( + "We are still in the process of connecting to integration %s." + "Please see the /integrations page oen the UI for more details." + % self.name() + ) def __hash__(self) -> int: """An integration is uniquely identified by its name. diff --git a/sdk/aqueduct/utils/integration_validation.py b/sdk/aqueduct/utils/integration_validation.py new file mode 100644 index 000000000..2a5d9e649 --- /dev/null +++ b/sdk/aqueduct/utils/integration_validation.py @@ -0,0 +1,30 @@ +from typing import Optional + +from aqueduct.error import IntegrationFailedToConnect, IntegrationConnectionInProgress + +from aqueduct.constants.enums import ExecutionStatus +from aqueduct.models.execution_state import ExecutionState + + +def validate_integration_is_connected(name: str, exec_state: Optional[ExecutionState]) -> None: + """Method used to determine if this integration was successfully connected to or not. + If not successfully connected (or pending), we will raise an Exception. + """ + # TODO(ENG-2813): Remove the assumption that a missing `exec_state` means success. + if exec_state is None or exec_state.status == ExecutionStatus.SUCCEEDED: + return + + if exec_state.status == ExecutionStatus.FAILED: + raise IntegrationFailedToConnect( + "Cannot use integration %s because it has not been successfully connected to:" + "%s\n%s\n\n Please see the /integrations page on the UI for more details." % ( + name, exec_state.error.tip, exec_state.error.context, + ) + ) + else: + raise IntegrationConnectionInProgress( + "Cannot use integration %s because it is still in the process of connecting." + "Please see the /integrations page oen the UI for more details." + % name + ) + \ No newline at end of file