|
| 1 | +/* |
| 2 | + * Copyright 2021 Dgraph Labs, Inc. and Contributors |
| 3 | + * |
| 4 | + * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | + * you may not use this file except in compliance with the License. |
| 6 | + * You may obtain a copy of the License at |
| 7 | + * |
| 8 | + * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | + * |
| 10 | + * Unless required by applicable law or agreed to in writing, software |
| 11 | + * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | + * See the License for the specific language governing permissions and |
| 14 | + * limitations under the License. |
| 15 | + */ |
| 16 | + |
| 17 | +package backup |
| 18 | + |
| 19 | +import ( |
| 20 | + "encoding/json" |
| 21 | + "fmt" |
| 22 | + "io/ioutil" |
| 23 | + "net/url" |
| 24 | + "os" |
| 25 | + "path/filepath" |
| 26 | + "time" |
| 27 | + |
| 28 | + bpb "github.com/dgraph-io/badger/v3/pb" |
| 29 | + "github.com/golang/glog" |
| 30 | + |
| 31 | + "github.com/dgraph-io/dgraph/ee" |
| 32 | + "github.com/dgraph-io/dgraph/posting" |
| 33 | + "github.com/dgraph-io/dgraph/protos/pb" |
| 34 | + "github.com/dgraph-io/dgraph/worker" |
| 35 | + "github.com/dgraph-io/dgraph/x" |
| 36 | + "github.com/dgraph-io/ristretto/z" |
| 37 | + "github.com/pkg/errors" |
| 38 | + "github.com/spf13/cobra" |
| 39 | +) |
| 40 | + |
| 41 | +// LsBackup is the sub-command used to list the backups in a folder. |
| 42 | +var LsBackup x.SubCommand |
| 43 | + |
| 44 | +var ExportBackup x.SubCommand |
| 45 | + |
| 46 | +var opt struct { |
| 47 | + backupId string |
| 48 | + badger string |
| 49 | + location string |
| 50 | + pdir string |
| 51 | + zero string |
| 52 | + key x.Sensitive |
| 53 | + forceZero bool |
| 54 | + destination string |
| 55 | + format string |
| 56 | + verbose bool |
| 57 | + upgrade bool // used by export backup command. |
| 58 | +} |
| 59 | + |
| 60 | +func init() { |
| 61 | + initBackupLs() |
| 62 | + initExportBackup() |
| 63 | +} |
| 64 | + |
| 65 | +func initBackupLs() { |
| 66 | + LsBackup.Cmd = &cobra.Command{ |
| 67 | + Use: "lsbackup", |
| 68 | + Short: "List info on backups in a given location", |
| 69 | + Args: cobra.NoArgs, |
| 70 | + Run: func(cmd *cobra.Command, args []string) { |
| 71 | + defer x.StartProfile(LsBackup.Conf).Stop() |
| 72 | + if err := runLsbackupCmd(); err != nil { |
| 73 | + fmt.Fprintln(os.Stderr, err) |
| 74 | + os.Exit(1) |
| 75 | + } |
| 76 | + }, |
| 77 | + Annotations: map[string]string{"group": "tool"}, |
| 78 | + } |
| 79 | + LsBackup.Cmd.SetHelpTemplate(x.NonRootTemplate) |
| 80 | + flag := LsBackup.Cmd.Flags() |
| 81 | + flag.StringVarP(&opt.location, "location", "l", "", |
| 82 | + "Sets the source location URI (required).") |
| 83 | + flag.BoolVar(&opt.verbose, "verbose", false, |
| 84 | + "Outputs additional info in backup list.") |
| 85 | + _ = LsBackup.Cmd.MarkFlagRequired("location") |
| 86 | +} |
| 87 | + |
| 88 | +func runLsbackupCmd() error { |
| 89 | + manifests, err := worker.ListBackupManifests(opt.location, nil) |
| 90 | + if err != nil { |
| 91 | + return errors.Wrapf(err, "while listing manifests") |
| 92 | + } |
| 93 | + |
| 94 | + type backupEntry struct { |
| 95 | + Path string `json:"path"` |
| 96 | + Since uint64 `json:"since"` |
| 97 | + ReadTs uint64 `json:"read_ts"` |
| 98 | + BackupId string `json:"backup_id"` |
| 99 | + BackupNum uint64 `json:"backup_num"` |
| 100 | + Encrypted bool `json:"encrypted"` |
| 101 | + Type string `json:"type"` |
| 102 | + Groups map[uint32][]string `json:"groups,omitempty"` |
| 103 | + DropOperations []*pb.DropOperation `json:"drop_operations,omitempty"` |
| 104 | + } |
| 105 | + |
| 106 | + type backupOutput []backupEntry |
| 107 | + |
| 108 | + var output backupOutput |
| 109 | + for _, manifest := range manifests { |
| 110 | + |
| 111 | + be := backupEntry{ |
| 112 | + Path: manifest.Path, |
| 113 | + Since: manifest.SinceTsDeprecated, |
| 114 | + ReadTs: manifest.ReadTs, |
| 115 | + BackupId: manifest.BackupId, |
| 116 | + BackupNum: manifest.BackupNum, |
| 117 | + Encrypted: manifest.Encrypted, |
| 118 | + Type: manifest.Type, |
| 119 | + } |
| 120 | + if opt.verbose { |
| 121 | + be.Groups = manifest.Groups |
| 122 | + be.DropOperations = manifest.DropOperations |
| 123 | + } |
| 124 | + output = append(output, be) |
| 125 | + } |
| 126 | + b, err := json.MarshalIndent(output, "", "\t") |
| 127 | + if err != nil { |
| 128 | + fmt.Println("error:", err) |
| 129 | + } |
| 130 | + os.Stdout.Write(b) |
| 131 | + fmt.Println() |
| 132 | + return nil |
| 133 | +} |
| 134 | + |
| 135 | +func initExportBackup() { |
| 136 | + ExportBackup.Cmd = &cobra.Command{ |
| 137 | + Use: "export_backup", |
| 138 | + Short: "Export data inside single full or incremental backup", |
| 139 | + Long: ``, |
| 140 | + Args: cobra.NoArgs, |
| 141 | + Run: func(cmd *cobra.Command, args []string) { |
| 142 | + defer x.StartProfile(ExportBackup.Conf).Stop() |
| 143 | + if err := runExportBackup(); err != nil { |
| 144 | + fmt.Fprintln(os.Stderr, err) |
| 145 | + os.Exit(1) |
| 146 | + } |
| 147 | + }, |
| 148 | + Annotations: map[string]string{"group": "tool"}, |
| 149 | + } |
| 150 | + |
| 151 | + ExportBackup.Cmd.SetHelpTemplate(x.NonRootTemplate) |
| 152 | + flag := ExportBackup.Cmd.Flags() |
| 153 | + flag.StringVarP(&opt.location, "location", "l", "", |
| 154 | + `Sets the location of the backup. Both file URIs and s3 are supported. |
| 155 | + This command will take care of all the full + incremental backups present in the location.`) |
| 156 | + flag.StringVarP(&opt.destination, "destination", "d", "", |
| 157 | + "The folder to which export the backups.") |
| 158 | + flag.StringVarP(&opt.format, "format", "f", "rdf", |
| 159 | + "The format of the export output. Accepts a value of either rdf or json") |
| 160 | + flag.BoolVar(&opt.upgrade, "upgrade", false, |
| 161 | + `If true, retrieve the CORS from DB and append at the end of GraphQL schema. |
| 162 | + It also deletes the deprecated types and predicates. |
| 163 | + Use this option when exporting a backup of 20.11 for loading onto 21.03.`) |
| 164 | + ee.RegisterEncFlag(flag) |
| 165 | +} |
| 166 | + |
| 167 | +type bufWriter struct { |
| 168 | + writers *worker.Writers |
| 169 | + req *pb.ExportRequest |
| 170 | +} |
| 171 | + |
| 172 | +func exportSchema(writers *worker.Writers, val []byte, pk x.ParsedKey) error { |
| 173 | + kv := &bpb.KV{} |
| 174 | + var err error |
| 175 | + if pk.IsSchema() { |
| 176 | + kv, err = worker.SchemaExportKv(pk.Attr, val, true) |
| 177 | + if err != nil { |
| 178 | + return err |
| 179 | + } |
| 180 | + } else { |
| 181 | + kv, err = worker.TypeExportKv(pk.Attr, val) |
| 182 | + if err != nil { |
| 183 | + return err |
| 184 | + } |
| 185 | + } |
| 186 | + return worker.WriteExport(writers, kv, "rdf") |
| 187 | +} |
| 188 | + |
| 189 | +func (bw *bufWriter) Write(buf *z.Buffer) error { |
| 190 | + kv := &bpb.KV{} |
| 191 | + err := buf.SliceIterate(func(s []byte) error { |
| 192 | + kv.Reset() |
| 193 | + if err := kv.Unmarshal(s); err != nil { |
| 194 | + return errors.Wrap(err, "processKvBuf failed to unmarshal kv") |
| 195 | + } |
| 196 | + pk, err := x.Parse(kv.Key) |
| 197 | + if err != nil { |
| 198 | + return errors.Wrap(err, "processKvBuf failed to parse key") |
| 199 | + } |
| 200 | + if pk.Attr == "_predicate_" { |
| 201 | + return nil |
| 202 | + } |
| 203 | + if pk.IsSchema() || pk.IsType() { |
| 204 | + return exportSchema(bw.writers, kv.Value, pk) |
| 205 | + } |
| 206 | + if pk.IsData() { |
| 207 | + pl := &pb.PostingList{} |
| 208 | + if err := pl.Unmarshal(kv.Value); err != nil { |
| 209 | + return errors.Wrap(err, "ProcessKvBuf failed to Unmarshal pl") |
| 210 | + } |
| 211 | + l := posting.NewList(kv.Key, pl, kv.Version) |
| 212 | + kvList, err := worker.ToExportKvList(pk, l, bw.req) |
| 213 | + if err != nil { |
| 214 | + return errors.Wrap(err, "processKvBuf failed to Export") |
| 215 | + } |
| 216 | + if len(kvList.Kv) == 0 { |
| 217 | + return nil |
| 218 | + } |
| 219 | + exportKv := kvList.Kv[0] |
| 220 | + return worker.WriteExport(bw.writers, exportKv, bw.req.Format) |
| 221 | + } |
| 222 | + return nil |
| 223 | + }) |
| 224 | + return errors.Wrap(err, "bufWriter failed to write") |
| 225 | +} |
| 226 | + |
| 227 | +func runExportBackup() error { |
| 228 | + keys, err := ee.GetKeys(ExportBackup.Conf) |
| 229 | + if err != nil { |
| 230 | + return err |
| 231 | + } |
| 232 | + opt.key = keys.EncKey |
| 233 | + if opt.format != "json" && opt.format != "rdf" { |
| 234 | + return errors.Errorf("invalid format %s", opt.format) |
| 235 | + } |
| 236 | + // Create exportDir and temporary folder to store the restored backup. |
| 237 | + exportDir, err := filepath.Abs(opt.destination) |
| 238 | + if err != nil { |
| 239 | + return errors.Wrapf(err, "cannot convert path %s to absolute path", exportDir) |
| 240 | + } |
| 241 | + if err := os.MkdirAll(exportDir, 0755); err != nil { |
| 242 | + return errors.Wrapf(err, "cannot create dir %s", exportDir) |
| 243 | + } |
| 244 | + |
| 245 | + uri, err := url.Parse(opt.location) |
| 246 | + if err != nil { |
| 247 | + return errors.Wrapf(err, "runExportBackup") |
| 248 | + } |
| 249 | + handler, err := x.NewUriHandler(uri, nil) |
| 250 | + if err != nil { |
| 251 | + return errors.Wrapf(err, "runExportBackup") |
| 252 | + } |
| 253 | + latestManifest, err := worker.GetLatestManifest(handler, uri) |
| 254 | + if err != nil { |
| 255 | + return errors.Wrapf(err, "runExportBackup") |
| 256 | + } |
| 257 | + |
| 258 | + mapDir, err := ioutil.TempDir(x.WorkerConfig.TmpDir, "restore-export") |
| 259 | + x.Check(err) |
| 260 | + defer os.RemoveAll(mapDir) |
| 261 | + glog.Infof("Created temporary map directory: %s\n", mapDir) |
| 262 | + |
| 263 | + encFlag := z.NewSuperFlag(ExportBackup.Conf.GetString("encryption")). |
| 264 | + MergeAndCheckDefault(ee.EncDefaults) |
| 265 | + // TODO: Can probably make this procesing concurrent. |
| 266 | + for gid := range latestManifest.Groups { |
| 267 | + glog.Infof("Exporting group: %d", gid) |
| 268 | + req := &pb.RestoreRequest{ |
| 269 | + GroupId: gid, |
| 270 | + Location: opt.location, |
| 271 | + EncryptionKeyFile: encFlag.GetPath("key-file"), |
| 272 | + RestoreTs: 1, |
| 273 | + } |
| 274 | + if _, err := worker.RunMapper(req, mapDir); err != nil { |
| 275 | + return errors.Wrap(err, "Failed to map the backups") |
| 276 | + } |
| 277 | + |
| 278 | + in := &pb.ExportRequest{ |
| 279 | + GroupId: uint32(gid), |
| 280 | + ReadTs: latestManifest.ValidReadTs(), |
| 281 | + UnixTs: time.Now().Unix(), |
| 282 | + Format: opt.format, |
| 283 | + Destination: exportDir, |
| 284 | + } |
| 285 | + writers, err := worker.NewWriters(in) |
| 286 | + defer writers.Close() |
| 287 | + if err != nil { |
| 288 | + return err |
| 289 | + } |
| 290 | + |
| 291 | + w := &bufWriter{req: in, writers: writers} |
| 292 | + if err := worker.RunReducer(w, mapDir); err != nil { |
| 293 | + return errors.Wrap(err, "Failed to reduce the map") |
| 294 | + } |
| 295 | + if err := writers.Close(); err != nil { |
| 296 | + return errors.Wrap(err, "Failed to finish write") |
| 297 | + } |
| 298 | + } |
| 299 | + return nil |
| 300 | +} |
0 commit comments