-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
Copy paths3_handler.go
477 lines (409 loc) · 13.5 KB
/
s3_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
// +build !oss
/*
* Copyright 2018 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Dgraph Community License (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* https://github.com/dgraph-io/dgraph/blob/master/licenses/DCL.txt
*/
package worker
import (
"encoding/json"
"fmt"
"io"
"net/url"
"os"
"path/filepath"
"sort"
"strings"
"time"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
minio "github.com/minio/minio-go"
"github.com/minio/minio-go/pkg/credentials"
"github.com/minio/minio-go/pkg/s3utils"
"github.com/pkg/errors"
)
const (
// Shown in transfer logs
appName = "Dgraph"
// defaultEndpointS3 is used with s3 scheme when no host is provided
defaultEndpointS3 = "s3.amazonaws.com"
// s3AccelerateSubstr S3 acceleration is enabled if the S3 host is contains this substring.
// See http://docs.aws.amazon.com/AmazonS3/latest/dev/transfer-acceleration.html
s3AccelerateSubstr = "s3-accelerate"
)
// FillRestoreCredentials fills the empty values with the default credentials so that
// a restore request is sent to all the groups with the same credentials.
func FillRestoreCredentials(location string, req *pb.RestoreRequest) error {
uri, err := url.Parse(location)
if err != nil {
return err
}
var provider credentials.Provider
switch uri.Scheme {
case "s3":
provider = &credentials.EnvAWS{}
case "minio":
provider = &credentials.EnvMinio{}
default:
return nil
}
if req == nil {
return nil
}
defaultCreds, _ := provider.Retrieve() // Error is always nil.
if len(req.AccessKey) == 0 {
req.AccessKey = defaultCreds.AccessKeyID
}
if len(req.SecretKey) == 0 {
req.SecretKey = defaultCreds.SecretAccessKey
}
if len(req.SessionToken) == 0 {
req.SessionToken = defaultCreds.SessionToken
}
return nil
}
// s3Handler is used for 's3:' and 'minio:' URI schemes.
type s3Handler struct {
bucketName, objectPrefix string
pwriter *io.PipeWriter
preader *io.PipeReader
cerr chan error
creds *Credentials
uri *url.URL
}
// setup creates a new session, checks valid bucket at uri.Path, and configures a minio client.
// setup also fills in values used by the handler in subsequent calls.
// Returns a new S3 minio client, otherwise a nil client with an error.
func (h *s3Handler) setup(uri *url.URL) (*minio.Client, error) {
if len(uri.Path) < 1 {
return nil, errors.Errorf("Invalid bucket: %q", uri.Path)
}
glog.V(2).Infof("Backup using host: %s, path: %s", uri.Host, uri.Path)
var creds credentials.Value
switch {
case h.creds.isAnonymous():
// No need to setup credentials.
case !h.creds.hasCredentials():
var provider credentials.Provider
switch uri.Scheme {
case "s3":
// Access Key ID: AWS_ACCESS_KEY_ID or AWS_ACCESS_KEY.
// Secret Access Key: AWS_SECRET_ACCESS_KEY or AWS_SECRET_KEY.
// Secret Token: AWS_SESSION_TOKEN.
provider = &credentials.EnvAWS{}
default: // minio
// Access Key ID: MINIO_ACCESS_KEY.
// Secret Access Key: MINIO_SECRET_KEY.
provider = &credentials.EnvMinio{}
}
// If no credentials can be retrieved, an attempt to access the destination
// with no credentials will be made.
creds, _ = provider.Retrieve() // error is always nil
default:
creds.AccessKeyID = h.creds.AccessKey
creds.SecretAccessKey = h.creds.SecretKey
creds.SessionToken = h.creds.SessionToken
}
// Verify URI and set default S3 host if needed.
switch uri.Scheme {
case "s3":
// s3:///bucket/folder
if !strings.Contains(uri.Host, ".") {
uri.Host = defaultEndpointS3
}
if !s3utils.IsAmazonEndpoint(*uri) {
return nil, errors.Errorf("Invalid S3 endpoint %q", uri.Host)
}
default: // minio
if uri.Host == "" {
return nil, errors.Errorf("Minio handler requires a host")
}
}
secure := uri.Query().Get("secure") != "false" // secure by default
mc, err := minio.New(uri.Host, creds.AccessKeyID, creds.SecretAccessKey, secure)
if err != nil {
return nil, err
}
// Set client app name "Dgraph/v1.0.x"
mc.SetAppInfo(appName, x.Version())
// S3 transfer acceleration support.
if uri.Scheme == "s3" && strings.Contains(uri.Host, s3AccelerateSubstr) {
mc.SetS3TransferAccelerate(uri.Host)
}
// enable HTTP tracing
if uri.Query().Get("trace") == "true" {
mc.TraceOn(os.Stderr)
}
// split path into bucketName and blobPrefix
parts := strings.Split(uri.Path[1:], "/")
h.bucketName = parts[0] // bucket
// verify the requested bucket exists.
found, err := mc.BucketExists(h.bucketName)
if err != nil {
return nil, errors.Wrapf(err, "while looking for bucket %s at host %s",
h.bucketName, uri.Host)
}
if !found {
return nil, errors.Errorf("Bucket was not found: %s", h.bucketName)
}
if len(parts) > 1 {
h.objectPrefix = filepath.Join(parts[1:]...)
}
return mc, err
}
func (h *s3Handler) createObject(uri *url.URL, req *pb.BackupRequest, mc *minio.Client,
objectName string) {
// The backup object is: folder1...folderN/dgraph.20181106.0113/r110001-g1.backup
object := filepath.Join(h.objectPrefix, fmt.Sprintf(backupPathFmt, req.UnixTs),
objectName)
glog.V(2).Infof("Sending data to %s blob %q ...", uri.Scheme, object)
h.cerr = make(chan error, 1)
h.preader, h.pwriter = io.Pipe()
go func() {
h.cerr <- h.upload(mc, object)
}()
}
// GetLatestManifest reads the manifests at the given URL and returns the
// latest manifest.
func (h *s3Handler) GetLatestManifest(uri *url.URL) (*Manifest, error) {
mc, err := h.setup(uri)
if err != nil {
return nil, err
}
// Find the max Since value from the latest backup.
var lastManifest string
done := make(chan struct{})
defer close(done)
suffix := "/" + backupManifest
for object := range mc.ListObjects(h.bucketName, h.objectPrefix, true, done) {
if strings.HasSuffix(object.Key, suffix) && object.Key > lastManifest {
lastManifest = object.Key
}
}
var m Manifest
if lastManifest == "" {
return &m, nil
}
if err := h.readManifest(mc, lastManifest, &m); err != nil {
return nil, err
}
return &m, nil
}
// CreateBackupFile creates a new session and prepares the data stream for the backup.
// URI formats:
// minio://<host>/bucket/folder1.../folderN?secure=true|false
// minio://<host:port>/bucket/folder1.../folderN?secure=true|false
// s3://<s3 region endpoint>/bucket/folder1.../folderN?secure=true|false
// s3:///bucket/folder1.../folderN?secure=true|false (use default S3 endpoint)
func (h *s3Handler) CreateBackupFile(uri *url.URL, req *pb.BackupRequest) error {
glog.V(2).Infof("S3Handler got uri: %+v. Host: %s. Path: %s\n", uri, uri.Host, uri.Path)
mc, err := h.setup(uri)
if err != nil {
return err
}
objectName := backupName(req.ReadTs, req.GroupId)
h.createObject(uri, req, mc, objectName)
return nil
}
// CreateManifest finishes a backup by creating an object to store the manifest.
func (h *s3Handler) CreateManifest(uri *url.URL, req *pb.BackupRequest) error {
glog.V(2).Infof("S3Handler got uri: %+v. Host: %s. Path: %s\n", uri, uri.Host, uri.Path)
mc, err := h.setup(uri)
if err != nil {
return err
}
h.createObject(uri, req, mc, backupManifest)
return nil
}
// readManifest reads a manifest file at path using the handler.
// Returns nil on success, otherwise an error.
func (h *s3Handler) readManifest(mc *minio.Client, object string, m *Manifest) error {
reader, err := mc.GetObject(h.bucketName, object, minio.GetObjectOptions{})
if err != nil {
return err
}
defer reader.Close()
return json.NewDecoder(reader).Decode(m)
}
func (h *s3Handler) GetManifests(uri *url.URL, backupId string) ([]*Manifest, error) {
mc, err := h.setup(uri)
if err != nil {
return nil, err
}
var paths []string
doneCh := make(chan struct{})
defer close(doneCh)
suffix := "/" + backupManifest
for object := range mc.ListObjects(h.bucketName, h.objectPrefix, true, doneCh) {
if strings.HasSuffix(object.Key, suffix) {
paths = append(paths, object.Key)
}
}
if len(paths) == 0 {
return nil, errors.Errorf("No manifests found at: %s", uri.String())
}
sort.Strings(paths)
if glog.V(3) {
fmt.Printf("Found backup manifest(s) %s: %v\n", uri.Scheme, paths)
}
// Read and filter the manifests to get the list of manifests to consider
// for this restore operation.
var manifests []*Manifest
for _, path := range paths {
var m Manifest
if err := h.readManifest(mc, path, &m); err != nil {
return nil, errors.Wrapf(err, "while reading %q", path)
}
m.Path = path
manifests = append(manifests, &m)
}
manifests, err = filterManifests(manifests, backupId)
if err != nil {
return nil, err
}
// Sort manifests in the ascending order of their BackupNum so that the first
// manifest corresponds to the first full backup and so on.
sort.Slice(manifests, func(i, j int) bool {
return manifests[i].BackupNum < manifests[j].BackupNum
})
return manifests, nil
}
// Load creates a new session, scans for backup objects in a bucket, then tries to
// load any backup objects found.
// Returns nil and the maximum Since value on success, error otherwise.
func (h *s3Handler) Load(uri *url.URL, backupId string, fn loadFn) LoadResult {
manifests, err := h.GetManifests(uri, backupId)
if err != nil {
return LoadResult{0, 0, errors.Wrapf(err, "while retrieving manifests")}
}
mc, err := h.setup(uri)
if err != nil {
return LoadResult{0, 0, err}
}
// since is returned with the max manifest Since value found.
var since uint64
// Process each manifest, first check that they are valid and then confirm the
// backup manifests for each group exist. Each group in manifest must have a backup file,
// otherwise this is a failure and the user must remedy.
var maxUid uint64
for i, manifest := range manifests {
if manifest.Since == 0 || len(manifest.Groups) == 0 {
if glog.V(2) {
fmt.Printf("Restore: skip backup: %#v\n", manifest)
}
continue
}
path := filepath.Dir(manifests[i].Path)
for gid := range manifest.Groups {
object := filepath.Join(path, backupName(manifest.Since, gid))
reader, err := mc.GetObject(h.bucketName, object, minio.GetObjectOptions{})
if err != nil {
return LoadResult{0, 0, errors.Wrapf(err, "Failed to get %q", object)}
}
defer reader.Close()
st, err := reader.Stat()
if err != nil {
return LoadResult{0, 0, errors.Wrapf(err, "Stat failed %q", object)}
}
if st.Size <= 0 {
return LoadResult{0, 0,
errors.Errorf("Remote object is empty or inaccessible: %s", object)}
}
fmt.Printf("Downloading %q, %d bytes\n", object, st.Size)
// Only restore the predicates that were assigned to this group at the time
// of the last backup.
predSet := manifests[len(manifests)-1].getPredsInGroup(gid)
groupMaxUid, err := fn(reader, int(gid), predSet)
if err != nil {
return LoadResult{0, 0, err}
}
if groupMaxUid > maxUid {
maxUid = groupMaxUid
}
}
since = manifest.Since
}
return LoadResult{since, maxUid, nil}
}
// Verify performs basic checks to decide whether the specified backup can be restored
// to a live cluster.
func (h *s3Handler) Verify(uri *url.URL, backupId string, currentGroups []uint32) error {
manifests, err := h.GetManifests(uri, backupId)
if err != nil {
return errors.Wrapf(err, "while retrieving manifests")
}
if len(manifests) == 0 {
return errors.Errorf("No backups with the specified backup ID %s", backupId)
}
return verifyGroupsInBackup(manifests, currentGroups)
}
// ListManifests loads the manifests in the locations and returns them.
func (h *s3Handler) ListManifests(uri *url.URL) ([]string, error) {
mc, err := h.setup(uri)
if err != nil {
return nil, err
}
h.uri = uri
var manifests []string
doneCh := make(chan struct{})
defer close(doneCh)
suffix := "/" + backupManifest
for object := range mc.ListObjects(h.bucketName, h.objectPrefix, true, doneCh) {
if strings.HasSuffix(object.Key, suffix) {
manifests = append(manifests, object.Key)
}
}
if len(manifests) == 0 {
return nil, errors.Errorf("No manifests found at: %s", uri.String())
}
sort.Strings(manifests)
if glog.V(3) {
fmt.Printf("Found backup manifest(s) %s: %v\n", uri.Scheme, manifests)
}
return manifests, nil
}
func (h *s3Handler) ReadManifest(path string, m *Manifest) error {
mc, err := h.setup(h.uri)
if err != nil {
return err
}
return h.readManifest(mc, path, m)
}
// upload will block until it's done or an error occurs.
func (h *s3Handler) upload(mc *minio.Client, object string) error {
start := time.Now()
// We don't need to have a progress object, because we're using a Pipe. A write to Pipe would
// block until it can be fully read. So, the rate of the writes here would be equal to the rate
// of upload. We're already tracking progress of the writes in stream.Lists, so no need to track
// the progress of read. By definition, it must be the same.
n, err := mc.PutObject(h.bucketName, object, h.preader, -1, minio.PutObjectOptions{})
glog.V(2).Infof("Backup sent %d bytes. Time elapsed: %s",
n, time.Since(start).Round(time.Second))
if err != nil {
// This should cause Write to fail as well.
glog.Errorf("Backup: Closing RW pipe due to error: %v", err)
if err := h.pwriter.Close(); err != nil {
return err
}
if err := h.preader.Close(); err != nil {
return err
}
}
return err
}
func (h *s3Handler) Close() error {
// Done buffering, send EOF.
if err := h.pwriter.CloseWithError(nil); err != nil && err != io.EOF {
glog.Errorf("Unexpected error when closing pipe: %v", err)
}
glog.V(2).Infof("Backup waiting for upload to complete.")
return <-h.cerr
}
func (h *s3Handler) Write(b []byte) (int, error) {
return h.pwriter.Write(b)
}