Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for subfolders for aws s3 #1191

Merged
merged 11 commits into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion integration_tests/sdk/shared/flow_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,12 @@ def stop_condition(
expect_status_strs = [status.value for status in expected_statuses]
if statuses != expect_status_strs:
for i, status in enumerate(statuses):
if status != expect_status_strs[i] and status == ExecutionStatus.FAILED:
if i >= len(expect_status_strs):
print("Unexpected additional workflow run:")
unexpected_flow_run = flow.fetch(flow_runs[i]["run_id"])
unexpected_flow_run.describe()
elif status != expect_status_strs[i] and status == ExecutionStatus.FAILED:
print("Failed workflow run:")
failed_flow_run = flow.fetch(flow_runs[i]["run_id"])
failed_flow_run.describe()

Expand Down
2 changes: 1 addition & 1 deletion sdk/aqueduct/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import logging
import os
import platform
import uuid
import warnings
import platform
from collections import defaultdict
from typing import Any, DefaultDict, Dict, List, Optional, Union

Expand Down
4 changes: 4 additions & 0 deletions sdk/aqueduct/integrations/connect_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ class S3Config(BaseConnectionConfig):
bucket: str
region: str

# When connecting a new integration, we allow both leading or trailing slashes here.
# The path will be sanitized before being stored in the database.
root_dir: str = ""

use_as_storage: str = "false"


Expand Down
17 changes: 14 additions & 3 deletions src/golang/cmd/server/handler/connect_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,21 @@ func (h *ConnectIntegrationHandler) Prepare(r *http.Request) (interface{}, int,
return nil, http.StatusBadRequest, errors.Wrap(err, "Error getting server's home directory path")
}

config := auth.NewStaticConfig(configMap)
// Sanitize the root directory path for S3. We remove any leading slash, but force there to always
// be a trailing slash. eg: `path/to/root/`.
if service == shared.S3 {
if root_dir, ok := configMap["root_dir"]; ok && root_dir != "" {
if root_dir[len(root_dir)-1] != '/' {
root_dir += "/"
}
configMap["root_dir"] = strings.TrimLeft(root_dir, "/")
}
}

staticConfig := auth.NewStaticConfig(configMap)

// Check if this integration should be used as the new storage layer
setStorage, err := checkIntegrationSetStorage(service, config)
setStorage, err := checkIntegrationSetStorage(service, staticConfig)
if err != nil {
return nil, http.StatusBadRequest, errors.Wrap(err, "Unable to connect integration.")
}
Expand All @@ -138,7 +149,7 @@ func (h *ConnectIntegrationHandler) Prepare(r *http.Request) (interface{}, int,
AqContext: aqContext,
Service: service,
Name: name,
Config: config,
Config: staticConfig,
UserOnly: userOnly,
SetAsStorage: setStorage,
}, http.StatusOK, nil
Expand Down
1 change: 1 addition & 0 deletions src/golang/lib/models/shared/integration_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type S3IntegrationConfig struct {
Type S3ConfigType `json:"type"`
Bucket string `json:"bucket"`
Region string `json:"region"`
RootDir string `json:"root_dir"`
AccessKeyId string `json:"access_key_id"`
SecretAccessKey string `json:"secret_access_key"`
ConfigFilePath string `json:"config_file_path"`
Expand Down
18 changes: 14 additions & 4 deletions src/golang/lib/models/shared/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@ type StorageConfigPublic struct {
}

type S3Config struct {
Region string `yaml:"region" json:"region"`
Bucket string `yaml:"bucket" json:"bucket"`
Region string `yaml:"region" json:"region"`
Bucket string `yaml:"bucket" json:"bucket"`

// Use this directory in the bucket as the root. If not set, we default to the root of the bucket.
// Expected to be santizied into the format "path/to/dir/" (without a leading slash, but with a trailing one).
RootDir string `yaml:"root_dir" json:"root_dir"`

CredentialsPath string `yaml:"credentialsPath" json:"credentials_path"`
CredentialsProfile string `yaml:"credentialsProfile" json:"credentials_profile"`
AWSAccessKeyID string `yaml:"awsAccessKeyId" json:"aws_access_key_id"`
Expand All @@ -44,6 +49,10 @@ type S3Config struct {
type S3ConfigPublic struct {
Region string `yaml:"region" json:"region"`
Bucket string `yaml:"bucket" json:"bucket"`

// Use this directory in the bucket as the root. If not set, we default to the root of the bucket.
// Expected to be santizied into the format "path/to/dir/" (without a leading slash, but with a trailing one).
RootDir string `yaml:"root_dir" json:"root_dir"`
}

type FileConfig struct {
Expand Down Expand Up @@ -77,8 +86,9 @@ func (s *StorageConfig) ToPublic() (*StorageConfigPublic, error) {
storageConfigPublic.FileConfig = s.FileConfig
case S3StorageType:
storageConfigPublic.S3ConfigPublic = &S3ConfigPublic{
Region: s.S3Config.Region,
Bucket: s.S3Config.Bucket,
Region: s.S3Config.Region,
Bucket: s.S3Config.Bucket,
RootDir: s.S3Config.RootDir,
}
case GCSStorageType:
storageConfigPublic.GCSConfigPublic = &GCSConfigPublic{
Expand Down
5 changes: 3 additions & 2 deletions src/golang/lib/storage/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ func convertS3IntegrationtoStorageConfig(c *shared.S3IntegrationConfig) (*shared
storageConfig := &shared.StorageConfig{
Type: shared.S3StorageType,
S3Config: &shared.S3Config{
Bucket: fmt.Sprintf("s3://%s", c.Bucket),
Region: c.Region,
Bucket: fmt.Sprintf("s3://%s", c.Bucket),
Region: c.Region,
RootDir: c.RootDir,
kenxu95 marked this conversation as resolved.
Show resolved Hide resolved
},
}
switch c.Type {
Expand Down
18 changes: 10 additions & 8 deletions src/golang/lib/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/dropbox/godropbox/errors"
)

type s3Storage struct {
Expand All @@ -26,20 +27,21 @@ func newS3Storage(s3Config *shared.S3Config) *s3Storage {
}
}

// parseBucketAndKey takes the bucket in the form of s3://bucket/path
// and a key and parses the bucket name and the key.
// parseBucketAndKey returns the bucket name and resolves the supplied key to the
// full path in the bucket.
func (s *s3Storage) parseBucketAndKey(key string) (string, string, error) {
u, err := url.Parse(s.s3Config.Bucket)
if err != nil {
return "", "", err
}

bucket := u.Host

u.Path = strings.TrimLeft(u.Path, "/")
key = path.Join(u.Path, key)

return bucket, key, nil
dirPath := strings.TrimLeft(u.Path, "/")
if s.s3Config.RootDir != "" {
dirPath = path.Join(dirPath, s.s3Config.RootDir)
}
full_key := path.Join(dirPath, key)
return bucket, full_key, nil
}

func (s *s3Storage) Get(ctx context.Context, key string) ([]byte, error) {
Expand All @@ -63,7 +65,7 @@ func (s *s3Storage) Get(ctx context.Context, key string) ([]byte, error) {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeNoSuchKey:
return nil, ErrObjectDoesNotExist()
return nil, errors.Wrapf(ErrObjectDoesNotExist(), "Unable to fetch key `%s` from bucket `%s`.", key, bucket)
default:
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions src/golang/lib/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/aqueducthq/aqueduct/lib/models/shared"
)

// NOTE: Callers that use ErrObjectDoesNotExist need to wrap this error with more detail about what
// path and storage integration is being accessed.
func ErrObjectDoesNotExist() error {
return errors.New("Object does not exist in storage.")
}
Expand Down
13 changes: 3 additions & 10 deletions src/golang/lib/vault/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,9 @@ const (
)

func newS3Vault(s3StoreConf shared.S3Config, key string) Vault {
// The S3 vault stores secrets under the ../vault path
// The S3 bucket is in the form of s3:// so we can't use path.Join, because
// it will clean the final filepath and change the prefix to s3:/
bucket := s3StoreConf.Bucket
if len(bucket) > 0 && bucket[len(bucket)-1] == '/' {
bucket += s3VaultDir
} else {
bucket += "/" + s3VaultDir
}
s3StoreConf.Bucket = bucket
// The S3 vault stores secrets under the [root_dir]/vault path
// NOTE: The existing root directory is expected to always end with a slash.
s3StoreConf.RootDir += s3VaultDir + "/"

store := storage.NewStorage(&shared.StorageConfig{
Type: shared.S3StorageType,
Expand Down
4 changes: 2 additions & 2 deletions src/golang/lib/workflow/utils/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ func MigrateVault(

val, err := oldVault.Get(ctx, key)
if err != nil {
log.Errorf("Unable to get integration credentials %v from old vault: %v", integrationDB.ID, err)
log.Errorf("Unable to get integration credentials %v from old vault at path %s: %v", integrationDB.ID, key, err)
return nil, err
}

if err := newVault.Put(ctx, key, val); err != nil {
log.Errorf("Unable to write integration credentials %v to new vault: %v", integrationDB.ID, err)
log.Errorf("Unable to write integration credentials %v to new vault at path %s: %v", integrationDB.ID, key, err)
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion src/golang/lib/workflow/utils/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func ReadFromStorage(ctx context.Context, storageConfig *shared.StorageConfig, p
// Read data from storage and deserialize payload to `container`
serializedPayload, err := storage.NewStorage(storageConfig).Get(ctx, path)
if err != nil {
return errors.Wrap(err, "Unable to get object from storage")
return errors.Wrapf(err, "Unable to get object from %s storage at path %s.", storageConfig.Type, path)
}

err = json.Unmarshal(serializedPayload, container)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ class S3Config(models.BaseConfig):
config_file_profile: str = ""

bucket: str = ""

region: str = ""

# This is unused for data integrations. It is only used for storage.
root_dir: str = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used in data connection? I thought it's only relevant for S3 used as storage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has to be here because the model forbids extra fields. I'll add a note that it's actually unused.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reminder to add the note.


use_as_storage: str = ""


Expand Down
10 changes: 10 additions & 0 deletions src/python/aqueduct_executor/operators/connectors/data/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(self, config: S3Config):
session = construct_boto_session(config)
self.s3 = session.resource("s3")
self.bucket = config.bucket
self.root_dir = config.root_dir

def authenticate(self) -> None:
try:
Expand All @@ -37,6 +38,15 @@ def authenticate(self) -> None:
% str(e)
)

# Check that any user-supplied root directory exists.
if self.root_dir != "":
# If nothing is returned by this filter call, then the directory does not exist.
if len(list(self.s3.Bucket(self.bucket).objects.filter(Prefix=self.root_dir))) == 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's a lot of objects in this root directory, this call may take very long to finish. I wonder if there's a lower overhead way of checking if a s3 directory exists.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, let's discuss in https://aqueducthq.slack.com/archives/C01KF131001/p1681168125986639? We already use this method in our extract implementation when fetching entire directories.

raise Exception(
"Supplied root directory `%s` does not exist in bucket %s."
% (self.root_dir, self.bucket)
)

def discover(self) -> List[str]:
raise Exception("Discover is not supported for S3.")

Expand Down
2 changes: 1 addition & 1 deletion src/python/aqueduct_executor/operators/utils/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
TIP_UNKNOWN_ERROR = f"Sorry, we've run into an unexpected error! {_TIP_CREATE_BUG_REPORT}"
TIP_INTEGRATION_CONNECTION = (
"We were unable to connect to this integration. "
"Please check your credentials or contact your integration's provider."
"If the stack trace is not helpful, please check your credentials or contact your integration's provider."
)
TIP_DEMO_CONNECTION = f"We have trouble connecting to demo DB. {_TIP_CREATE_BUG_REPORT}"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ class FileStorageConfig(BaseModel):
class S3StorageConfig(BaseModel):
region: str
bucket: str

# Expected to be in the format "path/to/dir/" (without a leading slash, but with a trailing one).
root_dir: str = ""

credentials_path: str
credentials_profile: str
aws_access_key_id: str = ""
Expand Down
41 changes: 31 additions & 10 deletions src/python/aqueduct_executor/operators/utils/storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,27 @@ def __init__(self, config: S3StorageConfig):
os.environ["AWS_SHARED_CREDENTIALS_FILE"] = config.credentials_path
os.environ["AWS_PROFILE"] = config.credentials_profile
self._client = boto3.client("s3", config=BotoConfig(region_name=config.region))

self._config = config

bucket, key_prefix = parse_s3_path(self._config.bucket)
bucket, key_prefix = parse_s3_bucket_and_key_prefix(
self._config.bucket, self._config.root_dir
)
self._bucket = bucket
self._key_prefix = key_prefix

def put(self, key: str, value: bytes) -> None:
key = self._prefix_key(key)
key = self._resolve_full_key(key)
print(f"writing to s3: {key}")
self._client.put_object(Bucket=self._bucket, Key=key, Body=value)

def get(self, key: str) -> bytes:
key = self._prefix_key(key)
key = self._resolve_full_key(key)
print(f"reading from s3: {key}")
return self._client.get_object(Bucket=self._bucket, Key=key)["Body"].read() # type: ignore

def exists(self, key: str) -> bool:
key = self._prefix_key(key)
key = self._resolve_full_key(key)
print(f"checking if exists in s3: {key}")
try:
self._client.head_object(Bucket=self._bucket, Key=key)
Expand All @@ -63,14 +66,32 @@ def exists(self, key: str) -> bool:
return False
return True

def _prefix_key(self, key: str) -> str:
def _resolve_full_key(self, key: str) -> str:
"""The `key_prefix` is expected to be in the format `path/to/dir/`."""
if not self._key_prefix:
return key
return self._key_prefix + "/" + key

assert self._key_prefix[0] != "/" and self._key_prefix[-1] == "/"
return self._key_prefix + key


def _sanitize_path(path: str) -> str:
"""Sanitize the given path to be in the format `path/to/dir/` (no leading slash but one trailing slash)."""
if path == "":
return path
if path[0] == "/":
path = path[1:]
if path[-1] != "/":
path += "/"
return path

def parse_s3_path(s3_path: str) -> Tuple[str, str]:
path_parts = s3_path.replace("s3://", "").split("/")

def parse_s3_bucket_and_key_prefix(s3_bucket_path: str, root_dir_path: str) -> Tuple[str, str]:
path_parts = s3_bucket_path.replace("s3://", "").split("/")
bucket = path_parts.pop(0)
key = "/".join(path_parts)
return bucket, key

if root_dir_path != "":
path_parts += [_sanitize_path(root_dir_path)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if root_dir_path is assumed to be in the format path/to/dir/, why do we need to sanitize it again?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is assumed, but we don't check it here. I didn't want to be too strict with the assertions in case there was a codepath I missed. Agree, it's confusing - maybe I can just remove the comment for now and make no assumptions about the root path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reminder to remove the comment.


key_prefix = "/".join(path_parts)
return bucket, _sanitize_path(key_prefix)
6 changes: 6 additions & 0 deletions src/ui/common/src/components/integrations/cards/s3Card.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ export const S3Card: React.FC<Props> = ({ integration }) => {
<strong>Bucket: </strong>
{config.bucket}
</Typography>
{config.root_dir?.length > 0 && (
<Typography variant="body2">
<strong>Root Directory: </strong>
{config.root_dir}
</Typography>
)}
<Typography variant="body2">
<strong>Region: </strong>
{config.region}
Expand Down
14 changes: 14 additions & 0 deletions src/ui/common/src/components/integrations/dialogs/s3Dialog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const Placeholders: S3Config = {
type: AWSCredentialType.AccessKey,
bucket: 'aqueduct',
region: 'us-east-1',
root_dir: 'path/to/root/',
access_key_id: '',
secret_access_key: '',
config_file_path: '',
Expand Down Expand Up @@ -193,6 +194,19 @@ export const S3Dialog: React.FC<Props> = ({
disableReason={editMode ? readOnlyFieldDisableReason : undefined}
/>

<IntegrationTextInputField
spellCheck={false}
required={false}
label="Directory"
description="Only applicable when also setting this integration to be the artifact store. This is an optional path to an existing directory in the bucket, to be used as the root of the artifact store. Defaults to the root of the bucket."
placeholder={Placeholders.root_dir}
onChange={(event) => onUpdateField('root_dir', event.target.value)}
value={value?.root_dir ?? null}
disabled={editMode}
warning={editMode ? undefined : readOnlyFieldWarning}
disableReason={editMode ? readOnlyFieldDisableReason : undefined}
/>

<Box sx={{ borderBottom: 1, borderColor: 'divider', mb: 2 }}>
<Tabs
value={value?.type}
Expand Down
Loading