Skip to content

Commit

Permalink
storage: fixed url parsing panic (#459); fixed directory download bug…
Browse files Browse the repository at this point in the history
…; better error reporting for SupportsGet/SupportsPut calls
  • Loading branch information
adamstruck committed Jan 29, 2018
1 parent adbef12 commit 99b4a28
Show file tree
Hide file tree
Showing 14 changed files with 329 additions and 101 deletions.
30 changes: 15 additions & 15 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export SHELL=/bin/bash
PATH := ${PATH}:${GOPATH}/bin
export PATH

PROTO_INC=-I ./ -I $(shell pwd)/vendor/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis
PROTO_INC=-I ./ -I $(shell pwd)/vendor/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis

V=github.com/ohsu-comp-bio/funnel/version
VERSION_LDFLAGS=\
Expand Down Expand Up @@ -90,7 +90,7 @@ test-verbose:
@go test -v $(TESTS)

start-elasticsearch:
@docker rm -f funnel-es-test > /dev/null 2>&1 || echo
@docker rm -f funnel-es-test > /dev/null 2>&1 || echo
@docker run -d --name funnel-es-test -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e "xpack.security.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:5.6.3 > /dev/null

test-elasticsearch:
Expand All @@ -103,7 +103,7 @@ start-mongodb:

test-mongodb:
@go test ./tests/core/ -funnel-config $(CONFIGDIR)/mongo.config.yml
@go test ./tests/scheduler/ -funnel-config $(CONFIGDIR)/mongo.config.yml
@go test ./tests/scheduler/ -funnel-config $(CONFIGDIR)/mongo.config.yml

start-dynamodb:
@docker rm -f funnel-dynamodb-test > /dev/null 2>&1 || echo
Expand All @@ -120,7 +120,7 @@ stop-datastore:

test-datastore: start-datastore
DATASTORE_EMULATOR_HOST=localhost:8081 \
go test -v ./tests/core/ -funnel-config $(CONFIGDIR)/datastore.config.yml
go test -v ./tests/core/ -funnel-config $(CONFIGDIR)/datastore.config.yml

start-kafka:
@docker rm -f funnel-kafka > /dev/null 2>&1 || echo
Expand All @@ -145,25 +145,26 @@ test-pbs-torque:
@docker pull ohsucompbio/pbs-torque
@go test -timeout 120s ./tests/pbs -funnel-config $(CONFIGDIR)/pbs.config.yml

test-s3:
@go test ./tests/storage -funnel-config $(CONFIGDIR)/s3.config.yml
test-amazon-s3:
@go test ./tests/storage -funnel-config $(CONFIGDIR)/s3.config.yml -run TestAmazonS3

start-generic-s3:
@docker rm -f funnel-s3server > /dev/null 2>&1 || echo
@docker run -d --name funnel-s3server -p 18888:8000 scality/s3server:mem-6018536a
@docker rm -f funnel-minio > /dev/null 2>&1 || echo
@docker run -d --name funnel-minio -p 9000:9000 -e "MINIO_ACCESS_KEY=fakekey" -e "MINIO_SECRET_KEY=fakesecret" minio/minio:RELEASE.2017-10-27T18-59-02Z server /data
@docker run -d --name funnel-minio -p 9000:9000 -e "MINIO_ACCESS_KEY=fakekey" -e "MINIO_SECRET_KEY=fakesecret" -e "MINIO_REGION=us-east-1" minio/minio:RELEASE.2017-10-27T18-59-02Z server /data

test-generic-s3:
@go test ./tests/storage -funnel-config $(CONFIGDIR)/gen-s3.config.yml
@go test ./tests/storage -funnel-config $(CONFIGDIR)/minio-s3.config.yml
@go test ./tests/storage -funnel-config $(CONFIGDIR)/multi-s3.config.yml
@go test ./tests/storage -funnel-config $(CONFIGDIR)/amazoncli-minio-s3.config.yml -run TestAmazonS3Storage
@go test ./tests/storage -funnel-config $(CONFIGDIR)/scality-s3.config.yml -run TestGenericS3Storage
@go test ./tests/storage -funnel-config $(CONFIGDIR)/minio-s3.config.yml -run TestGenericS3Storage
@go test ./tests/storage -funnel-config $(CONFIGDIR)/multi-s3.config.yml -run TestGenericS3Storage

test-gs:
@go test ./tests/storage -funnel-config $(CONFIGDIR)/gs.config.yml ${GCE_PROJECT_ID}
@go test ./tests/storage -run TestGoogleStorage -funnel-config $(CONFIGDIR)/gs.config.yml ${GCE_PROJECT_ID}

test-swift:
@go test ./tests/storage -funnel-config $(CONFIGDIR)/swift.config.yml
@go test ./tests/storage -funnel-config $(CONFIGDIR)/swift.config.yml -run TestSwiftStorage

webdash-install:
@npm install --prefix ./webdash
Expand Down Expand Up @@ -195,7 +196,6 @@ clean-release:
rm -rf ./build/release

build-release: clean-release cross-compile docker
#
# NOTE! Making a release requires manual steps.
# See: website/content/docs/development.md
@if [ $$(git rev-parse --abbrev-ref HEAD) != 'master' ]; then \
Expand Down Expand Up @@ -250,14 +250,14 @@ docker: cross-compile
cp build/bin/funnel-linux-amd64 build/docker/funnel
cp docker/* build/docker/
cd build/docker/ && docker build -t ohsucompbio/funnel .

test-datastore-travis:
@wget https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-183.0.0-linux-x86_64.tar.gz
@tar xzvf google-cloud-sdk-183.0.0-linux-x86_64.tar.gz 2> /dev/null
@./google-cloud-sdk/bin/gcloud --quiet beta emulators datastore start &
@sleep 60
make test-datastore

# Remove build/development files.
clean:
@rm -rf ./bin ./pkg ./test_tmp ./build ./buildtools
Expand Down
54 changes: 36 additions & 18 deletions storage/amazon_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,16 @@ func NewAmazonS3Backend(conf config.AmazonS3Storage) (*AmazonS3Backend, error) {

// Get copies an object from S3 to the host path.
func (s3b *AmazonS3Backend) Get(ctx context.Context, rawurl string, hostPath string, class tes.FileType) (err error) {

url := s3b.parse(rawurl)
url, err := s3b.parse(rawurl)
if err != nil {
return err
}

region, err := s3manager.GetBucketRegion(ctx, s3b.sess, url.bucket, "us-east-1")
if err != nil {
return fmt.Errorf("failed to determine region for bucket: %s. error: %v", url.bucket, err)
}

// Create a downloader with the session and default options
sess := s3b.sess.Copy(&aws.Config{Region: aws.String(region)})
client := s3.New(sess)
manager := s3manager.NewDownloader(sess)
Expand Down Expand Up @@ -110,9 +111,13 @@ func (s3b *AmazonS3Backend) Get(ctx context.Context, rawurl string, hostPath str

for _, obj := range objects {
if *obj.Key != url.path+"/" {
// Create the directories in the path
file := filepath.Join(hostPath, strings.TrimPrefix(*obj.Key, url.path+"/"))
if err := os.MkdirAll(filepath.Dir(file), 0775); err != nil {
// check if key represents a directory
if strings.HasSuffix(*obj.Key, "/") {
continue
}
err = fsutil.EnsurePath(file)
if err != nil {
return err
}

Expand Down Expand Up @@ -151,14 +156,16 @@ func (s3b *AmazonS3Backend) Get(ctx context.Context, rawurl string, hostPath str

// PutFile copies an object (file) from the host path to S3.
func (s3b *AmazonS3Backend) PutFile(ctx context.Context, rawurl string, hostPath string) error {
url := s3b.parse(rawurl)
url, err := s3b.parse(rawurl)
if err != nil {
return err
}

region, err := s3manager.GetBucketRegion(ctx, s3b.sess, url.bucket, "us-east-1")
if err != nil {
return fmt.Errorf("failed to determine region for bucket: %s. error: %v", url.bucket, err)
}

// Create a uploader with the session and default options
sess := s3b.sess.Copy(&aws.Config{Region: aws.String(region)})
manager := s3manager.NewUploader(sess)

Expand All @@ -180,14 +187,14 @@ func (s3b *AmazonS3Backend) PutFile(ctx context.Context, rawurl string, hostPath
// SupportsGet indicates whether this backend supports GET storage request.
// For the AmazonS3Backend, the url must start with "s3://" and the bucket must exist
func (s3b *AmazonS3Backend) SupportsGet(rawurl string, class tes.FileType) error {
if !strings.HasPrefix(rawurl, s3Protocol) {
return fmt.Errorf("s3: unsupported protocol; expected %s", s3Protocol)
url, err := s3b.parse(rawurl)
if err != nil {
return err
}

url := s3b.parse(rawurl)
_, err := s3manager.GetBucketRegion(context.Background(), s3b.sess, url.bucket, "us-east-1")
_, err = s3manager.GetBucketRegion(context.Background(), s3b.sess, url.bucket, "us-east-1")
if err != nil {
return fmt.Errorf("s3: failed to find bucket: %s. error: %v", url.bucket, err)
return fmt.Errorf("amazonS3: failed to determine region for bucket: %s. error: %v", url.bucket, err)
}

return nil
Expand All @@ -199,18 +206,29 @@ func (s3b *AmazonS3Backend) SupportsPut(rawurl string, class tes.FileType) error
return s3b.SupportsGet(rawurl, class)
}

func (s3b *AmazonS3Backend) parse(url string) *urlparts {
path := strings.TrimPrefix(url, s3Protocol)
func (s3b *AmazonS3Backend) parse(rawurl string) (*urlparts, error) {
if !strings.HasPrefix(rawurl, s3Protocol) {
return nil, &ErrUnsupportedProtocol{"amazonS3"}
}

path := strings.TrimPrefix(rawurl, s3Protocol)
if s3b.endpoint != "" {
path = strings.TrimPrefix(path, s3b.endpoint)
} else {
re := regexp.MustCompile("^s3.*\\.amazonaws\\.com/")
path = re.ReplaceAllString(path, "")
}
if path == "" {
return nil, &ErrInvalidURL{"amazonS3"}
}

split := strings.SplitN(path, "/", 2)
bucket := split[0]
key := split[1]

return &urlparts{bucket, key}
url := &urlparts{}
if len(split) > 0 {
url.bucket = split[0]
}
if len(split) == 2 {
url.path = split[1]
}
return url, nil
}
57 changes: 39 additions & 18 deletions storage/generic_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/ohsu-comp-bio/funnel/config"
"github.com/ohsu-comp-bio/funnel/proto/tes"
"github.com/ohsu-comp-bio/funnel/util/fsutil"
"os"
"path/filepath"
"strings"
)
Expand All @@ -33,7 +32,10 @@ func NewGenericS3Backend(conf config.GenericS3Storage) (*GenericS3Backend, error

// Get copies an object from S3 to the host path.
func (s3 *GenericS3Backend) Get(ctx context.Context, rawurl string, hostPath string, class tes.FileType) error {
url := s3.parse(rawurl)
url, err := s3.parse(rawurl)
if err != nil {
return err
}

switch class {
case File:
Expand Down Expand Up @@ -65,9 +67,15 @@ func (s3 *GenericS3Backend) Get(ctx context.Context, rawurl string, hostPath str
for _, obj := range objects {
// Create the directories in the path
file := filepath.Join(hostPath, strings.TrimPrefix(obj.Key, url.path+"/"))
if err := os.MkdirAll(filepath.Dir(file), 0775); err != nil {
// check if key represents a directory
if strings.HasSuffix(obj.Key, "/") {
continue
}
err = fsutil.EnsurePath(file)
if err != nil {
return err
}

err = s3.client.FGetObjectWithContext(ctx, url.bucket, obj.Key, file, minio.GetObjectOptions{})
if err != nil {
return err
Expand All @@ -83,27 +91,29 @@ func (s3 *GenericS3Backend) Get(ctx context.Context, rawurl string, hostPath str

// PutFile copies an object (file) from the host path to S3.
func (s3 *GenericS3Backend) PutFile(ctx context.Context, rawurl string, hostPath string) error {
url := s3.parse(rawurl)
_, err := s3.client.FPutObjectWithContext(ctx, url.bucket, url.path, hostPath, minio.PutObjectOptions{})
url, err := s3.parse(rawurl)
if err != nil {
return err
}

_, err = s3.client.FPutObjectWithContext(ctx, url.bucket, url.path, hostPath, minio.PutObjectOptions{})
return err
}

// SupportsGet indicates whether this backend supports GET storage request.
// For the GenericS3Backend, the url must start with "s3://" and the bucket must exist
func (s3 *GenericS3Backend) SupportsGet(rawurl string, class tes.FileType) error {
if !strings.HasPrefix(rawurl, s3Protocol) {
return fmt.Errorf("s3: unsupported protocol; expected %s", s3Protocol)
url, err := s3.parse(rawurl)
if err != nil {
return err
}

url := s3.parse(rawurl)
ok, err := s3.client.BucketExists(url.bucket)
if err != nil {
return fmt.Errorf("s3: failed to find bucket: %s. error: %v", url.bucket, err)
return fmt.Errorf("genericS3: failed to find bucket: %s. error: %v", url.bucket, err)
}
if !ok {
return fmt.Errorf("s3: bucket does not exist: %s", url.bucket)
return fmt.Errorf("genericS3: bucket does not exist: %s", url.bucket)
}

return nil
}

Expand All @@ -113,13 +123,24 @@ func (s3 *GenericS3Backend) SupportsPut(rawurl string, class tes.FileType) error
return s3.SupportsGet(rawurl, class)
}

func (s3 *GenericS3Backend) parse(url string) *urlparts {
path := strings.TrimPrefix(url, s3Protocol)
func (s3 *GenericS3Backend) parse(rawurl string) (*urlparts, error) {
if !strings.HasPrefix(rawurl, s3Protocol) {
return nil, &ErrUnsupportedProtocol{"genericS3"}
}

path := strings.TrimPrefix(rawurl, s3Protocol)
path = strings.TrimPrefix(path, s3.endpoint)
if path == "" {
return nil, &ErrInvalidURL{"genericS3"}
}

split := strings.SplitN(path, "/", 2)
bucket := split[0]
key := split[1]

return &urlparts{bucket, key}
url := &urlparts{}
if len(split) > 0 {
url.bucket = split[0]
}
if len(split) == 2 {
url.path = split[1]
}
return url, nil
}
Loading

0 comments on commit 99b4a28

Please sign in to comment.