From 504c2d1d6c74a4079dba5919a76cbf30c7f5bc90 Mon Sep 17 00:00:00 2001 From: Sandy Xu Date: Thu, 29 Sep 2022 19:27:38 +0800 Subject: [PATCH] meta/fdb: fix scan for many keys --- pkg/meta/tkv_fdb.go | 42 ++++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/pkg/meta/tkv_fdb.go b/pkg/meta/tkv_fdb.go index dca04aeda27f..e81eb5809740 100644 --- a/pkg/meta/tkv_fdb.go +++ b/pkg/meta/tkv_fdb.go @@ -68,19 +68,37 @@ func (c *fdbClient) txn(f func(kvTxn) error) error { } func (c *fdbClient) scan(prefix []byte, handler func(key, value []byte)) error { - _, err := c.client.ReadTransact(func(t fdb.ReadTransaction) (interface{}, error) { - snapshot := t.Snapshot() - iter := snapshot.GetRange( - fdb.KeyRange{Begin: fdb.Key(prefix), End: fdb.Key(nextKey(prefix))}, - fdb.RangeOptions{Mode: fdb.StreamingModeWantAll}, - ).Iterator() - for iter.Advance() { - r := iter.MustGet() - handler(r.Key, r.Value) + begin := fdb.Key(prefix) + end := fdb.Key(nextKey(prefix)) + limit := 102400 + var done bool + for { + if _, err := c.client.ReadTransact(func(t fdb.ReadTransaction) (interface{}, error) { + snapshot := t.Snapshot() + iter := snapshot.GetRange( + fdb.KeyRange{Begin: begin, End: end}, + fdb.RangeOptions{Limit: limit, Mode: fdb.StreamingModeWantAll}, + ).Iterator() + var r fdb.KeyValue + var count int + for iter.Advance() { + r = iter.MustGet() + handler(r.Key, r.Value) + count++ + } + if count < limit { + done = true + } else { + begin = append(r.Key, 0) + } + return nil, nil + }); err != nil { + return err } - return nil, nil - }) - return err + if done { + return nil + } + } } func (c *fdbClient) reset(prefix []byte) error {