diff --git a/espresso/batch_buffer.go b/espresso/batch_buffer.go deleted file mode 100644 index a5468fe366a..00000000000 --- a/espresso/batch_buffer.go +++ /dev/null @@ -1,112 +0,0 @@ -package espresso - -import ( - "errors" - "slices" - - "github.com/ethereum-optimism/optimism/op-service/eth" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" -) - -type BatchValidity uint8 - -const ( - // BatchDrop indicates that the batch is invalid, and will always be in the future, unless we reorg - BatchDrop = iota - // BatchAccept indicates that the batch is valid and should be processed - BatchAccept - // BatchUndecided indicates we are lacking L1 information until we can proceed batch filtering - BatchUndecided - // BatchPast indicates that the batch is from the past, i.e. its timestamp is smaller or equal - // to the safe head's timestamp. - BatchPast -) - -var ErrAtCapacity = errors.New("batch buffer at capacity") -var ErrDuplicateBatch = errors.New("duplicate batch") - -type Batch interface { - Number() uint64 - L1Origin() eth.BlockID - Header() *types.Header - Hash() common.Hash - Signer() common.Address -} - -type BatchBuffer[B Batch] struct { - batches []B - capacity uint64 -} - -func NewBatchBuffer[B Batch](capacity uint64) BatchBuffer[B] { - return BatchBuffer[B]{ - batches: []B{}, - capacity: capacity, - } -} - -func (b BatchBuffer[B]) Capacity() uint64 { - return b.capacity -} - -func (b BatchBuffer[B]) Len() int { - return len(b.batches) -} - -func (b *BatchBuffer[B]) Clear() { - b.batches = nil -} - -func (b *BatchBuffer[B]) Insert(batch B) error { - if uint64(b.Len()) >= b.capacity { - return ErrAtCapacity - } - - pos, alreadyExists := slices.BinarySearchFunc(b.batches, batch, func(a, t B) int { - // Note: we use a custom comparison function that returns 0 only if the batches are actually - // the same to ensure that newer batches with the same number are stored later in the buffer - if a.Hash() == t.Hash() { - return 0 - } - - if a.Number() > t.Number() { - return 1 - } else { - return -1 - } - }) - - if alreadyExists { - return ErrDuplicateBatch - } - - b.batches = slices.Insert(b.batches, pos, batch) - return nil -} - -func (b *BatchBuffer[B]) Get(i int) *B { - if i < b.Len() { - return &b.batches[i] - } else { - return nil - } -} - -func (b *BatchBuffer[B]) Peek() *B { - if len(b.batches) == 0 { - return nil - } - return &b.batches[0] -} - -func (b *BatchBuffer[B]) Pop() *B { - if len(b.batches) == 0 { - return nil - } - - batch := b.batches[0] - b.batches = b.batches[1:] - - return &batch -} diff --git a/espresso/batch_buffer_test.go b/espresso/batch_buffer_test.go deleted file mode 100644 index 952fa92f38f..00000000000 --- a/espresso/batch_buffer_test.go +++ /dev/null @@ -1,292 +0,0 @@ -package espresso - -import ( - "math/big" - "testing" - - "github.com/ethereum-optimism/optimism/op-service/eth" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - "github.com/stretchr/testify/require" -) - -// mockBatch is a simple implementation of the Batch interface for testing -type mockBatch struct { - number uint64 - hash common.Hash - l1Origin eth.BlockID -} - -func (m mockBatch) Number() uint64 { - return m.number -} - -func (m mockBatch) L1Origin() eth.BlockID { - return m.l1Origin -} - -func (m mockBatch) Header() *types.Header { - return &types.Header{ - Number: big.NewInt(int64(m.number)), - } -} - -func (m mockBatch) Hash() common.Hash { - return m.hash -} - -func (m mockBatch) Signer() common.Address { - return common.Address{} -} - -// newMockBatch creates a mock batch with the given number and a hash derived from the number -func newMockBatch(number uint64) mockBatch { - return mockBatch{ - number: number, - hash: common.BigToHash(big.NewInt(int64(number))), - l1Origin: eth.BlockID{ - Number: number, - Hash: common.BigToHash(big.NewInt(int64(number))), - }, - } -} - -// newMockBatchWithHash creates a mock batch with a specific number and hash -func newMockBatchWithHash(number uint64, hash common.Hash) mockBatch { - return mockBatch{ - number: number, - hash: hash, - l1Origin: eth.BlockID{ - Number: number, - Hash: common.BigToHash(big.NewInt(int64(number))), - }, - } -} - -// TestBatchBufferInsertAtCapacity verifies that the buffer respects its capacity limit -// and returns ErrAtCapacity when attempting to insert beyond capacity. -func TestBatchBufferInsertAtCapacity(t *testing.T) { - const testCapacity uint64 = 3 - - // Create a buffer with small capacity - buffer := NewBatchBuffer[mockBatch](testCapacity) - - // Verify Capacity() returns the configured capacity - require.Equal(t, testCapacity, buffer.Capacity()) - - // Verify buffer starts empty - require.Equal(t, 0, buffer.Len()) - - // Insert batches up to capacity - batch1 := newMockBatch(1) - batch2 := newMockBatch(2) - batch3 := newMockBatch(3) - - err := buffer.Insert(batch1) - require.NoError(t, err) - require.Equal(t, 1, buffer.Len()) - - err = buffer.Insert(batch2) - require.NoError(t, err) - require.Equal(t, 2, buffer.Len()) - - err = buffer.Insert(batch3) - require.NoError(t, err) - require.Equal(t, 3, buffer.Len()) - - // Verify inserting beyond capacity returns ErrAtCapacity - batch4 := newMockBatch(4) - err = buffer.Insert(batch4) - require.ErrorIs(t, err, ErrAtCapacity) - - // Verify buffer contents unchanged after failed insert - require.Equal(t, 3, buffer.Len()) - require.Equal(t, testCapacity, buffer.Capacity()) - - // Verify the original batches are still accessible and in sorted order - got := buffer.Get(0) - require.NotNil(t, got) - require.Equal(t, uint64(1), got.Number()) - - got = buffer.Get(1) - require.NotNil(t, got) - require.Equal(t, uint64(2), got.Number()) - - got = buffer.Get(2) - require.NotNil(t, got) - require.Equal(t, uint64(3), got.Number()) - - // Verify Get returns nil for out of bounds - require.Nil(t, buffer.Get(3)) -} - -// TestBatchBufferInsertDuplicateHandling verifies that: -// - Inserting the exact same batch (same number AND same hash) does not create a duplicate -// - Inserting a batch with the same number but different hash IS allowed -func TestBatchBufferInsertDuplicateHandling(t *testing.T) { - const testCapacity uint64 = 10 - const batchNumberN uint64 = 42 - - buffer := NewBatchBuffer[mockBatch](testCapacity) - - // Create first batch with number N and hash H1 - hashH1 := common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111") - batchH1 := newMockBatchWithHash(batchNumberN, hashH1) - - // Insert first batch - err := buffer.Insert(batchH1) - require.NoError(t, err) - require.Equal(t, 1, buffer.Len()) - - // Insert the exact same batch again (same number N, same hash H1) - // This should return ErrDuplicateBatch and not create a duplicate - err = buffer.Insert(batchH1) - require.ErrorIs(t, err, ErrDuplicateBatch) - require.Equal(t, 1, buffer.Len(), "duplicate batch with same number and hash should not be inserted") - - // Create a different batch with same number N but different hash H2 - hashH2 := common.HexToHash("0x2222222222222222222222222222222222222222222222222222222222222222") - batchH2 := newMockBatchWithHash(batchNumberN, hashH2) - - // Insert batch with same number but different hash - should be allowed - err = buffer.Insert(batchH2) - require.NoError(t, err) - require.Equal(t, 2, buffer.Len(), "batch with same number but different hash should be inserted") - - // Verify both batches can be retrieved - first := buffer.Get(0) - require.NotNil(t, first) - - second := buffer.Get(1) - require.NotNil(t, second) - - // Verify they both have the same batch number - require.Equal(t, batchNumberN, first.Number()) - require.Equal(t, batchNumberN, second.Number()) - - // Verify they have different hashes - require.NotEqual(t, first.Hash(), second.Hash()) - - // Verify insertion order is preserved (H1 first, H2 second) - require.Equal(t, hashH1, first.Hash()) - require.Equal(t, hashH2, second.Hash()) -} - -// TestBatchBufferPeekAndPop verifies Peek returns without removing and Pop removes -func TestBatchBufferPeekAndPop(t *testing.T) { - buffer := NewBatchBuffer[mockBatch](10) - - // Verify Peek on empty buffer returns nil - require.Nil(t, buffer.Peek()) - - // Verify Pop on empty buffer returns nil - require.Nil(t, buffer.Pop()) - - // Insert a batch - batch1 := newMockBatch(1) - err := buffer.Insert(batch1) - require.NoError(t, err) - - // Peek should return the batch without removing - peeked := buffer.Peek() - require.NotNil(t, peeked) - require.Equal(t, uint64(1), peeked.Number()) - require.Equal(t, 1, buffer.Len()) - - // Peek again should return the same batch - peekedAgain := buffer.Peek() - require.Equal(t, peeked.Number(), peekedAgain.Number()) - require.Equal(t, peeked.Hash(), peekedAgain.Hash()) - - // Pop should return and remove the batch - popped := buffer.Pop() - require.NotNil(t, popped) - require.Equal(t, uint64(1), popped.Number()) - require.Equal(t, 0, buffer.Len()) - - // Pop on now-empty buffer should return nil - require.Nil(t, buffer.Pop()) -} - -// TestBatchBufferSortedOrder verifies batches are stored in sorted order by batch number -func TestBatchBufferSortedOrder(t *testing.T) { - buffer := NewBatchBuffer[mockBatch](10) - - // Insert batches out of order - err := buffer.Insert(newMockBatch(5)) - require.NoError(t, err) - err = buffer.Insert(newMockBatch(2)) - require.NoError(t, err) - err = buffer.Insert(newMockBatch(8)) - require.NoError(t, err) - err = buffer.Insert(newMockBatch(1)) - require.NoError(t, err) - - require.Equal(t, 4, buffer.Len()) - - // Verify Get returns them in sorted order - require.Equal(t, uint64(1), buffer.Get(0).Number()) - require.Equal(t, uint64(2), buffer.Get(1).Number()) - require.Equal(t, uint64(5), buffer.Get(2).Number()) - require.Equal(t, uint64(8), buffer.Get(3).Number()) - - // Verify Pop returns them in sorted order - require.Equal(t, uint64(1), buffer.Pop().Number()) - require.Equal(t, uint64(2), buffer.Pop().Number()) - require.Equal(t, uint64(5), buffer.Pop().Number()) - require.Equal(t, uint64(8), buffer.Pop().Number()) - - // Buffer should be empty now - require.Equal(t, 0, buffer.Len()) -} - -// TestBatchBufferClear verifies Clear removes all batches -func TestBatchBufferClear(t *testing.T) { - buffer := NewBatchBuffer[mockBatch](10) - - // Insert some batches - err := buffer.Insert(newMockBatch(1)) - require.NoError(t, err) - err = buffer.Insert(newMockBatch(2)) - require.NoError(t, err) - err = buffer.Insert(newMockBatch(3)) - require.NoError(t, err) - require.Equal(t, 3, buffer.Len()) - - // Clear the buffer - buffer.Clear() - - // Verify buffer is empty - require.Equal(t, 0, buffer.Len()) - require.Nil(t, buffer.Peek()) - require.Nil(t, buffer.Pop()) - require.Nil(t, buffer.Get(0)) - - // Verify capacity is unchanged - require.Equal(t, uint64(10), buffer.Capacity()) - - // Verify we can insert again after clear - err = buffer.Insert(newMockBatch(1)) - require.NoError(t, err) - require.Equal(t, 1, buffer.Len()) -} - -// TestBatchBufferGetOutOfBounds verifies Get returns nil for invalid indices -func TestBatchBufferGetOutOfBounds(t *testing.T) { - buffer := NewBatchBuffer[mockBatch](10) - - // Empty buffer - all indices should return nil - require.Nil(t, buffer.Get(0)) - require.Nil(t, buffer.Get(1)) - - // Insert one batch - err := buffer.Insert(newMockBatch(1)) - require.NoError(t, err) - - // Valid index - require.NotNil(t, buffer.Get(0)) - - // Invalid indices - require.Nil(t, buffer.Get(1)) - require.Nil(t, buffer.Get(100)) -} diff --git a/espresso/buffered_streamer.go b/espresso/buffered_streamer.go deleted file mode 100644 index 34cc971c3d4..00000000000 --- a/espresso/buffered_streamer.go +++ /dev/null @@ -1,186 +0,0 @@ -package espresso - -import ( - "context" - - "github.com/ethereum-optimism/optimism/op-service/eth" -) - -// BufferedEspressoStreamer is a wrapper around EspressoStreamerIFace that -// buffers batches to avoid repeated calls to the underlying streamer. -// -// This structure is meant to help the underlying streamer avoid getting -// reset too frequently. This has primarily been added as an in-between -// layer for the Batch, which seems to need to rewind constantly, which is -// not great for the EspressoStreamer which wants to only progress forward -// and not rewind. -// -// The general idea is to take advantage that we should have a safe starting -// position for the batches being reported to the streamer that is being -// updated frequently. -// -// We can use this safe starting position to store a buffer as needed to store -// all batches from the safe position to whatever the current latest batch is. -// This allows us to avoid needing to rewind the streamer, and instead just -// adjust the read position of the buffered streamer. -type BufferedEspressoStreamer[B Batch] struct { - streamer EspressoStreamer[B] - - batches []*B - - // local offset - readPos uint64 - - startingBatchPos uint64 - currentSafeL1Origin eth.BlockID -} - -// Compile time assertion to ensure BufferedEspressoStreamer implements -// EspressoStreamerIFace -var _ EspressoStreamer[Batch] = (*BufferedEspressoStreamer[Batch])(nil) - -// NewBufferedEspressoStreamer creates a new BufferedEspressoStreamer instance. -func NewBufferedEspressoStreamer[B Batch](streamer EspressoStreamer[B]) *BufferedEspressoStreamer[B] { - return &BufferedEspressoStreamer[B]{ - streamer: streamer, - } -} - -// Update implements EspressoStreamerIFace -func (b *BufferedEspressoStreamer[B]) Update(ctx context.Context) error { - return b.streamer.Update(ctx) -} - -// handleL2PositionUpdate handles the update of the L2 position for the -// buffered streamer. -// -// There are three conditions to consider: -// 1. If the next position is before the starting batch position, we need to -// reset the underlying streamer, and dump our local buffer, as this -// indicates a need to move backwards before our earliest known batch. -// 2. If the next position is after our starting batch position, then we -// can drop all earlier stored batches in our buffer, and adjust our -// read position accordingly. This should appear to the consumer as nothing -// has changed progression-wise, but it allows us to reclaim memory. -// 3. If the next position is the same as our starting batch position, then -// we do nothing, as we are already at the correct position. -func (b *BufferedEspressoStreamer[B]) handleL2PositionUpdate(nextPosition uint64) { - if nextPosition < b.startingBatchPos { - // If the next position is before the starting batch position, - // we need to reset the buffered streamer to ensure we don't - // miss any batches. - b.readPos = 0 - b.startingBatchPos = nextPosition - b.batches = make([]*B, 0) - b.streamer.Reset() - return - } - - if nextPosition > b.startingBatchPos { - // We want to advance the read position, and we are indicating that - // we no longer will need to refer to older batches. So instead, we - // will want to adjust the buffer, and read position based on the - // new nextPosition. - - positionAdjustment := nextPosition - b.startingBatchPos - if positionAdjustment <= uint64(len(b.batches)) { - // If the adjustment is within the bounds of the current buffer, - // we can simply adjust the read position and starting batch position. - b.batches = b.batches[positionAdjustment:] - b.readPos -= positionAdjustment - } else { - b.batches = make([]*B, 0) - b.readPos = 0 - } - b.startingBatchPos = nextPosition - return - } -} - -// RefreshSafeL1Origin updates the safe L1 origin for the buffered streamer. -// This method attempts to safely handle the adjustment of the safeL1Origin -// without needing to defer to the underlying streamer unless necessary. -func (b *BufferedEspressoStreamer[B]) RefreshSafeL1Origin(safeL1Origin eth.BlockID) { - if safeL1Origin.Number < b.currentSafeL1Origin.Number { - // If the safeL1Origin is before the starting batch position, we need to - // reset the buffered streamer to ensure we don't miss any batches. - b.currentSafeL1Origin = safeL1Origin - b.startingBatchPos = 0 - b.readPos = 0 - b.batches = make([]*B, 0) - // we call underlying streamer's RefreshSafeL1Origin to ensure it is aware of - // the new safe L1 origin. - b.streamer.RefreshSafeL1Origin(safeL1Origin) - return - } - - b.currentSafeL1Origin = safeL1Origin -} - -// Refresh implements EspressoStreamerIFace -func (b *BufferedEspressoStreamer[B]) Refresh(ctx context.Context, finalizedL1 eth.L1BlockRef, safeBatchNumber uint64, safeL1Origin eth.BlockID) error { - b.handleL2PositionUpdate(safeBatchNumber) - b.RefreshSafeL1Origin(safeL1Origin) - - return b.streamer.Refresh(ctx, finalizedL1, safeBatchNumber, safeL1Origin) -} - -// Reset resets the buffered streamer state to the last known good -// safe batch position. -func (b *BufferedEspressoStreamer[B]) Reset() { - // Reset the buffered streamer state - b.readPos = 0 -} - -// HasNext implements EspressoStreamerIFace -// -// It checks to see if there are any batches left to read in its local buffer. -// If there are no batches left in the buffer, it defers to the underlying -// streamer to determine if there are more batches available. -func (b *BufferedEspressoStreamer[B]) HasNext(ctx context.Context) bool { - if b.readPos < uint64(len(b.batches)) { - return true - } - - return b.streamer.HasNext(ctx) -} - -// Next implements EspressoStreamerIFace -// -// It returns the next batch from the local buffer if available, or fetches -// it from the underlying streamer if not, appending to its local underlying -// buffer in the process. -func (b *BufferedEspressoStreamer[B]) Next(ctx context.Context) *B { - if b.readPos < uint64(len(b.batches)) { - // If we have a batch in the buffer, return it - batch := b.batches[b.readPos] - b.readPos++ - return batch - } - - // If we don't have a batch in the buffer, fetch the next one from the streamer - batch := b.streamer.Next(ctx) - - // No more batches available at the moment - if batch == nil { - return nil - } - - number := (*batch).Number() - if number < b.startingBatchPos { - // If the batch number is before the starting batch position, we ignore - // it, and want to fetch the next one - return b.Next(ctx) - } - - b.batches = append(b.batches, batch) - b.readPos++ - return batch - -} - -// UnmarshalBatch implements EspressoStreamerIFace -func (b *BufferedEspressoStreamer[B]) UnmarshalBatch(data []byte) (*B, error) { - // Delegate the unmarshalling to the underlying streamer - return b.streamer.UnmarshalBatch(data) -} diff --git a/espresso/cli.go b/espresso/cli.go index f6900aaee76..3e7b81dba63 100644 --- a/espresso/cli.go +++ b/espresso/cli.go @@ -6,10 +6,12 @@ import ( "strings" "time" + op "github.com/EspressoSystems/espresso-streamers/op" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" + "github.com/urfave/cli/v2" espressoClient "github.com/EspressoSystems/espresso-network/sdks/go/client" @@ -201,11 +203,11 @@ func ReadCLIConfig(c *cli.Context) CLIConfig { return config } -func BatchStreamerFromCLIConfig[B Batch]( +func BatchStreamerFromCLIConfig[B op.Batch]( cfg CLIConfig, log log.Logger, unmarshalBatch func([]byte) (*B, error), -) (*BatchStreamer[B], error) { +) (*op.BatchStreamer[B], error) { if !cfg.Enabled { return nil, fmt.Errorf("Espresso is not enabled") } @@ -228,7 +230,7 @@ func BatchStreamerFromCLIConfig[B Batch]( return nil, fmt.Errorf("failed to create Espresso light client") } - return NewEspressoStreamer( + return op.NewEspressoStreamer( cfg.Namespace, NewAdaptL1BlockRefClient(l1Client), NewAdaptL1BlockRefClient(RollupL1Client), @@ -236,7 +238,6 @@ func BatchStreamerFromCLIConfig[B Batch]( espressoLightClient, log, unmarshalBatch, - cfg.PollInterval, cfg.CaffeinationHeightEspresso, cfg.CaffeinationHeightL2, cfg.BatchAuthenticatorAddr, diff --git a/espresso/environment/2_espresso_liveness_test.go b/espresso/environment/2_espresso_liveness_test.go index ce3a3d8d1f3..fe593bcf98b 100644 --- a/espresso/environment/2_espresso_liveness_test.go +++ b/espresso/environment/2_espresso_liveness_test.go @@ -11,6 +11,7 @@ import ( espressoClient "github.com/EspressoSystems/espresso-network/sdks/go/client" espressoLightClient "github.com/EspressoSystems/espresso-network/sdks/go/light-client" + op "github.com/EspressoSystems/espresso-streamers/op" "github.com/ethereum-optimism/optimism/espresso" env "github.com/ethereum-optimism/optimism/espresso/environment" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait" @@ -259,7 +260,7 @@ func TestE2eDevnetWithEspressoDegradedLivenessViaCaffNode(t *testing.T) { l := log.NewLogger(slog.Default().Handler()) lightClient, err := espressoLightClient.NewLightclientCaller(common.HexToAddress(env.ESPRESSO_LIGHT_CLIENT_ADDRESS), l1Client) require.NoError(t, err, "light client creation failed") - streamer, err := espresso.NewEspressoStreamer( + streamer, err := op.NewEspressoStreamer( system.RollupConfig.L2ChainID.Uint64(), espresso.NewAdaptL1BlockRefClient(l1Client), espresso.NewAdaptL1BlockRefClient(l1Client), @@ -267,7 +268,6 @@ func TestE2eDevnetWithEspressoDegradedLivenessViaCaffNode(t *testing.T) { lightClient, l, derive.CreateEspressoBatchUnmarshaler(), - 100*time.Millisecond, 0, 1, system.RollupConfig.BatchAuthenticatorAddress, diff --git a/espresso/interface.go b/espresso/interface.go index 2af8e0e19cf..0c52ac68abd 100644 --- a/espresso/interface.go +++ b/espresso/interface.go @@ -3,11 +3,12 @@ package espresso import ( "context" + op "github.com/EspressoSystems/espresso-streamers/op" "github.com/ethereum-optimism/optimism/op-service/eth" ) // EspressoStreamer defines the interface for the Espresso streamer. -type EspressoStreamer[B Batch] interface { +type EspressoStreamer[B op.Batch] interface { // Update will update the `EspressoStreamer“ by attempting to ensure that // the next call to the `Next` method will return a `Batch`. // diff --git a/espresso/streamer.go b/espresso/streamer.go deleted file mode 100644 index 5100823f3dd..00000000000 --- a/espresso/streamer.go +++ /dev/null @@ -1,575 +0,0 @@ -package espresso - -import ( - "context" - "errors" - "fmt" - "math" - "math/big" - "slices" - "time" - - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" - "github.com/hashicorp/golang-lru/v2/simplelru" - - "github.com/EspressoSystems/espresso-network/sdks/go/types" - espressoCommon "github.com/EspressoSystems/espresso-network/sdks/go/types" - "github.com/ethereum-optimism/optimism/espresso/bindings" - "github.com/ethereum-optimism/optimism/op-service/eth" - "github.com/ethereum/go-ethereum/log" -) - -const BatchBufferCapacity uint64 = 1024 - -// Espresso light client bindings don't have an explicit name for this struct, -// so we define it here to avoid spelling it out every time -type FinalizedState = struct { - ViewNum uint64 - BlockHeight uint64 - BlockCommRoot *big.Int -} - -// LightClientCallerInterface is an interface that documents the methods we utilize -// for the espresso light client -// -// We define this here locally in order to effectively document the methods -// we utilize. This approach allows us to avoid importing the entire package -// and allows us to easily swap implementations for testing. -type LightClientCallerInterface interface { - FinalizedState(opts *bind.CallOpts) (FinalizedState, error) -} - -// EspressoClient is an interface that documents the methods we utilize for -// the espressoClient.Client. -// -// As a result we are able to easily swap implementations for testing, or -// for modification / wrapping. -type EspressoClient interface { - FetchLatestBlockHeight(ctx context.Context) (uint64, error) - FetchNamespaceTransactionsInRange(ctx context.Context, fromHeight uint64, toHeight uint64, namespace uint64) ([]types.NamespaceTransactionsRangeData, error) -} - -// L1Client is an interface that documents the methods we utilize for -// the L1 client. -type L1Client interface { - HeaderHashByNumber(ctx context.Context, number *big.Int) (common.Hash, error) - bind.ContractCaller -} - -// espresso-network go sdk's HeaderInterface currently lacks a function to get this info, -// although it is present in all header versions -func GetFinalizedL1(header *espressoCommon.HeaderImpl) espressoCommon.L1BlockInfo { - v0_1, ok := header.Header.(*espressoCommon.Header0_1) - if ok { - return *v0_1.L1Finalized - } - v0_2, ok := header.Header.(*espressoCommon.Header0_2) - if ok { - return *v0_2.L1Finalized - } - v0_3, ok := header.Header.(*espressoCommon.Header0_3) - if ok { - return *v0_3.L1Finalized - } - panic("Unsupported header version") -} - -// Subset of L1 state we're interested in for particular block number -type l1State struct { - // Block hash - hash common.Hash - // TEE batchers addresses for signature verification - teeBatchers []common.Address -} - -type BatchStreamer[B Batch] struct { - // Namespace of the rollup we're interested in - Namespace uint64 - - L1Client L1Client - RollupL1Client L1Client - EspressoClient EspressoClient - EspressoLightClient LightClientCallerInterface - BatchAuthenticatorCaller *bindings.BatchAuthenticatorCaller - Log log.Logger - HotShotPollingInterval time.Duration - - // Batch number we're to give out next - BatchPos uint64 - // Position of the last safe batch, we can use it as the position to fallback when resetting - fallbackBatchPos uint64 - // HotShot block that was visited last - hotShotPos uint64 - // HotShot position that we can fallback to, guaranteeing not to skip any unsafe batches - fallbackHotShotPos uint64 - // HotShot position we start reading from, exclusive - originHotShotPos uint64 - // Latest finalized block on the L1. - FinalizedL1 eth.L1BlockRef - // If the batch buffer is full, but we don't yet have the next batch, - // we will start skipping other batches until we encounter the missing batch. - // This position will be used to record such a situation occurring, when - // we find the target batch HotShot position will be reset to this. - skipPos uint64 - headBatch *B - - // Maintained in sorted order, but may be missing batches if we receive - // any out of order. - BatchBuffer BatchBuffer[B] - - // Cache for finalized L1 block hashes, keyed by block number. - finalizedL1StateCache *simplelru.LRU[uint64, l1State] - - unmarshalBatch func([]byte) (*B, error) -} - -// Compile time assertion to ensure EspressoStreamer implements -// EspressoStreamerIFace -var _ EspressoStreamer[Batch] = (*BatchStreamer[Batch])(nil) - -func NewEspressoStreamer[B Batch]( - namespace uint64, - l1Client L1Client, - rollupL1Client L1Client, - espressoClient EspressoClient, - lightClient LightClientCallerInterface, - log log.Logger, - unmarshalBatch func([]byte) (*B, error), - hotShotPollingInterval time.Duration, - originHotShotPos uint64, - originBatchPos uint64, - batchAuthenticatorAddress common.Address, -) (*BatchStreamer[B], error) { - if batchAuthenticatorAddress == (common.Address{}) { - return nil, errors.New("BatchAuthenticator address must be set for Espresso streamer") - } - - finalizedL1StateCache, _ := simplelru.NewLRU[uint64, l1State](1000, nil) - - batchAuthenticatorCaller, err := bindings.NewBatchAuthenticatorCaller(batchAuthenticatorAddress, l1Client) - if err != nil { - return nil, fmt.Errorf("failed to bind BatchAuthenticator at %s: %w", batchAuthenticatorAddress, err) - } - - return &BatchStreamer[B]{ - L1Client: l1Client, - RollupL1Client: rollupL1Client, - EspressoClient: espressoClient, - EspressoLightClient: lightClient, - BatchAuthenticatorCaller: batchAuthenticatorCaller, - Log: log, - Namespace: namespace, - // Internally, BatchPos is the position of the batch we are to give out next, hence the +1 - BatchPos: originBatchPos + 1, - fallbackBatchPos: originBatchPos + 1, - BatchBuffer: NewBatchBuffer[B](BatchBufferCapacity), - HotShotPollingInterval: hotShotPollingInterval, - finalizedL1StateCache: finalizedL1StateCache, - unmarshalBatch: unmarshalBatch, - originHotShotPos: originHotShotPos, - fallbackHotShotPos: originHotShotPos, - hotShotPos: originHotShotPos, - skipPos: math.MaxUint64, - }, nil -} - -// Reset the state to the last safe batch -func (s *BatchStreamer[B]) Reset() { - s.Log.Info("reset espresso streamer", "hotshot pos", s.fallbackHotShotPos, "batch pos", s.fallbackBatchPos) - s.hotShotPos = s.fallbackHotShotPos - s.BatchPos = s.fallbackBatchPos + 1 - s.headBatch = nil - s.skipPos = math.MaxUint64 - s.BatchBuffer.Clear() -} - -// RefreshSafeL1Origin is a convenience method that allows us to update the -// safe L1 origin of the Streamer. It will confirm the Espresso Block Height -// and reset the state if necessary. -func (s *BatchStreamer[B]) RefreshSafeL1Origin(safeL1Origin eth.BlockID) { - shouldReset := s.confirmEspressoBlockHeight(safeL1Origin) - if shouldReset { - s.Reset() - } -} - -// Update streamer state based on L1 and L2 sync status -func (s *BatchStreamer[B]) Refresh(ctx context.Context, finalizedL1 eth.L1BlockRef, safeBatchNumber uint64, safeL1Origin eth.BlockID) error { - s.FinalizedL1 = finalizedL1 - - s.RefreshSafeL1Origin(safeL1Origin) - - // NOTE: be sure to update s.finalizedL1 before checking this condition and returning - if s.fallbackBatchPos == safeBatchNumber { - // This means everything is in sync, no state update needed - return nil - } - - shouldReset := safeBatchNumber < s.fallbackBatchPos - - // We should also reset if fallback position is higher than what we're currently reading from - shouldReset = shouldReset || (s.fallbackHotShotPos > s.hotShotPos) - - s.fallbackBatchPos = safeBatchNumber - if shouldReset { - s.Reset() - } - return nil -} - -// CheckBatch checks the validity of the given batch against the finalized L1 -// block and the safe L1 origin. -func (s *BatchStreamer[B]) CheckBatch(ctx context.Context, batch B) BatchValidity { - - // Make sure the finalized L1 block is initialized before checking the block number. - if s.FinalizedL1 == (eth.L1BlockRef{}) { - s.Log.Error("Finalized L1 block not initialized") - return BatchUndecided - } - origin := (batch).L1Origin() - - if origin.Number > s.FinalizedL1.Number { - // Signal to resync to wait for the L1 finality. - s.Log.Warn("L1 origin not finalized, pending resync", "finalized L1 block number", s.FinalizedL1.Number, "origin number", origin.Number) - return BatchUndecided - } - - state, ok := s.finalizedL1StateCache.Get(origin.Number) - if !ok { - blockNumber := new(big.Int).SetUint64(origin.Number) - hash, err := s.RollupL1Client.HeaderHashByNumber(ctx, blockNumber) - if err != nil { - s.Log.Warn("Failed to fetch the L1 header, pending resync", "error", err) - return BatchUndecided - } - - teeBatcher, err := s.BatchAuthenticatorCaller.TeeBatcher(&bind.CallOpts{BlockNumber: blockNumber}) - if err != nil { - s.Log.Warn("Failed to fetch the TEE batcher address, pending resync", "error", err) - return BatchUndecided - } - - state = l1State{ - hash: hash, - teeBatchers: []common.Address{teeBatcher}, - } - - s.finalizedL1StateCache.Add(origin.Number, state) - } - - if !slices.Contains(state.teeBatchers, batch.Signer()) { - s.Log.Info("Dropping batch with invalid TEE batcher", "batch", batch.Hash(), "signer", batch.Signer()) - return BatchDrop - } - - if state.hash != origin.Hash { - s.Log.Warn("Dropping batch with invalid L1 origin hash") - return BatchDrop - } - // Batch already buffered/finalized - if batch.Number() < s.BatchPos { - s.Log.Warn("Batch is older than current batchPos, skipping", "batchNr", batch.Number(), "batchPos", s.BatchPos) - return BatchPast - } - - return BatchAccept -} - -// HOTSHOT_BLOCK_FETCH_LIMIT is the maximum number of blocks to attempt to -// load from Espresso in a single process using fetch API. -// This helps to limit our block polling to a limited number of blocks within -// a single batched attempt. -const HOTSHOT_BLOCK_FETCH_LIMIT = 100 - -// computeEspressoBlockHeightsRange computes the range of block heights to fetch -// from Espresso. It starts from the last processed block and goes up to -// `limit` blocks ahead or the current block height, whichever -// is smaller. -func (s *BatchStreamer[B]) computeEspressoBlockHeightsRange(currentBlockHeight uint64, limit uint64) (start uint64, finish uint64) { - start = s.hotShotPos - if start > 0 { - // We've already processed the block in hotShotPos. In order to avoid - // reprocessing the same block, we want to start from the next block. - start++ - } - // `FetchNamespaceTransactionsInRange` is exclusive to finish, so we add 1 to currentBlockHeight - finish = min(start+limit, currentBlockHeight+1) - - return start, finish -} - -// Update will update the `EspressoStreamer“ by attempting to ensure that the -// next call to the `Next` method will return a `Batch`. -// -// It attempts to ensure the existence of a next batch, provided no errors -// occur when communicating with HotShot, by processing Blocks retrieved from -// `HotShot` in discreet batches. If each processing of a batch of blocks will -// not yield a new `Batch`, then it will continue to process the next batch -// of blocks from HotShot until it runs out of blocks to process. -// -// NOTE: this method is best effort. It is unable to guarantee that the -// next call to `Next` will return a batch. However, the only things -// that will prevent the next call to `Next` from returning a batch is if -// there are no more HotShot blocks to process currently, or if an error -// occurs when communicating with HotShot. -func (s *BatchStreamer[B]) Update(ctx context.Context) error { - // Retrieve the current block height from Espresso. We grab this reference - // so we don't have to keep fetching it in a loop, and it informs us of - // the current block height available to process. - currentBlockHeight, err := s.EspressoClient.FetchLatestBlockHeight(ctx) - if err != nil { - return fmt.Errorf("failed to fetch latest block height: %w", err) - } - - // Fetch API implementation - for i := 0; ; i++ { - // Fetch more batches from HotShot if available. - start, finish := s.computeEspressoBlockHeightsRange(currentBlockHeight, HOTSHOT_BLOCK_FETCH_LIMIT) - - if start >= finish || (start+1 == finish && i > 0) { - // If start is one less than our finish, then that means we - // already processed all of the blocks available to us. We - // should break out of the loop. Sadly, this means that we - // likely do not have any batches to process. - // - // NOTE: this also likely means that the following is true: - // start + 1 == finish == currentBlockHeight + 1 - // - // NOTE: there is an edge case here if the only block available is - // the initial block of Espresso, then we get stuck in a loop - // repeatedly processing it again and again. So to catch - // this case, we check to see if start is equal to finish, after - // an initial iteration. - break - } - - // Process the new batches fetched from Espresso - if err := s.fetchHotShotRange(ctx, start, finish); err != nil { - return fmt.Errorf("failed to process hotshot range: %w", err) - } - - if s.HasNext(ctx) { - // If we have a batch ready to be processed, we can exit the loop, - // otherwise, we will want to continue to the next range of blocks - // to fetch. - // - // The goal here is to try and provide our best effort to ensure - // that we have the next batch available for processing. We should - // only fail to do this if there currently is no next batch - // currently available (or if we error while attempting to retrieve - // transactions from HotShot). - break - } - } - - return nil -} - -// fetchHotShotRange is a helper method that will load all of the blocks from -// Hotshot from start to finish, inclusive. It will process each block and -// update the batch buffer with any batches found in the block. -// It will also update the hotShotPos to the last block processed, in order -// to effectively keep track of the last block we have successfully fetched, -// and therefore processed from Hotshot. -func (s *BatchStreamer[B]) fetchHotShotRange(ctx context.Context, start, finish uint64) error { - // Process the new batches fetched from Espresso - s.Log.Trace("Fetching HotShot block range", "start", start, "finish", finish) - - // FetchNamespaceTransactionsInRange fetches transactions in [start, finish) - namespaceRangeTransactions, err := s.EspressoClient.FetchNamespaceTransactionsInRange(ctx, start, finish, s.Namespace) - if err != nil { - return err - } - - s.Log.Info("Fetched HotShot block range", "start", start, "finish", finish, "numNamespaceTransactions", len(namespaceRangeTransactions)) - if len(namespaceRangeTransactions) == 0 { - s.Log.Trace("No transactions in hotshot block range", "start", start, "finish", finish) - } - - // We want to keep track of the latest block we have processed. - // This is essential for ensuring we don't unnecessarily keep - // refetching the same blocks that we have already processed. - // This should ensure that we keep moving forward and consuming - // from the Espresso Blocks without missing any blocks. - s.hotShotPos = finish - 1 - - for _, namespaceTransaction := range namespaceRangeTransactions { - for _, txn := range namespaceTransaction.Transactions { - err := s.processEspressoTransaction(ctx, txn.Payload) - if errors.Is(err, ErrAtCapacity) { - s.skipPos = min(s.skipPos, start) - } - } - } - - return nil -} - -// processEspressoTransaction is a helper method that encapsulates the logic of -// processing batches from the transactions in a block fetched from Espresso. -// It will return an error if the transaction contains a valid batch, but the buffer is full. -func (s *BatchStreamer[B]) processEspressoTransaction(ctx context.Context, transaction espressoCommon.Bytes) error { - batch, err := s.UnmarshalBatch(transaction) - if err != nil { - s.Log.Warn("Dropping batch with invalid transaction data", "error", err) - return nil - } - - validity := s.CheckBatch(ctx, *batch) - - switch validity { - case BatchDrop: - s.Log.Info("Dropping batch", batch) - return nil - - case BatchPast: - s.Log.Info("Batch already processed. Skipping", "batch", (*batch).Number()) - return nil - - case BatchUndecided: - s.Log.Warn("Inserting undecided batch", "batch", (*batch).Hash()) - - case BatchAccept: - } - - header := (*batch).Header() - - // If this is the batch we're supposed to give out next and we don't - // have any other candidates, put it in as the head batch - if (*batch).Number() == s.BatchPos && s.headBatch == nil { - s.Log.Info("Setting batch as the head batch", - "hash", (*batch).Hash(), - "parentHash", header.ParentHash, - "epochNum", header.Number, - "timestamp", header.Time) - s.headBatch = batch - } else { - // Otherwise, try to buffer it. If the buffer is full, forward the error up to record - // that we're skipping batches and will need to revisit when the buffer drains - s.Log.Info("Inserting batch into buffer", - "hash", (*batch).Hash(), - "parentHash", header.ParentHash, - "epochNum", header.Number, - "timestamp", header.Time) - err := s.BatchBuffer.Insert(*batch) - if errors.Is(err, ErrDuplicateBatch) { - s.Log.Warn("Dropping batch with duplicate hash") - } - if errors.Is(err, ErrAtCapacity) { - return err - } - } - - return nil -} - -// UnmarshalBatch implements EspressoStreamerIFace -func (s *BatchStreamer[B]) Next(ctx context.Context) *B { - // Is the next batch available? - if s.HasNext(ctx) { - // Current batch is going to be processed, update fallback batch position - s.BatchPos += 1 - head := s.headBatch - s.headBatch = nil - // If we have been skipping batches, now is the time - // to rewind and start considering batches again: we've made more space - if s.skipPos != math.MaxUint64 { - s.hotShotPos = s.skipPos - s.skipPos = math.MaxUint64 - } - return head - } - - return nil -} - -// HasNext implements EspressoStreamerIFace -func (s *BatchStreamer[B]) HasNext(ctx context.Context) bool { - for { - if s.headBatch == nil { - nextBuffered := s.BatchBuffer.Peek() - if nextBuffered != nil && (*nextBuffered).Number() == s.BatchPos { - s.headBatch = nextBuffered - s.BatchBuffer.Pop() - } else { - return false - } - } - - validity := s.CheckBatch(ctx, *s.headBatch) - switch validity { - case BatchAccept: - // Batch is fine, we can give it out - return true - case BatchUndecided: - // We need to wait for our view of - // L1 to update before we can make a - // decision - return false - case BatchDrop: - // This was an undecided batch and looks like - // an L1 reorg happened that invalidated it. - // We drop it and check the next - s.headBatch = nil - continue - case BatchPast: - // This was probably a duplicate batch, skip it - // and check the next - s.headBatch = nil - continue - } - - return false - } -} - -// This function allows to "pin" the Espresso block height that is guaranteed not to contain -// any batches that have origin >= safeL1Origin. -// We do this by reading block height from Light Client FinalizedState at safeL1Origin. -// -// For reference on why doing this guarantees we won't skip any unsafe blocks: -// https://eng-wiki.espressosys.com/mainch30.html#:Components:espresso%20streamer:initializing%20hotshot%20height -// -// We do not propagate the error if Light Client is unreachable - this is not an essential -// operation and streamer can continue operation -func (s *BatchStreamer[B]) confirmEspressoBlockHeight(safeL1Origin eth.BlockID) (shouldReset bool) { - shouldReset = false - if s.EspressoLightClient == nil { - s.Log.Warn("Espresso light client is not initialized") - return false - } - - hotshotState, err := s.EspressoLightClient. - FinalizedState(&bind.CallOpts{BlockNumber: new(big.Int).SetUint64(safeL1Origin.Number)}) - - if err != nil { - // If we have already advanced our fallback position before, there's no need to roll it back - s.fallbackHotShotPos = max(s.fallbackHotShotPos, s.originHotShotPos) - s.Log.Warn("failed to get finalized state from light client", "err", err) - return false - } - - // If hotshot block height at L1 origin is lower than our - // hotshot origin, we never want to update our fallback - // position to this height, or we risk dipping below - // hotshot origin on reset. - if hotshotState.BlockHeight <= s.originHotShotPos { - s.Log.Info("HotShot height at L1 Origin less than HotShot origin of the streamer, ignoring") - return shouldReset - } - - // If we assigned to fallback position from hotsthot height before - // and now the light client reports a smaller height, there was an L1 - // reorg and we should reset our state - shouldReset = hotshotState.BlockHeight < s.fallbackHotShotPos - - s.fallbackHotShotPos = hotshotState.BlockHeight - - return shouldReset -} - -// UnmarshalBatch implements EspressoStreamerIFace -func (s *BatchStreamer[B]) UnmarshalBatch(b []byte) (*B, error) { - return s.unmarshalBatch(b) -} diff --git a/espresso/streamer_test.go b/espresso/streamer_test.go deleted file mode 100644 index fb24d3194b1..00000000000 --- a/espresso/streamer_test.go +++ /dev/null @@ -1,1321 +0,0 @@ -package espresso_test - -import ( - "context" - "encoding/binary" - "encoding/json" - "errors" - "fmt" - "log/slog" - "math/big" - "math/rand" - "testing" - "time" - - espressoClient "github.com/EspressoSystems/espresso-network/sdks/go/client" - espressoCommon "github.com/EspressoSystems/espresso-network/sdks/go/types" - "github.com/ethereum-optimism/optimism/espresso" - "github.com/ethereum-optimism/optimism/op-node/rollup" - "github.com/ethereum-optimism/optimism/op-node/rollup/derive" - "github.com/ethereum-optimism/optimism/op-service/crypto" - "github.com/ethereum-optimism/optimism/op-service/eth" - opsigner "github.com/ethereum-optimism/optimism/op-service/signer" - "github.com/ethereum-optimism/optimism/op-service/testutils" - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" - geth_types "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/log" - "github.com/stretchr/testify/require" -) - -// TestNewEspressoStreamer tests that we can create a new EspressoStreamer -// without any panic being thrown. - -func TestNewEspressoStreamer(t *testing.T) { - mock := NewMockStreamerSource() - // Use a non-zero address for the BatchAuthenticator mock - batchAuthAddr := common.HexToAddress("0x0000000000000000000000000000000000000001") - _, err := espresso.NewEspressoStreamer( - 1, - mock, - mock, - mock, mock, new(NoOpLogger), derive.CreateEspressoBatchUnmarshaler(), - 50*time.Millisecond, - 0, - 1, - batchAuthAddr, - ) - require.NoError(t, err) -} - -// EspBlockAndNamespace is a struct that holds the height and namespace -// of an Espresso block. It is used to uniquely identify a block in the -// EspressoStreamer. -type EspBlockAndNamespace struct { - Height, Namespace uint64 -} - -// BlockAndNamespace creates a new EspBlockAndNamespace struct -// with the provided height and namespace. -func BlockAndNamespace(height, namespace uint64) EspBlockAndNamespace { - return EspBlockAndNamespace{ - Height: height, - Namespace: namespace, - } -} - -// MockStreamerSource is a mock implementation of the various interfaces -// required by the EspressoStreamer. The idea behind this mock is to allow -// for the specific progression of the L1, L2, and Espresso states, so we can -// verify the implementation of our Streamer, in relation to specific scenarios -// and edge cases, without needing to forcibly simulate them via a live test -// environment. -// -// As we progress through the tests, we should be able to update our local mock -// state, and then perform our various `.Update` and `.Next` calls, in order to -// verify that we end up with the expected state. -// -// The current expected use case for the Streamer is for the user to "Refresh" -// the state of the streamer by calling `.Refresh`. -type MockStreamerSource struct { - // At the moment the Streamer utilizes the SyncStatus in order to update - // it's local state. But, in general the Streamer doesn't consume all - // of the fields provided within the SyncStatus. At the moment it only - // cares about SafeL2, and FinalizedL1. So this is what we will track - - FinalizedL1 eth.L1BlockRef - SafeL2 eth.L2BlockRef - - EspTransactionData map[EspBlockAndNamespace]espressoClient.TransactionsInBlock - LatestEspHeight uint64 - finalizedHeightHistory map[uint64]uint64 - - // TeeBatcherAddr is the address returned by the mock BatchAuthenticator contract - // for teeBatcher() calls. Can be changed per-test to simulate TEE batcher rotation. - TeeBatcherAddr common.Address -} - -// FetchNamespaceTransactionsInRange implements espresso.EspressoClient. -func (m *MockStreamerSource) FetchNamespaceTransactionsInRange(ctx context.Context, fromHeight uint64, toHeight uint64, namespace uint64) ([]espressoCommon.NamespaceTransactionsRangeData, error) { - var result []espressoCommon.NamespaceTransactionsRangeData - - if fromHeight > toHeight { - return nil, ErrNotFound - } - for height := fromHeight; height <= toHeight; height++ { - transactionsInBlock, ok := m.EspTransactionData[BlockAndNamespace(height, namespace)] - if !ok { - // Preserve alignment with the requested range even if the block - // has no transactions in this namespace. - result = append(result, espressoCommon.NamespaceTransactionsRangeData{}) - continue - } - - var txs []espressoCommon.Transaction - for _, txPayload := range transactionsInBlock.Transactions { - tx := espressoCommon.Transaction{ - Namespace: namespace, - Payload: txPayload, - } - txs = append(txs, tx) - } - - result = append(result, espressoCommon.NamespaceTransactionsRangeData{ - Transactions: txs}) - } - return result, nil -} - -func NewMockStreamerSource() *MockStreamerSource { - finalizedL1 := createL1BlockRef(1) - return &MockStreamerSource{ - FinalizedL1: finalizedL1, - SafeL2: createL2BlockRef(0, finalizedL1), - EspTransactionData: make(map[EspBlockAndNamespace]espressoClient.TransactionsInBlock), - finalizedHeightHistory: make(map[uint64]uint64), - LatestEspHeight: 0, - } -} - -// AdvanceFinalizedL1ByNBlocks advances the FinalizedL1 block reference by n blocks. -func (m *MockStreamerSource) AdvanceFinalizedL1ByNBlocks(n uint) { - for range n { - m.AdvanceFinalizedL1() - } -} - -// AdvanceFinalizedL1 advances the FinalizedL1 block reference by one block. -func (m *MockStreamerSource) AdvanceFinalizedL1() { - m.finalizedHeightHistory[m.FinalizedL1.Number] = m.LatestEspHeight - m.FinalizedL1 = createL1BlockRef(m.FinalizedL1.Number + 1) -} - -// AdvanceL2ByNBlocks advances the SafeL2 block reference by n blocks. -func (m *MockStreamerSource) AdvanceL2ByNBlocks(n uint) { - m.SafeL2 = createL2BlockRef(m.SafeL2.Number+uint64(n), m.FinalizedL1) -} - -// AdvanceSafeL2 advances the SafeL2 block reference by one block. -func (m *MockStreamerSource) AdvanceSafeL2() { - m.SafeL2 = createL2BlockRef(m.SafeL2.Number+1, m.FinalizedL1) -} - -// AdvanceEspressoHeightByNBlocks advances the LatestEspHeight by n blocks. -func (m *MockStreamerSource) AdvanceEspressoHeightByNBlocks(n int) { - m.LatestEspHeight += uint64(n) -} - -// AdvanceEspressoHeight advances the LatestEspHeight by one block. -func (m *MockStreamerSource) AdvanceEspressoHeight() { - m.LatestEspHeight++ -} - -// SyncStatus returns the current sync status of the mock streamer source. -// Only the fields FinalizedL1, FinalizedL1, and SafeL2 are populated, as those -// are the only fields explicitly inspected by the EspressoStreamer. -func (m *MockStreamerSource) SyncStatus() *eth.SyncStatus { - return ð.SyncStatus{ - FinalizedL1: m.FinalizedL1, - SafeL2: m.SafeL2, - } -} - -func (m *MockStreamerSource) AddEspressoTransactionData(height, namespace uint64, txData espressoClient.TransactionsInBlock) { - if m.EspTransactionData == nil { - m.EspTransactionData = make(map[EspBlockAndNamespace]espressoClient.TransactionsInBlock) - } - - m.EspTransactionData[BlockAndNamespace(height, namespace)] = txData - - if m.LatestEspHeight < height { - m.LatestEspHeight = height - } -} - -var _ espresso.L1Client = (*MockStreamerSource)(nil) - -// L1 Client methods - -func (m *MockStreamerSource) HeaderHashByNumber(ctx context.Context, number *big.Int) (common.Hash, error) { - l1Ref := createL1BlockRef(number.Uint64()) - return l1Ref.Hash, nil -} - -// teeBatcherSelector is the 4-byte function selector for teeBatcher() — 0xd909ba7c -var teeBatcherSelector = []byte{0xd9, 0x09, 0xba, 0x7c} - -func (m *MockStreamerSource) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) { - // Return non-empty bytes so the bindings consider the contract deployed - return []byte{0x01}, nil -} - -func (m *MockStreamerSource) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { - if len(call.Data) >= 4 && common.Bytes2Hex(call.Data[:4]) == common.Bytes2Hex(teeBatcherSelector) { - // ABI-encode the TEE batcher address as a 32-byte left-padded word - var result [32]byte - copy(result[12:], m.TeeBatcherAddr.Bytes()) - return result[:], nil - } - return nil, fmt.Errorf("unexpected contract call: %x", call.Data) -} - -// Espresso Client Methods -var _ espresso.EspressoClient = (*MockStreamerSource)(nil) - -func (m *MockStreamerSource) FetchLatestBlockHeight(ctx context.Context) (uint64, error) { - return m.LatestEspHeight, nil -} - -// ErrorNotFound is a custom error type used to indicate that a requested -// resource was not found. -type ErrorNotFound struct{} - -// Error implements error. -func (ErrorNotFound) Error() string { - return "not found" -} - -// ErrNotFound is an instance of ErrorNotFound that can be used to indicate -// that a requested resource was not found. -var ErrNotFound error = ErrorNotFound{} - -type MockTransactionStream struct { - pos uint64 - subPos uint64 - end uint64 - namespace uint64 - source *MockStreamerSource -} - -func (ms *MockTransactionStream) Next(ctx context.Context) (*espressoCommon.TransactionQueryData, error) { - raw, err := ms.NextRaw(ctx) - if err != nil { - return nil, err - } - var transaction espressoCommon.TransactionQueryData - if err := json.Unmarshal(raw, &transaction); err != nil { - return nil, err - } - return &transaction, nil -} - -func (ms *MockTransactionStream) NextRaw(ctx context.Context) (json.RawMessage, error) { - for { - // get the latest block number - latestHeight, err := ms.source.FetchLatestBlockHeight(ctx) - if err != nil { - // We will return error on NotFound as well to speed up tests. - // More faithful imitation of HotShot streaming API would be to hang - // until we receive new transactions, but that would slow down some - // tests significantly, because streamer would wait for full timeout - // threshold here before finishing update. - return nil, err - } - - if ms.pos > latestHeight { - return nil, ErrNotFound - } - - namespaceTransactions, err := ms.source.FetchNamespaceTransactionsInRange(ctx, ms.pos, latestHeight, ms.namespace) - if err != nil { - return nil, err - } - - // Each element in the returned slice corresponds to a block starting - // at fromHeight. We only need the current block (index 0) because - // fromHeight == ms.pos. - if len(namespaceTransactions) == 0 { - return nil, ErrNotFound - } - - currentBlock := namespaceTransactions[0] - - if len(currentBlock.Transactions) > int(ms.subPos) { - tx := currentBlock.Transactions[int(ms.subPos)] - transaction := &espressoCommon.TransactionQueryData{ - BlockHeight: ms.pos, - Index: ms.subPos, - Transaction: espressoCommon.Transaction{ - Payload: tx.Payload, - Namespace: ms.namespace, - }, - } - ms.subPos++ - return json.Marshal(transaction) - } - - // Move on to the next block. - ms.subPos = 0 - ms.pos++ - } -} - -func (ms *MockTransactionStream) Close() error { - return nil -} - -func (m *MockStreamerSource) StreamTransactionsInNamespace(ctx context.Context, height uint64, namespace uint64) (espressoClient.Stream[espressoCommon.TransactionQueryData], error) { - if m.LatestEspHeight < height { - return nil, ErrNotFound - } - - return &MockTransactionStream{ - pos: height, - subPos: 0, - end: m.LatestEspHeight, - namespace: namespace, - source: m, - }, nil -} - -// Espresso Light Client implementation -var _ espresso.LightClientCallerInterface = (*MockStreamerSource)(nil) - -// LightClientCallerInterface implementation -func (m *MockStreamerSource) FinalizedState(opts *bind.CallOpts) (espresso.FinalizedState, error) { - height, ok := m.finalizedHeightHistory[opts.BlockNumber.Uint64()] - if !ok { - height = m.LatestEspHeight - } - return espresso.FinalizedState{ - ViewNum: height, - BlockHeight: height, - }, nil -} - -// NoOpLogger is a no-op implementation of the log.Logger interface. -// It is used to pass a non-nil logger to the EspressoStreamer without -// producing any output. -type NoOpLogger struct{} - -var _ log.Logger = (*NoOpLogger)(nil) - -func (l *NoOpLogger) With(ctx ...interface{}) log.Logger { return l } -func (l *NoOpLogger) New(ctx ...interface{}) log.Logger { return l } -func (l *NoOpLogger) Log(level slog.Level, msg string, ctx ...interface{}) {} -func (l *NoOpLogger) Trace(msg string, ctx ...interface{}) {} -func (l *NoOpLogger) Debug(msg string, ctx ...interface{}) {} -func (l *NoOpLogger) Info(msg string, ctx ...interface{}) {} -func (l *NoOpLogger) Warn(msg string, ctx ...interface{}) {} -func (l *NoOpLogger) Error(msg string, ctx ...interface{}) {} -func (l *NoOpLogger) Crit(msg string, ctx ...interface{}) { panic("critical error") } -func (l *NoOpLogger) Write(level slog.Level, msg string, attrs ...any) {} -func (l *NoOpLogger) Enabled(ctx context.Context, level slog.Level) bool { return true } -func (l *NoOpLogger) Handler() slog.Handler { return nil } -func (l *NoOpLogger) TraceContext(ctx context.Context, msg string, ctxArgs ...interface{}) {} -func (l *NoOpLogger) DebugContext(ctx context.Context, msg string, ctxArgs ...interface{}) {} -func (l *NoOpLogger) InfoContext(ctx context.Context, msg string, ctxArgs ...interface{}) {} -func (l *NoOpLogger) WarnContext(ctx context.Context, msg string, ctxArgs ...interface{}) {} -func (l *NoOpLogger) ErrorContext(ctx context.Context, msg string, ctxArgs ...interface{}) {} -func (l *NoOpLogger) CritContext(ctx context.Context, msg string, ctxArgs ...interface{}) { - panic("critical error") -} -func (l *NoOpLogger) LogAttrs(ctx context.Context, level slog.Level, msg string, attrs ...slog.Attr) { -} -func (l *NoOpLogger) SetContext(ctx context.Context) {} -func (l *NoOpLogger) WriteCtx(ctx context.Context, level slog.Level, msg string, args ...any) {} - -func createHashFromHeight(height uint64) common.Hash { - var hash common.Hash - binary.LittleEndian.PutUint64(hash[(len(hash)-8):], height) - return hash -} - -// createL1BlockRef creates a mock L1BlockRef for testing purposes, with the -// every field being derived from the provided height. This should be -// sufficient for testing purposes. -func createL1BlockRef(height uint64) eth.L1BlockRef { - var parentHash common.Hash - if height > 0 { - parentHash = createHashFromHeight(height - 1) - } - return eth.L1BlockRef{ - Number: height, - Hash: createHashFromHeight(height), - ParentHash: parentHash, - Time: height, - } -} - -// createL2BlockRef creates a mock L2BlockRef for testing purposes, with the -// every field being derived from the provided height and L1BlockRef. This -// should be sufficient for testing purposes. -func createL2BlockRef(height uint64, l1Ref eth.L1BlockRef) eth.L2BlockRef { - return eth.L2BlockRef{ - Number: height, - Hash: createHashFromHeight(height), - ParentHash: createHashFromHeight(height - 1), - Time: height, - SequenceNumber: 1, - L1Origin: eth.BlockID{ - Hash: l1Ref.Hash, - Number: l1Ref.Number, - }, - } -} - -// batchAuthenticatorAddr is a dummy non-zero address used as the BatchAuthenticator -// contract address in unit tests. The mock L1Client intercepts calls to it. -var batchAuthenticatorAddr = common.HexToAddress("0x0000000000000000000000000000000000000001") - -// setupStreamerTesting initializes a MockStreamerSource and an EspressoStreamer -// for testing purposes. It sets up the initial state of the MockStreamerSource -// and returns both the MockStreamerSource and the EspressoStreamer. -func setupStreamerTesting(namespace uint64, batcherAddress common.Address) (*MockStreamerSource, *espresso.BatchStreamer[derive.EspressoBatch]) { - state := NewMockStreamerSource() - state.TeeBatcherAddr = batcherAddress - - logger := new(NoOpLogger) - streamer, err := espresso.NewEspressoStreamer( - namespace, - state, - state, - state, - state, - logger, - derive.CreateEspressoBatchUnmarshaler(), - 50*time.Millisecond, - 0, - 1, - batchAuthenticatorAddr, - ) - if err != nil { - panic(fmt.Sprintf("setupStreamerTesting: failed to create streamer: %v", err)) - } - - return state, streamer -} - -// createEspressoBatch creates a mock EspressoBatch for testing purposes -// containing the provided SingularBatch. -func createEspressoBatch(batch *derive.SingularBatch) *derive.EspressoBatch { - return &derive.EspressoBatch{ - BatchHeader: &geth_types.Header{ - ParentHash: batch.ParentHash, - Number: big.NewInt(int64(batch.Timestamp)), - }, - Batch: *batch, - L1InfoDeposit: geth_types.NewTx(&geth_types.DepositTx{}), - } -} - -// createEspressoTransaction creates a mock Espresso transaction for testing purposes -// containing the provided Espresso batch. -func createEspressoTransaction(ctx context.Context, batch *derive.EspressoBatch, namespace uint64, chainSigner crypto.ChainSigner) *espressoCommon.Transaction { - tx, err := batch.ToEspressoTransaction(ctx, namespace, chainSigner) - if have, want := err, error(nil); have != want { - panic(err) - } - - return tx -} - -// createTransactionsInBlock creates a mock TransactionsInBlock for testing purposes -// containing the provided Espresso transaction. -func createTransactionsInBlock(tx *espressoCommon.Transaction) espressoClient.TransactionsInBlock { - return espressoClient.TransactionsInBlock{ - Transactions: []espressoCommon.Bytes{tx.Payload}, - } -} - -// CreateEspressoTxnData creates a mock Espresso transaction data set -// for testing purposes. It generates a test SingularBatch, and takes it -// through the steps of getting all the way to an Espresso transaction in block. -// Every intermediate step is returned for inspection / utilization in tests. -// Uses m.FinalizedL1 as the L1 origin. -func (m *MockStreamerSource) CreateEspressoTxnData( - ctx context.Context, - namespace uint64, - rng *rand.Rand, - chainID *big.Int, - l2Height uint64, - chainSigner crypto.ChainSigner, -) (*derive.SingularBatch, *derive.EspressoBatch, *espressoCommon.Transaction, espressoClient.TransactionsInBlock) { - return m.CreateEspressoTxnDataWithL1Origin(ctx, namespace, rng, chainID, l2Height, chainSigner, m.FinalizedL1.Number, m.FinalizedL1.Hash) -} - -// TestStreamerSmoke tests the basic functionality of the EspressoStreamer -// ensuring that it behaves as expected from an empty state with no -// iterations, batches, or blocks. -func TestStreamerSmoke(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - state, streamer := setupStreamerTesting(42, common.Address{}) - - // update the state of our streamer - syncStatus := state.SyncStatus() - err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin) - - if have, want := err, error(nil); have != want { - t.Fatalf("failed to refresh streamer state encountered error:\nhave:\n\t\"%v\"\nwant:\n\t\"%v\"\n", have, want) - } - - // Update the state of our streamer - if have, want := streamer.Update(ctx), error(nil); !errors.Is(have, want) { - t.Fatalf("failed to update streamer state encountered error:\nhave:\n\t\"%v\"\nwant:\n\t\"%v\"\n", have, want) - } - - // We should not get any batches from the Streamer at this point. - if have, want := streamer.Next(ctx), (*derive.EspressoBatch)(nil); have != want { - t.Fatalf("failed to get next batch from streamer:\nhave:\n\t%v\nwant:\n\t%v\n", have, want) - } -} - -// TestEspressoStreamerSimpleIncremental tests the EspressoStreamer by -// incrementally adding batches to the state and verifying that the streamer -// can retrieve them in the correct order. -func TestEspressoStreamerSimpleIncremental(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - namespace := uint64(42) - chainID := big.NewInt(int64(namespace)) - privateKeyString := "59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d" - chainSignerFactory, signerAddress, _ := crypto.ChainSignerFactoryFromConfig(&NoOpLogger{}, privateKeyString, "", "", opsigner.CLIConfig{}) - chainSigner := chainSignerFactory(chainID, common.Address{}) - - state, streamer := setupStreamerTesting(namespace, signerAddress) - rng := rand.New(rand.NewSource(0)) - // The number of batches to create - const N = 1000 - - for i := 0; i < N; i++ { - // update the state of our streamer - syncStatus := state.SyncStatus() - err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin) - - if have, want := err, error(nil); have != want { - t.Fatalf("failed to refresh streamer state encountered error:\nhave:\n\t\"%v\"\nwant:\n\t\"%v\"\n", have, want) - } - - batch, _, _, espTxnInBlock := state.CreateEspressoTxnData( - ctx, - namespace, - rng, - chainID, - uint64(i)+1, - chainSigner, - ) - - state.AddEspressoTransactionData(uint64(5*i), namespace, espTxnInBlock) - - // Update the state of our streamer - if have, want := streamer.Update(ctx), error(nil); !errors.Is(have, want) { - t.Fatalf("failed to update streamer state encountered error:\nhave:\n\t\"%v\"\nwant:\n\t\"%v\"\n", have, want) - } - - batchFromEsp := streamer.Next(ctx) - require.NotNil(t, batchFromEsp, "unexpectedly did not receive a batch from streamer") - - // This batch ** should ** match the one we created above. - - if have, want := batchFromEsp.Batch.GetEpochNum(), batch.GetEpochNum(); have != want { - t.Fatalf("batch epoch number does not match:\nhave:\n\t%v\ndo not want:\n\t%v\n", have, want) - } - - state.AdvanceSafeL2() - state.AdvanceFinalizedL1() - } - - if have, want := len(state.EspTransactionData), N; have != want { - t.Fatalf("unexpected number of batches in state:\nhave:\n\t%v\nwant:\n\t%v\n", have, want) - } -} - -// TestEspressoStreamerIncrementalDelayedConsumption tests the EspressoStreamer -// by populating all of the batches in the state before incrementing over them -func TestEspressoStreamerIncrementalDelayedConsumption(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - namespace := uint64(42) - chainID := big.NewInt(int64(namespace)) - privateKeyString := "59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d" - chainSignerFactory, signerAddress, _ := crypto.ChainSignerFactoryFromConfig(&NoOpLogger{}, privateKeyString, "", "", opsigner.CLIConfig{}) - chainSigner := chainSignerFactory(chainID, common.Address{}) - - state, streamer := setupStreamerTesting(namespace, signerAddress) - rng := rand.New(rand.NewSource(0)) - - // The number of batches to create - const N = 1000 - - var batches []*derive.SingularBatch - - // update the state of our streamer - syncStatus := state.SyncStatus() - err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin) - - for i := 0; i < N; i++ { - batch, _, _, espTxnInBlock := state.CreateEspressoTxnData( - ctx, - namespace, - rng, - chainID, - uint64(i)+1, - chainSigner, - ) - - state.AddEspressoTransactionData(uint64(5*i), namespace, espTxnInBlock) - batches = append(batches, batch) - } - - if have, want := err, error(nil); have != want { - t.Fatalf("failed to refresh streamer state encountered error:\nhave:\n\t\"%v\"\nwant:\n\t\"%v\"\n", have, want) - } - - for i := 0; i < N; i++ { - if !streamer.HasNext(ctx) { - // Update the state of our streamer - if have, want := streamer.Update(ctx), error(nil); !errors.Is(have, want) { - t.Fatalf("failed to update streamer state encountered error:\nhave:\n\t\"%v\"\nwant:\n\t\"%v\"\n", have, want) - } - } - - batch := batches[i] - - batchFromEsp := streamer.Next(ctx) - require.NotNil(t, batchFromEsp, "unexpectedly did not receive a batch from streamer") - - // This batch ** should ** match the one we created above. - - if have, want := batchFromEsp.Batch.GetEpochNum(), batch.GetEpochNum(); have != want { - t.Fatalf("batch epoch number does not match:\nhave:\n\t%v\ndo not want:\n\t%v\n", have, want) - } - - state.AdvanceSafeL2() - state.AdvanceFinalizedL1() - } - - if have, want := len(state.EspTransactionData), N; have != want { - t.Fatalf("unexpected number of batches in state:\nhave:\n\t%v\nwant:\n\t%v\n", have, want) - } -} - -// TestStreamerEspressoOutOfOrder tests the behavior of the EspressoStreamer -// when the batches coming from Espresso are not in sequential order. -// -// The Streamer is expected to be able to reorder these batches before -// iterating over them. -func TestStreamerEspressoOutOfOrder(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - namespace := uint64(42) - chainID := big.NewInt(int64(namespace)) - privateKeyString := "59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d" - chainSignerFactory, signerAddress, _ := crypto.ChainSignerFactoryFromConfig(&NoOpLogger{}, privateKeyString, "", "", opsigner.CLIConfig{}) - chainSigner := chainSignerFactory(chainID, common.Address{}) - - state, streamer := setupStreamerTesting(namespace, signerAddress) - rng := rand.New(rand.NewSource(0)) - - // update the state of our streamer - syncStatus := state.SyncStatus() - err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin) - - if have, want := err, error(nil); have != want { - t.Fatalf("failed to refresh streamer state encountered error:\nhave:\n\t\"%v\"\nwant:\n\t\"%v\"\n", have, want) - } - - const N = 1000 - var batches []*derive.SingularBatch - for i := 0; i < N; i++ { - batch, _, _, block := state.CreateEspressoTxnData( - ctx, - namespace, - rng, - chainID, - uint64(i)+1, - chainSigner, - ) - - rollEspBlockNumber := rng.Intn(N * 5) - for { - _, ok := state.EspTransactionData[BlockAndNamespace(uint64(rollEspBlockNumber), namespace)] - if ok { - // re-roll, if already populated. - rollEspBlockNumber = rng.Intn(N * 5) - continue - } - - break - } - - state.AddEspressoTransactionData(uint64(rollEspBlockNumber), namespace, block) - batches = append(batches, batch) - } - - { - - for i := 0; i < N; i++ { - for j := 0; j < int(state.LatestEspHeight/100); j++ { - // Update the state of our streamer - if have, want := streamer.Update(ctx), error(nil); !errors.Is(have, want) { - t.Fatalf("failed to update streamer state encountered error:\nhave:\n\t\"%v\"\nwant:\n\t\"%v\"\n", have, want) - } - if streamer.HasNext(ctx) { - break - } - } - - batch := batches[i] - batchFromEsp := streamer.Next(ctx) - require.NotNil(t, batchFromEsp, "unexpectedly did not receive a batch from streamer") - - // This batch ** should ** match the one we created above. - - if have, want := batchFromEsp.Batch.GetEpochNum(), batch.GetEpochNum(); have != want { - t.Fatalf("batch epoch number does not match:\nhave:\n\t%v\ndo not want:\n\t%v\n", have, want) - } - - state.AdvanceSafeL2() - } - } - - if have, want := len(state.EspTransactionData), N; have != want { - t.Fatalf("unexpected number of batches in state:\nhave:\n\t%v\nwant:\n\t%v\n", have, want) - } -} - -// TestEspressoStreamerDuplicationHandling tests the behavior of the EspressoStreamer -// when a duplicated batch is received. -// -// The Streamer is expected to skip the duplicated batch and only return once for each batch. -func TestEspressoStreamerDuplicationHandling(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - namespace := uint64(42) - chainID := big.NewInt(int64(namespace)) - privateKeyString := "59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d" - chainSignerFactory, signerAddress, _ := crypto.ChainSignerFactoryFromConfig(&NoOpLogger{}, privateKeyString, "", "", opsigner.CLIConfig{}) - chainSigner := chainSignerFactory(chainID, common.Address{}) - - state, streamer := setupStreamerTesting(namespace, signerAddress) - rng := rand.New(rand.NewSource(0)) - - // update the state of our streamer - syncStatus := state.SyncStatus() - err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin) - - if have, want := err, error(nil); have != want { - t.Fatalf("failed to refresh streamer state encountered error:\nhave:\n\t\"%v\"\nwant:\n\t\"%v\"\n", have, want) - } - - const N = 1000 - for i := 0; i < N; i++ { - batch, _, _, espTxnInBlock := state.CreateEspressoTxnData( - ctx, - namespace, - rng, - chainID, - uint64(i)+1, - chainSigner, - ) - - // duplicate the batch - for j := 0; j < 2; j++ { - // update the state of our streamer - syncStatus := state.SyncStatus() - err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin) - - require.NoError(t, err) - - // add the batch to the state, and make sure duplicate batches are also added with a different height - state.AddEspressoTransactionData(uint64(5*i+j), namespace, espTxnInBlock) - - // Update the state of our streamer - if have, want := streamer.Update(ctx), error(nil); !errors.Is(have, want) { - t.Fatalf("failed to update streamer state encountered error:\nhave:\n\t\"%v\"\nwant:\n\t\"%v\"\n", have, want) - } - } - - batchFromEsp := streamer.Next(ctx) - require.NotNil(t, batchFromEsp, "unexpectedly did not receive a batch from streamer") - - // This batch ** should ** match the one we created above. - // If the duplicate one is NOT skipped, this will FAIL. - require.Equal(t, batchFromEsp.Batch.GetEpochNum(), batch.GetEpochNum()) - - state.AdvanceSafeL2() - state.AdvanceFinalizedL1() - - } - - // Check that the state has the correct number of duplicated batches - require.Equal(t, len(state.EspTransactionData), 2*N) -} - -// createSingularBatch creates a mock SingularBatch for testing purposes -// with a specific L1 origin (epoch number and hash). -func (m *MockStreamerSource) createSingularBatch(rng *rand.Rand, txCount int, chainID *big.Int, l2Height uint64, epochNum uint64, epochHash common.Hash) *derive.SingularBatch { - signer := geth_types.NewLondonSigner(chainID) - baseFee := big.NewInt(rng.Int63n(300_000_000_000)) - txsEncoded := make([]hexutil.Bytes, 0, txCount) - for i := 0; i < txCount; i++ { - tx := testutils.RandomTx(rng, baseFee, signer) - txEncoded, err := tx.MarshalBinary() - if err != nil { - panic("tx Marshal binary" + err.Error()) - } - txsEncoded = append(txsEncoded, txEncoded) - } - - return &derive.SingularBatch{ - ParentHash: createHashFromHeight(l2Height), - EpochNum: rollup.Epoch(epochNum), - EpochHash: epochHash, - Timestamp: l2Height, - Transactions: txsEncoded, - } -} - -// CreateEspressoTxnDataWithL1Origin creates a mock Espresso transaction data set -// for testing purposes with a specific L1 origin. -func (m *MockStreamerSource) CreateEspressoTxnDataWithL1Origin( - ctx context.Context, - namespace uint64, - rng *rand.Rand, - chainID *big.Int, - l2Height uint64, - chainSigner crypto.ChainSigner, - epochNum uint64, - epochHash common.Hash, -) (*derive.SingularBatch, *derive.EspressoBatch, *espressoCommon.Transaction, espressoClient.TransactionsInBlock) { - txCount := rng.Intn(10) - batch := m.createSingularBatch(rng, txCount, chainID, l2Height, epochNum, epochHash) - espBatch := createEspressoBatch(batch) - espTxn := createEspressoTransaction(ctx, espBatch, namespace, chainSigner) - espTxnInBlock := createTransactionsInBlock(espTxn) - - return batch, espBatch, espTxn, espTxnInBlock -} - -// TestStreamerInvalidHeadBatchDiscarded tests that an invalid headBatch is discarded -// and the next valid candidate is promoted from the buffer. -func TestStreamerInvalidHeadBatchDiscarded(t *testing.T) { - namespace := uint64(42) - chainID := big.NewInt(int64(namespace)) - privateKeyString := "59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d" - chainSignerFactory, signerAddress, _ := crypto.ChainSignerFactoryFromConfig(&NoOpLogger{}, privateKeyString, "", "", opsigner.CLIConfig{}) - chainSigner := chainSignerFactory(chainID, common.Address{}) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - state, streamer := setupStreamerTesting(namespace, signerAddress) - rng := rand.New(rand.NewSource(2)) - - // Refresh state - after this, BatchPos becomes 1 - syncStatus := state.SyncStatus() - err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin) - require.NoError(t, err) - - // Create batch 1 with INVALID L1 origin hash (using a hash that won't match) - invalidHash := common.HexToHash("0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef") - _, _, _, espTxnInBlockInvalid := state.CreateEspressoTxnDataWithL1Origin( - ctx, namespace, rng, chainID, 1, chainSigner, - state.FinalizedL1.Number, invalidHash, - ) - state.AddEspressoTransactionData(0, namespace, espTxnInBlockInvalid) - - // Create batch 1 with VALID L1 origin (using the correct hash) - _, _, _, espTxnInBlockValid := state.CreateEspressoTxnDataWithL1Origin( - ctx, namespace, rng, chainID, 1, chainSigner, - state.FinalizedL1.Number, state.FinalizedL1.Hash, - ) - state.AddEspressoTransactionData(1, namespace, espTxnInBlockValid) - - // Update to fetch both batches - err = streamer.Update(ctx) - require.NoError(t, err) - - // HasNext should drop the invalid batch and find the valid one - require.True(t, streamer.HasNext(ctx), "valid batch should be available after invalid is dropped") - - // Next should return the valid batch - batch := streamer.Next(ctx) - require.NotNil(t, batch) - require.Equal(t, uint64(1), batch.Number()) -} - -// TestStreamerMultipleBatchesSameNumber tests handling of multiple batches with -// the same batch number but different validity. -func TestStreamerMultipleBatchesSameNumber(t *testing.T) { - namespace := uint64(42) - chainID := big.NewInt(int64(namespace)) - privateKeyString := "59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d" - chainSignerFactory, signerAddress, _ := crypto.ChainSignerFactoryFromConfig(&NoOpLogger{}, privateKeyString, "", "", opsigner.CLIConfig{}) - chainSigner := chainSignerFactory(chainID, common.Address{}) - - t.Run("invalid batches dropped during HasNext iteration until valid found", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - state, streamer := setupStreamerTesting(namespace, signerAddress) - rng := rand.New(rand.NewSource(3)) - - // Refresh state - after this, BatchPos becomes 1 - syncStatus := state.SyncStatus() - err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin) - require.NoError(t, err) - - invalidHash := common.HexToHash("0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef") - - // Create 3 batches all with number 1: - // Batch A: invalid L1 origin hash - _, _, _, espTxnA := state.CreateEspressoTxnDataWithL1Origin( - ctx, namespace, rng, chainID, 1, chainSigner, - state.FinalizedL1.Number, invalidHash, - ) - state.AddEspressoTransactionData(0, namespace, espTxnA) - - // Batch B: invalid L1 origin hash - _, _, _, espTxnB := state.CreateEspressoTxnDataWithL1Origin( - ctx, namespace, rng, chainID, 1, chainSigner, - state.FinalizedL1.Number, invalidHash, - ) - state.AddEspressoTransactionData(1, namespace, espTxnB) - - // Batch C: valid L1 origin hash - _, _, _, espTxnC := state.CreateEspressoTxnDataWithL1Origin( - ctx, namespace, rng, chainID, 1, chainSigner, - state.FinalizedL1.Number, state.FinalizedL1.Hash, - ) - state.AddEspressoTransactionData(2, namespace, espTxnC) - - // Update to fetch all batches - err = streamer.Update(ctx) - require.NoError(t, err) - - // HasNext should return true (found valid batch C) - require.True(t, streamer.HasNext(ctx)) - - // Next should return batch C (the valid one) - batch := streamer.Next(ctx) - require.NotNil(t, batch) - require.Equal(t, uint64(1), batch.Number()) - - // BatchPos should have advanced to 2 - require.Equal(t, uint64(2), streamer.BatchPos) - }) - - t.Run("BatchPos does NOT advance when all candidates for batch number are invalid", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - state, streamer := setupStreamerTesting(namespace, signerAddress) - rng := rand.New(rand.NewSource(4)) - - // Refresh state - after this, BatchPos becomes 1 - syncStatus := state.SyncStatus() - err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin) - require.NoError(t, err) - - invalidHash := common.HexToHash("0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef") - - // Create 3 batches all with number 1, ALL with invalid L1 origins - for i := 0; i < 3; i++ { - _, _, _, espTxn := state.CreateEspressoTxnDataWithL1Origin( - ctx, namespace, rng, chainID, 1, chainSigner, - state.FinalizedL1.Number, invalidHash, - ) - state.AddEspressoTransactionData(uint64(i), namespace, espTxn) - } - - // Update to fetch all batches - err = streamer.Update(ctx) - require.NoError(t, err) - - // All candidates should be dropped (BatchDrop) - // HasNext should return false (no valid batch available) - require.False(t, streamer.HasNext(ctx)) - - // BatchPos should still be 1 (NOT advanced) - require.Equal(t, uint64(1), streamer.BatchPos) - }) - - t.Run("first valid batch returned when multiple valid candidates exist", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - state, streamer := setupStreamerTesting(namespace, signerAddress) - rng := rand.New(rand.NewSource(5)) - - // Refresh state - after this, BatchPos becomes 1 - syncStatus := state.SyncStatus() - err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin) - require.NoError(t, err) - - // Create 2 valid batches for number 1 with different hashes - _, espBatch1, _, espTxn1 := state.CreateEspressoTxnDataWithL1Origin( - ctx, namespace, rng, chainID, 1, chainSigner, - state.FinalizedL1.Number, state.FinalizedL1.Hash, - ) - state.AddEspressoTransactionData(0, namespace, espTxn1) - firstBatchHash := espBatch1.Hash() - - _, _, _, espTxn2 := state.CreateEspressoTxnDataWithL1Origin( - ctx, namespace, rng, chainID, 1, chainSigner, - state.FinalizedL1.Number, state.FinalizedL1.Hash, - ) - state.AddEspressoTransactionData(1, namespace, espTxn2) - - // Update to fetch both batches - err = streamer.Update(ctx) - require.NoError(t, err) - - // HasNext should return true - require.True(t, streamer.HasNext(ctx)) - - // Next should return the first valid batch (insertion order matters) - batch := streamer.Next(ctx) - require.NotNil(t, batch) - require.Equal(t, uint64(1), batch.Number()) - require.Equal(t, firstBatchHash, batch.Hash(), "first inserted batch should be returned") - - // Second batch should be skipped as BatchPast - require.False(t, streamer.HasNext(ctx), "no more batches should be available") - }) -} - -// TestStreamerBufferCapacityAndSkipPos tests the skip position mechanism when the buffer fills up. -func TestStreamerBufferCapacityAndSkipPos(t *testing.T) { - namespace := uint64(42) - chainID := big.NewInt(int64(namespace)) - privateKeyString := "59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d" - chainSignerFactory, signerAddress, _ := crypto.ChainSignerFactoryFromConfig(&NoOpLogger{}, privateKeyString, "", "", opsigner.CLIConfig{}) - chainSigner := chainSignerFactory(chainID, common.Address{}) - - t.Run("skipPos not overwritten across multiple fetch ranges", func(t *testing.T) { - // Regression test: when the Update loop iterates through multiple - // HotShot block ranges, hitting ErrAtCapacity in a later range must - // NOT overwrite skipPos set by an earlier range. Otherwise the rewind - // skips the earlier range's batches permanently. - // - // Scenario: - // - Enough batches (starting from 2, skipping 1) are placed to fill - // the buffer, plus an extra fetch range worth of batches beyond it. - // - The extra batches are dropped because the buffer is full. - // skipPos should record the earliest range where capacity was hit. - // - Batch 1 is injected later, consumed, and triggers a rewind. - // - After draining the buffer, the next batch must come from the - // re-fetched overflow. If skipPos was overwritten to a later range - // start, the rewind won't go far enough and those batches are lost. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - state := NewMockStreamerSource() - state.TeeBatcherAddr = signerAddress - logger := new(NoOpLogger) - - streamer, err := espresso.NewEspressoStreamer( - namespace, - state, - state, - state, - state, - logger, - derive.CreateEspressoBatchUnmarshaler(), - 50*time.Millisecond, - 0, - 0, // originBatchPos=0, so BatchPos starts at 1 - batchAuthenticatorAddr, - ) - require.NoError(t, err) - - rng := rand.New(rand.NewSource(99)) - - syncStatus := state.SyncStatus() - err = streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin) - require.NoError(t, err) - - // Place enough batches to fill the buffer and overflow by one full - // fetch range. Batch 1 is intentionally missing so HasNext stays - // false, forcing the Update loop to keep iterating across ranges. - totalBatches := int(espresso.BatchBufferCapacity) + int(espresso.HOTSHOT_BLOCK_FETCH_LIMIT) - for i := 0; i < totalBatches; i++ { - _, _, _, espTxn := state.CreateEspressoTxnData(ctx, namespace, rng, chainID, uint64(i+2), chainSigner) - state.AddEspressoTransactionData(uint64(i), namespace, espTxn) - } - - // Update processes all ranges. The buffer fills up partway through, - // and all subsequent batches are dropped with ErrAtCapacity. - err = streamer.Update(ctx) - require.NoError(t, err) - require.False(t, streamer.HasNext(ctx)) - - // Inject batch 1 beyond all existing data. - batch1Pos := uint64(totalBatches + 10) - _, _, _, espTxn1 := state.CreateEspressoTxnData(ctx, namespace, rng, chainID, 1, chainSigner) - state.AddEspressoTransactionData(batch1Pos, namespace, espTxn1) - - // Fetch and consume batch 1 — triggers the rewind via skipPos. - err = streamer.Update(ctx) - require.NoError(t, err) - require.True(t, streamer.HasNext(ctx)) - batch := streamer.Next(ctx) - require.NotNil(t, batch) - require.Equal(t, uint64(1), batch.Number()) - - // Drain the entire buffer of previously-buffered batches. - firstOverflow := uint64(espresso.BatchBufferCapacity) + 2 - for expectedNum := uint64(2); expectedNum < firstOverflow; expectedNum++ { - err = streamer.Update(ctx) - require.NoError(t, err) - require.True(t, streamer.HasNext(ctx), "expected batch %d to be available", expectedNum) - batch = streamer.Next(ctx) - require.NotNil(t, batch) - require.Equal(t, expectedNum, batch.Number()) - } - - // The first batch that was dropped due to capacity must now be - // recoverable via the rewind. If skipPos was overwritten to a later - // range, this batch is permanently lost. - err = streamer.Update(ctx) - require.NoError(t, err) - require.True(t, streamer.HasNext(ctx), "first overflow batch must be available after rewind — skipPos must preserve the earliest range") - batch = streamer.Next(ctx) - require.NotNil(t, batch) - require.Equal(t, firstOverflow, batch.Number(), "first batch after buffer drain must not be skipped") - }) - - t.Run("new batch for current BatchPos arrives when buffer full", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - state := NewMockStreamerSource() - state.TeeBatcherAddr = signerAddress - logger := new(NoOpLogger) - - // Create streamer - after Refresh with SafeL2.Number=0, BatchPos becomes 1 - streamer, err := espresso.NewEspressoStreamer( - namespace, - state, - state, - state, - state, - logger, - derive.CreateEspressoBatchUnmarshaler(), - 50*time.Millisecond, - 0, - 0, // originBatchPos=0, so BatchPos starts at 1 - batchAuthenticatorAddr, - ) - require.NoError(t, err) - - rng := rand.New(rand.NewSource(7)) - - // Refresh state - after this, BatchPos becomes 1 - syncStatus := state.SyncStatus() - err = streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin) - require.NoError(t, err) - - // Fill buffer with future batches (2, 3, 4, ...) - for i := 0; i < int(espresso.BatchBufferCapacity); i++ { - _, _, _, espTxn := state.CreateEspressoTxnData(ctx, namespace, rng, chainID, uint64(i+2), chainSigner) - state.AddEspressoTransactionData(uint64(i), namespace, espTxn) - } - - // Update to fill buffer - err = streamer.Update(ctx) - require.NoError(t, err) - - // HasNext should be false (batch 1 is missing) - require.False(t, streamer.HasNext(ctx)) - - // Now add batch 1 (the one we need) - laterPos := uint64(espresso.BatchBufferCapacity + 1) - _, _, _, espTxn1 := state.CreateEspressoTxnData(ctx, namespace, rng, chainID, 1, chainSigner) - state.AddEspressoTransactionData(laterPos, namespace, espTxn1) - - // Update to get batch 1 - err = streamer.Update(ctx) - require.NoError(t, err) - - // Batch 1 should be assigned to headBatch directly (not buffered) - require.True(t, streamer.HasNext(ctx)) - batch := streamer.Next(ctx) - require.NotNil(t, batch) - require.Equal(t, uint64(1), batch.Number()) - }) -} - -// TestStreamerBatchOrderingDeterminism tests that the streamer processes batches -// deterministically when multiple batches have the same number - insertion order -// must be respected. -func TestStreamerBatchOrderingDeterminism(t *testing.T) { - namespace := uint64(42) - chainID := big.NewInt(int64(namespace)) - privateKeyString := "59c6995e998f97a5a0044966f0945389dc9e86dae88c7a8412f4603b6b78690d" - chainSignerFactory, signerAddress, _ := crypto.ChainSignerFactoryFromConfig(&NoOpLogger{}, privateKeyString, "", "", opsigner.CLIConfig{}) - chainSigner := chainSignerFactory(chainID, common.Address{}) - - t.Run("must wait for first-inserted batch to become decided before processing later ones", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - state, streamer := setupStreamerTesting(namespace, signerAddress) - rng := rand.New(rand.NewSource(8)) - - // Advance L1 to have two finalized blocks at heights 1 and 2 - // FinalizedL1 starts at 1 - state.AdvanceFinalizedL1() // Now at 2 - - // Refresh state with only height 1 finalized (we'll pretend height 2 is not finalized yet) - // We need to control what the streamer sees as finalized - // After this refresh, BatchPos becomes 1 - l1Height1 := createL1BlockRef(1) - err := streamer.Refresh(ctx, l1Height1, state.SafeL2.Number, state.SafeL2.L1Origin) - require.NoError(t, err) - - // Insert batch a1 (number 1, L1 origin at height 2 - NOT finalized yet) - l1Height2 := createL1BlockRef(2) - _, espBatchA1, _, espTxnA1 := state.CreateEspressoTxnDataWithL1Origin( - ctx, namespace, rng, chainID, 1, chainSigner, - l1Height2.Number, l1Height2.Hash, - ) - state.AddEspressoTransactionData(0, namespace, espTxnA1) - a1Hash := espBatchA1.Hash() - - // Insert batch a2 (number 1, L1 origin at height 1 - IS finalized) - _, _, _, espTxnA2 := state.CreateEspressoTxnDataWithL1Origin( - ctx, namespace, rng, chainID, 1, chainSigner, - l1Height1.Number, l1Height1.Hash, - ) - state.AddEspressoTransactionData(1, namespace, espTxnA2) - - // Update to fetch both batches - err = streamer.Update(ctx) - require.NoError(t, err) - - // HasNext should return false - must wait for a1 (inserted first) to become decided - // even though a2 is already valid - require.False(t, streamer.HasNext(ctx), "should wait for first-inserted batch to become decided") - - // Now advance L1 finalized to height 2 - err = streamer.Refresh(ctx, l1Height2, state.SafeL2.Number, state.SafeL2.L1Origin) - require.NoError(t, err) - - // HasNext should now return true - require.True(t, streamer.HasNext(ctx)) - - // Next should return a1 (the first-inserted batch) - batch := streamer.Next(ctx) - require.NotNil(t, batch) - require.Equal(t, uint64(1), batch.Number()) - require.Equal(t, a1Hash, batch.Hash(), "first-inserted batch should be returned") - - // a2 should subsequently be skipped as BatchPast - require.False(t, streamer.HasNext(ctx), "second batch should be skipped") - }) - - t.Run("insertion order respected across multiple Update calls", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - state, streamer := setupStreamerTesting(namespace, signerAddress) - rng := rand.New(rand.NewSource(9)) - - // Refresh state - after this, BatchPos becomes 1 - syncStatus := state.SyncStatus() - err := streamer.Refresh(ctx, syncStatus.FinalizedL1, syncStatus.SafeL2.Number, syncStatus.SafeL2.L1Origin) - require.NoError(t, err) - - // First Update: insert batch a1 (number 1) - _, espBatchA1, _, espTxnA1 := state.CreateEspressoTxnDataWithL1Origin( - ctx, namespace, rng, chainID, 1, chainSigner, - state.FinalizedL1.Number, state.FinalizedL1.Hash, - ) - state.AddEspressoTransactionData(0, namespace, espTxnA1) - a1Hash := espBatchA1.Hash() - - err = streamer.Update(ctx) - require.NoError(t, err) - - // Second Update: insert batch a2 (number 1, different hash) - _, _, _, espTxnA2 := state.CreateEspressoTxnDataWithL1Origin( - ctx, namespace, rng, chainID, 1, chainSigner, - state.FinalizedL1.Number, state.FinalizedL1.Hash, - ) - state.AddEspressoTransactionData(1, namespace, espTxnA2) - - err = streamer.Update(ctx) - require.NoError(t, err) - - // HasNext should return true - require.True(t, streamer.HasNext(ctx)) - - // Next should return a1 (first inserted) - batch := streamer.Next(ctx) - require.NotNil(t, batch) - require.Equal(t, a1Hash, batch.Hash(), "first-inserted batch should be returned") - - // a2 should be skipped as BatchPast - require.False(t, streamer.HasNext(ctx)) - }) -} diff --git a/go.mod b/go.mod index 812373458e5..4dd2cb81ff1 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/chelnak/ysmrr v0.6.0 github.com/cockroachdb/pebble v1.1.5 github.com/coder/websocket v1.8.13 - github.com/consensys/gnark-crypto v0.18.0 + github.com/consensys/gnark-crypto v0.18.1 github.com/crate-crypto/go-kzg-4844 v1.1.0 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 @@ -24,7 +24,7 @@ require ( github.com/docker/go-connections v0.5.0 github.com/ethereum-optimism/go-ethereum-hdwallet v0.1.4-0.20251001155152-4eb15ccedf7e github.com/ethereum-optimism/superchain-registry/validation v0.0.0-20251121143344-5ac16e0fbb00 - github.com/ethereum/go-ethereum v1.16.3 + github.com/ethereum/go-ethereum v1.17.1 github.com/fatih/color v1.18.0 github.com/fsnotify/fsnotify v1.9.0 github.com/go-task/slim-sprig/v3 v3.0.0 @@ -65,24 +65,27 @@ require ( github.com/protolambda/ctxlock v0.1.0 github.com/schollz/progressbar/v3 v3.18.0 github.com/spf13/afero v1.12.0 - github.com/stretchr/testify v1.10.0 + github.com/stretchr/testify v1.11.1 github.com/urfave/cli/v2 v2.27.6 go.opentelemetry.io/otel v1.34.0 go.opentelemetry.io/otel/trace v1.34.0 - golang.org/x/crypto v0.36.0 + golang.org/x/crypto v0.45.0 golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c - golang.org/x/mod v0.22.0 + golang.org/x/mod v0.29.0 golang.org/x/oauth2 v0.25.0 - golang.org/x/sync v0.14.0 - golang.org/x/term v0.30.0 - golang.org/x/text v0.25.0 + golang.org/x/sync v0.18.0 + golang.org/x/term v0.37.0 + golang.org/x/text v0.31.0 golang.org/x/time v0.11.0 gonum.org/v1/plot v0.16.0 gopkg.in/yaml.v2 v2.4.0 gopkg.in/yaml.v3 v3.0.1 ) -require github.com/joho/godotenv v1.5.1 +require ( + github.com/EspressoSystems/espresso-streamers v1.0.0 + github.com/joho/godotenv v1.5.1 +) require ( codeberg.org/go-fonts/liberation v0.5.0 // indirect @@ -116,7 +119,7 @@ require ( github.com/containerd/cgroups v1.1.0 // indirect github.com/containerd/log v0.1.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect - github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect github.com/crate-crypto/go-eth-kzg v1.4.0 // indirect github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect @@ -262,7 +265,7 @@ require ( github.com/quic-go/webtransport-go v0.8.0 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect - github.com/rogpeppe/go-internal v1.13.1 // indirect + github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/rs/cors v1.11.0 // indirect github.com/rs/xid v1.6.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect @@ -306,9 +309,10 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/image v0.25.0 // indirect - golang.org/x/net v0.38.0 // indirect - golang.org/x/sys v0.36.0 // indirect - golang.org/x/tools v0.29.0 // indirect + golang.org/x/net v0.47.0 // indirect + golang.org/x/sys v0.39.0 // indirect + golang.org/x/telemetry v0.0.0-20251008203120-078029d740a8 // indirect + golang.org/x/tools v0.38.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250115164207-1a7da9e5054f // indirect google.golang.org/grpc v1.69.4 // indirect diff --git a/go.sum b/go.sum index 3d011748945..2a366303d95 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,12 @@ github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e h1:ZIWapoIRN1VqT8GR github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= github.com/EspressoSystems/espresso-network/sdks/go v0.3.4 h1:1hf/k2rGqIGEGQW8O3fQFltPIyxSmumph8aKIa6AjCk= github.com/EspressoSystems/espresso-network/sdks/go v0.3.4/go.mod h1:kaxR08mJb5Mijy7a2RhWCIWOevFI4PcXwDkzoEbsVTk= +github.com/EspressoSystems/espresso-streamers v0.0.2-0.20260401083845-6106312fbfd2 h1:QSDzLrdK6vbRv7R0yUd1RGRI3uJuDdJg/vVdkFdOsAM= +github.com/EspressoSystems/espresso-streamers v0.0.2-0.20260401083845-6106312fbfd2/go.mod h1:Op3SNwQnZ3bqwrUXMAORnL2/pNiFzpfOED4ltYs5o/U= +github.com/EspressoSystems/espresso-streamers v0.0.2-0.20260401163154-23746a33ce96 h1:/jViu0A5z/iLVTsxebsZ4gWdLZjsBcwgKfzorxn9sXA= +github.com/EspressoSystems/espresso-streamers v0.0.2-0.20260401163154-23746a33ce96/go.mod h1:Op3SNwQnZ3bqwrUXMAORnL2/pNiFzpfOED4ltYs5o/U= +github.com/EspressoSystems/espresso-streamers v1.0.0 h1:wMeB+aqevIJv0YNA7BcEXQgIUT8IgBLyuubD7R2B7lk= +github.com/EspressoSystems/espresso-streamers v1.0.0/go.mod h1:Op3SNwQnZ3bqwrUXMAORnL2/pNiFzpfOED4ltYs5o/U= github.com/Masterminds/semver/v3 v3.3.1 h1:QtNSWtVZ3nBfk8mAOu/B6v7FMJ+NHTIgUPi7rj+4nv4= github.com/Masterminds/semver/v3 v3.3.1/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= @@ -162,8 +168,8 @@ github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAK github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE= github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs= -github.com/consensys/gnark-crypto v0.18.0 h1:vIye/FqI50VeAr0B3dx+YjeIvmc3LWz4yEfbWBpTUf0= -github.com/consensys/gnark-crypto v0.18.0/go.mod h1:L3mXGFTe1ZN+RSJ+CLjUt9x7PNdx8ubaYfDROyp2Z8c= +github.com/consensys/gnark-crypto v0.18.1 h1:RyLV6UhPRoYYzaFnPQA4qK3DyuDgkTgskDdoGqFt3fI= +github.com/consensys/gnark-crypto v0.18.1/go.mod h1:L3mXGFTe1ZN+RSJ+CLjUt9x7PNdx8ubaYfDROyp2Z8c= github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327/go.mod h1:ZJeTFisyysqgcCdecO57Dj79RfL0LNeGiFUqLYQRYLE= github.com/containerd/cgroups v1.1.0 h1:v8rEWFl6EoqHB+swVNjVoCJE8o3jX7e8nqBGPLaDFBM= github.com/containerd/cgroups v1.1.0/go.mod h1:6ppBcbh/NOOUU+dMKrykgaBnK9lCIBxHqJDGwsa1mIw= @@ -175,8 +181,8 @@ github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8 github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= -github.com/cpuguy83/go-md2man/v2 v2.0.5 h1:ZtcqGrnekaHpVLArFSe4HK5DoKx1T0rq2DwVB0alcyc= -github.com/cpuguy83/go-md2man/v2 v2.0.5/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cpuguy83/go-md2man/v2 v2.0.6 h1:XJtiaUW6dEEqVuZiMTn1ldk455QWwEIsMIJlo5vtkx0= +github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/crate-crypto/go-eth-kzg v1.4.0 h1:WzDGjHk4gFg6YzV0rJOAsTK4z3Qkz5jd4RE3DAvPFkg= github.com/crate-crypto/go-eth-kzg v1.4.0/go.mod h1:J9/u5sWfznSObptgfa92Jq8rTswn6ahQWEuiLHOjCUI= github.com/crate-crypto/go-ipa v0.0.0-20240724233137-53bbb0ceb27a h1:W8mUrRp6NOVl3J+MYp5kPMoUZPp7aOYHtaua31lwRHg= @@ -826,8 +832,8 @@ github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUc github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= -github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= -github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/rs/cors v1.11.0 h1:0B9GE/r9Bc2UxRMMtymBkHTenPkHDv0CW4Y98GBY+po= github.com/rs/cors v1.11.0/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/rs/xid v1.6.0 h1:fV591PaemRlL6JfRxGDEPl69wICngIQ3shQtzfy2gxU= @@ -906,8 +912,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= -github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe h1:nbdqkIGOGfUAD54q1s2YBcBz/WcsxCO9HUQ4aGV5hUw= github.com/supranational/blst v0.3.16-0.20250831170142-f48500c1fdbe/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= @@ -1023,8 +1029,8 @@ golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5y golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= -golang.org/x/crypto v0.36.0 h1:AnAEvhDddvBdpY+uR+MyHmuZzzNqXSe/GvuDeob5L34= -golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= +golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= +golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY= golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8= @@ -1043,8 +1049,8 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= -golang.org/x/mod v0.22.0 h1:D4nJWe9zXqHOmWqj4VMOJhvzj7bEZg4wEYa759z1pH4= -golang.org/x/mod v0.22.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= +golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= +golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -1078,8 +1084,8 @@ golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= -golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= -golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= +golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1098,8 +1104,8 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= -golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= +golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20180810173357-98c5dad5d1a0/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1161,8 +1167,10 @@ golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= -golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= +golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/telemetry v0.0.0-20251008203120-078029d740a8 h1:LvzTn0GQhWuvKH/kVRS3R3bVAsdQWI7hvfLHGgh9+lU= +golang.org/x/telemetry v0.0.0-20251008203120-078029d740a8/go.mod h1:Pi4ztBfryZoJEkyFTI5/Ocsu2jXyDr6iSdgJiYE/uwE= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= @@ -1171,8 +1179,8 @@ golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= -golang.org/x/term v0.30.0 h1:PQ39fJZ+mfadBm0y5WlL4vlM7Sx1Hgf13sMIY2+QS9Y= -golang.org/x/term v0.30.0/go.mod h1:NYYFdzHoI5wRh/h5tDMdMqCqPJZEuNqVR5xJLd/n67g= +golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU= +golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -1185,8 +1193,8 @@ golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= -golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= +golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= +golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1214,8 +1222,8 @@ golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= -golang.org/x/tools v0.29.0 h1:Xx0h3TtM9rzQpQuR4dKLrdglAmCEN5Oi+P74JdhdzXE= -golang.org/x/tools v0.29.0/go.mod h1:KMQVMRsVxU6nHCFXrBPhDB8XncLNLM0lIy/F14RP588= +golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= +golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 31ef6e855a6..cb0105c850c 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -13,6 +13,7 @@ import ( "golang.org/x/sync/errgroup" + op "github.com/EspressoSystems/espresso-streamers/op" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -192,11 +193,11 @@ func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter { l1Adapter := &batcherL1Adapter{L1Client: batchSubmitter.L1Client} // Convert typed nil pointer to untyped nil interface to avoid typed-nil interface panic // in confirmEspressoBlockHeight when EspressoLightClient is not configured. - var lightClientIface espresso.LightClientCallerInterface + var lightClientIface op.LightClientCallerInterface if batchSubmitter.EspressoLightClient != nil { lightClientIface = batchSubmitter.EspressoLightClient } - unbufferedStreamer, err := espresso.NewEspressoStreamer( + unbufferedStreamer, err := op.NewEspressoStreamer( batchSubmitter.RollupConfig.L2ChainID.Uint64(), l1Adapter, l1Adapter, @@ -204,14 +205,13 @@ func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter { lightClientIface, batchSubmitter.Log, derive.CreateEspressoBatchUnmarshaler(), - 2*time.Second, setup.Config.CaffeinationHeightEspresso, setup.Config.CaffeinationHeightL2, batchSubmitter.RollupConfig.BatchAuthenticatorAddress, ) if err != nil { panic(fmt.Sprintf("failed to create Espresso streamer: %v", err)) } - batchSubmitter.espressoStreamer = espresso.NewBufferedEspressoStreamer(unbufferedStreamer) + batchSubmitter.espressoStreamer = op.NewBufferedEspressoStreamer(unbufferedStreamer) batchSubmitter.Log.Info("Streamer started", "streamer", batchSubmitter.espressoStreamer) } diff --git a/op-e2e/e2eutils/opnode/opnode.go b/op-e2e/e2eutils/opnode/opnode.go index 07d425f0e3e..f7091d83bf8 100644 --- a/op-e2e/e2eutils/opnode/opnode.go +++ b/op-e2e/e2eutils/opnode/opnode.go @@ -5,7 +5,7 @@ import ( "github.com/ethereum/go-ethereum/log" - "github.com/ethereum-optimism/optimism/espresso" + op "github.com/EspressoSystems/espresso-streamers/op" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/services" "github.com/ethereum-optimism/optimism/op-node/config" "github.com/ethereum-optimism/optimism/op-node/metrics" @@ -27,7 +27,7 @@ type Opnode struct { // // Note: This function should be used carefully to avoid a stall, since it is a getter and does not // create a new instance, which means the caller may deprive the node of the batches. -func (o *Opnode) EspressoStreamer() *espresso.BatchStreamer[derive.EspressoBatch] { +func (o *Opnode) EspressoStreamer() *op.BatchStreamer[derive.EspressoBatch] { return o.node.EspressoStreamer() } diff --git a/op-node/node/node.go b/op-node/node/node.go index 3fa42da560e..30f7e44b279 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -10,7 +10,7 @@ import ( "sync/atomic" "time" - "github.com/ethereum-optimism/optimism/espresso" + op "github.com/EspressoSystems/espresso-streamers/op" "github.com/hashicorp/go-multierror" @@ -763,7 +763,7 @@ func initP2PSigner(ctx context.Context, cfg *config.Config, node *OpNode) (p2p.S return p2pSigner, err } -func (n *OpNode) EspressoStreamer() *espresso.BatchStreamer[derive.EspressoBatch] { +func (n *OpNode) EspressoStreamer() *op.BatchStreamer[derive.EspressoBatch] { return n.l2Driver.SyncDeriver.Derivation.EspressoStreamer() } diff --git a/op-node/rollup/derive/attributes_queue.go b/op-node/rollup/derive/attributes_queue.go index 190a298c4f4..1b9f67f12de 100644 --- a/op-node/rollup/derive/attributes_queue.go +++ b/op-node/rollup/derive/attributes_queue.go @@ -7,6 +7,7 @@ import ( "io" "time" + op "github.com/EspressoSystems/espresso-streamers/op" "github.com/ethereum-optimism/optimism/espresso" "github.com/ethereum/go-ethereum/common" @@ -63,7 +64,7 @@ type AttributesQueue struct { isCaffNode bool caffeinationHeightL2 uint64 - espressoStreamer *espresso.BatchStreamer[EspressoBatch] + espressoStreamer *op.BatchStreamer[EspressoBatch] } type SingularBatchProvider interface { @@ -73,7 +74,7 @@ type SingularBatchProvider interface { NextBatch(context.Context, eth.L2BlockRef) (*SingularBatch, bool, error) } -func initEspressoStreamer(log log.Logger, cfg *rollup.Config) *espresso.BatchStreamer[EspressoBatch] { +func initEspressoStreamer(log log.Logger, cfg *rollup.Config) *op.BatchStreamer[EspressoBatch] { if !cfg.CaffNodeConfig.Enabled { log.Info("Espresso streamer not initialized: Caff node is not enabled") return nil @@ -123,7 +124,7 @@ func (aq *AttributesQueue) Origin() eth.L1BlockRef { // but with a few key differences: // - It only calls Update() when needed and everytime only calls Next() once. While the batcher calls Next() in a loop. // - It performs additional checks, such as validating the timestamp and parent hash, which does not apply to the batcher. -func CaffNextBatch(s *espresso.BatchStreamer[EspressoBatch], ctx context.Context, parent eth.L2BlockRef, blockTime uint64, l1Fetcher L1Fetcher) (*SingularBatch, bool, error) { +func CaffNextBatch(s *op.BatchStreamer[EspressoBatch], ctx context.Context, parent eth.L2BlockRef, blockTime uint64, l1Fetcher L1Fetcher) (*SingularBatch, bool, error) { // Get the L1 finalized block finalizedL1Block, err := l1Fetcher.L1BlockRefByLabel(ctx, eth.Finalized) if err != nil { diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 7a8b86d4d52..e8b531a3bcf 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -6,7 +6,7 @@ import ( "fmt" "io" - "github.com/ethereum-optimism/optimism/espresso" + op "github.com/EspressoSystems/espresso-streamers/op" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" @@ -293,6 +293,6 @@ func (dp *DerivationPipeline) ConfirmEngineReset() { dp.engineIsReset = true } -func (dp *DerivationPipeline) EspressoStreamer() *espresso.BatchStreamer[EspressoBatch] { +func (dp *DerivationPipeline) EspressoStreamer() *op.BatchStreamer[EspressoBatch] { return dp.attrib.espressoStreamer } diff --git a/op-node/rollup/driver/interfaces.go b/op-node/rollup/driver/interfaces.go index 0d08bff147d..31d59ac292c 100644 --- a/op-node/rollup/driver/interfaces.go +++ b/op-node/rollup/driver/interfaces.go @@ -3,8 +3,8 @@ package driver import ( "context" + op "github.com/EspressoSystems/espresso-streamers/op" altda "github.com/ethereum-optimism/optimism/op-alt-da" - "github.com/ethereum-optimism/optimism/espresso" opnodemetrics "github.com/ethereum-optimism/optimism/op-node/metrics" "github.com/ethereum-optimism/optimism/op-node/metrics/metered" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" @@ -60,7 +60,7 @@ type DerivationPipeline interface { Origin() eth.L1BlockRef DerivationReady() bool ConfirmEngineReset() - EspressoStreamer() *espresso.BatchStreamer[derive.EspressoBatch] + EspressoStreamer() *op.BatchStreamer[derive.EspressoBatch] } type AttributesHandler interface {