-
Notifications
You must be signed in to change notification settings - Fork 2
LOG-1772: Canoe: productionise Paddle #2
Changes from 2 commits
595d01b
8bf1d6b
48fd3a6
61c25ae
dbd350c
3309911
b4e255d
422e487
16ac408
a358f62
b2f065e
2f696d0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
tmp | ||
dist | ||
paddle |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,40 @@ | ||
# Paddle | ||
|
||
Paddle is a command line tool for data archival and processing. | ||
Paddle is a command line tool for canoe data archival and processing. | ||
|
||
Work in progress. | ||
## Setup | ||
|
||
Make sure you have Go installed on your machine and that you checkout the repo to | ||
the right folder. By default should be: | ||
|
||
``` | ||
mkdir -p ~/go/src/github.com/deliveroo | ||
cd ~/go/src/github.com/deliveroo | ||
git clone [email protected]:deliveroo/paddle.git | ||
cd paddle | ||
``` | ||
|
||
You will need create a `$HOME/.paddle.yaml` that contains the bucket name, e.g: | ||
|
||
``` | ||
> cat $HOME/.paddle.yaml | ||
bucket: roo-bucket | ||
``` | ||
|
||
You will also need to create a `$HOME/.aws/config` or `$HOME/.aws/credentials` so Paddle can connect to AWS, e.g.: | ||
|
||
``` | ||
> cat $HOME/.aws/credentials | ||
[default] | ||
aws_access_key_id=xxx | ||
aws_secret_access_key=yyy | ||
region=eu-west-1 | ||
``` | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unnecessary newline |
||
|
||
``` | ||
$ go build | ||
``` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should specify that Go must be installed only for development / tests. for actual usage, we'll be generating a binary which we can include instructions in the Readme for. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
|
||
## Usage | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,21 @@ import ( | |
"strings" | ||
) | ||
|
||
type S3Target struct { | ||
bucket string | ||
prefix string | ||
path string | ||
} | ||
|
||
func (s *S3Target) copy() *S3Target { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this here to ensure that we aren't mutating? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't understand enough yet about Go to grasp if I should be just passing structs by value and allow the runtime to clone those from my understanding but yet.. here was just do that scenario where you want to be a good citzen: class A
def initialize(options = {})
opts = options.dup
opts.merge(...)
end
end There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I wouldn't have this function, if anything we can just copy below; and |
||
clone := *s | ||
return &clone | ||
} | ||
|
||
func (t *S3Target) fullPath() string { | ||
return fmt.Sprintf("%s/%s/%s", t.bucket, t.prefix, t.path); | ||
} | ||
|
||
var getBranch string | ||
var getCommitPath string | ||
|
||
|
@@ -44,7 +59,14 @@ $ paddle data get -b experimental trained-model/version1 dest/path | |
if !viper.IsSet("bucket") { | ||
exitErrorf("Bucket not defined. Please define 'bucket' in your config file.") | ||
} | ||
fetchPath(viper.GetString("bucket"), args[0], getBranch, getCommitPath, args[1]) | ||
|
||
source := S3Target{ | ||
bucket: viper.GetString("bucket"), | ||
prefix: fmt.Sprintf("%s/%s", args[0], getBranch), | ||
path: getCommitPath, | ||
} | ||
|
||
copyPathToDestination(&source, args[1]) | ||
}, | ||
} | ||
|
||
|
@@ -53,69 +75,80 @@ func init() { | |
getCmd.Flags().StringVarP(&getCommitPath, "path", "p", "HEAD", "Path to fetch (instead of HEAD)") | ||
} | ||
|
||
func fetchPath(bucket string, version string, branch string, path string, destination string) { | ||
sess := session.Must(session.NewSessionWithOptions(session.Options{ | ||
func copyPathToDestination(source *S3Target, destination string) { | ||
session := session.Must(session.NewSessionWithOptions(session.Options{ | ||
SharedConfigState: session.SharedConfigEnable, | ||
})) | ||
|
||
if path == "HEAD" { | ||
svc := s3.New(sess) | ||
headPath := fmt.Sprintf("%s/%s/HEAD", version, branch) | ||
fmt.Println(headPath) | ||
out, err := svc.GetObject(&s3.GetObjectInput{ | ||
Bucket: aws.String(bucket), | ||
Key: aws.String(headPath), | ||
}) | ||
if err != nil { | ||
exitErrorf("%v", err) | ||
} | ||
buf := new(bytes.Buffer) | ||
buf.ReadFrom(out.Body) | ||
path = buf.String() | ||
} else { | ||
path = fmt.Sprintf("%s/%s/%s", version, branch, path) | ||
/* | ||
* HEAD contains the path to latest folder | ||
*/ | ||
if source.path == "HEAD" { | ||
source = source.copy() | ||
source.path = readHEAD(session, source) | ||
} | ||
|
||
fmt.Println("Copying " + source.fullPath() + " to " + destination) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I kinda liked a more minimal output to be honest, no biggie though |
||
copy(session, source, destination) | ||
} | ||
|
||
func readHEAD(session *session.Session, source *S3Target) string { | ||
svc := s3.New(session) | ||
key := fmt.Sprintf("%s/HEAD", source.prefix) | ||
|
||
out, err := svc.GetObject(&s3.GetObjectInput{ | ||
Bucket: aws.String(source.bucket), | ||
Key: aws.String(key), | ||
}) | ||
|
||
if err != nil { | ||
exitErrorf("%v", err) | ||
} | ||
fmt.Println("Fetching " + path) | ||
getBucketObjects(sess, bucket, path, destination) | ||
|
||
buf := new(bytes.Buffer) | ||
buf.ReadFrom(out.Body) | ||
return buf.String() | ||
} | ||
|
||
func getBucketObjects(sess *session.Session, bucket string, prefix string, dest string) { | ||
func copy(session *session.Session, source *S3Target, destination string) { | ||
query := &s3.ListObjectsV2Input{ | ||
Bucket: aws.String(bucket), | ||
Prefix: aws.String(prefix), | ||
Bucket: aws.String(source.bucket), | ||
Prefix: aws.String(source.prefix + "/" + source.path), | ||
} | ||
svc := s3.New(sess) | ||
svc := s3.New(session) | ||
|
||
truncatedListing := true | ||
|
||
for truncatedListing { | ||
resp, err := svc.ListObjectsV2(query) | ||
response, err := svc.ListObjectsV2(query) | ||
|
||
if err != nil { | ||
fmt.Println(err.Error()) | ||
return | ||
} | ||
getObjectsAll(bucket, resp, svc, prefix, dest) | ||
query.ContinuationToken = resp.NextContinuationToken | ||
truncatedListing = *resp.IsTruncated | ||
copyToLocalFiles(svc, response.Contents, source, destination) | ||
|
||
// Check if more results | ||
query.ContinuationToken = response.NextContinuationToken | ||
truncatedListing = *response.IsTruncated | ||
} | ||
} | ||
|
||
func getObjectsAll(bucket string, bucketObjectsList *s3.ListObjectsV2Output, s3Client *s3.S3, prefix string, dest string) { | ||
for _, key := range bucketObjectsList.Contents { | ||
func copyToLocalFiles(s3Client *s3.S3, objects []*s3.Object, source *S3Target, destination string) { | ||
for _, key := range objects { | ||
destFilename := *key.Key | ||
if strings.HasSuffix(*key.Key, "/") { | ||
fmt.Println("Got a directory") | ||
continue | ||
} | ||
out, err := s3Client.GetObject(&s3.GetObjectInput{ | ||
Bucket: aws.String(bucket), | ||
Bucket: aws.String(source.bucket), | ||
Key: key.Key, | ||
}) | ||
if err != nil { | ||
exitErrorf("%v", err) | ||
} | ||
destFilePath := dest + "/" + strings.TrimPrefix(destFilename, prefix+"/") | ||
destFilePath := destination + "/" + strings.TrimPrefix(destFilename, source.prefix + "/") | ||
err = os.MkdirAll(filepath.Dir(destFilePath), 0777) | ||
fmt.Print(destFilePath) | ||
destFile, err := os.Create(destFilePath) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be worth pointing out that these settings can be configured through ENV as well (e.g. BUCKET).