diff --git a/worker/restore_map.go b/worker/restore_map.go index 93220d7e585..9ce49696cc7 100644 --- a/worker/restore_map.go +++ b/worker/restore_map.go @@ -24,6 +24,7 @@ import ( "net/url" "os" "path/filepath" + "runtime" "strconv" "sync" "sync/atomic" @@ -131,19 +132,18 @@ func (me mapEntry) Data() []byte { type mapper struct { once sync.Once nextId uint32 - thr *y.Throttle bytesProcessed uint64 bytesRead uint64 closer *z.Closer - buf *z.Buffer - bufLock *sync.Mutex restoreTs uint64 - mapDir string - reqCh chan listReq - szHist *z.HistogramData + mapDir string + reqCh chan listReq + writeCh chan *z.Buffer + writers chan struct{} + szHist *z.HistogramData maxUid uint64 maxNs uint64 @@ -230,43 +230,25 @@ func newBuffer() *z.Buffer { return buf.WithMaxSize(2 * mapFileSz) } -func (mw *mapper) sendForWriting() error { - if mw.buf.IsEmpty() { +func (mw *mapper) writeNow(mbuf *z.Buffer) error { + defer func() { + <-mw.writers + }() + + if mbuf.IsEmpty() { + mbuf.Release() return nil } - mw.buf.SortSlice(func(ls, rs []byte) bool { + mbuf.SortSlice(func(ls, rs []byte) bool { lme := mapEntry(ls) rme := mapEntry(rs) return y.CompareKeys(lme.Key(), rme.Key()) < 0 }) - - if err := mw.thr.Do(); err != nil { - return err - } - go func(buf *z.Buffer) { - err := mw.writeToDisk(buf) - mw.thr.Done(err) - }(mw.buf) - mw.buf = newBuffer() - return nil + return mw.writeToDisk(mbuf) } func (mw *mapper) Flush() error { - cl := func() error { - if err := mw.sendForWriting(); err != nil { - return err - } - if err := mw.thr.Finish(); err != nil { - return err - } - return mw.buf.Release() - } - - var rerr error - mw.once.Do(func() { - rerr = cl() - }) - return rerr + return nil } func fromBackupKey(key []byte) ([]byte, uint64, error) { @@ -277,13 +259,49 @@ func fromBackupKey(key []byte) ([]byte, uint64, error) { return x.FromBackupKey(backupKey), backupKey.Namespace, nil } -func (m *mapper) processReqCh(ctx context.Context) error { - buf := z.NewBuffer(20<<20, "processKVList") - defer buf.Release() +func (m *mapper) mergeAndSend(closer *z.Closer) error { + defer closer.Done() + + mbuf := newBuffer() + for buf := range m.writeCh { + atomic.AddUint64(&m.bytesProcessed, uint64(buf.LenNoPadding())) + mbuf.Write(buf.Bytes()) + buf.Release() + + var writeNow bool + if mbuf.LenNoPadding() >= mapFileSz { + writeNow = true + m.writers <- struct{}{} + + } else if mbuf.LenNoPadding() >= mapFileSz/4 { + // This mechanism allows us to stagger our writes. So, if can do a + // write, and we have accumulated a large enough buffer, then go for + // it. + select { + case m.writers <- struct{}{}: + writeNow = true + default: + } + } + + if writeNow { + if err := m.writeNow(mbuf); err != nil { + return errors.Wrapf(err, "sendForWriting") + } + mbuf = newBuffer() + } + } + m.writers <- struct{}{} + return m.writeNow(mbuf) +} - maxNs := uint64(0) - maxUid := uint64(0) +type processor struct { + *mapper + maxUid uint64 + maxNs uint64 +} +func (p *processor) processKV(buf *z.Buffer, in *loadBackupInput, kv *bpb.KV) error { toBuffer := func(kv *bpb.KV, version uint64) error { key := y.KeyWithTs(kv.Key, version) sz := kv.Size() @@ -294,189 +312,173 @@ func (m *mapper) processReqCh(ctx context.Context) error { _, err := kv.MarshalToSizedBuffer(buf[2+len(key):]) return err } + if len(kv.GetUserMeta()) != 1 { + return errors.Errorf( + "Unexpected meta: %v for key: %s", kv.UserMeta, hex.Dump(kv.Key)) + } - processKV := func(in *loadBackupInput, kv *bpb.KV) error { - if len(kv.GetUserMeta()) != 1 { - return errors.Errorf( - "Unexpected meta: %v for key: %s", kv.UserMeta, hex.Dump(kv.Key)) - } + restoreKey, ns, err := fromBackupKey(kv.Key) + if err != nil { + return errors.Wrap(err, "fromBackupKey") + } - restoreKey, ns, err := fromBackupKey(kv.Key) - if err != nil { - return errors.Wrap(err, "fromBackupKey") - } + // Filter keys using the preds set. Do not do this filtering for type keys + // as they are meant to be in every group and their Attr value does not + // match a predicate name. + parsedKey, err := x.Parse(restoreKey) + if err != nil { + return errors.Wrapf(err, "could not parse key %s", hex.Dump(restoreKey)) + } - // Filter keys using the preds set. Do not do this filtering for type keys - // as they are meant to be in every group and their Attr value does not - // match a predicate name. - parsedKey, err := x.Parse(restoreKey) - if err != nil { - return errors.Wrapf(err, "could not parse key %s", hex.Dump(restoreKey)) - } + // Update the local max uid and max namespace values. + p.maxUid = x.Max(p.maxUid, parsedKey.Uid) + p.maxNs = x.Max(p.maxNs, ns) - // Update the local max uid and max namespace values. - maxUid = x.Max(maxUid, parsedKey.Uid) - maxNs = x.Max(maxNs, ns) + if !in.keepSchema && (parsedKey.IsSchema() || parsedKey.IsType()) { + return nil + } + if _, ok := in.preds[parsedKey.Attr]; !parsedKey.IsType() && !ok { + return nil + } - if !in.keepSchema && (parsedKey.IsSchema() || parsedKey.IsType()) { + switch kv.GetUserMeta()[0] { + case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting: + if _, ok := in.dropNs[ns]; ok { return nil } - if _, ok := in.preds[parsedKey.Attr]; !parsedKey.IsType() && !ok { - return nil + backupPl := &pb.BackupPostingList{} + if err := backupPl.Unmarshal(kv.Value); err != nil { + return errors.Wrapf(err, "while reading backup posting list") } - - switch kv.GetUserMeta()[0] { - case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting: - if _, ok := in.dropNs[ns]; ok { - return nil + pl := posting.FromBackupPostingList(backupPl) + + if !posting.ShouldSplit(pl) || parsedKey.HasStartUid || len(pl.GetSplits()) > 0 { + // This covers two cases. + // 1. The list is not big enough to be split. + // 2. This key is storing part of a multi-part list. Write each individual + // part without rolling the key first. This part is here for backwards + // compatibility. New backups are not affected because there was a change + // to roll up lists into a single one. + newKv := posting.MarshalPostingList(pl, nil) + newKv.Key = restoreKey + + // We are using kv.Version (from the key-value) to generate the key. But, using + // restoreTs to set the version of the KV. This way, when we sort the keys, we + // choose the latest key based on kv.Version. But, then set its version to + // restoreTs. + newKv.Version = p.restoreTs + if err := toBuffer(newKv, kv.Version); err != nil { + return err } - backupPl := &pb.BackupPostingList{} - if err := backupPl.Unmarshal(kv.Value); err != nil { - return errors.Wrapf(err, "while reading backup posting list") + } else { + // This is a complete list. It should be rolled up to avoid writing + // a list that is too big to be read back from disk. + // Rollup will take ownership of the Pack and will free the memory. + l := posting.NewList(restoreKey, pl, kv.Version) + kvs, err := l.Rollup(nil) + if err != nil { + // TODO: wrap errors in this file for easier debugging. + return err } - pl := posting.FromBackupPostingList(backupPl) - - if !posting.ShouldSplit(pl) || parsedKey.HasStartUid || len(pl.GetSplits()) > 0 { - // This covers two cases. - // 1. The list is not big enough to be split. - // 2. This key is storing part of a multi-part list. Write each individual - // part without rolling the key first. This part is here for backwards - // compatibility. New backups are not affected because there was a change - // to roll up lists into a single one. - newKv := posting.MarshalPostingList(pl, nil) - newKv.Key = restoreKey - - // We are using kv.Version (from the key-value) to generate the key. But, using - // restoreTs to set the version of the KV. This way, when we sort the keys, we - // choose the latest key based on kv.Version. But, then set its version to - // restoreTs. - newKv.Version = m.restoreTs - if err := toBuffer(newKv, kv.Version); err != nil { - return err - } - } else { - // This is a complete list. It should be rolled up to avoid writing - // a list that is too big to be read back from disk. - // Rollup will take ownership of the Pack and will free the memory. - l := posting.NewList(restoreKey, pl, kv.Version) - kvs, err := l.Rollup(nil) - if err != nil { - // TODO: wrap errors in this file for easier debugging. + for _, kv := range kvs { + version := kv.Version + kv.Version = p.restoreTs + if err := toBuffer(kv, version); err != nil { return err } - for _, kv := range kvs { - version := kv.Version - kv.Version = m.restoreTs - if err := toBuffer(kv, version); err != nil { - return err - } - } } + } - case posting.BitSchemaPosting: - appendNamespace := func() error { - // If the backup was taken on old version, we need to append the namespace to - // the fields of TypeUpdate. - var update pb.TypeUpdate + case posting.BitSchemaPosting: + appendNamespace := func() error { + // If the backup was taken on old version, we need to append the namespace to + // the fields of TypeUpdate. + var update pb.TypeUpdate + if err := update.Unmarshal(kv.Value); err != nil { + return err + } + update.TypeName = x.GalaxyAttr(update.TypeName) + for _, sch := range update.Fields { + sch.Predicate = x.GalaxyAttr(sch.Predicate) + } + kv.Value, err = update.Marshal() + return err + } + changeFormat := func() error { + // In the backup taken on 2103, we have the schemaUpdate.Predicate in format + // |. That had issues with JSON marshalling. + // So, we switched over to the format -. + var err error + if parsedKey.IsSchema() { + var update pb.SchemaUpdate if err := update.Unmarshal(kv.Value); err != nil { return err } - update.TypeName = x.GalaxyAttr(update.TypeName) - for _, sch := range update.Fields { - sch.Predicate = x.GalaxyAttr(sch.Predicate) + if update.Predicate, err = x.AttrFrom2103(update.Predicate); err != nil { + return err } kv.Value, err = update.Marshal() return err } - changeFormat := func() error { - // In the backup taken on 2103, we have the schemaUpdate.Predicate in format - // |. That had issues with JSON marshalling. - // So, we switched over to the format -. - var err error - if parsedKey.IsSchema() { - var update pb.SchemaUpdate - if err := update.Unmarshal(kv.Value); err != nil { - return err - } - if update.Predicate, err = x.AttrFrom2103(update.Predicate); err != nil { - return err - } - kv.Value, err = update.Marshal() + if parsedKey.IsType() { + var update pb.TypeUpdate + if err := update.Unmarshal(kv.Value); err != nil { return err } - if parsedKey.IsType() { - var update pb.TypeUpdate - if err := update.Unmarshal(kv.Value); err != nil { - return err - } - if update.TypeName, err = x.AttrFrom2103(update.TypeName); err != nil { - return err - } - for _, sch := range update.Fields { - if sch.Predicate, err = x.AttrFrom2103(sch.Predicate); err != nil { - return err - } - } - kv.Value, err = update.Marshal() + if update.TypeName, err = x.AttrFrom2103(update.TypeName); err != nil { return err } - return nil - } - // We changed the format of predicate in 2103 and 2105. SchemaUpdate and TypeUpdate have - // predicate stored within them, so they also need to be updated accordingly. - switch in.version { - case 0: - if parsedKey.IsType() { - if err := appendNamespace(); err != nil { - glog.Errorf("Unable to (un)marshal type: %+v. Err=%v\n", parsedKey, err) - return nil + for _, sch := range update.Fields { + if sch.Predicate, err = x.AttrFrom2103(sch.Predicate); err != nil { + return err } } - case 2103: - if err := changeFormat(); err != nil { - glog.Errorf("Unable to change format for: %+v Err=%+v", parsedKey, err) + kv.Value, err = update.Marshal() + return err + } + return nil + } + // We changed the format of predicate in 2103 and 2105. SchemaUpdate and TypeUpdate have + // predicate stored within them, so they also need to be updated accordingly. + switch in.version { + case 0: + if parsedKey.IsType() { + if err := appendNamespace(); err != nil { + glog.Errorf("Unable to (un)marshal type: %+v. Err=%v\n", parsedKey, err) return nil } - default: - // for manifest versions >= 2015, do nothing. } - // Reset the StreamId to prevent ordering issues while writing to stream writer. - kv.StreamId = 0 - // Schema and type keys are not stored in an intermediate format so their - // value can be written as is. - version := kv.Version - kv.Version = m.restoreTs - kv.Key = restoreKey - if err := toBuffer(kv, version); err != nil { - return err + case 2103: + if err := changeFormat(); err != nil { + glog.Errorf("Unable to change format for: %+v Err=%+v", parsedKey, err) + return nil } - default: - return errors.Errorf( - "Unexpected meta %d for key %s", kv.UserMeta[0], hex.Dump(kv.Key)) + // for manifest versions >= 2015, do nothing. } - return nil - } - - mergeBuffer := func() error { - if buf.IsEmpty() { - return nil + // Reset the StreamId to prevent ordering issues while writing to stream writer. + kv.StreamId = 0 + // Schema and type keys are not stored in an intermediate format so their + // value can be written as is. + version := kv.Version + kv.Version = p.restoreTs + kv.Key = restoreKey + if err := toBuffer(kv, version); err != nil { + return err } - atomic.AddUint64(&m.bytesProcessed, uint64(buf.LenNoPadding())) - - m.bufLock.Lock() - defer m.bufLock.Unlock() - - x.Check2(m.buf.Write(buf.Bytes())) - buf.Reset() - if m.buf.LenNoPadding() < mapFileSz { - return nil - } - return m.sendForWriting() + default: + return errors.Errorf( + "Unexpected meta %d for key %s", kv.UserMeta[0], hex.Dump(kv.Key)) } + return nil +} +func (m *mapper) processReqCh(ctx context.Context) error { var list bpb.KVList + p := &processor{mapper: m} + buf := z.NewBuffer(256<<20, "processKVList") + process := func(req listReq) error { defer req.lbuf.Release() @@ -489,13 +491,17 @@ func (m *mapper) processReqCh(ctx context.Context) error { return err } for _, kv := range list.GetKv() { - if err := processKV(req.in, kv); err != nil { + if err := p.processKV(buf, req.in, kv); err != nil { return err } - if buf.LenNoPadding() > 16<<20 { - if err := mergeBuffer(); err != nil { - return err + if buf.LenNoPadding() > 228<<20 { + select { + case m.writeCh <- buf: + // good. + case <-ctx.Done(): + return errors.Wrapf(ctx.Err(), "processReqCh.SliceIterate") } + buf = z.NewBuffer(256<<20, "processKVList") } } return nil @@ -507,22 +513,20 @@ func (m *mapper) processReqCh(ctx context.Context) error { return err } } - if err := mergeBuffer(); err != nil { - return err - } + m.writeCh <- buf // Update the global maxUid and maxNs. We need CAS here because mapping is // being carried out concurrently. for { oldMaxUid := atomic.LoadUint64(&m.maxUid) - newMaxUid := x.Max(oldMaxUid, maxUid) + newMaxUid := x.Max(oldMaxUid, p.maxUid) if swapped := atomic.CompareAndSwapUint64(&m.maxUid, oldMaxUid, newMaxUid); swapped { break } } for { oldMaxNs := atomic.LoadUint64(&m.maxNs) - newMaxNs := x.Max(oldMaxNs, maxNs) + newMaxNs := x.Max(oldMaxNs, p.maxNs) if swapped := atomic.CompareAndSwapUint64(&m.maxNs, oldMaxNs, newMaxNs); swapped { break } @@ -543,9 +547,13 @@ func (m *mapper) Progress() { proc := atomic.LoadUint64(&m.bytesProcessed) since := time.Since(start) rate := uint64(float64(proc) / since.Seconds()) - glog.Infof("Restore MAP %s read: %s. output: %s. rate: %s/sec. jemalloc: %s.\n", - x.FixedDuration(since), humanize.IBytes(read), humanize.IBytes(proc), - humanize.IBytes(rate), humanize.IBytes(uint64(z.NumAllocBytes()))) + glog.Infof("Restore MAP %s len(reqCh): %d len(writeCh): %d read: %s. output: %s."+ + " rate: %s/sec. nextFileId: %d writers: %d jemalloc: %s.\n", + x.FixedDuration(since), len(m.reqCh), + len(m.writeCh), humanize.IBytes(read), humanize.IBytes(proc), + humanize.IBytes(rate), atomic.LoadUint32(&m.nextId), + len(m.writers), + humanize.IBytes(uint64(z.NumAllocBytes()))) } for { select { @@ -589,7 +597,6 @@ func (m *mapper) Map(r io.Reader, in *loadBackupInput) error { if zbuf.LenNoPadding() > bufSoftLimit { atomic.AddUint64(&m.bytesRead, uint64(zbuf.LenNoPadding())) - glog.Infof("Sending req of size: %s\n", humanize.IBytes(uint64(zbuf.LenNoPadding()))) m.reqCh <- listReq{zbuf, in} zbuf = z.NewBuffer(bufSz, "Restore.Map") } @@ -644,24 +651,47 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) (*mapResult, error) { return nil, err } + numGo := int(float64(runtime.NumCPU()) * 0.75) + if numGo < 2 { + numGo = 2 + } + glog.Infof("Setting numGo = %d\n", numGo) mapper := &mapper{ - buf: newBuffer(), - thr: y.NewThrottle(3), - bufLock: &sync.Mutex{}, - closer: z.NewCloser(1), - reqCh: make(chan listReq, 3), + closer: z.NewCloser(1), + reqCh: make(chan listReq, numGo+numGo/4), + writeCh: make(chan *z.Buffer, numGo), + // Only half the writers should be writing at the same time. + writers: make(chan struct{}, numGo/2), restoreTs: req.RestoreTs, mapDir: mapDir, szHist: z.NewHistogramData(z.HistogramBounds(10, 32)), } - numGo := 8 g, ctx := errgroup.WithContext(mapper.closer.Ctx()) for i := 0; i < numGo; i++ { g.Go(func() error { return mapper.processReqCh(ctx) }) } + + wCloser := z.NewCloser(numGo / 2) + defer wCloser.Signal() + go func() { + <-wCloser.HasBeenClosed() + close(mapper.writeCh) + }() + for i := 0; i < numGo/2; i++ { + go func() { + err := mapper.mergeAndSend(wCloser) + if err != nil { + g.Go(func() error { + return errors.Wrapf(err, "mergeAndSend returned error") + }) + } + glog.Infof("mapper.mergeAndSend done with error: %v", err) + }() + } + go mapper.Progress() defer func() { mapper.Flush() @@ -769,6 +799,8 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) (*mapResult, error) { if err := g.Wait(); err != nil { return nil, errors.Wrapf(err, "from processKVList") } + glog.Infof("mapper.processReqCh done") + wCloser.SignalAndWait() if err := mapper.Flush(); err != nil { return nil, errors.Wrap(err, "failed to flush the mapper") }