diff --git a/CHANGELOG.md b/CHANGELOG.md index 6809913ee..adb5a03d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The following emojis are used to highlight certain changes: - upgrade to `go-libp2p` [v0.41.1](https://github.com/libp2p/go-libp2p/releases/tag/v0.41.1) - `bitswap/network`: Add a new `requests_in_flight` metric gauge that measures how many bitswap streams are being written or read at a given time. +- improve speed of data onboarding by batching/bufering provider queue writes [#888](https://github.com/ipfs/boxo/pull/888) ### Removed diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 97def8e84..7144141c3 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -2,10 +2,13 @@ package queue import ( "context" + "encoding/base64" "errors" "fmt" "sync" + "time" + "github.com/gammazero/deque" cid "github.com/ipfs/go-cid" datastore "github.com/ipfs/go-datastore" namespace "github.com/ipfs/go-datastore/namespace" @@ -15,136 +18,251 @@ import ( var log = logging.Logger("provider.queue") -// Queue provides a best-effort durability, FIFO interface to the datastore for storing cids +const ( + // batchSize is the limit on number of CIDs kept in memory at which ther + // are all written to the datastore. + batchSize = 16 * 1024 + // idleWriteTime is the amout of time to check if the queue has been idle + // (no input or output). If the queue has been idle since the last check, + // then write all buffered CIDs to the datastore. + idleWriteTime = time.Minute + // shutdownTimeout is the duration that Close waits to finish writing CIDs + // to the datastore. + shutdownTimeout = 10 * time.Second +) + +// Queue provides a FIFO interface to the datastore for storing cids. +// +// CIDs in the process of being provided when a crash or shutdown occurs may be +// in the queue when the node is brought back online depending on whether they +// were fully written to the underlying datastore. // -// Best-effort durability just means that cids in the process of being provided when a -// crash or shutdown occurs may be in the queue when the node is brought back online -// depending on whether the underlying datastore has synchronous or asynchronous writes. +// Input to the queue is buffered in memory. The contents of the buffer are +// written to the datastore when the input buffer contains batchSize items, or +// when idleWriteTime has elapsed since the previous batch write or dequeue. CIDs to +// dequeue are read, in order, from the input buffer if there are none in the +// datastore. Otherwise they are read from the datastore. +// +// If queued items are read from the input buffer before it reaches its limit, +// then queued items can remain in memory. When the queue is closed, any +// remaining items in memory are written to the datastore. type Queue struct { - // used to differentiate queues in datastore - // e.g. provider vs reprovider - ctx context.Context - ds datastore.Datastore // Must be threadsafe - dequeue chan cid.Cid - enqueue chan cid.Cid - close context.CancelFunc - closed sync.WaitGroup - - counter uint64 + close context.CancelFunc + closed chan error + closeOnce sync.Once + dequeue chan cid.Cid + ds datastore.Batching + enqueue chan cid.Cid } -// NewQueue creates a queue for cids -func NewQueue(ds datastore.Datastore) *Queue { - namespaced := namespace.Wrap(ds, datastore.NewKey("/queue")) - cancelCtx, cancel := context.WithCancel(context.Background()) +// New creates a queue for cids. +func New(ds datastore.Batching) *Queue { + ctx, cancel := context.WithCancel(context.Background()) + q := &Queue{ - ctx: cancelCtx, - ds: namespaced, + close: cancel, + closed: make(chan error, 1), dequeue: make(chan cid.Cid), + ds: namespace.Wrap(ds, datastore.NewKey("/queue")), enqueue: make(chan cid.Cid), - close: cancel, } - q.closed.Add(1) - go q.worker() + + go q.worker(ctx) + return q } -// Close stops the queue +// Close stops the queue. func (q *Queue) Close() error { - q.close() - q.closed.Wait() - // We don't close dequeue because the provider which consume this get caught in - // an infinite loop dequeing cid.Undef if we do that. - // The provider has it's own select on top of dequeue and will handle this by itself. - return nil + var err error + q.closeOnce.Do(func() { + // Close input queue and wait for worker to finish reading it. + close(q.enqueue) + select { + case <-q.closed: + case <-time.After(shutdownTimeout): + q.close() // force immediate shutdown + err = <-q.closed + } + close(q.dequeue) // no more output from this queue + }) + return err } -// Enqueue puts a cid in the queue -func (q *Queue) Enqueue(cid cid.Cid) error { - select { - case q.enqueue <- cid: - return nil - case <-q.ctx.Done(): - return errors.New("failed to enqueue CID: shutting down") +// Enqueue puts a cid in the queue. +func (q *Queue) Enqueue(c cid.Cid) (err error) { + if c == cid.Undef { + return } + defer func() { + if r := recover(); r != nil { + err = errors.New("failed to enqueue CID: shutting down") + } + }() + q.enqueue <- c + return } -// Dequeue returns a channel that if listened to will remove entries from the queue +// Dequeue returns a channel that for reading entries from the queue, func (q *Queue) Dequeue() <-chan cid.Cid { return q.dequeue } +func makeCidString(c cid.Cid) string { + data := c.Bytes() + if len(data) > 4 { + data = data[len(data)-4:] + } + return base64.RawURLEncoding.EncodeToString(data) +} + +func makeKey(c cid.Cid, counter uint64) datastore.Key { + return datastore.NewKey(fmt.Sprintf("%020d/%s", counter, makeCidString(c))) +} + // worker run dequeues and enqueues when available. -func (q *Queue) worker() { - var k datastore.Key = datastore.Key{} - var c cid.Cid = cid.Undef +func (q *Queue) worker(ctx context.Context) { + defer close(q.closed) + + var ( + c cid.Cid + counter uint64 + k datastore.Key = datastore.Key{} + inBuf deque.Deque[cid.Cid] + ) + + const baseCap = 1024 + inBuf.SetBaseCap(baseCap) + + defer func() { + if c != cid.Undef { + if err := q.ds.Put(ctx, k, c.Bytes()); err != nil { + log.Errorw("Failed to write cid to datastore", "err", err) + } + counter++ + } + if inBuf.Len() != 0 { + err := q.commitInput(ctx, counter, &inBuf) + if err != nil && !errors.Is(err, context.Canceled) { + log.Error(err) + if inBuf.Len() != 0 { + q.closed <- fmt.Errorf("provider queue: %d cids not written to datastore", inBuf.Len()) + } + } + } + }() + + var ( + commit bool + dsEmpty bool + err error + idle bool + ) + + readInBuf := q.enqueue - defer q.closed.Done() - defer q.close() + batchTimer := time.NewTimer(idleWriteTime) + defer batchTimer.Stop() for { if c == cid.Undef { - head, err := q.getQueueHead() - - switch { - case err != nil: - log.Errorf("error querying for head of queue: %s, stopping provider", err) - return - case head != nil: - k = datastore.NewKey(head.Key) - c, err = cid.Parse(head.Value) + if !dsEmpty { + head, err := q.getQueueHead(ctx) if err != nil { - log.Warnf("error parsing queue entry cid with key (%s), removing it from queue: %s", head.Key, err) - err = q.ds.Delete(q.ctx, k) - if err != nil { - log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err) + log.Errorw("Error querying for head of queue, stopping provider", "err", err) + return + } + if head != nil { + k = datastore.NewKey(head.Key) + if err = q.ds.Delete(ctx, k); err != nil { + log.Errorw("Error deleting queue entry, stopping provider", "err", err, "key", head.Key) return } - continue + c, err = cid.Parse(head.Value) + if err != nil { + log.Warnw("Error parsing queue entry cid, removing it from queue", "err", err, "key", head.Key) + continue + } + } else { + dsEmpty = true } - default: - c = cid.Undef + } + if dsEmpty && inBuf.Len() != 0 { + // There were no queued CIDs in the datastore, so read one from + // the input buffer. + c = inBuf.PopFront() + k = makeKey(c, counter) } } - // If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue + // If c != cid.Undef set dequeue and attempt write. var dequeue chan cid.Cid if c != cid.Undef { dequeue = q.dequeue } select { - case toQueue := <-q.enqueue: - keyPath := fmt.Sprintf("%020d/%s", q.counter, c.String()) - q.counter++ - nextKey := datastore.NewKey(keyPath) + case toQueue, ok := <-readInBuf: + if !ok { + return + } + idle = false if c == cid.Undef { - // fast path, skip rereading the datastore if we don't have anything in hand yet + // Use this CID as the next output since there was nothing in + // the datastore or buffer previously. c = toQueue - k = nextKey + k = makeKey(c, counter) + continue } - if err := q.ds.Put(q.ctx, nextKey, toQueue.Bytes()); err != nil { - log.Errorf("Failed to enqueue cid: %s", err) - continue + inBuf.PushBack(toQueue) + if inBuf.Len() >= batchSize { + commit = true } case dequeue <- c: - err := q.ds.Delete(q.ctx, k) - if err != nil { - log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) - continue - } c = cid.Undef - case <-q.ctx.Done(): + idle = false + case <-batchTimer.C: + if idle { + if inBuf.Len() != 0 { + commit = true + } else { + if inBuf.Cap() > baseCap { + inBuf = deque.Deque[cid.Cid]{} + inBuf.SetBaseCap(baseCap) + } + } + } + idle = true + batchTimer.Reset(idleWriteTime) + + case <-ctx.Done(): return } + + if commit { + commit = false + n := inBuf.Len() + err = q.commitInput(ctx, counter, &inBuf) + if err != nil { + if !errors.Is(err, context.Canceled) { + log.Errorw("Error writing CIDs to datastore, stopping provider", "err", err) + } + return + } + counter += uint64(n) + dsEmpty = false + } } } -func (q *Queue) getQueueHead() (*query.Entry, error) { - qry := query.Query{Orders: []query.Order{query.OrderByKey{}}, Limit: 1} - results, err := q.ds.Query(q.ctx, qry) +func (q *Queue) getQueueHead(ctx context.Context) (*query.Entry, error) { + qry := query.Query{ + Orders: []query.Order{query.OrderByKey{}}, + Limit: 1, + } + results, err := q.ds.Query(ctx, qry) if err != nil { return nil, err } @@ -156,3 +274,29 @@ func (q *Queue) getQueueHead() (*query.Entry, error) { return &r.Entry, r.Error } + +func (q *Queue) commitInput(ctx context.Context, counter uint64, cids *deque.Deque[cid.Cid]) error { + b, err := q.ds.Batch(ctx) + if err != nil { + return fmt.Errorf("failed to create batch: %w", err) + } + + cstr := makeCidString(cids.Front()) + n := cids.Len() + for i := 0; i < n; i++ { + c := cids.At(i) + key := datastore.NewKey(fmt.Sprintf("%020d/%s", counter, cstr)) + if err = b.Put(ctx, key, c.Bytes()); err != nil { + log.Errorw("Failed to add cid to batch", "err", err) + continue + } + counter++ + } + cids.Clear() + + if err = b.Commit(ctx); err != nil { + return fmt.Errorf("failed to commit batch to datastore: %w", err) + } + + return nil +} diff --git a/provider/internal/queue/queue_test.go b/provider/internal/queue/queue_test.go index a9a49cc66..d0250e995 100644 --- a/provider/internal/queue/queue_test.go +++ b/provider/internal/queue/queue_test.go @@ -5,73 +5,65 @@ import ( "testing" "time" - "github.com/ipfs/boxo/internal/test" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/sync" "github.com/ipfs/go-test/random" ) -const blockSize = 4 - -func makeCids(n int) []cid.Cid { - blks := random.BlocksOfSize(n, blockSize) - cids := make([]cid.Cid, n) - for i := 0; i < n; i++ { - cids[i] = blks[i].Cid() - } - return cids -} - func assertOrdered(cids []cid.Cid, q *Queue, t *testing.T) { - for _, c := range cids { + t.Helper() + + for i, c := range cids { select { - case dequeued := <-q.dequeue: + case dequeued, ok := <-q.Dequeue(): + if !ok { + t.Fatal("queue closed") + } if c != dequeued { - t.Fatalf("Error in ordering of CIDs retrieved from queue. Expected: %s, got: %s", c, dequeued) + t.Fatalf("Error in ordering of CID %d retrieved from queue. Expected: %s, got: %s", i, c, dequeued) } - case <-time.After(time.Second * 1): + case <-time.After(time.Second): t.Fatal("Timeout waiting for cids to be provided.") } } } func TestBasicOperation(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue := NewQueue(ds) + queue := New(ds) defer queue.Close() - cids := makeCids(10) - + cids := random.Cids(10) for _, c := range cids { queue.Enqueue(c) } assertOrdered(cids, queue, t) + + err := queue.Close() + if err != nil { + t.Fatal(err) + } + if err = queue.Close(); err != nil { + t.Fatal(err) + } } func TestMangledData(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue := NewQueue(ds) + queue := New(ds) defer queue.Close() - cids := makeCids(10) + cids := random.Cids(10) for _, c := range cids { queue.Enqueue(c) } // put bad data in the queue queueKey := datastore.NewKey("/test/0") - err := queue.ds.Put(ctx, queueKey, []byte("borked")) + err := queue.ds.Put(context.Background(), queueKey, []byte("borked")) if err != nil { t.Fatal(err) } @@ -82,45 +74,46 @@ func TestMangledData(t *testing.T) { } func TestInitialization(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue := NewQueue(ds) + queue := New(ds) defer queue.Close() - cids := makeCids(10) + cids := random.Cids(10) for _, c := range cids { queue.Enqueue(c) } assertOrdered(cids[:5], queue, t) + err := queue.Close() + if err != nil { + t.Fatal(err) + } + // make a new queue, same data - queue = NewQueue(ds) + queue = New(ds) defer queue.Close() assertOrdered(cids[5:], queue, t) } func TestInitializationWithManyCids(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue := NewQueue(ds) + queue := New(ds) + defer queue.Close() - cids := makeCids(25) + cids := random.Cids(25) for _, c := range cids { queue.Enqueue(c) } - queue.Close() + err := queue.Close() + if err != nil { + t.Fatal(err) + } // make a new queue, same data - queue = NewQueue(ds) + queue = New(ds) defer queue.Close() assertOrdered(cids, queue, t) diff --git a/provider/reprovider.go b/provider/reprovider.go index 94e077da5..002168e94 100644 --- a/provider/reprovider.go +++ b/provider/reprovider.go @@ -138,7 +138,7 @@ func New(ds datastore.Batching, opts ...Option) (System, error) { } s.ds = namespace.Wrap(ds, s.keyPrefix) - s.q = queue.NewQueue(s.ds) + s.q = queue.New(s.ds) // This is after the options processing so we do not have to worry about leaking a context if there is an // initialization error processing the options @@ -423,10 +423,12 @@ func (s *reprovider) run() { // after the first reprovide, schedule periodical reprovides nextReprovideTicker := time.NewTicker(s.reprovideInterval) - for s.ctx.Err() == nil { + for { err := s.Reprovide(context.Background()) - - if s.ctx.Err() == nil && err != nil { + if err != nil { + if s.ctx.Err() != nil { + return + } log.Errorf("failed to reprovide: %s", err) } select {