-
Notifications
You must be signed in to change notification settings - Fork 19
[SDK] Fail when using any integrations that aren't connected yet #1223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
from typing import Any, Callable | ||
|
||
from aqueduct.utils.integration_validation import validate_integration_is_connected | ||
|
||
AnyFunc = Callable[..., Any] | ||
|
||
|
||
def validate_is_connected() -> Callable[[AnyFunc], AnyFunc]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer exposing this as a method in integration base class and have all methods use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm I don't think I can use the There's also a bit of a layering issue, where I'd have to move the What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah the current way looks fine then! |
||
"""This decorator, when used on an Integration class method, ensures that the integration is | ||
connected before allowing the method to be called.""" | ||
|
||
def decorator(method: AnyFunc) -> Callable[[AnyFunc], AnyFunc]: | ||
def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any: | ||
validate_integration_is_connected(self.name(), self._metadata.exec_state) | ||
return method(*args, **kwargs) | ||
|
||
return wrapper | ||
|
||
return decorator |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
from typing import Optional | ||
|
||
from aqueduct.constants.enums import ExecutionStatus | ||
from aqueduct.error import IntegrationConnectionInProgress, IntegrationFailedToConnect | ||
from aqueduct.models.execution_state import ExecutionState | ||
|
||
|
||
def validate_integration_is_connected(name: str, exec_state: Optional[ExecutionState]) -> None: | ||
"""Method used to determine if this integration was successfully connected to or not. | ||
If not successfully connected (or pending), we will raise an Exception. | ||
""" | ||
# TODO(ENG-2813): Remove the assumption that a missing `exec_state` means success. | ||
if exec_state is None or exec_state.status == ExecutionStatus.SUCCEEDED: | ||
return | ||
|
||
if exec_state.status == ExecutionStatus.FAILED: | ||
assert exec_state.error is not None | ||
raise IntegrationFailedToConnect( | ||
"Cannot use integration %s because it has not been successfully connected to: " | ||
"%s\n%s\n\n Please see the /integrations page on the UI for more details." | ||
% ( | ||
name, | ||
exec_state.error.tip, | ||
exec_state.error.context, | ||
) | ||
) | ||
else: | ||
# The assumption is that we are in the running state here. | ||
raise IntegrationConnectionInProgress( | ||
"Cannot use integration %s because it is still in the process of connecting." | ||
"Please see the /integrations page on the UI for more details." % name | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,7 +78,11 @@ func CreateLambdaFunction(ctx context.Context, functionsToShip []LambdaFunctionT | |
defer close(pushImageChannel) | ||
AddFunctionTypeToChannel(functionsToShip[:], pullImageChannel) | ||
|
||
for i := 0; i < MaxConcurrentDownload; i++ { | ||
numPullWorkers := MaxConcurrentDownload | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what are these for? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We pull the lambda Docker images from Dockerhub in parallel, this is making sure that the number of workers isn't greater than necessary i believe. @kenxu95 can confirm here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah that's what it's doing. It's just leftover from when I was trying out the lambda stuff, but seemed useful. |
||
if numPullWorkers > len(functionsToShip) { | ||
numPullWorkers = len(functionsToShip) | ||
} | ||
for i := 0; i < numPullWorkers; i++ { | ||
errGroup.Go(func() error { | ||
for { | ||
select { | ||
|
@@ -105,7 +109,11 @@ func CreateLambdaFunction(ctx context.Context, functionsToShip []LambdaFunctionT | |
AddFunctionTypeToChannel(functionsToShip[:], incompleteWorkChannel) | ||
|
||
// Receive the downloaded docker images from push channels and create lambda functions on a concurrency of "MaxConcurrentUpload". | ||
for i := 0; i < MaxConcurrentUpload; i++ { | ||
numPushWorkers := MaxConcurrentUpload | ||
if numPushWorkers > len(functionsToShip) { | ||
numPushWorkers = len(functionsToShip) | ||
} | ||
for i := 0; i < numPushWorkers; i++ { | ||
errGroup.Go(func() error { | ||
for { | ||
select { | ||
|
@@ -245,6 +253,8 @@ func PushImageToPrivateECR(functionType LambdaFunctionType, roleArn string) erro | |
repositoryUri := fmt.Sprintf("%s:%s", *result.Repository.RepositoryUri, lib.ServerVersionNumber) | ||
|
||
cmd := exec.Command("docker", "tag", versionedLambdaImageUri, repositoryUri) | ||
cmd.Stdout = stdout | ||
cmd.Stderr = stderr | ||
err = cmd.Run() | ||
if err != nil { | ||
log.Info(stdout.String()) | ||
|
@@ -253,6 +263,8 @@ func PushImageToPrivateECR(functionType LambdaFunctionType, roleArn string) erro | |
} | ||
|
||
cmd = exec.Command("docker", "push", repositoryUri) | ||
cmd.Stdout = stdout | ||
cmd.Stderr = stderr | ||
err = cmd.Run() | ||
if err != nil { | ||
log.Info(stdout.String()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-functional change. It just looks better to have the two subgraph statements next to each other.