Skip to content

Commit

Permalink
made the change for all data integrations
Browse files Browse the repository at this point in the history
  • Loading branch information
kenxu95 committed Apr 18, 2023
1 parent 49a8423 commit cebcdfd
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 8 deletions.
8 changes: 8 additions & 0 deletions sdk/aqueduct/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,11 @@ class ClientValidationError(Error):
# Exception raised when requirements.txt file is missing
class RequirementsMissingError(Error):
pass

# Exception raised when the user attempts to use an integration that has failed to connected.
class IntegrationFailedToConnect(Error):
pass

# Exception raised when the user attempts to use an integration that is still connecting.
class IntegrationConnectionInProgress(Error):
pass
6 changes: 6 additions & 0 deletions sdk/aqueduct/integrations/dynamic_k8s_integration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Dict, Union

from aqueduct import globals
from aqueduct.integrations.integration_decorators import validate_is_connected

from aqueduct.constants.enums import K8sClusterActionType, K8sClusterStatusType
from aqueduct.error import InvalidIntegrationException, InvalidUserArgumentException
from aqueduct.integrations.connect_config import DynamicK8sConfig
Expand Down Expand Up @@ -40,6 +42,7 @@ class DynamicK8sIntegration(Integration):
def __init__(self, metadata: IntegrationInfo):
self._metadata = metadata

@validate_is_connected()
def status(self) -> str:
"""Get the current status of the dynamic Kubernetes cluster."""
engine_statuses = globals.__GLOBAL_API_CLIENT__.get_dynamic_engine_status(
Expand All @@ -50,6 +53,7 @@ def status(self) -> str:

return engine_statuses[self._metadata.name].status.value

@validate_is_connected()
def create(
self, config_delta: Union[Dict[str, Union[int, str]], DynamicK8sConfig] = {}
) -> None:
Expand Down Expand Up @@ -92,6 +96,7 @@ def create(
config_delta=config_delta,
)

@validate_is_connected()
def update(self, config_delta: Union[Dict[str, Union[int, str]], DynamicK8sConfig]) -> None:
"""Update the dynamic Kubernetes cluster. This can only be done when the cluster is in
Active status.
Expand Down Expand Up @@ -134,6 +139,7 @@ def update(self, config_delta: Union[Dict[str, Union[int, str]], DynamicK8sConfi
config_delta=config_delta,
)

@validate_is_connected()
def delete(self, force: bool = False) -> None:
"""Deletes the dynamic Kubernetes cluster if it is running, ignoring the keepalive period.
Expand Down
4 changes: 4 additions & 0 deletions sdk/aqueduct/integrations/google_sheets_integration.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Optional

from aqueduct.integrations.integration_decorators import validate_is_connected

from aqueduct.artifacts.base_artifact import BaseArtifact
from aqueduct.artifacts.table_artifact import TableArtifact
from aqueduct.constants.enums import ArtifactType, GoogleSheetsSaveMode
Expand Down Expand Up @@ -29,6 +31,7 @@ def __init__(self, dag: DAG, metadata: IntegrationInfo):
self._dag = dag
self._metadata = metadata

@validate_is_connected()
def spreadsheet(
self,
spreadsheet_id: str,
Expand Down Expand Up @@ -96,6 +99,7 @@ def spreadsheet(
artifact_id=output_artifact_id,
)

@validate_is_connected()
def save(
self,
artifact: BaseArtifact,
Expand Down
13 changes: 13 additions & 0 deletions sdk/aqueduct/integrations/integration_decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from aqueduct.utils.integration_validation import validate_integration_is_connected


def validate_is_connected():
"""This decorator, when used on an Integration class method, ensures that the integration is
connected before allowing the method to be called."""

def decorator(method):
def wrapper(self, *args, **kwargs):
validate_integration_is_connected(self.name(), self._metadata.exec_state)
return method(*args, **kwargs)
return wrapper
return decorator
6 changes: 6 additions & 0 deletions sdk/aqueduct/integrations/mongodb_integration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
from typing import Any, Dict, List, Optional

from aqueduct.integrations.integration_decorators import validate_is_connected

from aqueduct import globals
from aqueduct.artifacts import preview as artifact_utils
from aqueduct.artifacts.base_artifact import BaseArtifact
Expand Down Expand Up @@ -32,6 +34,7 @@ def __init__(self, dag: DAG, metadata: IntegrationInfo, collection_name: str) ->
self._dag = dag
self._collection_name = collection_name

@validate_is_connected()
def find(
self,
*args: List[Any],
Expand Down Expand Up @@ -155,6 +158,7 @@ def find(
# We are in lazy mode.
return TableArtifact(self._dag, output_artf_id)

@validate_is_connected()
def save(self, artifact: BaseArtifact, update_mode: LoadUpdateMode) -> None:
"""Registers a save operator of the given artifact, to be executed when it's computed in a published flow.
Expand Down Expand Up @@ -187,6 +191,7 @@ def __init__(self, dag: DAG, metadata: IntegrationInfo):
self._dag = dag
self._metadata = metadata

@validate_is_connected()
def collection(self, name: str) -> MongoDBCollectionIntegration:
"""Returns a specific collection object to call `.find()` method.
Expand All @@ -202,6 +207,7 @@ def describe(self) -> None:
print("==================== MongoDB Integration =============================")
self._metadata.describe()

@validate_is_connected()
def save(self, artifact: BaseArtifact, collection: str, update_mode: LoadUpdateMode) -> None:
"""Registers a save operator of the given artifact, to be executed when it's computed in a published flow.
Expand Down
4 changes: 4 additions & 0 deletions sdk/aqueduct/integrations/s3_integration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import json
from typing import List, Optional, Union

from aqueduct.integrations.integration_decorators import validate_is_connected

from aqueduct import globals
from aqueduct.artifacts import preview as artifact_utils
from aqueduct.artifacts.base_artifact import BaseArtifact
Expand Down Expand Up @@ -50,6 +52,7 @@ def __init__(self, dag: DAG, metadata: IntegrationInfo):
self._dag = dag
self._metadata = metadata

@validate_is_connected()
def file(
self,
filepaths: Union[List[str], str],
Expand Down Expand Up @@ -170,6 +173,7 @@ def _is_multi_file_search() -> bool:
# We are in lazy mode.
return to_artifact_class(self._dag, output_artifact_id, artifact_type)

@validate_is_connected()
def save(self, artifact: BaseArtifact, filepath: str, format: Optional[str] = None) -> None:
"""Registers a save operator of the given artifact, to be executed when it's computed in a published flow.
Expand Down
5 changes: 5 additions & 0 deletions sdk/aqueduct/integrations/salesforce_integration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import uuid
from typing import Optional

from aqueduct.integrations.integration_decorators import validate_is_connected

from aqueduct.artifacts.base_artifact import BaseArtifact
from aqueduct.artifacts.table_artifact import TableArtifact
from aqueduct.constants.enums import ArtifactType, SalesforceExtractType
Expand Down Expand Up @@ -30,6 +32,7 @@ def __init__(self, dag: DAG, metadata: IntegrationInfo):
self._dag = dag
self._metadata = metadata

@validate_is_connected()
def search(
self,
search_query: str,
Expand Down Expand Up @@ -69,6 +72,7 @@ def search(
artifact_id=output_artifact_id,
)

@validate_is_connected()
def query(
self,
query: str,
Expand Down Expand Up @@ -102,6 +106,7 @@ def query(
artifact_id=output_artifact_id,
)

@validate_is_connected()
def save(self, artifact: BaseArtifact, object: str) -> None:
"""Registers a save operator of the given artifact, to be executed when it's computed in a published flow.
Expand Down
18 changes: 14 additions & 4 deletions sdk/aqueduct/integrations/sql_integration.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from typing import List, Optional, Union

import pandas as pd
from aqueduct.integrations.integration_decorators import validate_is_connected

from aqueduct import globals
from aqueduct.artifacts.base_artifact import BaseArtifact
from aqueduct.artifacts.preview import preview_artifact
from aqueduct.artifacts.table_artifact import TableArtifact
Expand Down Expand Up @@ -46,6 +46,7 @@ def __init__(self, dag: DAG, metadata: IntegrationInfo):
self._dag = dag
self._metadata = metadata

@validate_is_connected()
def list_tables(self) -> pd.DataFrame:
"""
Lists the tables available in the RelationalDB integration.
Expand Down Expand Up @@ -80,6 +81,7 @@ def list_tables(self) -> pd.DataFrame:
sql_artifact = self.sql(query=list_tables_query)
return sql_artifact.get()

@validate_is_connected()
def table(self, name: str) -> pd.DataFrame:
"""
Retrieves a table from a RealtionalDB integration.
Expand All @@ -94,6 +96,7 @@ def table(self, name: str) -> pd.DataFrame:
sql_artifact = self.sql(query=GET_TABLE_QUERY % name)
return sql_artifact.get()

@validate_is_connected()
def sql(
self,
query: Union[str, List[str], RelationalDBExtractParams],
Expand Down Expand Up @@ -233,6 +236,7 @@ def sql(
# We are in lazy mode.
return TableArtifact(self._dag, sql_output_artifact_id)

@validate_is_connected()
def save(self, artifact: BaseArtifact, table_name: str, update_mode: LoadUpdateMode) -> None:
"""Registers a save operator of the given artifact, to be executed when it's computed in a published flow.
Expand Down Expand Up @@ -269,6 +273,12 @@ def describe(self) -> None:
print("==================== SQL Integration =============================")
print("Integration Information:")
self._metadata.describe()
print("Integration Table List Preview:")
print(self.list_tables()["name"].head().to_string())
print("(only first 5 tables are shown)")

# Only list the tables if the integration is connected.
try:
print("Integration Table List Preview:")
print(self.list_tables()["name"].head().to_string())
print("(only first 5 tables are shown)")
except:
pass

25 changes: 21 additions & 4 deletions sdk/aqueduct/models/integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, Dict, Optional

from aqueduct.constants.enums import ExecutionStatus
from aqueduct.error import IntegrationFailedToConnect, IntegrationConnectionInProgress
from aqueduct.models.execution_state import ExecutionState
from aqueduct.models.utils import human_readable_timestamp
from pydantic import BaseModel
Expand Down Expand Up @@ -78,11 +79,27 @@ def name(self) -> str:
def type(self) -> ServiceType:
return self._metadata.service

def _is_connected(self) -> bool:
"""Method used to determine if this integration was successfully connected to or not."""
def _validate_is_connected(self) -> None:
"""Method used to determine if this integration was successfully connected to or not.
If not successfully connected (or pending), we will raise an Exception.
"""
# TODO(ENG-2813): Remove the assumption that a missing `exec_state` means success.
if self._metadata.exec_state

if self._metadata.exec_state is None or self._metadata.exec_state.status == ExecutionStatus.SUCCEEDED:
return

if self._metadata.exec_state.status == ExecutionStatus.FAILED:
raise IntegrationFailedToConnect(
"The attempt to connect to %s has failed with error: %s\n%s\n\n "
"Please see the /integrations page on the UI for more details." % (
self.name(), self._metadata.exec_state.error.tip, self._metadata.exec_state.error.context,
)
)
else:
raise IntegrationConnectionInProgress(
"We are still in the process of connecting to integration %s."
"Please see the /integrations page oen the UI for more details."
% self.name()
)

def __hash__(self) -> int:
"""An integration is uniquely identified by its name.
Expand Down
30 changes: 30 additions & 0 deletions sdk/aqueduct/utils/integration_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from typing import Optional

from aqueduct.error import IntegrationFailedToConnect, IntegrationConnectionInProgress

from aqueduct.constants.enums import ExecutionStatus
from aqueduct.models.execution_state import ExecutionState


def validate_integration_is_connected(name: str, exec_state: Optional[ExecutionState]) -> None:
"""Method used to determine if this integration was successfully connected to or not.
If not successfully connected (or pending), we will raise an Exception.
"""
# TODO(ENG-2813): Remove the assumption that a missing `exec_state` means success.
if exec_state is None or exec_state.status == ExecutionStatus.SUCCEEDED:
return

if exec_state.status == ExecutionStatus.FAILED:
raise IntegrationFailedToConnect(
"Cannot use integration %s because it has not been successfully connected to:"
"%s\n%s\n\n Please see the /integrations page on the UI for more details." % (
name, exec_state.error.tip, exec_state.error.context,
)
)
else:
raise IntegrationConnectionInProgress(
"Cannot use integration %s because it is still in the process of connecting."
"Please see the /integrations page oen the UI for more details."
% name
)

0 comments on commit cebcdfd

Please sign in to comment.