Skip to content

Commit

Permalink
storage: fixed url parsing panic (#459); fixed directory download bug…
Browse files Browse the repository at this point in the history
…; better error reporting for SupportsGet/SupportsPut calls
  • Loading branch information
adamstruck committed Jan 30, 2018
1 parent 7af0be3 commit 6b03c33
Show file tree
Hide file tree
Showing 14 changed files with 328 additions and 99 deletions.
26 changes: 13 additions & 13 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export SHELL=/bin/bash
PATH := ${PATH}:${GOPATH}/bin
export PATH

PROTO_INC=-I ./ -I $(shell pwd)/vendor/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis
PROTO_INC=-I ./ -I $(shell pwd)/vendor/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis

V=github.com/ohsu-comp-bio/funnel/version
VERSION_LDFLAGS=\
Expand Down Expand Up @@ -91,7 +91,7 @@ test-verbose:
@go test -v $(TESTS)

start-elasticsearch:
@docker rm -f funnel-es-test > /dev/null 2>&1 || echo
@docker rm -f funnel-es-test > /dev/null 2>&1 || echo
@docker run -d --name funnel-es-test -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" -e "xpack.security.enabled=false" docker.elastic.co/elasticsearch/elasticsearch:5.6.3 > /dev/null

test-elasticsearch:
Expand All @@ -104,7 +104,7 @@ start-mongodb:

test-mongodb:
@go test ./tests/core/ -funnel-config $(CONFIGDIR)/mongo.config.yml
@go test ./tests/scheduler/ -funnel-config $(CONFIGDIR)/mongo.config.yml
@go test ./tests/scheduler/ -funnel-config $(CONFIGDIR)/mongo.config.yml

start-dynamodb:
@docker rm -f funnel-dynamodb-test > /dev/null 2>&1 || echo
Expand All @@ -121,7 +121,7 @@ stop-datastore:

test-datastore: start-datastore
DATASTORE_EMULATOR_HOST=localhost:8081 \
go test -v ./tests/core/ -funnel-config $(CONFIGDIR)/datastore.config.yml
go test -v ./tests/core/ -funnel-config $(CONFIGDIR)/datastore.config.yml

start-kafka:
@docker rm -f funnel-kafka > /dev/null 2>&1 || echo
Expand All @@ -146,25 +146,26 @@ test-pbs-torque:
@docker pull ohsucompbio/pbs-torque
@go test -timeout 120s ./tests/pbs -funnel-config $(CONFIGDIR)/pbs.config.yml

test-s3:
@go test ./tests/storage -funnel-config $(CONFIGDIR)/s3.config.yml
test-amazon-s3:
@go test ./tests/storage -funnel-config $(CONFIGDIR)/s3.config.yml -run TestAmazonS3

start-generic-s3:
@docker rm -f funnel-s3server > /dev/null 2>&1 || echo
@docker run -d --name funnel-s3server -p 18888:8000 scality/s3server:mem-6018536a
@docker rm -f funnel-minio > /dev/null 2>&1 || echo
@docker run -d --name funnel-minio -p 9000:9000 -e "MINIO_ACCESS_KEY=fakekey" -e "MINIO_SECRET_KEY=fakesecret" minio/minio:RELEASE.2017-10-27T18-59-02Z server /data
@docker run -d --name funnel-minio -p 9000:9000 -e "MINIO_ACCESS_KEY=fakekey" -e "MINIO_SECRET_KEY=fakesecret" -e "MINIO_REGION=us-east-1" minio/minio:RELEASE.2017-10-27T18-59-02Z server /data

test-generic-s3:
@go test ./tests/storage -funnel-config $(CONFIGDIR)/gen-s3.config.yml
@go test ./tests/storage -funnel-config $(CONFIGDIR)/minio-s3.config.yml
@go test ./tests/storage -funnel-config $(CONFIGDIR)/multi-s3.config.yml
@go test ./tests/storage -funnel-config $(CONFIGDIR)/amazoncli-minio-s3.config.yml -run TestAmazonS3Storage
@go test ./tests/storage -funnel-config $(CONFIGDIR)/scality-s3.config.yml -run TestGenericS3Storage
@go test ./tests/storage -funnel-config $(CONFIGDIR)/minio-s3.config.yml -run TestGenericS3Storage
@go test ./tests/storage -funnel-config $(CONFIGDIR)/multi-s3.config.yml -run TestGenericS3Storage

test-gs:
@go test ./tests/storage -funnel-config $(CONFIGDIR)/gs.config.yml ${GCE_PROJECT_ID}
@go test ./tests/storage -run TestGoogleStorage -funnel-config $(CONFIGDIR)/gs.config.yml ${GCE_PROJECT_ID}

test-swift:
@go test ./tests/storage -funnel-config $(CONFIGDIR)/swift.config.yml
@go test ./tests/storage -funnel-config $(CONFIGDIR)/swift.config.yml -run TestSwiftStorage

webdash-install:
@npm install --prefix ./webdash
Expand Down Expand Up @@ -196,7 +197,6 @@ clean-release:
rm -rf ./build/release

build-release: clean-release cross-compile docker
#
# NOTE! Making a release requires manual steps.
# See: website/content/docs/development.md
@if [ $$(git rev-parse --abbrev-ref HEAD) != 'master' ]; then \
Expand Down
54 changes: 36 additions & 18 deletions storage/amazon_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,16 @@ func NewAmazonS3Backend(conf config.AmazonS3Storage) (*AmazonS3Backend, error) {

// Get copies an object from S3 to the host path.
func (s3b *AmazonS3Backend) Get(ctx context.Context, rawurl string, hostPath string, class tes.FileType) (err error) {

url := s3b.parse(rawurl)
url, err := s3b.parse(rawurl)
if err != nil {
return err
}

region, err := s3manager.GetBucketRegion(ctx, s3b.sess, url.bucket, "us-east-1")
if err != nil {
return fmt.Errorf("failed to determine region for bucket: %s. error: %v", url.bucket, err)
}

// Create a downloader with the session and default options
sess := s3b.sess.Copy(&aws.Config{Region: aws.String(region)})
client := s3.New(sess)
manager := s3manager.NewDownloader(sess)
Expand Down Expand Up @@ -111,9 +112,13 @@ func (s3b *AmazonS3Backend) Get(ctx context.Context, rawurl string, hostPath str

for _, obj := range objects {
if *obj.Key != url.path+"/" {
// Create the directories in the path
file := filepath.Join(hostPath, strings.TrimPrefix(*obj.Key, url.path+"/"))
if err := os.MkdirAll(filepath.Dir(file), 0775); err != nil {
// check if key represents a directory
if strings.HasSuffix(*obj.Key, "/") {
continue
}
err = fsutil.EnsurePath(file)
if err != nil {
return err
}

Expand Down Expand Up @@ -152,14 +157,16 @@ func (s3b *AmazonS3Backend) Get(ctx context.Context, rawurl string, hostPath str

// PutFile copies an object (file) from the host path to S3.
func (s3b *AmazonS3Backend) PutFile(ctx context.Context, rawurl string, hostPath string) error {
url := s3b.parse(rawurl)
url, err := s3b.parse(rawurl)
if err != nil {
return err
}

region, err := s3manager.GetBucketRegion(ctx, s3b.sess, url.bucket, "us-east-1")
if err != nil {
return fmt.Errorf("failed to determine region for bucket: %s. error: %v", url.bucket, err)
}

// Create a uploader with the session and default options
sess := s3b.sess.Copy(&aws.Config{Region: aws.String(region)})
manager := s3manager.NewUploader(sess)

Expand All @@ -181,14 +188,14 @@ func (s3b *AmazonS3Backend) PutFile(ctx context.Context, rawurl string, hostPath
// SupportsGet indicates whether this backend supports GET storage request.
// For the AmazonS3Backend, the url must start with "s3://" and the bucket must exist
func (s3b *AmazonS3Backend) SupportsGet(rawurl string, class tes.FileType) error {
if !strings.HasPrefix(rawurl, s3Protocol) {
return fmt.Errorf("s3: unsupported protocol; expected %s", s3Protocol)
url, err := s3b.parse(rawurl)
if err != nil {
return err
}

url := s3b.parse(rawurl)
_, err := s3manager.GetBucketRegion(context.Background(), s3b.sess, url.bucket, "us-east-1")
_, err = s3manager.GetBucketRegion(context.Background(), s3b.sess, url.bucket, "us-east-1")
if err != nil {
return fmt.Errorf("s3: failed to find bucket: %s. error: %v", url.bucket, err)
return fmt.Errorf("amazonS3: failed to determine region for bucket: %s. error: %v", url.bucket, err)
}

return nil
Expand All @@ -200,18 +207,29 @@ func (s3b *AmazonS3Backend) SupportsPut(rawurl string, class tes.FileType) error
return s3b.SupportsGet(rawurl, class)
}

func (s3b *AmazonS3Backend) parse(url string) *urlparts {
path := strings.TrimPrefix(url, s3Protocol)
func (s3b *AmazonS3Backend) parse(rawurl string) (*urlparts, error) {
if !strings.HasPrefix(rawurl, s3Protocol) {
return nil, &ErrUnsupportedProtocol{"amazonS3"}
}

path := strings.TrimPrefix(rawurl, s3Protocol)
if s3b.endpoint != "" {
path = strings.TrimPrefix(path, s3b.endpoint)
} else {
re := regexp.MustCompile("^s3.*\\.amazonaws\\.com/")
path = re.ReplaceAllString(path, "")
}
if path == "" {
return nil, &ErrInvalidURL{"amazonS3"}
}

split := strings.SplitN(path, "/", 2)
bucket := split[0]
key := split[1]

return &urlparts{bucket, key}
url := &urlparts{}
if len(split) > 0 {
url.bucket = split[0]
}
if len(split) == 2 {
url.path = split[1]
}
return url, nil
}
57 changes: 39 additions & 18 deletions storage/generic_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package storage
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"

Expand Down Expand Up @@ -34,7 +33,10 @@ func NewGenericS3Backend(conf config.GenericS3Storage) (*GenericS3Backend, error

// Get copies an object from S3 to the host path.
func (s3 *GenericS3Backend) Get(ctx context.Context, rawurl string, hostPath string, class tes.FileType) error {
url := s3.parse(rawurl)
url, err := s3.parse(rawurl)
if err != nil {
return err
}

switch class {
case File:
Expand Down Expand Up @@ -66,9 +68,15 @@ func (s3 *GenericS3Backend) Get(ctx context.Context, rawurl string, hostPath str
for _, obj := range objects {
// Create the directories in the path
file := filepath.Join(hostPath, strings.TrimPrefix(obj.Key, url.path+"/"))
if err := os.MkdirAll(filepath.Dir(file), 0775); err != nil {
// check if key represents a directory
if strings.HasSuffix(obj.Key, "/") {
continue
}
err = fsutil.EnsurePath(file)
if err != nil {
return err
}

err = s3.client.FGetObjectWithContext(ctx, url.bucket, obj.Key, file, minio.GetObjectOptions{})
if err != nil {
return err
Expand All @@ -84,27 +92,29 @@ func (s3 *GenericS3Backend) Get(ctx context.Context, rawurl string, hostPath str

// PutFile copies an object (file) from the host path to S3.
func (s3 *GenericS3Backend) PutFile(ctx context.Context, rawurl string, hostPath string) error {
url := s3.parse(rawurl)
_, err := s3.client.FPutObjectWithContext(ctx, url.bucket, url.path, hostPath, minio.PutObjectOptions{})
url, err := s3.parse(rawurl)
if err != nil {
return err
}

_, err = s3.client.FPutObjectWithContext(ctx, url.bucket, url.path, hostPath, minio.PutObjectOptions{})
return err
}

// SupportsGet indicates whether this backend supports GET storage request.
// For the GenericS3Backend, the url must start with "s3://" and the bucket must exist
func (s3 *GenericS3Backend) SupportsGet(rawurl string, class tes.FileType) error {
if !strings.HasPrefix(rawurl, s3Protocol) {
return fmt.Errorf("s3: unsupported protocol; expected %s", s3Protocol)
url, err := s3.parse(rawurl)
if err != nil {
return err
}

url := s3.parse(rawurl)
ok, err := s3.client.BucketExists(url.bucket)
if err != nil {
return fmt.Errorf("s3: failed to find bucket: %s. error: %v", url.bucket, err)
return fmt.Errorf("genericS3: failed to find bucket: %s. error: %v", url.bucket, err)
}
if !ok {
return fmt.Errorf("s3: bucket does not exist: %s", url.bucket)
return fmt.Errorf("genericS3: bucket does not exist: %s", url.bucket)
}

return nil
}

Expand All @@ -114,13 +124,24 @@ func (s3 *GenericS3Backend) SupportsPut(rawurl string, class tes.FileType) error
return s3.SupportsGet(rawurl, class)
}

func (s3 *GenericS3Backend) parse(url string) *urlparts {
path := strings.TrimPrefix(url, s3Protocol)
func (s3 *GenericS3Backend) parse(rawurl string) (*urlparts, error) {
if !strings.HasPrefix(rawurl, s3Protocol) {
return nil, &ErrUnsupportedProtocol{"genericS3"}
}

path := strings.TrimPrefix(rawurl, s3Protocol)
path = strings.TrimPrefix(path, s3.endpoint)
if path == "" {
return nil, &ErrInvalidURL{"genericS3"}
}

split := strings.SplitN(path, "/", 2)
bucket := split[0]
key := split[1]

return &urlparts{bucket, key}
url := &urlparts{}
if len(split) > 0 {
url.bucket = split[0]
}
if len(split) == 2 {
url.path = split[1]
}
return url, nil
}
49 changes: 37 additions & 12 deletions storage/gs.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ func NewGSBackend(conf config.GSStorage) (*GSBackend, error) {

// Get copies an object from GS to the host path.
func (gs *GSBackend) Get(ctx context.Context, rawurl string, hostPath string, class tes.FileType) error {
url := gs.parse(rawurl)
url, err := gs.parse(rawurl)
if err != nil {
return err
}

switch class {
case File:
Expand Down Expand Up @@ -97,6 +100,9 @@ func (gs *GSBackend) Get(ctx context.Context, rawurl string, hostPath string, cl
}

for _, obj := range objects {
if strings.HasSuffix(obj.Name, "/") {
continue
}
call := gs.svc.Objects.Get(url.bucket, obj.Name)
key := strings.TrimPrefix(obj.Name, url.path)
err = download(call, path.Join(hostPath, key))
Expand All @@ -117,6 +123,10 @@ func download(call *storage.ObjectsGetCall, hostPath string) (err error) {
return err
}

err = fsutil.EnsurePath(hostPath)
if err != nil {
return err
}
dest, err := os.Create(hostPath)
if err != nil {
return err
Expand All @@ -134,7 +144,10 @@ func download(call *storage.ObjectsGetCall, hostPath string) (err error) {

// PutFile copies an object (file) from the host path to GS.
func (gs *GSBackend) PutFile(ctx context.Context, rawurl string, hostPath string) error {
url := gs.parse(rawurl)
url, err := gs.parse(rawurl)
if err != nil {
return err
}

reader, err := os.Open(hostPath)
if err != nil {
Expand All @@ -153,14 +166,13 @@ func (gs *GSBackend) PutFile(ctx context.Context, rawurl string, hostPath string
// SupportsGet indicates whether this backend supports GET storage request.
// For the Google Storage backend, the url must start with "gs://" and the bucket must exist
func (gs *GSBackend) SupportsGet(rawurl string, class tes.FileType) error {
ok := strings.HasPrefix(rawurl, gsProtocol)
if !ok {
return fmt.Errorf("gs: unsupported protocol; expected %s", gsProtocol)
url, err := gs.parse(rawurl)
if err != nil {
return err
}
url := gs.parse(rawurl)
_, err := gs.svc.Buckets.Get(url.bucket).Do()
_, err = gs.svc.Buckets.Get(url.bucket).Do()
if err != nil {
return fmt.Errorf("gs: failed to find bucket: %s. error: %v", url.bucket, err)
return fmt.Errorf("googleStorage: failed to find bucket: %s. error: %v", url.bucket, err)
}
return nil
}
Expand All @@ -171,10 +183,23 @@ func (gs *GSBackend) SupportsPut(rawurl string, class tes.FileType) error {
return gs.SupportsGet(rawurl, class)
}

func (gs *GSBackend) parse(rawurl string) *urlparts {
func (gs *GSBackend) parse(rawurl string) (*urlparts, error) {
if !strings.HasPrefix(rawurl, gsProtocol) {
return nil, &ErrUnsupportedProtocol{"googleStorage"}
}

path := strings.TrimPrefix(rawurl, gsProtocol)
if path == "" {
return nil, &ErrInvalidURL{"googleStorage"}
}

split := strings.SplitN(path, "/", 2)
bucket := split[0]
key := split[1]
return &urlparts{bucket, key}
url := &urlparts{}
if len(split) > 0 {
url.bucket = split[0]
}
if len(split) == 2 {
url.path = split[1]
}
return url, nil
}
Loading

0 comments on commit 6b03c33

Please sign in to comment.