diff --git a/.github/workflows/go-ci.yml b/.github/workflows/go-ci.yml index 7f1abbd1..e69fc133 100644 --- a/.github/workflows/go-ci.yml +++ b/.github/workflows/go-ci.yml @@ -7,7 +7,6 @@ on: - main pull_request: jobs: - build: runs-on: ubuntu-latest steps: @@ -28,8 +27,8 @@ jobs: file: ./coverage.o - name: Lint - uses: golangci/golangci-lint-action@v7 + uses: golangci/golangci-lint-action@v9 with: version: latest - name: Go Mod Tidy - run: go mod tidy && git diff --exit-code \ No newline at end of file + run: go mod tidy && git diff --exit-code diff --git a/headertest/store.go b/headertest/store.go index 4adeb570..7071189e 100644 --- a/headertest/store.go +++ b/headertest/store.go @@ -82,10 +82,20 @@ func (m *Store[H]) GetByHeight(_ context.Context, height uint64) (H, error) { return zero, header.ErrNotFound } -func (m *Store[H]) DeleteTo(ctx context.Context, to uint64) error { +func (m *Store[H]) DeleteRange(ctx context.Context, from, to uint64) error { m.HeaderMu.Lock() defer m.HeaderMu.Unlock() - for h := m.TailHeight; h < to; h++ { + + if from >= to { + return fmt.Errorf("malformed range, from: %d, to: %d", from, to) + } + + if to > m.HeadHeight+1 { + return fmt.Errorf("delete range to %d beyond current head+1(%d)", to, m.HeadHeight+1) + } + + // Delete headers in the range [from:to) + for h := from; h < to; h++ { _, ok := m.Headers[h] if !ok { continue @@ -100,7 +110,17 @@ func (m *Store[H]) DeleteTo(ctx context.Context, to uint64) error { delete(m.Headers, h) // must be after deleteFn } - m.TailHeight = to + // Update TailHeight if we deleted from the beginning + if from <= m.TailHeight { + m.TailHeight = to + } + + // Update HeadHeight if we deleted from the end + // Range is [from:to), so head is only affected if to > HeadHeight + if to > m.HeadHeight { + m.HeadHeight = from - 1 + } + return nil } diff --git a/interface.go b/interface.go index 1c719fa6..26893b79 100644 --- a/interface.go +++ b/interface.go @@ -85,8 +85,9 @@ type Store[H Header[H]] interface { // GetRange returns the range [from:to). GetRange(context.Context, uint64, uint64) ([]H, error) - // DeleteTo deletes the range [Tail():to). - DeleteTo(ctx context.Context, to uint64) error + // DeleteRange deletes the range [from:to). + // It disallows the creation of gaps in the implementation's chain, ensuring contiguity between Tail --> Head. + DeleteRange(ctx context.Context, from, to uint64) error // OnDelete registers given handler to be called whenever a header with the height is being removed. // OnDelete guarantees that the header is accessible for the handler with GetByHeight and is removed diff --git a/p2p/server.go b/p2p/server.go index 4dc02a79..70613fd4 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -115,7 +115,11 @@ func (serv *ExchangeServer[H]) requestHandler(stream network.Stream) { case *p2p_pb.HeaderRequest_Hash: headers, err = serv.handleRequestByHash(ctx, pbreq.GetHash()) case *p2p_pb.HeaderRequest_Origin: - headers, err = serv.handleRangeRequest(ctx, pbreq.GetOrigin(), pbreq.GetOrigin()+pbreq.Amount) + headers, err = serv.handleRangeRequest( + ctx, + pbreq.GetOrigin(), + pbreq.GetOrigin()+pbreq.Amount, + ) default: log.Warn("server: invalid data type received") stream.Reset() //nolint:errcheck diff --git a/p2p/server_test.go b/p2p/server_test.go index 0af96b11..315359fc 100644 --- a/p2p/server_test.go +++ b/p2p/server_test.go @@ -192,7 +192,7 @@ func (timeoutStore[H]) GetRange(ctx context.Context, _, _ uint64) ([]H, error) { return nil, ctx.Err() } -func (timeoutStore[H]) DeleteTo(ctx context.Context, _ uint64) error { +func (timeoutStore[H]) DeleteRange(ctx context.Context, _, _ uint64) error { <-ctx.Done() return ctx.Err() } diff --git a/store/store_delete.go b/store/store_delete.go index 8c16e1b3..ebfaea12 100644 --- a/store/store_delete.go +++ b/store/store_delete.go @@ -36,59 +36,6 @@ func (s *Store[H]) OnDelete(fn func(context.Context, uint64) error) { }) } -// DeleteTo implements [header.Store] interface. -func (s *Store[H]) DeleteTo(ctx context.Context, to uint64) error { - // ensure all the pending headers are synchronized - err := s.Sync(ctx) - if err != nil { - return err - } - - head, err := s.Head(ctx) - if err != nil { - return fmt.Errorf("header/store: reading head: %w", err) - } - if head.Height()+1 < to { - _, err := s.getByHeight(ctx, to) - if errors.Is(err, header.ErrNotFound) { - return fmt.Errorf( - "header/store: delete to %d beyond current head(%d)", - to, - head.Height(), - ) - } - if err != nil { - return fmt.Errorf("delete to potential new head: %w", err) - } - - // if `to` is bigger than the current head and is stored - allow delete, making `to` a new head - } - - tail, err := s.Tail(ctx) - if err != nil { - return fmt.Errorf("header/store: reading tail: %w", err) - } - if tail.Height() >= to { - return fmt.Errorf("header/store: delete to %d below current tail(%d)", to, tail.Height()) - } - - err = s.deleteRange(ctx, tail.Height(), to) - if errors.Is(err, header.ErrNotFound) && head.Height()+1 == to { - // this is the case where we have deleted all the headers - // wipe the store - if err := s.wipe(ctx); err != nil { - return fmt.Errorf("header/store: wipe: %w", err) - } - - return nil - } - if err != nil { - return fmt.Errorf("header/store: delete to height %d: %w", to, err) - } - - return nil -} - // deleteRangeParallelThreshold defines the threshold for parallel deletion. // If range is smaller than this threshold, deletion will be performed sequentially. var ( @@ -96,69 +43,6 @@ var ( errDeleteTimeout = errors.New("delete timeout") ) -// deleteRange deletes [from:to) header range from the store. -// It gracefully handles context and errors attempting to save interrupted progress. -func (s *Store[H]) deleteRange(ctx context.Context, from, to uint64) (err error) { - startTime := time.Now() - - var ( - height uint64 - missing int - ) - defer func() { - if err != nil { - if errors.Is(err, errDeleteTimeout) { - log.Warnw("partial delete", - "from_height", from, - "expected_to_height", to, - "actual_to_height", height, - "hdrs_not_found", missing, - "took(s)", time.Since(startTime), - ) - } else { - log.Errorw("partial delete with error", - "from_height", from, - "expected_to_height", to, - "actual_to_height", height, - "hdrs_not_found", missing, - "took(s)", time.Since(startTime), - "err", err, - ) - } - } else if to-from > 1 { - log.Debugw("deleted headers", - "from_height", from, - "to_height", to, - "hdrs_not_found", missing, - "took(s)", time.Since(startTime).Seconds(), - ) - } - - if derr := s.setTail(ctx, s.ds, height); derr != nil { - err = errors.Join(err, fmt.Errorf("setting tail to %d: %w", height, derr)) - } - }() - - deleteCtx := ctx - if deadline, ok := ctx.Deadline(); ok { - // allocate 95% of caller's set deadline for deletion - // and give leftover to save progress - // this prevents store's state corruption from partial deletion - sub := deadline.Sub(startTime) / 100 * 95 - var cancel context.CancelFunc - deleteCtx, cancel = context.WithDeadlineCause(ctx, startTime.Add(sub), errDeleteTimeout) - defer cancel() - } - - if to-from < deleteRangeParallelThreshold { - height, missing, err = s.deleteSequential(deleteCtx, from, to) - } else { - height, missing, err = s.deleteParallel(deleteCtx, from, to) - } - - return err -} - // deleteSingle deletes a single header from the store, // its caches and indexies, notifying any registered onDelete handlers. func (s *Store[H]) deleteSingle( @@ -348,3 +232,222 @@ func (s *Store[H]) deleteParallel(ctx context.Context, from, to uint64) (uint64, ) return highest, missing, nil } + +// DeleteRange deletes headers in the range [from:to) from the store. +// It intelligently updates head and/or tail pointers based on what range is being deleted. +func (s *Store[H]) DeleteRange(ctx context.Context, from, to uint64) error { + // ensure all the pending headers are synchronized + err := s.Sync(ctx) + if err != nil { + return err + } + + // load current head and tail + head, err := s.Head(ctx) + if err != nil { + return fmt.Errorf("header/store: reading head: %w", err) + } + + tail, err := s.Tail(ctx) + if err != nil { + return fmt.Errorf("header/store: reading tail: %w", err) + } + + // sanity check range parameters + if from >= to { + return fmt.Errorf( + "header/store: invalid range [%d:%d) - from must be less than to", + from, + to, + ) + } + // if range is empty within the current store bounds, it's a no-op + if from > head.Height() || to <= tail.Height() { + return fmt.Errorf( + "header/store range [%d,%d) is not present in store, nothing needs to be deleted: "+ + "current head %d, tail %d", + from, + to, + head.Height(), + tail.Height(), + ) + } + + // Determine whether we are moving the head or tail pointers, or wiping the + // store completely: only allow deletions that - + // 1. Start from tail (advancing tail forward) + // 2. End at head+1 (moving head backward) + // 3. Delete the entire store + + updateTail := from == tail.Height() + updateHead := to == head.Height()+1 + + // Attempt to delete (wipe) the entire store + if updateTail && updateHead { + // Only wipe if 'to' is exactly at head+1 (normal case) to avoid accidental wipes + // Check if a header exists exactly at 'to' (in pending, cache, or disk) + // If it exists, we can't wipe - there's a header that would become the new tail + _, err := s.getByHeight(ctx, to) + if errors.Is(err, header.ErrNotFound) { + // No header at 'to', safe to wipe the entire store + if err := s.wipe(ctx); err != nil { + return fmt.Errorf("header/store: wipe: %w", err) + } + log.Info("header/store: wiped store") + return nil + } + if err != nil { + return fmt.Errorf("header/store: checking header at %d: %w", to, err) + } + // Header exists at 'to', proceed with normal deletion + } + + switch { + case updateTail: + // we are attempting to move tail forward, so sanity check `to` + if to > head.Height()+1 { + return fmt.Errorf( + "header/store: delete range to %d beyond current head+1(%d)", + to, + head.Height()+1, + ) + } + case updateHead: + // we are attempting to move head backward, so sanity check `from` + if from < tail.Height() { + return fmt.Errorf( + "header/store: delete range from %d below current tail(%d)", + from, + tail.Height(), + ) + } + default: + // disallow deletions that are neither move the tail nor head as this is + // a malformed range and could create a gap within the contiguous chain + // of headers + return fmt.Errorf( + "header/store: delete range [%d:%d) does not move head(%d) or tail(%d) pointers "+ + "and would create gaps in the store. Only deletion from tail or head+1 is supported", + from, to, head.Height(), tail.Height(), + ) + } + + // Delete the headers without automatic tail updates + actualTo, _, deleteErr := s.deleteRangeRaw(ctx, from, to) + + // Always update pointers to reflect actual progress, even on partial delete. + // This ensures store consistency and allows retries to continue from where we left off. + if updateTail { + // For tail-side deletion, update tail to actual progress + if err := s.setTail(ctx, s.ds, actualTo); err != nil { + return errors.Join( + deleteErr, + fmt.Errorf("header/store: setting tail to %d: %w", actualTo, err), + ) + } + } + + if updateHead && !updateTail { + // This means we only receded the head, only update head if we made progress + // `actualTo` represents the height we receded backwards to as deletion + // moves in ascending order from --> to, so if we made any progress deleting headers + // `from` --> `actualTo`, we must always update the head to be one below `from` (which was deleted) + // to preserve contiguity (regardless of whether a partial delete occurred) + if actualTo > from { + newHeadHeight := from - 1 + if err := s.setHead(ctx, s.ds, newHeadHeight); err != nil { + return errors.Join( + deleteErr, + fmt.Errorf("header/store: setting head to %d: %w", newHeadHeight, err), + ) + } + } + } + + if deleteErr != nil { + return fmt.Errorf( + "header/store: delete range [%d:%d) (actual: %d): %w", + from, + to, + actualTo, + deleteErr, + ) + } + + return nil +} + +// deleteRangeRaw deletes [from:to) header range without updating head or tail pointers. +// Returns the actual highest height processed (actualTo) and the number of missing headers. +func (s *Store[H]) deleteRangeRaw( + ctx context.Context, + from, to uint64, +) (actualTo uint64, missing int, err error) { + startTime := time.Now() + + var height uint64 + defer func() { + actualTo = height + if err != nil { + if errors.Is(err, errDeleteTimeout) { + log.Warnw("partial delete range", + "from_height", from, + "expected_to_height", to, + "actual_to_height", height, + "hdrs_not_found", missing, + "took(s)", time.Since(startTime).Seconds(), + ) + } else { + log.Errorw("partial delete range with error", + "from_height", from, + "expected_to_height", to, + "actual_to_height", height, + "hdrs_not_found", missing, + "took(s)", time.Since(startTime).Seconds(), + "err", err, + ) + } + } else if to-from > 1 { + log.Debugw("deleted range", + "from_height", from, + "to_height", to, + "hdrs_not_found", missing, + "took(s)", time.Since(startTime).Seconds(), + ) + } + }() + + deleteCtx := ctx + if deadline, ok := ctx.Deadline(); ok { + // allocate 95% of caller's set deadline for deletion + // and give leftover to save progress + sub := deadline.Sub(startTime) / 100 * 95 + var cancel context.CancelFunc + deleteCtx, cancel = context.WithDeadlineCause(ctx, startTime.Add(sub), errDeleteTimeout) + defer cancel() + } + + if to-from < deleteRangeParallelThreshold { + height, missing, err = s.deleteSequential(deleteCtx, from, to) + } else { + height, missing, err = s.deleteParallel(deleteCtx, from, to) + } + + return height, missing, err +} + +// setHead sets the head of the store to the specified height. +func (s *Store[H]) setHead(ctx context.Context, write datastore.Write, to uint64) error { + newHead, err := s.getByHeight(ctx, to) + if err != nil { + return fmt.Errorf("getting head: %w", err) + } + + // update the contiguous head + s.contiguousHead.Store(&newHead) + if err := writeHeaderHashTo(ctx, write, newHead, headKey); err != nil { + return fmt.Errorf("writing headKey in batch: %w", err) + } + + return nil +} diff --git a/store/store_test.go b/store/store_test.go index f9ffca0c..f5b18a34 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -3,6 +3,7 @@ package store import ( "bytes" "context" + "errors" "math/rand" stdsync "sync" "testing" @@ -525,7 +526,7 @@ func TestStore_GetRange(t *testing.T) { } } -func TestStore_DeleteTo(t *testing.T) { +func TestStore_DeleteRange_Tail(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) @@ -590,7 +591,7 @@ func TestStore_DeleteTo(t *testing.T) { ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() - err := store.DeleteTo(ctx, tt.to) + err := store.DeleteRange(ctx, from, tt.to) if tt.wantError { assert.Error(t, err) return @@ -612,7 +613,7 @@ func TestStore_DeleteTo(t *testing.T) { } } -func TestStore_DeleteTo_EmptyStore(t *testing.T) { +func TestStore_DeleteRange_EmptyStore(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) @@ -629,11 +630,14 @@ func TestStore_DeleteTo_EmptyStore(t *testing.T) { require.NoError(t, err) time.Sleep(10 * time.Millisecond) - err = store.DeleteTo(ctx, 101) + tail, err := store.Tail(ctx) + require.NoError(t, err) + + err = store.DeleteRange(ctx, tail.Height(), 101) require.NoError(t, err) // assert store is empty - tail, err := store.Tail(ctx) + tail, err = store.Tail(ctx) assert.Nil(t, tail) assert.ErrorIs(t, err, header.ErrEmptyStore) head, err := store.Head(ctx) @@ -654,7 +658,7 @@ func TestStore_DeleteTo_EmptyStore(t *testing.T) { assert.ErrorIs(t, err, header.ErrEmptyStore) } -func TestStore_DeleteTo_MoveHeadAndTail(t *testing.T) { +func TestStore_DeleteRange_MoveHeadAndTail(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) @@ -667,26 +671,33 @@ func TestStore_DeleteTo_MoveHeadAndTail(t *testing.T) { err = store.Start(ctx) require.NoError(t, err) + // Append 100 headers (heights 2-101, head becomes 101) err = store.Append(ctx, suite.GenDummyHeaders(100)...) require.NoError(t, err) time.Sleep(10 * time.Millisecond) - gap := suite.GenDummyHeaders(10) - - err = store.Append(ctx, suite.GenDummyHeaders(10)...) + // Append 10 more headers (heights 102-111, head becomes 111) + newHeaders := suite.GenDummyHeaders(10) + err = store.Append(ctx, newHeaders...) require.NoError(t, err) time.Sleep(10 * time.Millisecond) - err = store.DeleteTo(ctx, 111) + tail, err := store.Tail(ctx) + require.NoError(t, err) + + // Delete from tail to head+1 (wipes the store, then we verify behavior) + // Instead, let's delete a portion from tail to keep some headers + deleteTo := uint64(102) // Delete heights 1-101, keep 102-111 + err = store.DeleteRange(ctx, tail.Height(), deleteTo) require.NoError(t, err) // assert store is not empty - tail, err := store.Tail(ctx) + tail, err = store.Tail(ctx) require.NoError(t, err) - assert.Equal(t, int(gap[len(gap)-1].Height()+1), int(tail.Height())) + assert.Equal(t, deleteTo, tail.Height()) head, err := store.Head(ctx) require.NoError(t, err) - assert.Equal(t, suite.Head().Height(), head.Height()) + assert.Equal(t, newHeaders[len(newHeaders)-1].Height(), head.Height()) // assert that it is still not empty after restart err = store.Stop(ctx) @@ -696,36 +707,10 @@ func TestStore_DeleteTo_MoveHeadAndTail(t *testing.T) { tail, err = store.Tail(ctx) require.NoError(t, err) - assert.Equal(t, gap[len(gap)-1].Height()+1, tail.Height()) + assert.Equal(t, deleteTo, tail.Height()) head, err = store.Head(ctx) require.NoError(t, err) - assert.Equal(t, suite.Head().Height(), head.Height()) -} - -func TestStore_DeleteTo_Synchronized(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - t.Cleanup(cancel) - - suite := headertest.NewTestSuite(t) - - ds := sync.MutexWrap(datastore.NewMapDatastore()) - store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) - - err := store.Append(ctx, suite.GenDummyHeaders(50)...) - require.NoError(t, err) - - err = store.Append(ctx, suite.GenDummyHeaders(50)...) - require.NoError(t, err) - - err = store.Append(ctx, suite.GenDummyHeaders(50)...) - require.NoError(t, err) - - err = store.DeleteTo(ctx, 100) - require.NoError(t, err) - - tail, err := store.Tail(ctx) - require.NoError(t, err) - require.EqualValues(t, 100, tail.Height()) + assert.Equal(t, newHeaders[len(newHeaders)-1].Height(), head.Height()) } func TestStore_OnDelete(t *testing.T) { @@ -735,19 +720,11 @@ func TestStore_OnDelete(t *testing.T) { suite := headertest.NewTestSuite(t) ds := sync.MutexWrap(datastore.NewMapDatastore()) - store, err := NewStore[*headertest.DummyHeader](ds) - require.NoError(t, err) - - err = store.Start(ctx) - require.NoError(t, err) - - err = store.Append(ctx, suite.GenDummyHeaders(50)...) - require.NoError(t, err) - // artificial gap - _ = suite.GenDummyHeaders(50) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) - err = store.Append(ctx, suite.GenDummyHeaders(50)...) + err := store.Append(ctx, suite.GenDummyHeaders(100)...) require.NoError(t, err) + time.Sleep(100 * time.Millisecond) deleted := 0 store.OnDelete(func(ctx context.Context, height uint64) error { @@ -758,13 +735,27 @@ func TestStore_OnDelete(t *testing.T) { return nil }) - err = store.DeleteTo(ctx, 101) + tail, err := store.Tail(ctx) + require.NoError(t, err) + + // Delete a partial range from tail (not the entire store) + // This ensures OnDelete handlers are called for each header + deleteTo := uint64(51) + err = store.DeleteRange(ctx, tail.Height(), deleteTo) require.NoError(t, err) - assert.Equal(t, 50, deleted) + // Should have deleted headers from tail to deleteTo-1 (50 headers) + expectedDeleted := int(deleteTo - tail.Height()) + assert.Equal(t, expectedDeleted, deleted) - hdr, err := store.GetByHeight(ctx, 50) + // Verify deleted headers are gone + hdr, err := store.GetByHeight(ctx, tail.Height()) assert.Error(t, err) assert.Nil(t, hdr) + + // Verify headers at and above deleteTo still exist + hdr, err = store.GetByHeight(ctx, deleteTo) + assert.NoError(t, err) + assert.NotNil(t, hdr) } func TestStorePendingCacheMiss(t *testing.T) { @@ -889,7 +880,10 @@ func TestStore_HasAt(t *testing.T) { require.NoError(t, err) time.Sleep(100 * time.Millisecond) - err = store.DeleteTo(ctx, 50) + tail, err := store.Tail(ctx) + require.NoError(t, err) + + err = store.DeleteRange(ctx, tail.Height(), 50) require.NoError(t, err) has := store.HasAt(ctx, 100) @@ -907,3 +901,625 @@ func TestStore_HasAt(t *testing.T) { has = store.HasAt(ctx, 0) assert.False(t, has) } + +func TestStore_DeleteRange(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + t.Cleanup(cancel) + + t.Run("delete range from head down", func(t *testing.T) { + suite := headertest.NewTestSuite(t) + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + const count = 20 + in := suite.GenDummyHeaders(count) + err := store.Append(ctx, in...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // Genesis is at height 1, GenDummyHeaders(20) creates headers 2-21 + // So head should be at height 21, tail at height 1 + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, uint64(21), head.Height()) + + // Delete from height 16 to 22 (should delete 16, 17, 18, 19, 20, 21) + err = store.DeleteRange(ctx, 16, 22) + require.NoError(t, err) + + // Verify new head is at height 15 + newHead, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, uint64(15), newHead.Height()) + + // Verify deleted headers are gone + for h := uint64(16); h <= 21; h++ { + has := store.HasAt(ctx, h) + assert.False(t, has, "height %d should be deleted", h) + } + + // Verify remaining headers still exist + for h := uint64(1); h <= 15; h++ { + has := store.HasAt(ctx, h) + assert.True(t, has, "height %d should still exist", h) + } + }) + + t.Run("delete range in middle should fail", func(t *testing.T) { + suite := headertest.NewTestSuite(t) + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + const count = 20 + in := suite.GenDummyHeaders(count) + err := store.Append(ctx, in...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // Try to delete a range in the middle (heights 8-12) which would create gaps + err = store.DeleteRange(ctx, 8, 12) + require.Error(t, err) + assert.Contains(t, err.Error(), "would create gaps in the store") + + // Verify all headers still exist since the operation failed + for h := uint64(1); h <= 21; h++ { + has := store.HasAt(ctx, h) + assert.True(t, has, "height %d should still exist after failed deletion", h) + } + }) + + t.Run("delete range from tail up", func(t *testing.T) { + suite := headertest.NewTestSuite(t) + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + const count = 20 + in := suite.GenDummyHeaders(count) + err := store.Append(ctx, in...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + originalHead, err := store.Head(ctx) + require.NoError(t, err) + + // Delete from tail height to height 10 + err = store.DeleteRange(ctx, 1, 10) + require.NoError(t, err) + + // Verify head is unchanged + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, originalHead.Height(), head.Height()) + + // Verify tail moved to height 10 + tail, err := store.Tail(ctx) + require.NoError(t, err) + assert.Equal(t, uint64(10), tail.Height()) + + // Verify deleted headers are gone + for h := uint64(1); h < 10; h++ { + has := store.HasAt(ctx, h) + assert.False(t, has, "height %d should be deleted", h) + } + + // Verify remaining headers still exist + for h := uint64(10); h <= 21; h++ { + has := store.HasAt(ctx, h) + assert.True(t, has, "height %d should still exist", h) + } + }) + + t.Run("delete range completely out of bounds errors", func(t *testing.T) { + suite := headertest.NewTestSuite(t) + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + const count = 20 + in := suite.GenDummyHeaders(count) + err := store.Append(ctx, in...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + originalHead, err := store.Head(ctx) + require.NoError(t, err) + originalTail, err := store.Tail(ctx) + require.NoError(t, err) + + // Delete range completely above head - should error (from > head.Height()) + err = store.DeleteRange(ctx, 200, 300) + require.Error(t, err) + assert.Contains(t, err.Error(), "is not present in store") + + // Verify head and tail are unchanged + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, originalHead.Height(), head.Height()) + + tail, err := store.Tail(ctx) + require.NoError(t, err) + assert.Equal(t, originalTail.Height(), tail.Height()) + + // Verify all original headers still exist + for h := uint64(1); h <= 21; h++ { + has := store.HasAt(ctx, h) + assert.True(t, has, "height %d should still exist", h) + } + }) + + t.Run("invalid range errors", func(t *testing.T) { + suite := headertest.NewTestSuite(t) + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + const count = 20 + in := suite.GenDummyHeaders(count) + err := store.Append(ctx, in...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // from >= to should error + err = store.DeleteRange(ctx, 50, 50) + assert.Error(t, err) + assert.Contains(t, err.Error(), "from must be less than to") + + // from > to should error + err = store.DeleteRange(ctx, 60, 50) + assert.Error(t, err) + assert.Contains(t, err.Error(), "from must be less than to") + + // from < tail && to < head+1 should error + err = store.DeleteRange(ctx, 0, 5) + assert.Error(t, err) + assert.Contains(t, err.Error(), "Only deletion from tail or head+1 is supported") + + // middle deletion should error + err = store.DeleteRange(ctx, 10, 15) + assert.Error(t, err) + assert.Contains(t, err.Error(), "would create gaps") + }) +} + +func TestStore_DeleteRange_SingleHeader(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + // Add single header at height 1 (genesis is at 0) + headers := suite.GenDummyHeaders(1) + err := store.Append(ctx, headers...) + require.NoError(t, err) + + // Should not be able to delete below tail + err = store.DeleteRange(ctx, 0, 1) + require.Error(t, err) // should error - would delete below tail +} + +func TestStore_DeleteRange_Synchronized(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + err := store.Append(ctx, suite.GenDummyHeaders(50)...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // Ensure sync completes + err = store.Sync(ctx) + require.NoError(t, err) + + // Delete from height 26 to head+1 + head, err := store.Head(ctx) + require.NoError(t, err) + + err = store.DeleteRange(ctx, 26, head.Height()+1) + require.NoError(t, err) + + // Verify head is now at height 25 + newHead, err := store.Head(ctx) + require.NoError(t, err) + require.EqualValues(t, 25, newHead.Height()) + + // Verify headers above 25 are gone + for h := uint64(26); h <= 50; h++ { + has := store.HasAt(ctx, h) + assert.False(t, has, "height %d should be deleted", h) + } + + // Verify headers at and below 25 still exist + for h := uint64(1); h <= 25; h++ { + has := store.HasAt(ctx, h) + assert.True(t, has, "height %d should still exist", h) + } +} + +func TestStore_DeleteRange_OnDeleteHandlers(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + err := store.Append(ctx, suite.GenDummyHeaders(50)...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // Get the actual head height to calculate expected deletions + head, err := store.Head(ctx) + require.NoError(t, err) + + var deletedHeights []uint64 + store.OnDelete(func(ctx context.Context, height uint64) error { + deletedHeights = append(deletedHeights, height) + return nil + }) + + // Delete from height 41 to head+1 + err = store.DeleteRange(ctx, 41, head.Height()+1) + require.NoError(t, err) + + // Verify onDelete was called for each deleted height (from 41 to head height) + var expectedDeleted []uint64 + for h := uint64(41); h <= head.Height(); h++ { + expectedDeleted = append(expectedDeleted, h) + } + assert.ElementsMatch(t, expectedDeleted, deletedHeights) +} + +func TestStore_DeleteRange_LargeRange(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(100)) + + // Create a large number of headers to trigger parallel deletion + const count = 15000 + headers := suite.GenDummyHeaders(count) + err := store.Append(ctx, headers...) + require.NoError(t, err) + + time.Sleep(500 * time.Millisecond) // allow time for large batch to write + + // Get head height for deletion range + head, err := store.Head(ctx) + require.NoError(t, err) + + // Delete a large range to test parallel deletion path (from 5001 to head+1) + const keepHeight = 5000 + err = store.DeleteRange(ctx, keepHeight+1, head.Height()+1) + require.NoError(t, err) + + // Verify new head + newHead, err := store.Head(ctx) + require.NoError(t, err) + require.EqualValues(t, keepHeight, newHead.Height()) + + // Spot check that high numbered headers are gone + for h := uint64(keepHeight + 1000); h <= count; h += 1000 { + has := store.HasAt(ctx, h) + assert.False(t, has, "height %d should be deleted", h) + } + + // Spot check that low numbered headers still exist + for h := uint64(1000); h <= keepHeight; h += 1000 { + has := store.HasAt(ctx, h) + assert.True(t, has, "height %d should still exist", h) + } +} + +func TestStore_DeleteRange_Wipe(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(100)) + + // Create a large number of headers + const count = 15000 + headers := suite.GenDummyHeaders(count) + err := store.Append(ctx, headers...) + require.NoError(t, err) + + time.Sleep(500 * time.Millisecond) // allow time for large batch to write + + // Get head height for deletion range + head, err := store.Head(ctx) + require.NoError(t, err) + + tail, err := store.Tail(ctx) + require.NoError(t, err) + + // Delete a large range to test parallel deletion path (from 5001 to head+1) + err = store.DeleteRange(ctx, tail.Height(), head.Height()+1) + require.NoError(t, err) + + // Verify new head + _, err = store.Head(ctx) + require.Error(t, err) + _, err = store.Tail(ctx) + require.Error(t, err) +} + +func TestStore_DeleteRange_ValidationErrors(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + err := store.Append(ctx, suite.GenDummyHeaders(20)...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + tail, err := store.Tail(ctx) + require.NoError(t, err) + + head, err := store.Head(ctx) + require.NoError(t, err) + + tests := []struct { + name string + from uint64 + to uint64 + errMsg string + }{ + { + name: "delete from below tail boundary", + from: tail.Height() - 1, + to: tail.Height() + 5, + errMsg: "Only deletion from tail or head+1 is supported", + }, + { + name: "invalid range - from equals to", + from: 10, + to: 10, + errMsg: "from must be less than to", + }, + { + name: "invalid range - from greater than to", + from: 15, + to: 10, + errMsg: "from must be less than to", + }, + { + name: "delete to beyond head+1", + from: tail.Height(), + to: head.Height() + 10, + errMsg: "beyond current head+1", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := store.DeleteRange(ctx, tt.from, tt.to) + require.Error(t, err) + assert.Contains(t, err.Error(), tt.errMsg) + }) + } +} + +func TestStore_DeleteRange_PartialDelete(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + t.Cleanup(cancel) + + t.Run("partial delete from head with timeout and recovery", func(t *testing.T) { + suite := headertest.NewTestSuite(t) + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + // Add headers + const count = 1000 + in := suite.GenDummyHeaders(count) + err := store.Append(ctx, in...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + originalHead, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, uint64(1001), originalHead.Height()) + + originalTail, err := store.Tail(ctx) + require.NoError(t, err) + + // Create a context with very short timeout to trigger partial delete + shortCtx, shortCancel := context.WithTimeout(ctx, 1*time.Millisecond) + defer shortCancel() + + // Try to delete from height 500 to head+1 (may partially complete or fully complete) + deleteErr := store.DeleteRange(shortCtx, 500, 1002) + + // Get current state - head should be updated to reflect progress (set to 499 since from=500) + head, err := store.Head(ctx) + require.NoError(t, err) + + // Tail should not have changed (we're deleting from head side) + tail, err := store.Tail(ctx) + require.NoError(t, err) + assert.Equal(t, originalTail.Height(), tail.Height()) + + if deleteErr != nil { + // Partial delete occurred - head should be updated to from-1 + assert.True(t, errors.Is(deleteErr, context.DeadlineExceeded) || + errors.Is(deleteErr, errDeleteTimeout), + "expected timeout error, got: %v", deleteErr) + + // Head should be updated to reflect that deletion started at 500 + assert.Equal(t, uint64(499), head.Height()) + + // Verify we can still read the new head + _, err = store.Get(ctx, head.Hash()) + require.NoError(t, err) + + // Since head is already at 499, there's nothing more to delete from the head side + // The partial delete already completed the deletion by setting head to from-1 + } + + // After completion, verify final state + newHead, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, uint64(499), newHead.Height()) + + // Verify deleted headers are gone + for h := uint64(500); h <= 1001; h++ { + has := store.HasAt(ctx, h) + assert.False(t, has, "height %d should be deleted", h) + } + + // Verify remaining headers exist + for h := uint64(1); h <= 499; h++ { + has := store.HasAt(ctx, h) + assert.True(t, has, "height %d should still exist", h) + } + }) + + t.Run("partial delete from tail with timeout and recovery", func(t *testing.T) { + suite := headertest.NewTestSuite(t) + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + // Add headers + const count = 1000 + in := suite.GenDummyHeaders(count) + err := store.Append(ctx, in...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + originalHead, err := store.Head(ctx) + require.NoError(t, err) + originalTail, err := store.Tail(ctx) + require.NoError(t, err) + assert.Equal(t, uint64(1), originalTail.Height()) + + // Create a context with very short timeout + shortCtx, shortCancel := context.WithTimeout(ctx, 1*time.Millisecond) + defer shortCancel() + + // Try to delete from tail to height 500 (may partially complete or fully complete) + deleteErr := store.DeleteRange(shortCtx, 1, 500) + + // Head should not have changed (we're deleting from tail side) + head, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, originalHead.Height(), head.Height()) + + // Get current tail - it should be updated to reflect progress + tail, err := store.Tail(ctx) + require.NoError(t, err) + + if deleteErr != nil { + // Partial delete occurred + assert.True(t, errors.Is(deleteErr, context.DeadlineExceeded) || + errors.Is(deleteErr, errDeleteTimeout), + "expected timeout error, got: %v", deleteErr) + + // Tail should be updated to reflect actual progress + assert.Greater(t, tail.Height(), originalTail.Height()) + + // Now complete the deletion with proper timeout - use current tail + err = store.DeleteRange(ctx, tail.Height(), 500) + require.NoError(t, err) + } + + // After completion, verify final state + newTail, err := store.Tail(ctx) + require.NoError(t, err) + assert.Equal(t, uint64(500), newTail.Height()) + + // Head should be unchanged + head, err = store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, originalHead.Height(), head.Height()) + + // Verify deleted headers are gone + for h := uint64(1); h < 500; h++ { + has := store.HasAt(ctx, h) + assert.False(t, has, "height %d should be deleted", h) + } + + // Verify remaining headers exist + for h := uint64(500); h <= 1001; h++ { + has := store.HasAt(ctx, h) + assert.True(t, has, "height %d should still exist", h) + } + }) + + t.Run("multiple partial deletes eventually succeed", func(t *testing.T) { + suite := headertest.NewTestSuite(t) + ds := sync.MutexWrap(datastore.NewMapDatastore()) + store := NewTestStore(t, ctx, ds, suite.Head(), WithWriteBatchSize(10)) + + // Add headers + const count = 800 + in := suite.GenDummyHeaders(count) + err := store.Append(ctx, in...) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + + // Attempt delete with progressively longer timeouts + from, to := uint64(300), uint64(802) + maxAttempts := 5 + + for attempt := 1; attempt <= maxAttempts; attempt++ { + attemptCtx, attemptCancel := context.WithTimeout(ctx, + time.Duration(attempt*20)*time.Millisecond) + + err = store.DeleteRange(attemptCtx, from, to) + attemptCancel() + + if err == nil { + // Success! + break + } + + // Verify store remains consistent after each failed attempt + head, err := store.Head(ctx) + require.NoError(t, err) + _, err = store.Get(ctx, head.Hash()) + require.NoError(t, err) + + if attempt == maxAttempts { + // Last attempt with full context + err = store.DeleteRange(ctx, from, to) + require.NoError(t, err) + } + } + + // Verify final state + newHead, err := store.Head(ctx) + require.NoError(t, err) + assert.Equal(t, uint64(299), newHead.Height()) + + // Verify deleted range is gone + for h := from; h <= 801; h++ { + has := store.HasAt(ctx, h) + assert.False(t, has, "height %d should be deleted", h) + } + }) +} diff --git a/sync/syncer_head.go b/sync/syncer_head.go index 3fb24c63..271a5b07 100644 --- a/sync/syncer_head.go +++ b/sync/syncer_head.go @@ -93,7 +93,11 @@ func (s *Syncer[H]) networkHead(ctx context.Context) (H, bool, error) { return sbjHead, false, nil } // still check if even the newly requested head is not recent - if recent, timeDiff = isRecent(newHead, s.Params.blockTime, s.Params.recencyThreshold); !recent { + if recent, timeDiff = isRecent( + newHead, + s.Params.blockTime, + s.Params.recencyThreshold, + ); !recent { log.Warnw( "non recent head from trusted peers", "height", diff --git a/sync/syncer_tail.go b/sync/syncer_tail.go index 877ff407..c7790c99 100644 --- a/sync/syncer_tail.go +++ b/sync/syncer_tail.go @@ -130,7 +130,7 @@ func (s *Syncer[H]) moveTail(ctx context.Context, from, to H) error { switch { case from.Height() < to.Height(): log.Infof("move tail up from %d to %d, pruning the diff...", from.Height(), to.Height()) - err := s.store.DeleteTo(ctx, to.Height()) + err := s.store.DeleteRange(ctx, from.Height(), to.Height()) if err != nil { return fmt.Errorf( "deleting headers up to newly configured tail(%d): %w",