From 1109c20f9241d059ae9260204d59149f731763a2 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 19 Mar 2025 01:47:07 -1000 Subject: [PATCH 01/23] Improve the perceived performance of data onboarding Adding a large directory of data to IPFS can take a long time. A significant amount of this time is spent writing to and reading from the persisted provider queue, depending on the underlying datastore. For slower datastores, buffering input to the persisted queue, in memory, allows data to be read in much more quickly. Overall performance is somewhat increased since writes are not blocked (up to a point) while waiting for the queue to persist data to the datastore. --- provider/internal/queue/queue.go | 141 +++++++++++++++----------- provider/internal/queue/queue_test.go | 71 ++++++------- 2 files changed, 116 insertions(+), 96 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 97def8e84..2dbd4b6d0 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -2,10 +2,13 @@ package queue import ( "context" + "encoding/base64" "errors" "fmt" "sync" + "time" + "github.com/gammazero/chanqueue" cid "github.com/ipfs/go-cid" datastore "github.com/ipfs/go-datastore" namespace "github.com/ipfs/go-datastore/namespace" @@ -15,96 +18,111 @@ import ( var log = logging.Logger("provider.queue") -// Queue provides a best-effort durability, FIFO interface to the datastore for storing cids +const ( + // Number of input CIDs to buffer without blocking. + inputBufferSize = 65536 + // Time for Close to wait to finish writing CIDs to datastore. + shutdownTimeout = 5 * time.Second +) + +// Queue provides a FIFO interface to the datastore for storing cids. // -// Best-effort durability just means that cids in the process of being provided when a -// crash or shutdown occurs may be in the queue when the node is brought back online -// depending on whether the underlying datastore has synchronous or asynchronous writes. +// Cids in the process of being provided when a crash or shutdown occurs may be +// in the queue when the node is brought back online depending on whether they +// were fully written to the underlying datastore. +// +// Input to the queue is buffered in memory, up to inputBufferSize, to increase +// the speed of data onboarding. This input buffer behaves as a channel with a +// very large capacity. type Queue struct { // used to differentiate queues in datastore // e.g. provider vs reprovider - ctx context.Context - ds datastore.Datastore // Must be threadsafe - dequeue chan cid.Cid - enqueue chan cid.Cid - close context.CancelFunc - closed sync.WaitGroup - - counter uint64 + ctx context.Context + ds datastore.Datastore // Must be threadsafe + dequeue chan cid.Cid + enqueue *chanqueue.ChanQueue[cid.Cid] // in-memory queue to buffer input + close context.CancelFunc + closed chan struct{} + closeOnce sync.Once } // NewQueue creates a queue for cids func NewQueue(ds datastore.Datastore) *Queue { - namespaced := namespace.Wrap(ds, datastore.NewKey("/queue")) - cancelCtx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.Background()) q := &Queue{ - ctx: cancelCtx, - ds: namespaced, + ds: namespace.Wrap(ds, datastore.NewKey("/queue")), dequeue: make(chan cid.Cid), - enqueue: make(chan cid.Cid), + //enqueue: make(chan cid.Cid, 1024), + enqueue: chanqueue.New(chanqueue.WithCapacity[cid.Cid](inputBufferSize)), close: cancel, + closed: make(chan struct{}), } - q.closed.Add(1) - go q.worker() + go q.worker(ctx) return q } // Close stops the queue func (q *Queue) Close() error { - q.close() - q.closed.Wait() - // We don't close dequeue because the provider which consume this get caught in - // an infinite loop dequeing cid.Undef if we do that. - // The provider has it's own select on top of dequeue and will handle this by itself. - return nil + var err error + q.closeOnce.Do(func() { + // Close input queue and wait for worker to finish reading it. + q.enqueue.Close() + select { + case <-q.closed: + case <-time.After(shutdownTimeout): + q.close() // force immediate shutdown + <-q.closed + err = errors.New("provider queue: some cids not written to datastore") + } + close(q.dequeue) // no more output from this queue + }) + return err } // Enqueue puts a cid in the queue -func (q *Queue) Enqueue(cid cid.Cid) error { - select { - case q.enqueue <- cid: - return nil - case <-q.ctx.Done(): - return errors.New("failed to enqueue CID: shutting down") - } +func (q *Queue) Enqueue(cid cid.Cid) (err error) { + defer func() { + if r := recover(); r != nil { + err = errors.New("failed to enqueue CID: shutting down") + } + }() + q.enqueue.In() <- cid + return } -// Dequeue returns a channel that if listened to will remove entries from the queue +// Dequeue returns a channel that for reading entries from the queue, func (q *Queue) Dequeue() <-chan cid.Cid { return q.dequeue } // worker run dequeues and enqueues when available. -func (q *Queue) worker() { +func (q *Queue) worker(ctx context.Context) { + defer close(q.closed) + defer q.enqueue.Shutdown() + var k datastore.Key = datastore.Key{} var c cid.Cid = cid.Undef - - defer q.closed.Done() - defer q.close() + var counter uint64 for { if c == cid.Undef { - head, err := q.getQueueHead() - - switch { - case err != nil: + head, err := q.getQueueHead(ctx) + if err != nil { log.Errorf("error querying for head of queue: %s, stopping provider", err) return - case head != nil: + } + if head != nil { k = datastore.NewKey(head.Key) c, err = cid.Parse(head.Value) if err != nil { log.Warnf("error parsing queue entry cid with key (%s), removing it from queue: %s", head.Key, err) - err = q.ds.Delete(q.ctx, k) - if err != nil { + if err = q.ds.Delete(ctx, k); err != nil { log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err) return } continue } - default: - c = cid.Undef - } + } // else queue is empty } // If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue @@ -114,10 +132,16 @@ func (q *Queue) worker() { } select { - case toQueue := <-q.enqueue: - keyPath := fmt.Sprintf("%020d/%s", q.counter, c.String()) - q.counter++ - nextKey := datastore.NewKey(keyPath) + case toQueue, ok := <-q.enqueue.Out(): + if !ok { + return + } + // Add unique suffix to key path to allow multiple entries with the + // same sequence. + toQueueBytes := toQueue.Bytes() + sfx := base64.RawURLEncoding.EncodeToString(toQueueBytes[len(toQueueBytes)-6:]) + nextKey := datastore.NewKey(fmt.Sprintf("%020d/%s", counter, sfx)) + counter++ if c == cid.Undef { // fast path, skip rereading the datastore if we don't have anything in hand yet @@ -125,26 +149,29 @@ func (q *Queue) worker() { k = nextKey } - if err := q.ds.Put(q.ctx, nextKey, toQueue.Bytes()); err != nil { + if err := q.ds.Put(ctx, nextKey, toQueueBytes); err != nil { log.Errorf("Failed to enqueue cid: %s", err) continue } case dequeue <- c: - err := q.ds.Delete(q.ctx, k) + err := q.ds.Delete(ctx, k) if err != nil { log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) continue } c = cid.Undef - case <-q.ctx.Done(): + case <-ctx.Done(): return } } } -func (q *Queue) getQueueHead() (*query.Entry, error) { - qry := query.Query{Orders: []query.Order{query.OrderByKey{}}, Limit: 1} - results, err := q.ds.Query(q.ctx, qry) +func (q *Queue) getQueueHead(ctx context.Context) (*query.Entry, error) { + qry := query.Query{ + Orders: []query.Order{query.OrderByKey{}}, + Limit: 1, + } + results, err := q.ds.Query(ctx, qry) if err != nil { return nil, err } diff --git a/provider/internal/queue/queue_test.go b/provider/internal/queue/queue_test.go index a9a49cc66..122ed057f 100644 --- a/provider/internal/queue/queue_test.go +++ b/provider/internal/queue/queue_test.go @@ -5,73 +5,65 @@ import ( "testing" "time" - "github.com/ipfs/boxo/internal/test" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/sync" "github.com/ipfs/go-test/random" ) -const blockSize = 4 - -func makeCids(n int) []cid.Cid { - blks := random.BlocksOfSize(n, blockSize) - cids := make([]cid.Cid, n) - for i := 0; i < n; i++ { - cids[i] = blks[i].Cid() - } - return cids -} - func assertOrdered(cids []cid.Cid, q *Queue, t *testing.T) { - for _, c := range cids { + t.Helper() + for i, c := range cids { select { - case dequeued := <-q.dequeue: + case dequeued, ok := <-q.dequeue: + if !ok { + t.Fatal("queue closed") + } if c != dequeued { - t.Fatalf("Error in ordering of CIDs retrieved from queue. Expected: %s, got: %s", c, dequeued) + t.Fatalf("Error in ordering of CID %d retrieved from queue. Expected: %s, got: %s", i, c, dequeued) } - case <-time.After(time.Second * 1): + case <-time.After(time.Second): t.Fatal("Timeout waiting for cids to be provided.") } } } func TestBasicOperation(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - ds := sync.MutexWrap(datastore.NewMapDatastore()) queue := NewQueue(ds) defer queue.Close() - cids := makeCids(10) - + cids := random.Cids(10) for _, c := range cids { + //t.Log(c.String()) queue.Enqueue(c) } assertOrdered(cids, queue, t) + + err := queue.Close() + if err != nil { + t.Fatal(err) + } + if err = queue.Close(); err != nil { + t.Fatal(err) + } } func TestMangledData(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - ds := sync.MutexWrap(datastore.NewMapDatastore()) queue := NewQueue(ds) defer queue.Close() - cids := makeCids(10) + cids := random.Cids(10) for _, c := range cids { queue.Enqueue(c) } // put bad data in the queue queueKey := datastore.NewKey("/test/0") - err := queue.ds.Put(ctx, queueKey, []byte("borked")) + err := queue.ds.Put(context.Background(), queueKey, []byte("borked")) if err != nil { t.Fatal(err) } @@ -82,21 +74,22 @@ func TestMangledData(t *testing.T) { } func TestInitialization(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - ds := sync.MutexWrap(datastore.NewMapDatastore()) queue := NewQueue(ds) defer queue.Close() - cids := makeCids(10) + cids := random.Cids(10) for _, c := range cids { queue.Enqueue(c) } assertOrdered(cids[:5], queue, t) + err := queue.Close() + if err != nil { + t.Fatal(err) + } + // make a new queue, same data queue = NewQueue(ds) defer queue.Close() @@ -105,19 +98,19 @@ func TestInitialization(t *testing.T) { } func TestInitializationWithManyCids(t *testing.T) { - test.Flaky(t) - ctx := context.Background() - defer ctx.Done() - ds := sync.MutexWrap(datastore.NewMapDatastore()) queue := NewQueue(ds) + defer queue.Close() - cids := makeCids(25) + cids := random.Cids(25) for _, c := range cids { queue.Enqueue(c) } - queue.Close() + err := queue.Close() + if err != nil { + t.Fatal(err) + } // make a new queue, same data queue = NewQueue(ds) From 295380de25ecb2a7420a7be0ab811efd5fb93ee5 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 19 Mar 2025 02:38:38 -1000 Subject: [PATCH 02/23] no ctx in struct --- provider/internal/queue/queue.go | 1 - 1 file changed, 1 deletion(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 2dbd4b6d0..08f92806f 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -37,7 +37,6 @@ const ( type Queue struct { // used to differentiate queues in datastore // e.g. provider vs reprovider - ctx context.Context ds datastore.Datastore // Must be threadsafe dequeue chan cid.Cid enqueue *chanqueue.ChanQueue[cid.Cid] // in-memory queue to buffer input From ccd403d6299eaa605eb701e3cf3eb1b625688027 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 19 Mar 2025 07:55:30 -1000 Subject: [PATCH 03/23] rm coment --- provider/internal/queue/queue.go | 1 - 1 file changed, 1 deletion(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 08f92806f..671cb7acd 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -51,7 +51,6 @@ func NewQueue(ds datastore.Datastore) *Queue { q := &Queue{ ds: namespace.Wrap(ds, datastore.NewKey("/queue")), dequeue: make(chan cid.Cid), - //enqueue: make(chan cid.Cid, 1024), enqueue: chanqueue.New(chanqueue.WithCapacity[cid.Cid](inputBufferSize)), close: cancel, closed: make(chan struct{}), From 0d8abf46af09b87dd290d85628adbe33a2f2ee7a Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 19 Mar 2025 09:11:09 -1000 Subject: [PATCH 04/23] Make queue input buffering optional --- provider/internal/queue/queue.go | 47 ++++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 671cb7acd..9cff5f9c4 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -2,7 +2,6 @@ package queue import ( "context" - "encoding/base64" "errors" "fmt" "sync" @@ -19,7 +18,7 @@ import ( var log = logging.Logger("provider.queue") const ( - // Number of input CIDs to buffer without blocking. + // Number of input CIDs to buffer without blocking. If <= 1, then use channel. inputBufferSize = 65536 // Time for Close to wait to finish writing CIDs to datastore. shutdownTimeout = 5 * time.Second @@ -39,7 +38,8 @@ type Queue struct { // e.g. provider vs reprovider ds datastore.Datastore // Must be threadsafe dequeue chan cid.Cid - enqueue *chanqueue.ChanQueue[cid.Cid] // in-memory queue to buffer input + enqueue chan cid.Cid + inBuf *chanqueue.ChanQueue[cid.Cid] // in-memory queue to buffer input close context.CancelFunc closed chan struct{} closeOnce sync.Once @@ -51,10 +51,19 @@ func NewQueue(ds datastore.Datastore) *Queue { q := &Queue{ ds: namespace.Wrap(ds, datastore.NewKey("/queue")), dequeue: make(chan cid.Cid), - enqueue: chanqueue.New(chanqueue.WithCapacity[cid.Cid](inputBufferSize)), close: cancel, closed: make(chan struct{}), } + if inputBufferSize > 1 { + q.enqueue = make(chan cid.Cid) + q.inBuf = chanqueue.New( + chanqueue.WithInput(q.enqueue), + chanqueue.WithCapacity[cid.Cid](inputBufferSize), + ) + } else { + q.enqueue = make(chan cid.Cid, inputBufferSize) + } + go q.worker(ctx) return q } @@ -64,7 +73,7 @@ func (q *Queue) Close() error { var err error q.closeOnce.Do(func() { // Close input queue and wait for worker to finish reading it. - q.enqueue.Close() + close(q.enqueue) select { case <-q.closed: case <-time.After(shutdownTimeout): @@ -84,7 +93,7 @@ func (q *Queue) Enqueue(cid cid.Cid) (err error) { err = errors.New("failed to enqueue CID: shutting down") } }() - q.enqueue.In() <- cid + q.enqueue <- cid return } @@ -96,12 +105,19 @@ func (q *Queue) Dequeue() <-chan cid.Cid { // worker run dequeues and enqueues when available. func (q *Queue) worker(ctx context.Context) { defer close(q.closed) - defer q.enqueue.Shutdown() var k datastore.Key = datastore.Key{} var c cid.Cid = cid.Undef + var cstr string var counter uint64 + var readInBuf <-chan cid.Cid + if q.inBuf != nil { + readInBuf = q.inBuf.Out() + } else { + readInBuf = q.enqueue + } + for { if c == cid.Undef { head, err := q.getQueueHead(ctx) @@ -121,6 +137,7 @@ func (q *Queue) worker(ctx context.Context) { continue } } // else queue is empty + cstr = c.String() } // If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue @@ -130,24 +147,22 @@ func (q *Queue) worker(ctx context.Context) { } select { - case toQueue, ok := <-q.enqueue.Out(): + case toQueue, ok := <-readInBuf: if !ok { return } - // Add unique suffix to key path to allow multiple entries with the - // same sequence. - toQueueBytes := toQueue.Bytes() - sfx := base64.RawURLEncoding.EncodeToString(toQueueBytes[len(toQueueBytes)-6:]) - nextKey := datastore.NewKey(fmt.Sprintf("%020d/%s", counter, sfx)) + // Add suffix to key path to allow multiple entries with same sequence. + nextKey := datastore.NewKey(fmt.Sprintf("%020d/%s", counter, cstr)) counter++ if c == cid.Undef { // fast path, skip rereading the datastore if we don't have anything in hand yet c = toQueue k = nextKey + cstr = c.String() } - if err := q.ds.Put(ctx, nextKey, toQueueBytes); err != nil { + if err := q.ds.Put(ctx, nextKey, toQueue.Bytes()); err != nil { log.Errorf("Failed to enqueue cid: %s", err) continue } @@ -159,6 +174,10 @@ func (q *Queue) worker(ctx context.Context) { } c = cid.Undef case <-ctx.Done(): + if q.inBuf != nil { + for range readInBuf { + } + } return } } From bb4a89fcf43ac8e05cb78090a4e58a25fdf49ef0 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Thu, 20 Mar 2025 16:14:00 -1000 Subject: [PATCH 05/23] review changes --- provider/internal/queue/queue.go | 2 +- provider/internal/queue/queue_test.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 9cff5f9c4..9fadd6b12 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -79,7 +79,7 @@ func (q *Queue) Close() error { case <-time.After(shutdownTimeout): q.close() // force immediate shutdown <-q.closed - err = errors.New("provider queue: some cids not written to datastore") + err = fmt.Errorf("provider queue: %d cids not written to datastore", q.inBuf.Len()) } close(q.dequeue) // no more output from this queue }) diff --git a/provider/internal/queue/queue_test.go b/provider/internal/queue/queue_test.go index 122ed057f..7b69d0070 100644 --- a/provider/internal/queue/queue_test.go +++ b/provider/internal/queue/queue_test.go @@ -36,7 +36,6 @@ func TestBasicOperation(t *testing.T) { cids := random.Cids(10) for _, c := range cids { - //t.Log(c.String()) queue.Enqueue(c) } From a275ba50317be93996a751df6ee99289816628fe Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 21 Mar 2025 13:32:10 -1000 Subject: [PATCH 06/23] Queue can operate without datastore (mem-only mode) --- provider/internal/queue/queue.go | 56 +++++++++++++++------------ provider/internal/queue/queue_test.go | 20 ++++++++++ 2 files changed, 52 insertions(+), 24 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 9fadd6b12..1e44c2dbe 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -18,7 +18,7 @@ import ( var log = logging.Logger("provider.queue") const ( - // Number of input CIDs to buffer without blocking. If <= 1, then use channel. + // Number of input CIDs to buffer without blocking. inputBufferSize = 65536 // Time for Close to wait to finish writing CIDs to datastore. shutdownTimeout = 5 * time.Second @@ -47,24 +47,32 @@ type Queue struct { // NewQueue creates a queue for cids func NewQueue(ds datastore.Datastore) *Queue { - ctx, cancel := context.WithCancel(context.Background()) + dequeue := make(chan cid.Cid) + enqueue := make(chan cid.Cid) + q := &Queue{ - ds: namespace.Wrap(ds, datastore.NewKey("/queue")), - dequeue: make(chan cid.Cid), - close: cancel, - closed: make(chan struct{}), + dequeue: dequeue, + enqueue: enqueue, } - if inputBufferSize > 1 { - q.enqueue = make(chan cid.Cid) + + if ds == nil { q.inBuf = chanqueue.New( - chanqueue.WithInput(q.enqueue), + chanqueue.WithInput(enqueue), + chanqueue.WithOutput(dequeue), chanqueue.WithCapacity[cid.Cid](inputBufferSize), ) } else { - q.enqueue = make(chan cid.Cid, inputBufferSize) + ctx, cancel := context.WithCancel(context.Background()) + q.close = cancel + q.inBuf = chanqueue.New( + chanqueue.WithInput(enqueue), + chanqueue.WithCapacity[cid.Cid](inputBufferSize), + ) + q.ds = namespace.Wrap(ds, datastore.NewKey("/queue")) + q.closed = make(chan struct{}) + go q.worker(ctx) } - go q.worker(ctx) return q } @@ -73,15 +81,19 @@ func (q *Queue) Close() error { var err error q.closeOnce.Do(func() { // Close input queue and wait for worker to finish reading it. - close(q.enqueue) - select { - case <-q.closed: - case <-time.After(shutdownTimeout): - q.close() // force immediate shutdown - <-q.closed - err = fmt.Errorf("provider queue: %d cids not written to datastore", q.inBuf.Len()) + if q.ds == nil { + q.inBuf.Shutdown() + } else { + q.inBuf.Close() + select { + case <-q.closed: + case <-time.After(shutdownTimeout): + q.close() // force immediate shutdown + <-q.closed + err = fmt.Errorf("provider queue: %d cids not written to datastore", q.inBuf.Len()) + } + close(q.dequeue) // no more output from this queue } - close(q.dequeue) // no more output from this queue }) return err } @@ -112,11 +124,7 @@ func (q *Queue) worker(ctx context.Context) { var counter uint64 var readInBuf <-chan cid.Cid - if q.inBuf != nil { - readInBuf = q.inBuf.Out() - } else { - readInBuf = q.enqueue - } + readInBuf = q.inBuf.Out() for { if c == cid.Undef { diff --git a/provider/internal/queue/queue_test.go b/provider/internal/queue/queue_test.go index 7b69d0070..62f6d1cf8 100644 --- a/provider/internal/queue/queue_test.go +++ b/provider/internal/queue/queue_test.go @@ -50,6 +50,26 @@ func TestBasicOperation(t *testing.T) { } } +func TestBasicOperationNoDS(t *testing.T) { + queue := NewQueue(nil) + defer queue.Close() + + cids := random.Cids(10) + for _, c := range cids { + queue.Enqueue(c) + } + + assertOrdered(cids, queue, t) + + err := queue.Close() + if err != nil { + t.Fatal(err) + } + if err = queue.Close(); err != nil { + t.Fatal(err) + } +} + func TestMangledData(t *testing.T) { ds := sync.MutexWrap(datastore.NewMapDatastore()) queue := NewQueue(ds) From f7cd6e8834c7d6e7c8930b0a983805f0aa7b82c1 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Fri, 21 Mar 2025 13:44:13 -1000 Subject: [PATCH 07/23] Option for memory-only queue --- provider/reprovider.go | 8 +++++++- provider/reprovider_test.go | 16 ++++++++++++---- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/provider/reprovider.go b/provider/reprovider.go index 94e077da5..6ee20cd40 100644 --- a/provider/reprovider.go +++ b/provider/reprovider.go @@ -57,6 +57,8 @@ type reprovider struct { q *queue.Queue ds datastore.Batching + memOnlyQueue bool + reprovideCh chan cid.Cid noReprovideInFlight chan struct{} @@ -138,7 +140,11 @@ func New(ds datastore.Batching, opts ...Option) (System, error) { } s.ds = namespace.Wrap(ds, s.keyPrefix) - s.q = queue.NewQueue(s.ds) + if s.memOnlyQueue { + s.q = queue.NewQueue(nil) + } else { + s.q = queue.NewQueue(s.ds) + } // This is after the options processing so we do not have to worry about leaking a context if there is an // initialization error processing the options diff --git a/provider/reprovider_test.go b/provider/reprovider_test.go index ceb72f97b..f3455d1f1 100644 --- a/provider/reprovider_test.go +++ b/provider/reprovider_test.go @@ -75,15 +75,23 @@ func TestReprovider(t *testing.T) { t.Parallel() t.Run("many", func(t *testing.T) { t.Parallel() - testProvider(t, false) + testProvider(t, false, false) }) t.Run("single", func(t *testing.T) { t.Parallel() - testProvider(t, true) + testProvider(t, true, false) }) } -func testProvider(t *testing.T, singleProvide bool) { +func TestReproviderMemOnly(t *testing.T) { + t.Parallel() + t.Run("many", func(t *testing.T) { + t.Parallel() + testProvider(t, false, true) + }) +} + +func testProvider(t *testing.T, singleProvide, memOnlyQueue bool) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) // It has to be so big because the combo of noisy CI runners + OSes that don't @@ -110,7 +118,7 @@ func testProvider(t *testing.T, singleProvide bool) { var keyWait sync.Mutex keyWait.Lock() - batchSystem, err := New(ds, Online(provider), KeyProvider(func(ctx context.Context) (<-chan cid.Cid, error) { + batchSystem, err := New(ds, Online(provider), WithMemOnlyQueue(memOnlyQueue), KeyProvider(func(ctx context.Context) (<-chan cid.Cid, error) { ch := make(chan cid.Cid) go func() { defer keyWait.Unlock() From 49ee7f068aca64e64d5ebf033707fd1d7a1977ad Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Mon, 24 Mar 2025 05:33:24 -1000 Subject: [PATCH 08/23] fix lint warn --- provider/internal/queue/queue.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 1e44c2dbe..3bb789df7 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -118,13 +118,13 @@ func (q *Queue) Dequeue() <-chan cid.Cid { func (q *Queue) worker(ctx context.Context) { defer close(q.closed) - var k datastore.Key = datastore.Key{} - var c cid.Cid = cid.Undef - var cstr string - var counter uint64 - - var readInBuf <-chan cid.Cid - readInBuf = q.inBuf.Out() + var ( + c cid.Cid = cid.Undef + counter uint64 + cstr string + k datastore.Key = datastore.Key{} + ) + readInBuf := q.inBuf.Out() for { if c == cid.Undef { From 9bf4aca111a5dad306d3bee6f1dcc8143d358d8a Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Tue, 25 Mar 2025 23:18:47 -1000 Subject: [PATCH 09/23] Revert to disk based queue, with batching --- provider/internal/queue/queue.go | 122 ++++++++++++++++++++------ provider/internal/queue/queue_test.go | 22 +---- 2 files changed, 95 insertions(+), 49 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 3bb789df7..d9c622f68 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -18,10 +18,13 @@ import ( var log = logging.Logger("provider.queue") const ( + batchSize = 1024 + batchCommitInterval = 5 * time.Second + // Number of input CIDs to buffer without blocking. - inputBufferSize = 65536 + inputBufferSize = 1024 * 256 // Time for Close to wait to finish writing CIDs to datastore. - shutdownTimeout = 5 * time.Second + shutdownTimeout = 30 * time.Second ) // Queue provides a FIFO interface to the datastore for storing cids. @@ -36,46 +39,49 @@ const ( type Queue struct { // used to differentiate queues in datastore // e.g. provider vs reprovider - ds datastore.Datastore // Must be threadsafe + ds datastore.Batching dequeue chan cid.Cid enqueue chan cid.Cid inBuf *chanqueue.ChanQueue[cid.Cid] // in-memory queue to buffer input close context.CancelFunc closed chan struct{} closeOnce sync.Once + + syncDone chan struct{} + syncMutex sync.Mutex } // NewQueue creates a queue for cids -func NewQueue(ds datastore.Datastore) *Queue { +func NewQueue(ds datastore.Batching) *Queue { dequeue := make(chan cid.Cid) enqueue := make(chan cid.Cid) + ctx, cancel := context.WithCancel(context.Background()) q := &Queue{ - dequeue: dequeue, - enqueue: enqueue, + close: cancel, + closed: make(chan struct{}), + ds: namespace.Wrap(ds, datastore.NewKey("/queue")), + dequeue: dequeue, + enqueue: enqueue, + syncDone: make(chan struct{}, 1), } - if ds == nil { - q.inBuf = chanqueue.New( - chanqueue.WithInput(enqueue), - chanqueue.WithOutput(dequeue), - chanqueue.WithCapacity[cid.Cid](inputBufferSize), - ) - } else { - ctx, cancel := context.WithCancel(context.Background()) - q.close = cancel - q.inBuf = chanqueue.New( - chanqueue.WithInput(enqueue), - chanqueue.WithCapacity[cid.Cid](inputBufferSize), - ) - q.ds = namespace.Wrap(ds, datastore.NewKey("/queue")) - q.closed = make(chan struct{}) - go q.worker(ctx) - } + q.inBuf = chanqueue.New( + chanqueue.WithInput(enqueue), + chanqueue.WithCapacity[cid.Cid](inputBufferSize), + ) + go q.worker(ctx) return q } +func (q *Queue) Sync() { + q.syncMutex.Lock() + q.inBuf.In() <- cid.Undef + <-q.syncDone + q.syncMutex.Unlock() +} + // Close stops the queue func (q *Queue) Close() error { var err error @@ -117,6 +123,7 @@ func (q *Queue) Dequeue() <-chan cid.Cid { // worker run dequeues and enqueues when available. func (q *Queue) worker(ctx context.Context) { defer close(q.closed) + defer q.inBuf.Shutdown() var ( c cid.Cid = cid.Undef @@ -126,7 +133,27 @@ func (q *Queue) worker(ctx context.Context) { ) readInBuf := q.inBuf.Out() + var batchCount int + b, err := q.ds.Batch(ctx) + if err != nil { + log.Errorf("Failed to create batch, stopping provider: %s", err) + return + } + + defer func() { + if batchCount != 0 { + if err := b.Commit(ctx); err != nil { + log.Errorf("Failed to write cid batch: %s", err) + } + } + close(q.syncDone) + }() + + batchTicker := time.NewTicker(batchCommitInterval) + defer batchTicker.Stop() + for { + //fmt.Println("---> inbuf len:", q.inBuf.Len(), "batch count:", batchCount) if c == cid.Undef { head, err := q.getQueueHead(ctx) if err != nil { @@ -153,12 +180,20 @@ func (q *Queue) worker(ctx context.Context) { if c != cid.Undef { dequeue = q.dequeue } + var commit, needSync bool select { case toQueue, ok := <-readInBuf: if !ok { return } + if toQueue == cid.Undef { + if batchCount != 0 { + commit = true + } + needSync = true + break + } // Add suffix to key path to allow multiple entries with same sequence. nextKey := datastore.NewKey(fmt.Sprintf("%020d/%s", counter, cstr)) counter++ @@ -170,11 +205,21 @@ func (q *Queue) worker(ctx context.Context) { cstr = c.String() } - if err := q.ds.Put(ctx, nextKey, toQueue.Bytes()); err != nil { - log.Errorf("Failed to enqueue cid: %s", err) + //if err := q.ds.Put(ctx, nextKey, toQueue.Bytes()); err != nil { + if err := b.Put(ctx, nextKey, toQueue.Bytes()); err != nil { + log.Errorf("Failed to batch cid: %s", err) continue } + batchCount++ + if batchCount == batchSize { + commit = true + } + case <-batchTicker.C: + if batchCount != 0 && q.inBuf.Len() == 0 { + commit = true + } case dequeue <- c: + // Do not batch delete. Delete must be committed immediately, otherwise the same head cid will be read from the datastore. err := q.ds.Delete(ctx, k) if err != nil { log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) @@ -182,12 +227,31 @@ func (q *Queue) worker(ctx context.Context) { } c = cid.Undef case <-ctx.Done(): - if q.inBuf != nil { - for range readInBuf { - } - } return } + + if commit { + if err = b.Commit(ctx); err != nil { + log.Errorf("Failed to write cid batch, stopping provider: %s", err) + return + } + b, err = q.ds.Batch(ctx) + if err != nil { + log.Errorf("Failed to create batch, stopping provider: %s", err) + return + } + batchCount = 0 + commit = false + } + + if needSync { + needSync = false + select { + case q.syncDone <- struct{}{}: + case <-ctx.Done(): + return + } + } } } diff --git a/provider/internal/queue/queue_test.go b/provider/internal/queue/queue_test.go index 62f6d1cf8..852f29e5b 100644 --- a/provider/internal/queue/queue_test.go +++ b/provider/internal/queue/queue_test.go @@ -13,6 +13,8 @@ import ( func assertOrdered(cids []cid.Cid, q *Queue, t *testing.T) { t.Helper() + + q.Sync() for i, c := range cids { select { case dequeued, ok := <-q.dequeue: @@ -50,26 +52,6 @@ func TestBasicOperation(t *testing.T) { } } -func TestBasicOperationNoDS(t *testing.T) { - queue := NewQueue(nil) - defer queue.Close() - - cids := random.Cids(10) - for _, c := range cids { - queue.Enqueue(c) - } - - assertOrdered(cids, queue, t) - - err := queue.Close() - if err != nil { - t.Fatal(err) - } - if err = queue.Close(); err != nil { - t.Fatal(err) - } -} - func TestMangledData(t *testing.T) { ds := sync.MutexWrap(datastore.NewMapDatastore()) queue := NewQueue(ds) From b973004ed67d270dc7b51620bf7043b74463cd2c Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 26 Mar 2025 11:47:09 -1000 Subject: [PATCH 10/23] Update datastore-backed queue --- provider/internal/queue/queue.go | 83 +++++++++++++++------------ provider/internal/queue/queue_test.go | 3 +- 2 files changed, 48 insertions(+), 38 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index d9c622f68..efd3014ba 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -22,7 +22,7 @@ const ( batchCommitInterval = 5 * time.Second // Number of input CIDs to buffer without blocking. - inputBufferSize = 1024 * 256 + inputBufferSize = 1024 * 64 // Time for Close to wait to finish writing CIDs to datastore. shutdownTimeout = 30 * time.Second ) @@ -41,7 +41,6 @@ type Queue struct { // e.g. provider vs reprovider ds datastore.Batching dequeue chan cid.Cid - enqueue chan cid.Cid inBuf *chanqueue.ChanQueue[cid.Cid] // in-memory queue to buffer input close context.CancelFunc closed chan struct{} @@ -53,23 +52,17 @@ type Queue struct { // NewQueue creates a queue for cids func NewQueue(ds datastore.Batching) *Queue { - dequeue := make(chan cid.Cid) - enqueue := make(chan cid.Cid) - ctx, cancel := context.WithCancel(context.Background()) + q := &Queue{ close: cancel, closed: make(chan struct{}), ds: namespace.Wrap(ds, datastore.NewKey("/queue")), - dequeue: dequeue, - enqueue: enqueue, + dequeue: make(chan cid.Cid), + inBuf: chanqueue.New(chanqueue.WithCapacity[cid.Cid](inputBufferSize)), syncDone: make(chan struct{}, 1), } - q.inBuf = chanqueue.New( - chanqueue.WithInput(enqueue), - chanqueue.WithCapacity[cid.Cid](inputBufferSize), - ) go q.worker(ctx) return q @@ -111,7 +104,7 @@ func (q *Queue) Enqueue(cid cid.Cid) (err error) { err = errors.New("failed to enqueue CID: shutting down") } }() - q.enqueue <- cid + q.inBuf.In() <- cid return } @@ -126,7 +119,7 @@ func (q *Queue) worker(ctx context.Context) { defer q.inBuf.Shutdown() var ( - c cid.Cid = cid.Undef + c cid.Cid counter uint64 cstr string k datastore.Key = datastore.Key{} @@ -152,8 +145,9 @@ func (q *Queue) worker(ctx context.Context) { batchTicker := time.NewTicker(batchCommitInterval) defer batchTicker.Stop() + var lastCid cid.Cid + for { - //fmt.Println("---> inbuf len:", q.inBuf.Len(), "batch count:", batchCount) if c == cid.Undef { head, err := q.getQueueHead(ctx) if err != nil { @@ -188,12 +182,15 @@ func (q *Queue) worker(ctx context.Context) { return } if toQueue == cid.Undef { - if batchCount != 0 { - commit = true - } + commit = true needSync = true break } + if toQueue == lastCid { + continue + } + lastCid = toQueue + // Add suffix to key path to allow multiple entries with same sequence. nextKey := datastore.NewKey(fmt.Sprintf("%020d/%s", counter, cstr)) counter++ @@ -215,10 +212,22 @@ func (q *Queue) worker(ctx context.Context) { commit = true } case <-batchTicker.C: - if batchCount != 0 && q.inBuf.Len() == 0 { - commit = true - } + commit = q.inBuf.Len() == 0 case dequeue <- c: + // CID may still be in uncommitted batch, so commit current batch first. + if batchCount != 0 { + if err = b.Commit(ctx); err != nil { + log.Errorf("Failed to write cid batch, stopping provider: %s", err) + return + } + b, err = q.ds.Batch(ctx) + if err != nil { + log.Errorf("Failed to create batch, stopping provider: %s", err) + return + } + batchCount = 0 + } + // Do not batch delete. Delete must be committed immediately, otherwise the same head cid will be read from the datastore. err := q.ds.Delete(ctx, k) if err != nil { @@ -231,25 +240,27 @@ func (q *Queue) worker(ctx context.Context) { } if commit { - if err = b.Commit(ctx); err != nil { - log.Errorf("Failed to write cid batch, stopping provider: %s", err) - return - } - b, err = q.ds.Batch(ctx) - if err != nil { - log.Errorf("Failed to create batch, stopping provider: %s", err) - return + if batchCount != 0 { + if err = b.Commit(ctx); err != nil { + log.Errorf("Failed to write cid batch, stopping provider: %s", err) + return + } + b, err = q.ds.Batch(ctx) + if err != nil { + log.Errorf("Failed to create batch, stopping provider: %s", err) + return + } + batchCount = 0 } - batchCount = 0 commit = false - } - if needSync { - needSync = false - select { - case q.syncDone <- struct{}{}: - case <-ctx.Done(): - return + if needSync { + needSync = false + select { + case q.syncDone <- struct{}{}: + case <-ctx.Done(): + return + } } } } diff --git a/provider/internal/queue/queue_test.go b/provider/internal/queue/queue_test.go index 852f29e5b..5b1c7c99e 100644 --- a/provider/internal/queue/queue_test.go +++ b/provider/internal/queue/queue_test.go @@ -14,10 +14,9 @@ import ( func assertOrdered(cids []cid.Cid, q *Queue, t *testing.T) { t.Helper() - q.Sync() for i, c := range cids { select { - case dequeued, ok := <-q.dequeue: + case dequeued, ok := <-q.Dequeue(): if !ok { t.Fatal("queue closed") } From 636e3d19f38e3694835537358f0a9e6138bdcbe7 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 26 Mar 2025 19:23:26 -1000 Subject: [PATCH 11/23] update batching --- provider/internal/queue/queue.go | 73 +++++++++++++++++--------------- provider/reprovider.go | 8 ++-- 2 files changed, 44 insertions(+), 37 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index efd3014ba..831e5a4b7 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -18,7 +18,7 @@ import ( var log = logging.Logger("provider.queue") const ( - batchSize = 1024 + batchSize = 16384 batchCommitInterval = 5 * time.Second // Number of input CIDs to buffer without blocking. @@ -118,21 +118,13 @@ func (q *Queue) worker(ctx context.Context) { defer close(q.closed) defer q.inBuf.Shutdown() - var ( - c cid.Cid - counter uint64 - cstr string - k datastore.Key = datastore.Key{} - ) - readInBuf := q.inBuf.Out() - - var batchCount int b, err := q.ds.Batch(ctx) if err != nil { log.Errorf("Failed to create batch, stopping provider: %s", err) return } + var batchCount int defer func() { if batchCount != 0 { if err := b.Commit(ctx); err != nil { @@ -142,11 +134,34 @@ func (q *Queue) worker(ctx context.Context) { close(q.syncDone) }() + refreshBatch := func(ctx context.Context) error { + if batchCount == 0 { + return nil + } + err := b.Commit(ctx) + if err != nil { + return fmt.Errorf("failed to write cid batch: %w", err) + } + b, err = q.ds.Batch(ctx) + if err != nil { + return fmt.Errorf("failed to create batch: %w", err) + } + batchCount = 0 + return nil + } + + var ( + counter uint64 + c, lastCid cid.Cid + cstr string + k datastore.Key = datastore.Key{} + ) + + readInBuf := q.inBuf.Out() + batchTicker := time.NewTicker(batchCommitInterval) defer batchTicker.Stop() - var lastCid cid.Cid - for { if c == cid.Undef { head, err := q.getQueueHead(ctx) @@ -203,7 +218,7 @@ func (q *Queue) worker(ctx context.Context) { } //if err := q.ds.Put(ctx, nextKey, toQueue.Bytes()); err != nil { - if err := b.Put(ctx, nextKey, toQueue.Bytes()); err != nil { + if err = b.Put(ctx, nextKey, toQueue.Bytes()); err != nil { log.Errorf("Failed to batch cid: %s", err) continue } @@ -216,21 +231,16 @@ func (q *Queue) worker(ctx context.Context) { case dequeue <- c: // CID may still be in uncommitted batch, so commit current batch first. if batchCount != 0 { - if err = b.Commit(ctx); err != nil { - log.Errorf("Failed to write cid batch, stopping provider: %s", err) - return - } - b, err = q.ds.Batch(ctx) - if err != nil { - log.Errorf("Failed to create batch, stopping provider: %s", err) + if err = refreshBatch(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + log.Errorf("%w, stopping provider", err) + } return } - batchCount = 0 } // Do not batch delete. Delete must be committed immediately, otherwise the same head cid will be read from the datastore. - err := q.ds.Delete(ctx, k) - if err != nil { + if err = q.ds.Delete(ctx, k); err != nil { log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) continue } @@ -240,19 +250,14 @@ func (q *Queue) worker(ctx context.Context) { } if commit { - if batchCount != 0 { - if err = b.Commit(ctx); err != nil { - log.Errorf("Failed to write cid batch, stopping provider: %s", err) - return - } - b, err = q.ds.Batch(ctx) - if err != nil { - log.Errorf("Failed to create batch, stopping provider: %s", err) - return + commit = false + + if err = refreshBatch(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + log.Errorf("%w, stopping provider", err) } - batchCount = 0 + return } - commit = false if needSync { needSync = false diff --git a/provider/reprovider.go b/provider/reprovider.go index 6ee20cd40..d8adda691 100644 --- a/provider/reprovider.go +++ b/provider/reprovider.go @@ -429,10 +429,12 @@ func (s *reprovider) run() { // after the first reprovide, schedule periodical reprovides nextReprovideTicker := time.NewTicker(s.reprovideInterval) - for s.ctx.Err() == nil { + for { err := s.Reprovide(context.Background()) - - if s.ctx.Err() == nil && err != nil { + if err != nil { + if s.ctx.Err() != nil { + return + } log.Errorf("failed to reprovide: %s", err) } select { From f04e933e8cc0bfbccd51ba786adf459c25c030aa Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Thu, 27 Mar 2025 06:15:27 -1000 Subject: [PATCH 12/23] remove unused option --- provider/reprovider.go | 8 +------- provider/reprovider_test.go | 16 ++++------------ 2 files changed, 5 insertions(+), 19 deletions(-) diff --git a/provider/reprovider.go b/provider/reprovider.go index d8adda691..0559a46c6 100644 --- a/provider/reprovider.go +++ b/provider/reprovider.go @@ -57,8 +57,6 @@ type reprovider struct { q *queue.Queue ds datastore.Batching - memOnlyQueue bool - reprovideCh chan cid.Cid noReprovideInFlight chan struct{} @@ -140,11 +138,7 @@ func New(ds datastore.Batching, opts ...Option) (System, error) { } s.ds = namespace.Wrap(ds, s.keyPrefix) - if s.memOnlyQueue { - s.q = queue.NewQueue(nil) - } else { - s.q = queue.NewQueue(s.ds) - } + s.q = queue.NewQueue(s.ds) // This is after the options processing so we do not have to worry about leaking a context if there is an // initialization error processing the options diff --git a/provider/reprovider_test.go b/provider/reprovider_test.go index f3455d1f1..ceb72f97b 100644 --- a/provider/reprovider_test.go +++ b/provider/reprovider_test.go @@ -75,23 +75,15 @@ func TestReprovider(t *testing.T) { t.Parallel() t.Run("many", func(t *testing.T) { t.Parallel() - testProvider(t, false, false) + testProvider(t, false) }) t.Run("single", func(t *testing.T) { t.Parallel() - testProvider(t, true, false) + testProvider(t, true) }) } -func TestReproviderMemOnly(t *testing.T) { - t.Parallel() - t.Run("many", func(t *testing.T) { - t.Parallel() - testProvider(t, false, true) - }) -} - -func testProvider(t *testing.T, singleProvide, memOnlyQueue bool) { +func testProvider(t *testing.T, singleProvide bool) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) // It has to be so big because the combo of noisy CI runners + OSes that don't @@ -118,7 +110,7 @@ func testProvider(t *testing.T, singleProvide, memOnlyQueue bool) { var keyWait sync.Mutex keyWait.Lock() - batchSystem, err := New(ds, Online(provider), WithMemOnlyQueue(memOnlyQueue), KeyProvider(func(ctx context.Context) (<-chan cid.Cid, error) { + batchSystem, err := New(ds, Online(provider), KeyProvider(func(ctx context.Context) (<-chan cid.Cid, error) { ch := make(chan cid.Cid) go func() { defer keyWait.Unlock() From ef6bef01c7a410547e948fd10de6837aeeabfe6e Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Thu, 27 Mar 2025 07:52:46 -1000 Subject: [PATCH 13/23] re unneeded sync function --- provider/internal/queue/queue.go | 80 ++++++++++++++------------------ 1 file changed, 34 insertions(+), 46 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 831e5a4b7..0c8ebe9fe 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -21,10 +21,11 @@ const ( batchSize = 16384 batchCommitInterval = 5 * time.Second - // Number of input CIDs to buffer without blocking. + // Number of input CIDs to buffer without blocking. This capacity is only + // used when writing batches to the datastore takes some time. inputBufferSize = 1024 * 64 // Time for Close to wait to finish writing CIDs to datastore. - shutdownTimeout = 30 * time.Second + shutdownTimeout = 20 * time.Second ) // Queue provides a FIFO interface to the datastore for storing cids. @@ -33,21 +34,17 @@ const ( // in the queue when the node is brought back online depending on whether they // were fully written to the underlying datastore. // -// Input to the queue is buffered in memory, up to inputBufferSize, to increase -// the speed of data onboarding. This input buffer behaves as a channel with a -// very large capacity. +// Input to the queue is buffered in memory, up to inputBufferSize, to maintain +// the speed at which input is consumed, even if persisting it to the datastore +// becomes slow. This input buffer behaves as a channel with a dynamic +// capacity. type Queue struct { - // used to differentiate queues in datastore - // e.g. provider vs reprovider - ds datastore.Batching - dequeue chan cid.Cid - inBuf *chanqueue.ChanQueue[cid.Cid] // in-memory queue to buffer input close context.CancelFunc closed chan struct{} closeOnce sync.Once - - syncDone chan struct{} - syncMutex sync.Mutex + dequeue chan cid.Cid + ds datastore.Batching + inBuf *chanqueue.ChanQueue[cid.Cid] } // NewQueue creates a queue for cids @@ -55,12 +52,11 @@ func NewQueue(ds datastore.Batching) *Queue { ctx, cancel := context.WithCancel(context.Background()) q := &Queue{ - close: cancel, - closed: make(chan struct{}), - ds: namespace.Wrap(ds, datastore.NewKey("/queue")), - dequeue: make(chan cid.Cid), - inBuf: chanqueue.New(chanqueue.WithCapacity[cid.Cid](inputBufferSize)), - syncDone: make(chan struct{}, 1), + close: cancel, + closed: make(chan struct{}), + dequeue: make(chan cid.Cid), + ds: namespace.Wrap(ds, datastore.NewKey("/queue")), + inBuf: chanqueue.New(chanqueue.WithCapacity[cid.Cid](inputBufferSize)), } go q.worker(ctx) @@ -68,13 +64,6 @@ func NewQueue(ds datastore.Batching) *Queue { return q } -func (q *Queue) Sync() { - q.syncMutex.Lock() - q.inBuf.In() <- cid.Undef - <-q.syncDone - q.syncMutex.Unlock() -} - // Close stops the queue func (q *Queue) Close() error { var err error @@ -131,7 +120,6 @@ func (q *Queue) worker(ctx context.Context) { log.Errorf("Failed to write cid batch: %s", err) } } - close(q.syncDone) }() refreshBatch := func(ctx context.Context) error { @@ -153,6 +141,7 @@ func (q *Queue) worker(ctx context.Context) { var ( counter uint64 c, lastCid cid.Cid + commit bool cstr string k datastore.Key = datastore.Key{} ) @@ -180,27 +169,32 @@ func (q *Queue) worker(ctx context.Context) { } continue } - } // else queue is empty + } else if batchCount != 0 { + // There were no queued CIDs in the datastore, but there were + // some waiting to be written. Write them and re-read from + // datastore. + if err = refreshBatch(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + log.Errorf("%w, stopping provider", err) + } + return + } + continue + } cstr = c.String() } - // If c != cid.Undef set dequeue and attempt write, otherwise wait for enqueue + // If c != cid.Undef set dequeue and attempt write. var dequeue chan cid.Cid if c != cid.Undef { dequeue = q.dequeue } - var commit, needSync bool select { case toQueue, ok := <-readInBuf: if !ok { return } - if toQueue == cid.Undef { - commit = true - needSync = true - break - } if toQueue == lastCid { continue } @@ -229,7 +223,9 @@ func (q *Queue) worker(ctx context.Context) { case <-batchTicker.C: commit = q.inBuf.Len() == 0 case dequeue <- c: - // CID may still be in uncommitted batch, so commit current batch first. + // Commit current batch first so that if CID being read is still in + // the uncommitted batch, that CID is written and deleted from the + // datastore. if batchCount != 0 { if err = refreshBatch(ctx); err != nil { if !errors.Is(err, context.Canceled) { @@ -239,7 +235,8 @@ func (q *Queue) worker(ctx context.Context) { } } - // Do not batch delete. Delete must be committed immediately, otherwise the same head cid will be read from the datastore. + // Do not batch delete. Delete must be committed immediately, + // otherwise the same head cid will be read from the datastore. if err = q.ds.Delete(ctx, k); err != nil { log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) continue @@ -258,15 +255,6 @@ func (q *Queue) worker(ctx context.Context) { } return } - - if needSync { - needSync = false - select { - case q.syncDone <- struct{}{}: - case <-ctx.Done(): - return - } - } } } } From 949950f17999ed326deef9118c632740fadd3ae5 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Thu, 27 Mar 2025 08:07:54 -1000 Subject: [PATCH 14/23] batch size, comments --- provider/internal/queue/queue.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 0c8ebe9fe..0cbc94096 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -18,12 +18,12 @@ import ( var log = logging.Logger("provider.queue") const ( - batchSize = 16384 + batchSize = 16 * 1024 batchCommitInterval = 5 * time.Second // Number of input CIDs to buffer without blocking. This capacity is only // used when writing batches to the datastore takes some time. - inputBufferSize = 1024 * 64 + inputBufferSize = 64 * 1024 // Time for Close to wait to finish writing CIDs to datastore. shutdownTimeout = 20 * time.Second ) From 75e9c6c3a3476049ff1b2aa94de34d0bae135aba Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Thu, 27 Mar 2025 10:14:17 -1000 Subject: [PATCH 15/23] Rename queue.NewQueue to queue.New --- provider/internal/queue/queue.go | 8 ++++---- provider/internal/queue/queue_test.go | 12 ++++++------ provider/reprovider.go | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 0cbc94096..29b22e286 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -47,8 +47,8 @@ type Queue struct { inBuf *chanqueue.ChanQueue[cid.Cid] } -// NewQueue creates a queue for cids -func NewQueue(ds datastore.Batching) *Queue { +// New creates a queue for cids. +func New(ds datastore.Batching) *Queue { ctx, cancel := context.WithCancel(context.Background()) q := &Queue{ @@ -64,7 +64,7 @@ func NewQueue(ds datastore.Batching) *Queue { return q } -// Close stops the queue +// Close stops the queue. func (q *Queue) Close() error { var err error q.closeOnce.Do(func() { @@ -86,7 +86,7 @@ func (q *Queue) Close() error { return err } -// Enqueue puts a cid in the queue +// Enqueue puts a cid in the queue. func (q *Queue) Enqueue(cid cid.Cid) (err error) { defer func() { if r := recover(); r != nil { diff --git a/provider/internal/queue/queue_test.go b/provider/internal/queue/queue_test.go index 5b1c7c99e..d0250e995 100644 --- a/provider/internal/queue/queue_test.go +++ b/provider/internal/queue/queue_test.go @@ -32,7 +32,7 @@ func assertOrdered(cids []cid.Cid, q *Queue, t *testing.T) { func TestBasicOperation(t *testing.T) { ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue := NewQueue(ds) + queue := New(ds) defer queue.Close() cids := random.Cids(10) @@ -53,7 +53,7 @@ func TestBasicOperation(t *testing.T) { func TestMangledData(t *testing.T) { ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue := NewQueue(ds) + queue := New(ds) defer queue.Close() cids := random.Cids(10) @@ -75,7 +75,7 @@ func TestMangledData(t *testing.T) { func TestInitialization(t *testing.T) { ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue := NewQueue(ds) + queue := New(ds) defer queue.Close() cids := random.Cids(10) @@ -91,7 +91,7 @@ func TestInitialization(t *testing.T) { } // make a new queue, same data - queue = NewQueue(ds) + queue = New(ds) defer queue.Close() assertOrdered(cids[5:], queue, t) @@ -99,7 +99,7 @@ func TestInitialization(t *testing.T) { func TestInitializationWithManyCids(t *testing.T) { ds := sync.MutexWrap(datastore.NewMapDatastore()) - queue := NewQueue(ds) + queue := New(ds) defer queue.Close() cids := random.Cids(25) @@ -113,7 +113,7 @@ func TestInitializationWithManyCids(t *testing.T) { } // make a new queue, same data - queue = NewQueue(ds) + queue = New(ds) defer queue.Close() assertOrdered(cids, queue, t) diff --git a/provider/reprovider.go b/provider/reprovider.go index 0559a46c6..002168e94 100644 --- a/provider/reprovider.go +++ b/provider/reprovider.go @@ -138,7 +138,7 @@ func New(ds datastore.Batching, opts ...Option) (System, error) { } s.ds = namespace.Wrap(ds, s.keyPrefix) - s.q = queue.NewQueue(s.ds) + s.q = queue.New(s.ds) // This is after the options processing so we do not have to worry about leaking a context if there is an // initialization error processing the options From 1be37ea9f5468abac8b25b648199548af41df560 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Thu, 27 Mar 2025 10:16:01 -1000 Subject: [PATCH 16/23] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6809913ee..adb5a03d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ The following emojis are used to highlight certain changes: - upgrade to `go-libp2p` [v0.41.1](https://github.com/libp2p/go-libp2p/releases/tag/v0.41.1) - `bitswap/network`: Add a new `requests_in_flight` metric gauge that measures how many bitswap streams are being written or read at a given time. +- improve speed of data onboarding by batching/bufering provider queue writes [#888](https://github.com/ipfs/boxo/pull/888) ### Removed From 508af50d896ca144e0949a338d6ded84b2a93fce Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Thu, 27 Mar 2025 11:02:56 -1000 Subject: [PATCH 17/23] batch deletes --- provider/internal/queue/queue.go | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 29b22e286..a01088506 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -223,25 +223,15 @@ func (q *Queue) worker(ctx context.Context) { case <-batchTicker.C: commit = q.inBuf.Len() == 0 case dequeue <- c: - // Commit current batch first so that if CID being read is still in - // the uncommitted batch, that CID is written and deleted from the - // datastore. - if batchCount != 0 { - if err = refreshBatch(ctx); err != nil { - if !errors.Is(err, context.Canceled) { - log.Errorf("%w, stopping provider", err) - } - return - } - } - - // Do not batch delete. Delete must be committed immediately, - // otherwise the same head cid will be read from the datastore. - if err = q.ds.Delete(ctx, k); err != nil { + if err = b.Delete(ctx, k); err != nil { log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) continue } c = cid.Undef + batchCount++ + // Delete (in batch) must be committed immediately, otherwise the + // same head cid will be read from the datastore. + commit = true case <-ctx.Done(): return } From 675ca0e80df61432e9d8a1ceb70184ac9744a65c Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Tue, 1 Apr 2025 11:45:31 -1000 Subject: [PATCH 18/23] no need for batchticker --- provider/internal/queue/queue.go | 26 ++++++++------------------ 1 file changed, 8 insertions(+), 18 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index a01088506..9113acce0 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -69,19 +69,15 @@ func (q *Queue) Close() error { var err error q.closeOnce.Do(func() { // Close input queue and wait for worker to finish reading it. - if q.ds == nil { - q.inBuf.Shutdown() - } else { - q.inBuf.Close() - select { - case <-q.closed: - case <-time.After(shutdownTimeout): - q.close() // force immediate shutdown - <-q.closed - err = fmt.Errorf("provider queue: %d cids not written to datastore", q.inBuf.Len()) - } - close(q.dequeue) // no more output from this queue + q.inBuf.Close() + select { + case <-q.closed: + case <-time.After(shutdownTimeout): + q.close() // force immediate shutdown + <-q.closed + err = fmt.Errorf("provider queue: %d cids not written to datastore", q.inBuf.Len()) } + close(q.dequeue) // no more output from this queue }) return err } @@ -148,9 +144,6 @@ func (q *Queue) worker(ctx context.Context) { readInBuf := q.inBuf.Out() - batchTicker := time.NewTicker(batchCommitInterval) - defer batchTicker.Stop() - for { if c == cid.Undef { head, err := q.getQueueHead(ctx) @@ -211,7 +204,6 @@ func (q *Queue) worker(ctx context.Context) { cstr = c.String() } - //if err := q.ds.Put(ctx, nextKey, toQueue.Bytes()); err != nil { if err = b.Put(ctx, nextKey, toQueue.Bytes()); err != nil { log.Errorf("Failed to batch cid: %s", err) continue @@ -220,8 +212,6 @@ func (q *Queue) worker(ctx context.Context) { if batchCount == batchSize { commit = true } - case <-batchTicker.C: - commit = q.inBuf.Len() == 0 case dequeue <- c: if err = b.Delete(ctx, k); err != nil { log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) From 65ab200e9c000df2ef8e38d40e06cea8f024a0fd Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Tue, 1 Apr 2025 15:36:54 -1000 Subject: [PATCH 19/23] Keep CIDs in memory until batchSize reached. If nothing in datastore then read items to dequeue from memory buffer. --- provider/internal/queue/queue.go | 236 +++++++++++++++++-------------- 1 file changed, 133 insertions(+), 103 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 9113acce0..ef375ddad 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -2,12 +2,13 @@ package queue import ( "context" + "encoding/base64" "errors" "fmt" "sync" "time" - "github.com/gammazero/chanqueue" + "github.com/gammazero/deque" cid "github.com/ipfs/go-cid" datastore "github.com/ipfs/go-datastore" namespace "github.com/ipfs/go-datastore/namespace" @@ -18,33 +19,34 @@ import ( var log = logging.Logger("provider.queue") const ( - batchSize = 16 * 1024 - batchCommitInterval = 5 * time.Second - - // Number of input CIDs to buffer without blocking. This capacity is only - // used when writing batches to the datastore takes some time. - inputBufferSize = 64 * 1024 - // Time for Close to wait to finish writing CIDs to datastore. + // batchSize is the limit on number of CIDs kept in memory at which ther + // are all written to the datastore.o + batchSize = 16 * 1024 + // batchCommitInterval is the sime since the last batch commit to write all + // CIDs remaining in memory. + batchCommitInterval = 2 * time.Minute + // shutdownTimeout is the duration that Close waits to finish writing CIDs + // to the datastore. shutdownTimeout = 20 * time.Second ) // Queue provides a FIFO interface to the datastore for storing cids. // -// Cids in the process of being provided when a crash or shutdown occurs may be +// CIDs in the process of being provided when a crash or shutdown occurs may be // in the queue when the node is brought back online depending on whether they // were fully written to the underlying datastore. // -// Input to the queue is buffered in memory, up to inputBufferSize, to maintain -// the speed at which input is consumed, even if persisting it to the datastore -// becomes slow. This input buffer behaves as a channel with a dynamic -// capacity. +// Input to the queue is buffered in memory, up to batchSize. When the input +// buffer contains batchSize items, or when batchCommitInterval has elapsed +// since the previous batch commit, the contents of the input buffer are +// written to the datastore. type Queue struct { close context.CancelFunc - closed chan struct{} + closed chan error closeOnce sync.Once dequeue chan cid.Cid ds datastore.Batching - inBuf *chanqueue.ChanQueue[cid.Cid] + enqueue chan cid.Cid } // New creates a queue for cids. @@ -53,10 +55,10 @@ func New(ds datastore.Batching) *Queue { q := &Queue{ close: cancel, - closed: make(chan struct{}), + closed: make(chan error, 1), dequeue: make(chan cid.Cid), ds: namespace.Wrap(ds, datastore.NewKey("/queue")), - inBuf: chanqueue.New(chanqueue.WithCapacity[cid.Cid](inputBufferSize)), + enqueue: make(chan cid.Cid), } go q.worker(ctx) @@ -69,13 +71,12 @@ func (q *Queue) Close() error { var err error q.closeOnce.Do(func() { // Close input queue and wait for worker to finish reading it. - q.inBuf.Close() + close(q.enqueue) select { case <-q.closed: case <-time.After(shutdownTimeout): q.close() // force immediate shutdown - <-q.closed - err = fmt.Errorf("provider queue: %d cids not written to datastore", q.inBuf.Len()) + err = <-q.closed } close(q.dequeue) // no more output from this queue }) @@ -89,7 +90,7 @@ func (q *Queue) Enqueue(cid cid.Cid) (err error) { err = errors.New("failed to enqueue CID: shutting down") } }() - q.inBuf.In() <- cid + q.enqueue <- cid return } @@ -98,83 +99,91 @@ func (q *Queue) Dequeue() <-chan cid.Cid { return q.dequeue } +func makeCidString(c cid.Cid) string { + data := c.Bytes() + return base64.RawURLEncoding.EncodeToString(data[len(data)-6:]) +} + +func makeKey(c cid.Cid, counter uint64) datastore.Key { + return datastore.NewKey(fmt.Sprintf("%020d/%s", counter, makeCidString(c))) +} + // worker run dequeues and enqueues when available. func (q *Queue) worker(ctx context.Context) { defer close(q.closed) - defer q.inBuf.Shutdown() - b, err := q.ds.Batch(ctx) - if err != nil { - log.Errorf("Failed to create batch, stopping provider: %s", err) - return - } + var ( + c cid.Cid + counter uint64 + k datastore.Key = datastore.Key{} + inBuf deque.Deque[cid.Cid] + ) + + const baseCap = 1024 + inBuf.SetBaseCap(baseCap) - var batchCount int defer func() { - if batchCount != 0 { - if err := b.Commit(ctx); err != nil { - log.Errorf("Failed to write cid batch: %s", err) + if c != cid.Undef { + if err := q.ds.Put(ctx, k, c.Bytes()); err != nil { + log.Errorf("Failed to add cid for addition to batch: %s", err) } + counter++ } - }() - - refreshBatch := func(ctx context.Context) error { - if batchCount == 0 { - return nil - } - err := b.Commit(ctx) - if err != nil { - return fmt.Errorf("failed to write cid batch: %w", err) - } - b, err = q.ds.Batch(ctx) - if err != nil { - return fmt.Errorf("failed to create batch: %w", err) + if inBuf.Len() != 0 { + err := q.commitInput(ctx, counter, &inBuf) + if err != nil && !errors.Is(err, context.Canceled) { + log.Error(err) + if inBuf.Len() != 0 { + q.closed <- fmt.Errorf("provider queue: %d cids not written to datastore", inBuf.Len()) + } + } } - batchCount = 0 - return nil - } + }() var ( - counter uint64 - c, lastCid cid.Cid - commit bool - cstr string - k datastore.Key = datastore.Key{} + commit bool + dsEmpty bool + err error ) - readInBuf := q.inBuf.Out() + readInBuf := q.enqueue + + batchTimer := time.NewTimer(batchCommitInterval) + defer batchTimer.Stop() for { if c == cid.Undef { - head, err := q.getQueueHead(ctx) - if err != nil { - log.Errorf("error querying for head of queue: %s, stopping provider", err) - return - } - if head != nil { - k = datastore.NewKey(head.Key) - c, err = cid.Parse(head.Value) + if !dsEmpty { + head, err := q.getQueueHead(ctx) if err != nil { - log.Warnf("error parsing queue entry cid with key (%s), removing it from queue: %s", head.Key, err) + log.Errorf("error querying for head of queue: %s, stopping provider", err) + return + } + if head != nil { + k = datastore.NewKey(head.Key) if err = q.ds.Delete(ctx, k); err != nil { - log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err) - return + log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) + continue } - continue - } - } else if batchCount != 0 { - // There were no queued CIDs in the datastore, but there were - // some waiting to be written. Write them and re-read from - // datastore. - if err = refreshBatch(ctx); err != nil { - if !errors.Is(err, context.Canceled) { - log.Errorf("%w, stopping provider", err) + c, err = cid.Parse(head.Value) + if err != nil { + log.Warnf("error parsing queue entry cid with key (%s), removing it from queue: %s", head.Key, err) + if err = q.ds.Delete(ctx, k); err != nil { + log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err) + return + } + continue } - return + } else { + dsEmpty = true } - continue } - cstr = c.String() + if dsEmpty && inBuf.Len() != 0 { + // There were no queued CIDs in the datastore, so read one from + // the input buffer. + c = inBuf.PopFront() + k = makeKey(c, counter) + } } // If c != cid.Undef set dequeue and attempt write. @@ -188,53 +197,48 @@ func (q *Queue) worker(ctx context.Context) { if !ok { return } - if toQueue == lastCid { - continue - } - lastCid = toQueue - - // Add suffix to key path to allow multiple entries with same sequence. - nextKey := datastore.NewKey(fmt.Sprintf("%020d/%s", counter, cstr)) - counter++ if c == cid.Undef { - // fast path, skip rereading the datastore if we don't have anything in hand yet + // Use this CID as the next output since there was nothing in + // the datastore or buffer previously. c = toQueue - k = nextKey - cstr = c.String() - } - - if err = b.Put(ctx, nextKey, toQueue.Bytes()); err != nil { - log.Errorf("Failed to batch cid: %s", err) + k = makeKey(c, counter) continue } - batchCount++ - if batchCount == batchSize { + + inBuf.PushBack(toQueue) + if inBuf.Len() >= batchSize { commit = true } case dequeue <- c: - if err = b.Delete(ctx, k); err != nil { - log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) - continue - } c = cid.Undef - batchCount++ - // Delete (in batch) must be committed immediately, otherwise the - // same head cid will be read from the datastore. - commit = true + case <-batchTimer.C: + if inBuf.Len() != 0 { + commit = true + } else { + batchTimer.Reset(batchCommitInterval) + if inBuf.Cap() > baseCap { + inBuf = deque.Deque[cid.Cid]{} + inBuf.SetBaseCap(baseCap) + } + } case <-ctx.Done(): return } if commit { commit = false - - if err = refreshBatch(ctx); err != nil { + n := inBuf.Len() + err = q.commitInput(ctx, counter, &inBuf) + if err != nil { if !errors.Is(err, context.Canceled) { log.Errorf("%w, stopping provider", err) } return } + counter += uint64(n) + dsEmpty = false + batchTimer.Reset(batchCommitInterval) } } } @@ -256,3 +260,29 @@ func (q *Queue) getQueueHead(ctx context.Context) (*query.Entry, error) { return &r.Entry, r.Error } + +func (q *Queue) commitInput(ctx context.Context, counter uint64, cids *deque.Deque[cid.Cid]) error { + b, err := q.ds.Batch(ctx) + if err != nil { + return fmt.Errorf("Failed to create batch: %w", err) + } + + cstr := makeCidString(cids.Front()) + n := cids.Len() + for i := 0; i < n; i++ { + c := cids.At(i) + key := datastore.NewKey(fmt.Sprintf("%020d/%s", counter, cstr)) + if err = b.Put(ctx, key, c.Bytes()); err != nil { + log.Errorf("Failed to add cid for addition to batch: %s", err) + continue + } + counter++ + } + cids.Clear() + + if err = b.Commit(ctx); err != nil { + return fmt.Errorf("failed to write to datastore: %w", err) + } + + return nil +} From 880b87dac138ec474ad30891a5b5b06c306000bf Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Tue, 1 Apr 2025 15:49:12 -1000 Subject: [PATCH 20/23] remove redundant delete --- provider/internal/queue/queue.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index ef375ddad..e9262e8c0 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -125,7 +125,7 @@ func (q *Queue) worker(ctx context.Context) { defer func() { if c != cid.Undef { if err := q.ds.Put(ctx, k, c.Bytes()); err != nil { - log.Errorf("Failed to add cid for addition to batch: %s", err) + log.Errorw("Failed to write cid to datastore", "err", err) } counter++ } @@ -156,22 +156,18 @@ func (q *Queue) worker(ctx context.Context) { if !dsEmpty { head, err := q.getQueueHead(ctx) if err != nil { - log.Errorf("error querying for head of queue: %s, stopping provider", err) + log.Errorw("Error querying for head of queue, stopping provider", "err", err) return } if head != nil { k = datastore.NewKey(head.Key) if err = q.ds.Delete(ctx, k); err != nil { - log.Errorf("Failed to delete queued cid %s with key %s: %s", c, k, err) - continue + log.Errorw("Error deleting queue entry, stopping provider", "err", err, "key", head.Key) + return } c, err = cid.Parse(head.Value) if err != nil { - log.Warnf("error parsing queue entry cid with key (%s), removing it from queue: %s", head.Key, err) - if err = q.ds.Delete(ctx, k); err != nil { - log.Errorf("error deleting queue entry with key (%s), due to error (%s), stopping provider", head.Key, err) - return - } + log.Warnw("Error parsing queue entry cid, removing it from queue", "err", err, "key", head.Key) continue } } else { @@ -232,7 +228,7 @@ func (q *Queue) worker(ctx context.Context) { err = q.commitInput(ctx, counter, &inBuf) if err != nil { if !errors.Is(err, context.Canceled) { - log.Errorf("%w, stopping provider", err) + log.Errorw("Error writing CIDs to datastore, stopping provider", "err", err) } return } From 3ab25fc036a6eddeb57be05ccb6931f6f018de49 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Tue, 1 Apr 2025 16:30:23 -1000 Subject: [PATCH 21/23] idle check with minimum timer resets --- provider/internal/queue/queue.go | 46 ++++++++++++++++++++------------ 1 file changed, 29 insertions(+), 17 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index e9262e8c0..1edc348ba 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -20,11 +20,12 @@ var log = logging.Logger("provider.queue") const ( // batchSize is the limit on number of CIDs kept in memory at which ther - // are all written to the datastore.o + // are all written to the datastore. batchSize = 16 * 1024 - // batchCommitInterval is the sime since the last batch commit to write all - // CIDs remaining in memory. - batchCommitInterval = 2 * time.Minute + // idleWriteTime is the amout of time to check if the queue has been idle + // (no input or output). If the queue has been idle since the last check, + // then write all buffered CIDs to the datastore. + idleWriteTime = time.Minute // shutdownTimeout is the duration that Close waits to finish writing CIDs // to the datastore. shutdownTimeout = 20 * time.Second @@ -36,10 +37,15 @@ const ( // in the queue when the node is brought back online depending on whether they // were fully written to the underlying datastore. // -// Input to the queue is buffered in memory, up to batchSize. When the input -// buffer contains batchSize items, or when batchCommitInterval has elapsed -// since the previous batch commit, the contents of the input buffer are -// written to the datastore. +// Input to the queue is buffered in memory. The contents of the buffer are +// written to the datastore when the input buffer contains batchSize items, or +// when idleWriteTime has elapsed since the previous batch write or dequeue. CIDs to +// dequeue are read, in order, from the input buffer if there are none in the +// datastore. Otherwise they are read from the datastore. +// +// If queued items are read from the input buffer before it reaches its limit, +// then queued items can remain in memory. When the queue is closed, any +// remaining items in memory are written to the datastore. type Queue struct { close context.CancelFunc closed chan error @@ -144,11 +150,12 @@ func (q *Queue) worker(ctx context.Context) { commit bool dsEmpty bool err error + idle bool ) readInBuf := q.enqueue - batchTimer := time.NewTimer(batchCommitInterval) + batchTimer := time.NewTimer(idleWriteTime) defer batchTimer.Stop() for { @@ -193,6 +200,7 @@ func (q *Queue) worker(ctx context.Context) { if !ok { return } + idle = false if c == cid.Undef { // Use this CID as the next output since there was nothing in @@ -208,16 +216,21 @@ func (q *Queue) worker(ctx context.Context) { } case dequeue <- c: c = cid.Undef + idle = false case <-batchTimer.C: - if inBuf.Len() != 0 { - commit = true - } else { - batchTimer.Reset(batchCommitInterval) - if inBuf.Cap() > baseCap { - inBuf = deque.Deque[cid.Cid]{} - inBuf.SetBaseCap(baseCap) + if idle { + if inBuf.Len() != 0 { + commit = true + } else { + if inBuf.Cap() > baseCap { + inBuf = deque.Deque[cid.Cid]{} + inBuf.SetBaseCap(baseCap) + } } } + idle = true + batchTimer.Reset(idleWriteTime) + case <-ctx.Done(): return } @@ -234,7 +247,6 @@ func (q *Queue) worker(ctx context.Context) { } counter += uint64(n) dsEmpty = false - batchTimer.Reset(batchCommitInterval) } } } From 44c013de0625e9ac7df5b43c3b8d45d8e8341763 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Tue, 1 Apr 2025 17:14:56 -1000 Subject: [PATCH 22/23] error messages --- provider/internal/queue/queue.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 1edc348ba..17f7e760b 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -28,7 +28,7 @@ const ( idleWriteTime = time.Minute // shutdownTimeout is the duration that Close waits to finish writing CIDs // to the datastore. - shutdownTimeout = 20 * time.Second + shutdownTimeout = 10 * time.Second ) // Queue provides a FIFO interface to the datastore for storing cids. @@ -272,7 +272,7 @@ func (q *Queue) getQueueHead(ctx context.Context) (*query.Entry, error) { func (q *Queue) commitInput(ctx context.Context, counter uint64, cids *deque.Deque[cid.Cid]) error { b, err := q.ds.Batch(ctx) if err != nil { - return fmt.Errorf("Failed to create batch: %w", err) + return fmt.Errorf("failed to create batch: %w", err) } cstr := makeCidString(cids.Front()) @@ -281,7 +281,7 @@ func (q *Queue) commitInput(ctx context.Context, counter uint64, cids *deque.Deq c := cids.At(i) key := datastore.NewKey(fmt.Sprintf("%020d/%s", counter, cstr)) if err = b.Put(ctx, key, c.Bytes()); err != nil { - log.Errorf("Failed to add cid for addition to batch: %s", err) + log.Errorw("Failed to add cid to batch", "err", err) continue } counter++ @@ -289,7 +289,7 @@ func (q *Queue) commitInput(ctx context.Context, counter uint64, cids *deque.Deq cids.Clear() if err = b.Commit(ctx); err != nil { - return fmt.Errorf("failed to write to datastore: %w", err) + return fmt.Errorf("failed to commit batch to datastore: %w", err) } return nil From b6d46012e01f70c28bdaf9fa3cc2fc623b5b7b24 Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Wed, 2 Apr 2025 13:08:59 -1000 Subject: [PATCH 23/23] ignore undef cids --- provider/internal/queue/queue.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/provider/internal/queue/queue.go b/provider/internal/queue/queue.go index 17f7e760b..7144141c3 100644 --- a/provider/internal/queue/queue.go +++ b/provider/internal/queue/queue.go @@ -90,13 +90,16 @@ func (q *Queue) Close() error { } // Enqueue puts a cid in the queue. -func (q *Queue) Enqueue(cid cid.Cid) (err error) { +func (q *Queue) Enqueue(c cid.Cid) (err error) { + if c == cid.Undef { + return + } defer func() { if r := recover(); r != nil { err = errors.New("failed to enqueue CID: shutting down") } }() - q.enqueue <- cid + q.enqueue <- c return } @@ -107,7 +110,10 @@ func (q *Queue) Dequeue() <-chan cid.Cid { func makeCidString(c cid.Cid) string { data := c.Bytes() - return base64.RawURLEncoding.EncodeToString(data[len(data)-6:]) + if len(data) > 4 { + data = data[len(data)-4:] + } + return base64.RawURLEncoding.EncodeToString(data) } func makeKey(c cid.Cid, counter uint64) datastore.Key {