Skip to content

[SDK] Fail when using any integrations that aren't connected yet #1223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions sdk/aqueduct/artifacts/preview.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ def preview_artifacts(
Returns a list of artifacts, each corresponding to one of the provided `target_artifact_ids`, in
the same order.
"""
global_engine_config: Optional[EngineConfig] = None
if globals.__GLOBAL_CONFIG__.engine is not None:
global_engine_config = generate_engine_config(
globals.__GLOBAL_API_CLIENT__.list_integrations(),
globals.__GLOBAL_CONFIG__.engine,
)

subgraph = apply_deltas_to_dag(
dag,
deltas=[
Expand All @@ -50,13 +57,6 @@ def preview_artifacts(
],
make_copy=True,
)

global_engine_config: Optional[EngineConfig] = None
if globals.__GLOBAL_CONFIG__.engine is not None:
global_engine_config = generate_engine_config(
globals.__GLOBAL_API_CLIENT__.list_integrations(),
globals.__GLOBAL_CONFIG__.engine,
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-functional change. It just looks better to have the two subgraph statements next to each other.

subgraph.set_engine_config(global_engine_config=global_engine_config)

engine_statuses = globals.__GLOBAL_API_CLIENT__.get_dynamic_engine_status_by_dag(dag=subgraph)
Expand Down
10 changes: 10 additions & 0 deletions sdk/aqueduct/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,13 @@ class ClientValidationError(Error):
# Exception raised when requirements.txt file is missing
class RequirementsMissingError(Error):
pass


# Exception raised when the user attempts to use an integration that has failed to connected.
class IntegrationFailedToConnect(Error):
pass


# Exception raised when the user attempts to use an integration that is still connecting.
class IntegrationConnectionInProgress(Error):
pass
5 changes: 5 additions & 0 deletions sdk/aqueduct/integrations/dynamic_k8s_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from aqueduct.constants.enums import K8sClusterActionType, K8sClusterStatusType
from aqueduct.error import InvalidIntegrationException, InvalidUserArgumentException
from aqueduct.integrations.connect_config import DynamicK8sConfig
from aqueduct.integrations.validation import validate_is_connected
from aqueduct.models.integration import Integration, IntegrationInfo
from aqueduct.models.response_models import DynamicEngineStatusResponse

Expand Down Expand Up @@ -41,6 +42,7 @@ class DynamicK8sIntegration(Integration):
def __init__(self, metadata: IntegrationInfo):
self._metadata = metadata

@validate_is_connected()
def status(self) -> str:
"""Get the current status of the dynamic Kubernetes cluster."""
engine_statuses = globals.__GLOBAL_API_CLIENT__.get_dynamic_engine_status(
Expand All @@ -51,6 +53,7 @@ def status(self) -> str:

return engine_statuses[self._metadata.name].status.value

@validate_is_connected()
def create(
self, config_delta: Union[Dict[str, Union[int, str]], DynamicK8sConfig] = {}
) -> None:
Expand Down Expand Up @@ -93,6 +96,7 @@ def create(
config_delta=config_delta,
)

@validate_is_connected()
def update(self, config_delta: Union[Dict[str, Union[int, str]], DynamicK8sConfig]) -> None:
"""Update the dynamic Kubernetes cluster. This can only be done when the cluster is in
Active status.
Expand Down Expand Up @@ -135,6 +139,7 @@ def update(self, config_delta: Union[Dict[str, Union[int, str]], DynamicK8sConfi
config_delta=config_delta,
)

@validate_is_connected()
def delete(self, force: bool = False) -> None:
"""Deletes the dynamic Kubernetes cluster if it is running, ignoring the keepalive period.

Expand Down
3 changes: 3 additions & 0 deletions sdk/aqueduct/integrations/google_sheets_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from aqueduct.artifacts.base_artifact import BaseArtifact
from aqueduct.artifacts.table_artifact import TableArtifact
from aqueduct.constants.enums import ArtifactType, GoogleSheetsSaveMode
from aqueduct.integrations.validation import validate_is_connected
from aqueduct.models.artifact import ArtifactMetadata
from aqueduct.models.dag import DAG
from aqueduct.models.integration import Integration, IntegrationInfo
Expand All @@ -29,6 +30,7 @@ def __init__(self, dag: DAG, metadata: IntegrationInfo):
self._dag = dag
self._metadata = metadata

@validate_is_connected()
def spreadsheet(
self,
spreadsheet_id: str,
Expand Down Expand Up @@ -96,6 +98,7 @@ def spreadsheet(
artifact_id=output_artifact_id,
)

@validate_is_connected()
def save(
self,
artifact: BaseArtifact,
Expand Down
5 changes: 5 additions & 0 deletions sdk/aqueduct/integrations/mongodb_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from aqueduct.constants.enums import ArtifactType, ExecutionMode, LoadUpdateMode
from aqueduct.integrations.parameters import _validate_parameters
from aqueduct.integrations.save import _save_artifact
from aqueduct.integrations.validation import validate_is_connected
from aqueduct.models.artifact import ArtifactMetadata
from aqueduct.models.dag import DAG
from aqueduct.models.integration import Integration, IntegrationInfo
Expand All @@ -33,6 +34,7 @@ def __init__(self, dag: DAG, metadata: IntegrationInfo, collection_name: str) ->
self._dag = dag
self._collection_name = collection_name

@validate_is_connected()
def find(
self,
*args: List[Any],
Expand Down Expand Up @@ -156,6 +158,7 @@ def find(
# We are in lazy mode.
return TableArtifact(self._dag, output_artf_id)

@validate_is_connected()
def save(self, artifact: BaseArtifact, update_mode: LoadUpdateMode) -> None:
"""Registers a save operator of the given artifact, to be executed when it's computed in a published flow.

Expand Down Expand Up @@ -188,6 +191,7 @@ def __init__(self, dag: DAG, metadata: IntegrationInfo):
self._dag = dag
self._metadata = metadata

@validate_is_connected()
def collection(self, name: str) -> MongoDBCollectionIntegration:
"""Returns a specific collection object to call `.find()` method.

Expand All @@ -203,6 +207,7 @@ def describe(self) -> None:
print("==================== MongoDB Integration =============================")
self._metadata.describe()

@validate_is_connected()
def save(self, artifact: BaseArtifact, collection: str, update_mode: LoadUpdateMode) -> None:
"""Registers a save operator of the given artifact, to be executed when it's computed in a published flow.

Expand Down
3 changes: 3 additions & 0 deletions sdk/aqueduct/integrations/s3_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from aqueduct.artifacts import preview as artifact_utils
from aqueduct.artifacts.base_artifact import BaseArtifact
from aqueduct.constants.enums import ArtifactType, ExecutionMode, S3TableFormat
from aqueduct.integrations.validation import validate_is_connected
from aqueduct.models.artifact import ArtifactMetadata
from aqueduct.models.dag import DAG
from aqueduct.models.integration import Integration, IntegrationInfo
Expand Down Expand Up @@ -51,6 +52,7 @@ def __init__(self, dag: DAG, metadata: IntegrationInfo):
self._dag = dag
self._metadata = metadata

@validate_is_connected()
def file(
self,
filepaths: Union[List[str], str],
Expand Down Expand Up @@ -171,6 +173,7 @@ def _is_multi_file_search() -> bool:
# We are in lazy mode.
return to_artifact_class(self._dag, output_artifact_id, artifact_type)

@validate_is_connected()
def save(self, artifact: BaseArtifact, filepath: str, format: Optional[str] = None) -> None:
"""Registers a save operator of the given artifact, to be executed when it's computed in a published flow.

Expand Down
4 changes: 4 additions & 0 deletions sdk/aqueduct/integrations/salesforce_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from aqueduct.artifacts.base_artifact import BaseArtifact
from aqueduct.artifacts.table_artifact import TableArtifact
from aqueduct.constants.enums import ArtifactType, SalesforceExtractType
from aqueduct.integrations.validation import validate_is_connected
from aqueduct.models.artifact import ArtifactMetadata
from aqueduct.models.dag import DAG
from aqueduct.models.integration import Integration, IntegrationInfo
Expand All @@ -30,6 +31,7 @@ def __init__(self, dag: DAG, metadata: IntegrationInfo):
self._dag = dag
self._metadata = metadata

@validate_is_connected()
def search(
self,
search_query: str,
Expand Down Expand Up @@ -69,6 +71,7 @@ def search(
artifact_id=output_artifact_id,
)

@validate_is_connected()
def query(
self,
query: str,
Expand Down Expand Up @@ -102,6 +105,7 @@ def query(
artifact_id=output_artifact_id,
)

@validate_is_connected()
def save(self, artifact: BaseArtifact, object: str) -> None:
"""Registers a save operator of the given artifact, to be executed when it's computed in a published flow.

Expand Down
18 changes: 14 additions & 4 deletions sdk/aqueduct/integrations/sql_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from aqueduct.error import InvalidUserActionException, InvalidUserArgumentException
from aqueduct.integrations.parameters import _validate_builtin_expansions, _validate_parameters
from aqueduct.integrations.save import _save_artifact
from aqueduct.integrations.validation import validate_is_connected
from aqueduct.models.artifact import ArtifactMetadata
from aqueduct.models.dag import DAG
from aqueduct.models.integration import Integration, IntegrationInfo
Expand Down Expand Up @@ -46,6 +47,7 @@ def __init__(self, dag: DAG, metadata: IntegrationInfo):
self._dag = dag
self._metadata = metadata

@validate_is_connected()
def list_tables(self) -> pd.DataFrame:
"""
Lists the tables available in the RelationalDB integration.
Expand Down Expand Up @@ -80,9 +82,10 @@ def list_tables(self) -> pd.DataFrame:
sql_artifact = self.sql(query=list_tables_query)
return sql_artifact.get()

@validate_is_connected()
def table(self, name: str) -> pd.DataFrame:
"""
Retrieves a table from a RealtionalDB integration.
Retrieves a table from a RelationalDB integration.

Args:
name:
Expand All @@ -94,6 +97,7 @@ def table(self, name: str) -> pd.DataFrame:
sql_artifact = self.sql(query=GET_TABLE_QUERY % name)
return sql_artifact.get()

@validate_is_connected()
def sql(
self,
query: Union[str, List[str], RelationalDBExtractParams],
Expand Down Expand Up @@ -233,6 +237,7 @@ def sql(
# We are in lazy mode.
return TableArtifact(self._dag, sql_output_artifact_id)

@validate_is_connected()
def save(self, artifact: BaseArtifact, table_name: str, update_mode: LoadUpdateMode) -> None:
"""Registers a save operator of the given artifact, to be executed when it's computed in a published flow.

Expand Down Expand Up @@ -269,6 +274,11 @@ def describe(self) -> None:
print("==================== SQL Integration =============================")
print("Integration Information:")
self._metadata.describe()
print("Integration Table List Preview:")
print(self.list_tables()["name"].head().to_string())
print("(only first 5 tables are shown)")

# Only list the tables if the integration is connected.
try:
print("Integration Table List Preview:")
print(self.list_tables()["name"].head().to_string())
print("(only first 5 tables are shown)")
except:
pass
19 changes: 19 additions & 0 deletions sdk/aqueduct/integrations/validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from typing import Any, Callable

from aqueduct.utils.integration_validation import validate_integration_is_connected

AnyFunc = Callable[..., Any]


def validate_is_connected() -> Callable[[AnyFunc], AnyFunc]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer exposing this as a method in integration base class and have all methods use @self.validate_is_connected(). In this way the method is tied to the integration and it's clear that we are ensuring 'this' integration is connected. It also avoids passing around self to an externally imported method (I'm not super sure if it's an antipattern but it feels weird to read :) )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I don't think I can use the @self.validate_is_connected() syntax.. My intellij errors and also https://stackoverflow.com/questions/11731136/class-method-decorator-with-self-arguments#:~:text=You%20can't.,use%20a%20different%20method%20entirely.

There's also a bit of a layering issue, where I'd have to move the validate_integration_is_connected() down to the models/ layer from utils/, which I feel doesn't make too much sense. This is all because the integration classes are defined in integrations/ and the base integration class is defined at a much lower level in models. I think these layering issues in the SDK should be addressed, but probably not in this PR.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah the current way looks fine then!

"""This decorator, which must be used on an Integration class method,
ensures that the integration is connected before allowing the method to be called."""

def decorator(method: AnyFunc) -> Callable[[AnyFunc], AnyFunc]:
def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
validate_integration_is_connected(self.name(), self._metadata.exec_state)
return method(self, *args, **kwargs)

return wrapper

return decorator
32 changes: 32 additions & 0 deletions sdk/aqueduct/utils/integration_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import Optional

from aqueduct.constants.enums import ExecutionStatus
from aqueduct.error import IntegrationConnectionInProgress, IntegrationFailedToConnect
from aqueduct.models.execution_state import ExecutionState


def validate_integration_is_connected(name: str, exec_state: Optional[ExecutionState]) -> None:
"""Method used to determine if this integration was successfully connected to or not.
If not successfully connected (or pending), we will raise an Exception.
"""
# TODO(ENG-2813): Remove the assumption that a missing `exec_state` means success.
if exec_state is None or exec_state.status == ExecutionStatus.SUCCEEDED:
return

if exec_state.status == ExecutionStatus.FAILED:
assert exec_state.error is not None
raise IntegrationFailedToConnect(
"Cannot use integration %s because it has not been successfully connected to: "
"%s\n%s\n\n Please see the /integrations page on the UI for more details."
% (
name,
exec_state.error.tip,
exec_state.error.context,
)
)
else:
# The assumption is that we are in the running state here.
raise IntegrationConnectionInProgress(
"Cannot use integration %s because it is still in the process of connecting."
"Please see the /integrations page on the UI for more details." % name
)
3 changes: 3 additions & 0 deletions sdk/aqueduct/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from aqueduct.models.dag import Schedule
from aqueduct.models.integration import IntegrationInfo
from aqueduct.models.operators import ParamSpec
from aqueduct.utils.integration_validation import validate_integration_is_connected
from croniter import croniter

from ..models.execution_state import Logs
Expand Down Expand Up @@ -160,6 +161,8 @@ def generate_engine_config(
)

integration = integrations[integration_name]
validate_integration_is_connected(integration_name, integration.exec_state)

if integration.service == ServiceType.AIRFLOW:
return EngineConfig(
type=RuntimeType.AIRFLOW,
Expand Down
16 changes: 14 additions & 2 deletions src/golang/lib/lambda/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ func CreateLambdaFunction(ctx context.Context, functionsToShip []LambdaFunctionT
defer close(pushImageChannel)
AddFunctionTypeToChannel(functionsToShip[:], pullImageChannel)

for i := 0; i < MaxConcurrentDownload; i++ {
numPullWorkers := MaxConcurrentDownload
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are these for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We pull the lambda Docker images from Dockerhub in parallel, this is making sure that the number of workers isn't greater than necessary i believe. @kenxu95 can confirm here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that's what it's doing. It's just leftover from when I was trying out the lambda stuff, but seemed useful.

if numPullWorkers > len(functionsToShip) {
numPullWorkers = len(functionsToShip)
}
for i := 0; i < numPullWorkers; i++ {
errGroup.Go(func() error {
for {
select {
Expand All @@ -105,7 +109,11 @@ func CreateLambdaFunction(ctx context.Context, functionsToShip []LambdaFunctionT
AddFunctionTypeToChannel(functionsToShip[:], incompleteWorkChannel)

// Receive the downloaded docker images from push channels and create lambda functions on a concurrency of "MaxConcurrentUpload".
for i := 0; i < MaxConcurrentUpload; i++ {
numPushWorkers := MaxConcurrentUpload
if numPushWorkers > len(functionsToShip) {
numPushWorkers = len(functionsToShip)
}
for i := 0; i < numPushWorkers; i++ {
errGroup.Go(func() error {
for {
select {
Expand Down Expand Up @@ -245,6 +253,8 @@ func PushImageToPrivateECR(functionType LambdaFunctionType, roleArn string) erro
repositoryUri := fmt.Sprintf("%s:%s", *result.Repository.RepositoryUri, lib.ServerVersionNumber)

cmd := exec.Command("docker", "tag", versionedLambdaImageUri, repositoryUri)
cmd.Stdout = stdout
cmd.Stderr = stderr
err = cmd.Run()
if err != nil {
log.Info(stdout.String())
Expand All @@ -253,6 +263,8 @@ func PushImageToPrivateECR(functionType LambdaFunctionType, roleArn string) erro
}

cmd = exec.Command("docker", "push", repositoryUri)
cmd.Stdout = stdout
cmd.Stderr = stderr
err = cmd.Run()
if err != nil {
log.Info(stdout.String())
Expand Down