-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
Copy pathhandler.go
213 lines (181 loc) · 6.84 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
// +build !oss
/*
* Copyright 2018 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Dgraph Community License (the "License"); you
* may not use this file except in compliance with the License. You
* may obtain a copy of the License at
*
* https://github.com/dgraph-io/dgraph/blob/master/licenses/DCL.txt
*/
package backup
import (
"fmt"
"io"
"net/url"
"strings"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/pkg/errors"
)
const (
// backupPathFmt defines the path to store or index backup objects.
// The expected parameter is a date in string format.
backupPathFmt = `dgraph.%s`
// backupNameFmt defines the name of backups files or objects (remote).
// The first parameter is the read timestamp at the time of backup. This is used for
// incremental backups and partial restore.
// The second parameter is the group ID when backup happened. This is used for partitioning
// the posting directories 'p' during restore.
backupNameFmt = `r%d-g%d.backup`
// backupManifest is the name of backup manifests. This a JSON file that contains the
// details of the backup. A backup dir without a manifest is ignored.
//
// Example manifest:
// {
// "since": 2280,
// "groups": [ 1, 2, 3 ],
// }
//
// "since" is the read timestamp used at the backup request. This value is called "since"
// because it used by subsequent incremental backups.
// "groups" are the group IDs that participated.
backupManifest = `manifest.json`
)
// UriHandler interface is implemented by URI scheme handlers.
// When adding new scheme handles, for example 'azure://', an object will implement
// this interface to supply Dgraph with a way to create or load backup files into DB.
// For all methods below, the URL object is parsed as described in `newHandler' and
// the Processor object has the DB, estimated tablets size, and backup parameters.
type UriHandler interface {
// Handlers must know how to Write to their URI location.
// These function calls are used by both Create and Load.
io.WriteCloser
// GetSinceTs reads the manifests at the given URL and returns the appropriate
// timestamp from which the current backup should be started.
GetSinceTs(*url.URL) (uint64, error)
// CreateBackupFile prepares the object or file to save the backup file.
CreateBackupFile(*url.URL, *pb.BackupRequest) error
// CreateManifest prepares the manifest for writing.
CreateManifest(*url.URL, *pb.BackupRequest) error
// Load will scan location URI for backup files, then load them via loadFn.
// It optionally takes the name of the last directory to consider. Any backup directories
// created after will be ignored.
// Objects implementing this function will be used for retrieving (dowload) backup files
// and loading the data into a DB. The restore CLI command uses this call.
Load(*url.URL, string, loadFn) (uint64, error)
// ListManifests will scan the provided URI and return the paths to the manifests stored
// in that location.
ListManifests(*url.URL) ([]string, error)
// ReadManifest will read the manifest at the given location and load it into the given
// Manifest object.
ReadManifest(string, *Manifest) error
}
// getHandler returns a UriHandler for the URI scheme.
func getHandler(scheme string) UriHandler {
switch scheme {
case "file", "":
return &fileHandler{}
case "minio", "s3":
return &s3Handler{}
}
return nil
}
// NewUriHandler parses the requested URI and finds the corresponding UriHandler.
// Target URI formats:
// [scheme]://[host]/[path]?[args]
// [scheme]:///[path]?[args]
// /[path]?[args] (only for local or NFS)
//
// Target URI parts:
// scheme - service handler, one of: "file", "s3", "minio"
// host - remote address. ex: "dgraph.s3.amazonaws.com"
// path - directory, bucket or container at target. ex: "/dgraph/backups/"
// args - specific arguments that are ok to appear in logs.
//
// Global args (if supported by the handler):
// secure - true|false turn on/off TLS.
// trace - true|false turn on/off HTTP tracing.
// compress - true|false turn on/off data compression.
// encrypt - true|false turn on/off data encryption.
//
// Examples:
// s3://dgraph.s3.amazonaws.com/dgraph/backups?secure=true
// minio://localhost:9000/dgraph?secure=true
// file:///tmp/dgraph/backups
// /tmp/dgraph/backups?compress=gzip
func NewUriHandler(uri *url.URL) (UriHandler, error) {
h := getHandler(uri.Scheme)
if h == nil {
return nil, errors.Errorf("Unable to handle url: %s", uri)
}
return h, nil
}
// loadFn is a function that will receive the current file being read.
// A reader and the backup groupId are passed as arguments.
type loadFn func(reader io.Reader, groupId int) error
// Load will scan location l for backup files (not including any directories
// created after lastDir), then load them sequentially through reader.
// Returns the maximum Since value on success, otherwise an error.
func Load(location, lastDir string, fn loadFn) (since uint64, err error) {
uri, err := url.Parse(location)
if err != nil {
return 0, err
}
h := getHandler(uri.Scheme)
if h == nil {
return 0, errors.Errorf("Unsupported URI: %v", uri)
}
return h.Load(uri, lastDir, fn)
}
// ListManifests scans location l for backup files and returns the list of manifests.
func ListManifests(l string) (map[string]*Manifest, error) {
uri, err := url.Parse(l)
if err != nil {
return nil, err
}
h := getHandler(uri.Scheme)
if h == nil {
return nil, errors.Errorf("Unsupported URI: %v", uri)
}
paths, err := h.ListManifests(uri)
if err != nil {
return nil, err
}
listedManifests := make(map[string]*Manifest)
for _, path := range paths {
var m Manifest
if err := h.ReadManifest(path, &m); err != nil {
return nil, errors.Wrapf(err, "While reading %q", path)
}
listedManifests[path] = &m
}
return listedManifests, nil
}
type manifestFile struct {
path string
manifest *Manifest
}
// filterManifests takes a list of manifests, their paths, and returns the list of manifests
// that should be considered during a restore.
func filterManifests(files []*manifestFile, lastDir string) ([]*manifestFile, error) {
// Go through the files in reverse order and stop when the latest full backup is found.
var filteredManifests []*manifestFile
for i := len(files) - 1; i >= 0; i-- {
parts := strings.Split(files[i].path, "/")
dir := parts[len(parts)-2]
if len(lastDir) > 0 && dir > lastDir {
fmt.Printf("Restore: skip directory %s because it's newer than %s.\n", dir, lastDir)
continue
}
filteredManifests = append(filteredManifests, files[i])
if files[i].manifest.Type == "full" {
break
}
}
// Reverse the filtered lists since the original iteration happened in reverse.
for i := len(filteredManifests)/2 - 1; i >= 0; i-- {
opp := len(filteredManifests) - 1 - i
filteredManifests[i], filteredManifests[opp] = filteredManifests[opp], filteredManifests[i]
}
return filteredManifests, nil
}