Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebase code with latest main after the refactor and implement UI for ondemand GKE #1384

Merged
merged 11 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions sdk/aqueduct/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
from aqueduct.resources.ecr import ECRResource
from aqueduct.resources.gar import GARResource
from aqueduct.resources.google_sheets import GoogleSheetsResource
from aqueduct.resources.k8s import K8sResource
from aqueduct.resources.mongodb import MongoDBResource
from aqueduct.resources.parameters import USER_TAG_PATTERN
from aqueduct.resources.s3 import S3Resource
Expand Down Expand Up @@ -298,7 +297,7 @@ def delete_integration(
) -> None:
"""Deprecated. Use `client.delete_resource()` instead."""
logger().warning(
"client.delete_resource() will be deprecated soon. Use `client.delete_resource() instead."
"client.delete_integration() will be deprecated soon. Use `client.delete_resource() instead."
)
return self.delete_resource(name)

Expand Down Expand Up @@ -353,13 +352,13 @@ def integration(
GoogleSheetsResource,
RelationalDBResource,
AirflowResource,
K8sResource,
LambdaResource,
MongoDBResource,
DatabricksResource,
SparkResource,
AWSResource,
ECRResource,
DynamicK8sResource,
GARResource,
]:
"""Deprecated. Use `client.resource()` instead."""
Expand All @@ -376,13 +375,13 @@ def resource(
GoogleSheetsResource,
RelationalDBResource,
AirflowResource,
K8sResource,
LambdaResource,
MongoDBResource,
DatabricksResource,
SparkResource,
AWSResource,
ECRResource,
DynamicK8sResource,
GARResource,
]:
"""Retrieves a connected resource object.
Expand Down Expand Up @@ -442,7 +441,7 @@ def resource(
metadata=resource_info,
)
elif resource_info.service == ServiceType.K8S:
return K8sResource(
return DynamicK8sResource(
metadata=resource_info,
)
elif resource_info.service == ServiceType.LAMBDA:
Expand Down
5 changes: 5 additions & 0 deletions sdk/aqueduct/constants/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ class ServiceType(str, Enum, metaclass=MetaEnum):
GAR = "GAR"


class CloudProviderType(str, Enum, metaclass=MetaEnum):
AWS = "AWS"
GCP = "GCP"


class RelationalDBServices(str, Enum, metaclass=MetaEnum):
"""Must match the corresponding entries in `ServiceType` exactly."""

Expand Down
63 changes: 60 additions & 3 deletions sdk/aqueduct/resources/connect_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from enum import Enum
from typing import Any, Dict, List, Optional, Union, cast

from aqueduct.constants.enums import MetaEnum, NotificationLevel, ServiceType
from aqueduct.constants.enums import CloudProviderType, MetaEnum, NotificationLevel, ServiceType
from aqueduct.error import InternalAqueductError, InvalidUserArgumentException
from pydantic import BaseModel, Extra, Field

Expand Down Expand Up @@ -199,6 +199,13 @@ class AWSConfig(BaseConnectionConfig):
k8s: Optional[DynamicK8sConfig]


class GCPConfig(BaseConnectionConfig):
region: str
zone: str
service_account_key_path: str = ""
service_account_key: str = ""


class ECRConfig(BaseConnectionConfig):
# Either 1) all of access_key_id, secret_access_key, region, or 2) both config_file_path and
# config_file_profile need to be specified. Any other cases will be rejected by the server's
Expand Down Expand Up @@ -270,11 +277,33 @@ class DatabricksConfig(BaseConnectionConfig):


class K8sConfig(BaseConnectionConfig):
kubeconfig_path: str = ""
cluster_name: str = ""
use_same_cluster: str = "false"
cloud_provider: Optional[CloudProviderType]
gcp_config: Optional[GCPConfig]
cluster_config: Optional[DynamicK8sConfig]


class _K8sConfigWithSerializedConfig(BaseConnectionConfig):
kubeconfig_path: str
cluster_name: str
use_same_cluster: str = "false"
dynamic: str = "false"
cloud_resource_id: str = ""
cloud_provider: Optional[CloudProviderType]
gcp_config_serialized: Optional[str] # this is a json-serialized string of GCPConfig
# add fields from DynamicK8sConfig
keepalive: Optional[Union[str, int]]
cpu_node_type: Optional[str]
gpu_node_type: Optional[str]
min_cpu_node: Optional[Union[str, int]]
max_cpu_node: Optional[Union[str, int]]
min_gpu_node: Optional[Union[str, int]]
max_gpu_node: Optional[Union[str, int]]

# This converts all int fields to string during json serialization. We need to do this becasue our
# backend assumes all config fields must be string.
class Config:
json_encoders = {int: str}


ResourceConfig = Union[
Expand All @@ -300,6 +329,7 @@ class K8sConfig(BaseConnectionConfig):
DatabricksConfig,
K8sConfig,
CondaConfig,
_K8sConfigWithSerializedConfig,
]


Expand Down Expand Up @@ -367,6 +397,9 @@ def prepare_resource_config(
if service == ServiceType.AWS:
return _prepare_aws_config(cast(AWSConfig, config))

if service == ServiceType.K8S:
return _prepare_k8s_config(cast(K8sConfig, config))

if service == ServiceType.GAR:
return _prepare_gar_config(cast(GARConfig, config))

Expand Down Expand Up @@ -405,6 +438,30 @@ def _prepare_aws_config(config: AWSConfig) -> _AWSConfigWithSerializedConfig:
)


def _prepare_k8s_config(config: K8sConfig) -> _K8sConfigWithSerializedConfig:
if config.gcp_config is not None and config.gcp_config.service_account_key_path is not None:
with open(config.gcp_config.service_account_key_path, "r") as f:
config.gcp_config.service_account_key = f.read()

return _K8sConfigWithSerializedConfig(
kubeconfig_path=config.kubeconfig_path,
cluster_name=config.cluster_name,
use_same_cluster=config.use_same_cluster,
cloud_provider=config.cloud_provider,
gcp_config_serialized=(
None if config.gcp_config is None else config.gcp_config.json(exclude_none=True)
),
# add fields from DynamicK8sConfig
keepalive=config.cluster_config.keepalive if config.cluster_config else None,
cpu_node_type=config.cluster_config.cpu_node_type if config.cluster_config else None,
gpu_node_type=config.cluster_config.gpu_node_type if config.cluster_config else None,
min_cpu_node=config.cluster_config.min_cpu_node if config.cluster_config else None,
max_cpu_node=config.cluster_config.max_cpu_node if config.cluster_config else None,
min_gpu_node=config.cluster_config.min_gpu_node if config.cluster_config else None,
max_gpu_node=config.cluster_config.max_gpu_node if config.cluster_config else None,
)


def _prepare_gar_config(config: GARConfig) -> GARConfig:
if config.service_account_key_path is not None:
with open(config.service_account_key_path, "r") as f:
Expand Down
35 changes: 5 additions & 30 deletions src/golang/cmd/server/handler/cloud_resource_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func setupCloudResource(
}

terraformPath := filepath.Join(os.Getenv("HOME"), ".aqueduct", "server", "cloud_resource", args.Name, "eks")
if err = setupTerraformDirectory(terraformPath); err != nil {
if err = dynamic.SetupTerraformDirectory(dynamic.EKSTerraformTemplateDir, terraformPath); err != nil {
return http.StatusInternalServerError, errors.Wrap(err, "Unable to create Terraform directory.")
}

Expand All @@ -51,8 +51,8 @@ func setupCloudResource(

config := shared.DynamicK8sConfig{
Keepalive: strconv.Itoa(shared.K8sDefaultKeepalive),
CpuNodeType: shared.K8sDefaultCpuNodeType,
GpuNodeType: shared.K8sDefaultGpuNodeType,
CpuNodeType: shared.EKSDefaultCpuNodeType,
GpuNodeType: shared.EKSDefaultGpuNodeType,
MinCpuNode: strconv.Itoa(shared.K8sDefaultMinCpuNode),
MaxCpuNode: strconv.Itoa(shared.K8sDefaultMaxCpuNode),
MinGpuNode: strconv.Itoa(shared.K8sDefaultMinGpuNode),
Expand Down Expand Up @@ -117,33 +117,8 @@ func setupCloudResource(
return http.StatusOK, nil
}

// setupTerraformDirectory copies all files and folders in the Terraform template directory to the
// cloud resource's destination directory, which is ~/.aqueduct/server/cloud_resource/<name>/eks.
func setupTerraformDirectory(dst string) error {
// Create the destination directory if it doesn't exist.
if err := os.MkdirAll(dst, 0o755); err != nil {
return err
}

_, stdErr, err := lib_utils.RunCmd(
"cp",
[]string{
"-R", // we could have used -T to not create a directory if the source is a directory, but it's not supported on macOS
fmt.Sprintf("%s%s.", dynamic.TerraformTemplateDir, string(filepath.Separator)),
dst,
},
"",
false,
)
if err != nil {
return errors.New(stdErr)
}

return nil
}

// deleteCloudResourceHelper does the following:
// 1. Verifies that there is no workflow using the dynamic k8s resource.
// deleteCloudIntegrationHelper does the following:
// 1. Verifies that there is no workflow using the dynamic k8s integration.
// 2. Deletes the EKS cluster if it's running.
// 3. Deletes the cloud resource directory.
// 4. Deletes the Aqueduct-generated dynamic k8s resource.
Expand Down
33 changes: 32 additions & 1 deletion src/golang/cmd/server/handler/connect_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (h *ConnectResourceHandler) Perform(ctx context.Context, interfaceArgs inte
args.Name,
args.ID,
args.OrgID,
args.Config,
h.ResourceRepo,
h.Database,
)
Expand Down Expand Up @@ -643,7 +644,7 @@ func validateKubernetesConfig(
ctx context.Context,
config auth.Config,
) (int, error) {
if err := engine.AuthenticateK8sConfig(ctx, config); err != nil {
if err := engine.AuthenticateAndUpdateK8sConfig(ctx, config); err != nil {
return http.StatusBadRequest, err
}

Expand Down Expand Up @@ -738,6 +739,7 @@ func ValidatePrerequisites(
name string,
userID uuid.UUID,
orgID string,
conf auth.Config,
resourceRepo repos.Resource,
DB database.Database,
) (int, error) {
Expand Down Expand Up @@ -840,6 +842,35 @@ func ValidatePrerequisites(
}
}

// For on-demand GKE resource, we require the user to have Terraform, gcloud, and gcloud gke-gcloud-auth-plugin plugin installed.
if svc == shared.Kubernetes {
// Parse the config and see if cloud provider is GCP
k8sConf, err := lib_utils.ParseK8sConfig(conf)
if err != nil {
return http.StatusInternalServerError, errors.Wrap(err, "Unable to parse Kubernetes configuration.")
}

if k8sConf.CloudProvider == shared.GCPProvider {
if _, _, err := lib_utils.RunCmd("terraform", []string{"--version"}, "", false); err != nil {
return http.StatusNotFound, errors.Wrap(err, "terraform executable not found. Please go to https://developer.hashicorp.com/terraform/downloads to install terraform")
}

_, _, err := lib_utils.RunCmd("gcloud", []string{"--version"}, "", false)
if err != nil {
return http.StatusNotFound, errors.Wrap(err, "gcloud executable not found. Please go to https://cloud.google.com/sdk/docs/install to install gcloud")
}

componentsOutput, _, err := lib_utils.RunCmd("gcloud", []string{"components", "list"}, "", false)
if err != nil {
return http.StatusUnprocessableEntity, errors.Wrap(err, "Error listing gcloud components")
}

if !strings.Contains(componentsOutput, "gke-gcloud-auth-plugin") {
return http.StatusUnprocessableEntity, errors.New("gke-gcloud-auth-plugin is not installed. Please run `gcloud components install gke-gcloud-auth-plugin` to install it.")
}
}
}

// For GAR integration, we require the user to have gcloud installed.
if svc == shared.GAR {
_, _, err := lib_utils.RunCmd("gcloud", []string{"--version"}, "", false)
Expand Down
32 changes: 32 additions & 0 deletions src/golang/cmd/server/handler/delete_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/aqueducthq/aqueduct/lib/database"
aq_errors "github.com/aqueducthq/aqueduct/lib/errors"
exec_env "github.com/aqueducthq/aqueduct/lib/execution_environment"
"github.com/aqueducthq/aqueduct/lib/lib_utils"
"github.com/aqueducthq/aqueduct/lib/models"
"github.com/aqueducthq/aqueduct/lib/models/shared"
"github.com/aqueducthq/aqueduct/lib/repos"
Expand Down Expand Up @@ -138,6 +139,37 @@ func (h *DeleteResourceHandler) Perform(ctx context.Context, interfaceArgs inter
}
}

if args.resourceObject.Service == shared.Kubernetes {
if args.resourceObject.Config[shared.K8sCloudProviderKey] == string(shared.GCPProvider) {
// Delete the ondemand cluster
editDynamicEngineArgs := &editDynamicEngineArgs{
AqContext: args.AqContext,
action: forceDeleteAction,
resourceID: args.resourceObject.ID,
configDelta: &shared.DynamicK8sConfig{},
}
_, statusCode, err := (&EditDynamicEngineHandler{
Database: h.Database,
ResourceRepo: h.ResourceRepo,
}).Perform(ctx, editDynamicEngineArgs)
if err != nil {
return emptyResp, statusCode, errors.Wrap(err, "Failed to delete the dynamic k8s cluster.")
}

// Clean up the ondemand k8s directory
_, stdErr, err := lib_utils.RunCmd("rm", []string{
"-rf",
args.resourceObject.Config[shared.K8sTerraformPathKey],
},
"",
false,
)
if err != nil {
return emptyResp, http.StatusInternalServerError, errors.New(stdErr)
}
}
}

txn, err := h.Database.BeginTx(ctx)
if err != nil {
return emptyResp, http.StatusInternalServerError, errors.Wrap(err, "Unable to delete resource.")
Expand Down
Loading