diff --git a/espresso/buffered_streamer.go b/espresso/buffered_streamer.go new file mode 100644 index 00000000000..c43ff65580d --- /dev/null +++ b/espresso/buffered_streamer.go @@ -0,0 +1,191 @@ +package espresso + +import ( + "context" + + "github.com/ethereum-optimism/optimism/op-service/eth" +) + +// BufferedEspressoStreamer is a wrapper around EspressoStreamerIFace that +// buffers batches to avoid repeated calls to the underlying streamer. +// +// This structure is meant to help the underlying streamer avoid getting +// reset too frequently. This has primarily been added as an in-between +// layer for the Batch, which seems to need to rewind constantly, which is +// not great for the EspressoStreamer which wants to only progress forward +// and not rewind. +// +// The general idea is to take advantage that we should have a safe starting +// position for the batches being reported to the streamer that is being +// updated frequently. +// +// We can use this safe starting position to store a buffer as needed to store +// all batches from the safe position to whatever the current latest batch is. +// This allows us to avoid needing to rewind the streamer, and instead just +// adjust the read position of the buffered streamer. +type BufferedEspressoStreamer[B Batch] struct { + streamer EspressoStreamer[B] + + batches []*B + + // local offset + readPos uint64 + + startingBatchPos uint64 + currentSafeL1Origin eth.BlockID +} + +// Compile time assertion to ensure BufferedEspressoStreamer implements +// EspressoStreamerIFace +var _ EspressoStreamer[Batch] = (*BufferedEspressoStreamer[Batch])(nil) + +// NewBufferedEspressoStreamer creates a new BufferedEspressoStreamer instance. +func NewBufferedEspressoStreamer[B Batch](streamer EspressoStreamer[B]) *BufferedEspressoStreamer[B] { + return &BufferedEspressoStreamer[B]{ + streamer: streamer, + } +} + +// Update implements EspressoStreamerIFace +func (b *BufferedEspressoStreamer[B]) Update(ctx context.Context) error { + return b.streamer.Update(ctx) +} + +// handleL2PositionUpdate handles the update of the L2 position for the +// buffered streamer. +// +// There are three conditions to consider: +// 1. If the next position is before the starting batch position, we need to +// reset the underlying streamer, and dump our local buffer, as this +// indicates a need to move backwards before our earliest known batch. +// 2. If the next position is after our starting batch position, then we +// can drop all earlier stored batches in our buffer, and adjust our +// read position accordingly. This should appear to the consumer as nothing +// has changed progression-wise, but it allows us to reclaim memory. +// 3. If the next position is the same as our starting batch position, then +// we do nothing, as we are already at the correct position. +func (b *BufferedEspressoStreamer[B]) handleL2PositionUpdate(nextPosition uint64) { + if nextPosition < b.startingBatchPos { + // If the next position is before the starting batch position, + // we need to reset the buffered streamer to ensure we don't + // miss any batches. + b.readPos = 0 + b.startingBatchPos = nextPosition + b.batches = make([]*B, 0) + b.streamer.Reset() + return + } + + if nextPosition > b.startingBatchPos { + // We want to advance the read position, and we are indicating that + // we no longer will need to refer to older batches. So instead, we + // will want to adjust the buffer, and read position based on the + // new nextPosition. + + positionAdjustment := nextPosition - b.startingBatchPos + if positionAdjustment <= uint64(len(b.batches)) { + // If the adjustment is within the bounds of the current buffer, + // we can simply adjust the read position and starting batch position. + b.batches = b.batches[positionAdjustment:] + b.readPos -= positionAdjustment + } else { + b.batches = make([]*B, 0) + b.readPos = 0 + } + b.startingBatchPos = nextPosition + return + } +} + +// RefreshSafeL1Origin updates the safe L1 origin for the buffered streamer. +// This method attempts to safely handle the adjustment of the safeL1Origin +// without needing to defer to the underlying streamer unless necessary. +func (b *BufferedEspressoStreamer[B]) RefreshSafeL1Origin(safeL1Origin eth.BlockID) error { + if safeL1Origin.Number < b.currentSafeL1Origin.Number { + // If the safeL1Origin is before the starting batch position, we need to + // reset the buffered streamer to ensure we don't miss any batches. + b.currentSafeL1Origin = safeL1Origin + b.startingBatchPos = 0 + b.readPos = 0 + b.batches = make([]*B, 0) + if cast, castOk := b.streamer.(interface{ RefreshSafeL1Origin(eth.BlockID) error }); castOk { + // If the underlying streamer has a method to refresh the safe L1 origin, + // we call it to ensure it is aware of the new safe L1 origin. + return cast.RefreshSafeL1Origin(safeL1Origin) + } + return nil + } + + b.currentSafeL1Origin = safeL1Origin + return nil +} + +// Refresh implements EspressoStreamerIFace +func (b *BufferedEspressoStreamer[B]) Refresh(ctx context.Context, finalizedL1 eth.L1BlockRef, safeBatchNumber uint64, safeL1Origin eth.BlockID) error { + b.handleL2PositionUpdate(safeBatchNumber) + if err := b.RefreshSafeL1Origin(safeL1Origin); err != nil { + return err + } + + return b.streamer.Refresh(ctx, finalizedL1, safeBatchNumber, safeL1Origin) +} + +// Reset resets the buffered streamer state to the last known good +// safe batch position. +func (b *BufferedEspressoStreamer[B]) Reset() { + // Reset the buffered streamer state + b.readPos = 0 +} + +// HasNext implements EspressoStreamerIFace +// +// It checks to see if there are any batches left to read in its local buffer. +// If there are no batches left in the buffer, it defers to the underlying +// streamer to determine if there are more batches available. +func (b *BufferedEspressoStreamer[B]) HasNext(ctx context.Context) bool { + if b.readPos < uint64(len(b.batches)) { + return true + } + + return b.streamer.HasNext(ctx) +} + +// Next implements EspressoStreamerIFace +// +// It returns the next batch from the local buffer if available, or fetches +// it from the underlying streamer if not, appending to its local underlying +// buffer in the process. +func (b *BufferedEspressoStreamer[B]) Next(ctx context.Context) *B { + if b.readPos < uint64(len(b.batches)) { + // If we have a batch in the buffer, return it + batch := b.batches[b.readPos] + b.readPos++ + return batch + } + + // If we don't have a batch in the buffer, fetch the next one from the streamer + batch := b.streamer.Next(ctx) + + // No more batches available at the moment + if batch == nil { + return nil + } + + number := (*batch).Number() + if number < b.startingBatchPos { + // If the batch number is before the starting batch position, we ignore + // it, and want to fetch the next one + return b.Next(ctx) + } + + b.batches = append(b.batches, batch) + b.readPos++ + return batch + +} + +// UnmarshalBatch implements EspressoStreamerIFace +func (b *BufferedEspressoStreamer[B]) UnmarshalBatch(data []byte) (*B, error) { + // Delegate the unmarshalling to the underlying streamer + return b.streamer.UnmarshalBatch(data) +} diff --git a/espresso/enclave-tests/enclave_smoke_test.go b/espresso/enclave-tests/enclave_smoke_test.go index b8746f8ebcc..ad04ef8f747 100644 --- a/espresso/enclave-tests/enclave_smoke_test.go +++ b/espresso/enclave-tests/enclave_smoke_test.go @@ -18,9 +18,9 @@ import ( env "github.com/ethereum-optimism/optimism/espresso/environment" ) -// TestE2eDevNetWithEspressoSimpleTransactions launches the e2e Dev Net with the Espresso Dev Node +// TestE2eDevNetEnclaveWithEspressoSimpleTransactions launches the e2e Dev Net with the Espresso Dev Node // and runs a couple of simple transactions to it. -func TestE2eDevNetWithEspressoSimpleTransactions(t *testing.T) { +func TestE2eDevNetEnclaveWithEspressoSimpleTransactions(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/espresso/environment/8_reorg_test.go b/espresso/environment/8_reorg_test.go index c2da452472d..33c32d22340 100644 --- a/espresso/environment/8_reorg_test.go +++ b/espresso/environment/8_reorg_test.go @@ -83,7 +83,7 @@ func TestBatcherWaitForFinality(t *testing.T) { // VerifyL1OriginFinalized checks whether every batch in the batch buffer has a finalized L1 // origin. -func VerifyL1OriginFinalized(t *testing.T, streamer *espresso.EspressoStreamer[derive.EspressoBatch], l1Client *ethclient.Client) bool { +func VerifyL1OriginFinalized(t *testing.T, streamer *espresso.BatchStreamer[derive.EspressoBatch], l1Client *ethclient.Client) bool { for i := 0; i < streamer.BatchBuffer.Len(); i++ { batch := streamer.BatchBuffer.Get(i) origin := (batch).L1Origin() @@ -103,7 +103,7 @@ func VerifyL1OriginFinalized(t *testing.T, streamer *espresso.EspressoStreamer[d } // VerifyBatchBufferUpdated checks whether the batch buffer is updated before the timeout. -func VerifyBatchBufferUpdated(ctx context.Context, streamer *espresso.EspressoStreamer[derive.EspressoBatch]) bool { +func VerifyBatchBufferUpdated(ctx context.Context, streamer *espresso.BatchStreamer[derive.EspressoBatch]) bool { tickerBufferInsert := time.NewTicker(100 * time.Millisecond) defer tickerBufferInsert.Stop() for { diff --git a/espresso/interface.go b/espresso/interface.go new file mode 100644 index 00000000000..da3955505b2 --- /dev/null +++ b/espresso/interface.go @@ -0,0 +1,65 @@ +package espresso + +import ( + "context" + + "github.com/ethereum-optimism/optimism/op-service/eth" +) + +// EspressoStreamer defines the interface for the Espresso streamer. +type EspressoStreamer[B Batch] interface { + // Update will update the `EspressoStreamer“ by attempting to ensure that + // the next call to the `Next` method will return a `Batch`. + // + // It attempts to ensure the existence of a next batch, provided no errors + // occur when communicating with HotShot, by processing Blocks retrieved + // from `HotShot` in discreet batches. If each processing of a batch of + // blocks will not yield a new `Batch`, then it will continue to process + // the next batch of blocks from HotShot until it runs out of blocks to + // process. + // + // NOTE: this method is best effort. It is unable to guarantee that the + // next call to `Next` will return a batch. However, the only things + // that will prevent the next call to `Next` from returning a batch is if + // there are no more HotShot blocks to process currently, or if an error + // occurs when communicating with HotShot. + Update(ctx context.Context) error + + // Refresh updates the local references of the EspressoStreamer to the + // specified values. + // + // These values can be used to help determine whether the Streamer needs + // to be reset or not. + // + // NOTE: This will only automatically reset the Streamer if the + // `safeBatchNumber` moves backwards. + Refresh(ctx context.Context, finalizedL1 eth.L1BlockRef, safeBatchNumber uint64, safeL1Origin eth.BlockID) error + + // RefreshSafeL1Origin updates the safe L1 origin for the streamer. This is + // used to help the streamer determine if it needs to be reset or not based + // on the safe L1 origin moving backwards. + // + // NOTE: This will only automatically reset the Streamer if the + // `safeL1Origin` moves backwards. + RefreshSafeL1Origin(safeL1Origin eth.BlockID) error + + // Reset will reset the Streamer to the last known good safe state. + // This generally means resetting to the last know good safe batch + // position, but in the case of consuming blocks from Espresso, it will + // also reset the starting Espresso block position to the last known + // good safe block position there as well. + Reset() + + // UnmarshalBatch is a convenience method that allows the caller to + // attempt to unmarshal a batch from the provided byte slice. + UnmarshalBatch(b []byte) (*B, error) + + // HasNext checks to see if there are any batches left to read in the + // streamer. + HasNext(ctx context.Context) bool + + // Next attempts to return the next batch from the streamer. If there + // are no batches left to read, at the moment of the call, it will return + // nil. + Next(ctx context.Context) *B +} diff --git a/espresso/streamer.go b/espresso/streamer.go index 2a35ac81ca4..504f7ca798f 100644 --- a/espresso/streamer.go +++ b/espresso/streamer.go @@ -68,7 +68,7 @@ func GetFinalizedL1(header *espressoCommon.HeaderImpl) espressoCommon.L1BlockInf panic("Unsupported header version") } -type EspressoStreamer[B Batch] struct { +type BatchStreamer[B Batch] struct { // Namespace of the rollup we're interested in Namespace uint64 @@ -96,9 +96,13 @@ type EspressoStreamer[B Batch] struct { // Manage the batches which origin is unfinalized RemainingBatches map[common.Hash]B - UnmarshalBatch func([]byte) (*B, error) + unmarshalBatch func([]byte) (*B, error) } +// Compile time assertion to ensure EspressoStreamer implements +// EspressoStreamerIFace +var _ EspressoStreamer[Batch] = (*BatchStreamer[Batch])(nil) + func NewEspressoStreamer[B Batch]( namespace uint64, l1Client L1Client, @@ -107,8 +111,8 @@ func NewEspressoStreamer[B Batch]( log log.Logger, unmarshalBatch func([]byte) (*B, error), pollingHotShotPollingInterval time.Duration, -) EspressoStreamer[B] { - return EspressoStreamer[B]{ +) *BatchStreamer[B] { + return &BatchStreamer[B]{ L1Client: l1Client, EspressoClient: espressoClient, EspressoLightClient: lightClient, @@ -118,25 +122,36 @@ func NewEspressoStreamer[B Batch]( BatchBuffer: NewBatchBuffer[B](), PollingHotShotPollingInterval: pollingHotShotPollingInterval, RemainingBatches: make(map[common.Hash]B), - UnmarshalBatch: unmarshalBatch, + unmarshalBatch: unmarshalBatch, } } // Reset the state to the last safe batch -func (s *EspressoStreamer[B]) Reset() { +func (s *BatchStreamer[B]) Reset() { s.Log.Info("reset espresso streamer", "hotshot pos", s.fallbackHotShotPos, "batch pos", s.fallbackBatchPos) s.hotShotPos = s.fallbackHotShotPos s.BatchPos = s.fallbackBatchPos + 1 s.BatchBuffer.Clear() } +// RefreshSafeL1Origin is a convenience method that allows us to update the +// safe L1 origin of the Streamer. It will confirm the Espresso Block Height +// and reset the state if necessary. +func (s *BatchStreamer[B]) RefreshSafeL1Origin(safeL1Origin eth.BlockID) error { + shouldReset, err := s.confirmEspressoBlockHeight(safeL1Origin) + if shouldReset { + s.Reset() + } + + return err +} + // Handle both L1 reorgs and batcher restarts by updating our state in case it is // not consistent with what's on the L1. -func (s *EspressoStreamer[B]) Refresh(ctx context.Context, finalizedL1 eth.L1BlockRef, safeBatchNumber uint64, safeL1Origin eth.BlockID) error { +func (s *BatchStreamer[B]) Refresh(ctx context.Context, finalizedL1 eth.L1BlockRef, safeBatchNumber uint64, safeL1Origin eth.BlockID) error { s.FinalizedL1 = finalizedL1 - err := s.confirmEspressoBlockHeight(safeL1Origin) - if err != nil { + if err := s.RefreshSafeL1Origin(safeL1Origin); err != nil { return err } @@ -146,13 +161,17 @@ func (s *EspressoStreamer[B]) Refresh(ctx context.Context, finalizedL1 eth.L1Blo return nil } + shouldReset := safeBatchNumber < s.fallbackBatchPos s.fallbackBatchPos = safeBatchNumber - s.Reset() + if shouldReset { + s.Reset() + } return nil } -func (s *EspressoStreamer[B]) CheckBatch(ctx context.Context, batch B) (BatchValidity, int) { - +// CheckBatch checks the validity of the given batch against the finalized L1 +// block and the safe L1 origin. +func (s *BatchStreamer[B]) CheckBatch(ctx context.Context, batch B) (BatchValidity, int) { // Make sure the finalized L1 block is initialized before checking the block number. if s.FinalizedL1 == (eth.L1BlockRef{}) { s.Log.Error("Finalized L1 block not initialized") @@ -202,7 +221,7 @@ const HOTSHOT_BLOCK_LOAD_LIMIT = 100 // from Espresso. It starts from the last processed block and goes up to // HOTSHOT_BLOCK_LOAD_LIMIT blocks ahead or the current block height, whichever // is smaller. -func (s *EspressoStreamer[B]) computeEspressoBlockHeightsRange(currentBlockHeight uint64) (start uint64, finish uint64) { +func (s *BatchStreamer[B]) computeEspressoBlockHeightsRange(currentBlockHeight uint64) (start uint64, finish uint64) { start = s.hotShotPos if start > 0 { // We've already processed the block in hotShotPos. In order to avoid @@ -228,7 +247,7 @@ func (s *EspressoStreamer[B]) computeEspressoBlockHeightsRange(currentBlockHeigh // that will prevent the next call to `Next` from returning a batch is if // there are no more HotShot blocks to process currently, or if an error // occurs when communicating with HotShot. -func (s *EspressoStreamer[B]) Update(ctx context.Context) error { +func (s *BatchStreamer[B]) Update(ctx context.Context) error { // Retrieve the current block height from Espresso. We grab this reference // so we don't have to keep fetching it in a loop, and it informs us of // the current block height available to process. @@ -289,7 +308,7 @@ func (s *EspressoStreamer[B]) Update(ctx context.Context) error { // It will also update the hotShotPos to the last block processed, in order // to effectively keep track of the last block we have successfully fetched, // and therefore processed from Hotshot. -func (s *EspressoStreamer[B]) processHotShotRange(ctx context.Context, start, finish uint64) error { +func (s *BatchStreamer[B]) processHotShotRange(ctx context.Context, start, finish uint64) error { // Process the new batches fetched from Espresso for height := start; height <= finish; height++ { s.Log.Trace("Fetching HotShot block", "block", height) @@ -320,7 +339,7 @@ func (s *EspressoStreamer[B]) processHotShotRange(ctx context.Context, start, fi // processRemainingBatches is a helper method that checks the remaining batches // and prunes or adds them to the batch buffer as appropriate. -func (s *EspressoStreamer[B]) processRemainingBatches(ctx context.Context) { +func (s *BatchStreamer[B]) processRemainingBatches(ctx context.Context) { // Collect keys to delete, without modifying the batch list during iteration. var keysToDelete []common.Hash @@ -365,7 +384,7 @@ func (s *EspressoStreamer[B]) processRemainingBatches(ctx context.Context) { // processEspressoTransactions is a helper method that encapsulates the logic of // processing batches from the transactions in a block fetched from Espresso. -func (s *EspressoStreamer[B]) processEspressoTransactions(ctx context.Context, i uint64, txns espressoClient.TransactionsInBlock) { +func (s *BatchStreamer[B]) processEspressoTransactions(ctx context.Context, i uint64, txns espressoClient.TransactionsInBlock) { for _, transaction := range txns.Transactions { batch, err := s.UnmarshalBatch(transaction) if err != nil { @@ -407,11 +426,11 @@ func (s *EspressoStreamer[B]) processEspressoTransactions(ctx context.Context, i } } -func (s *EspressoStreamer[B]) Next(ctx context.Context) *B { +// UnmarshalBatch implements EspressoStreamerIFace +func (s *BatchStreamer[B]) Next(ctx context.Context) *B { // Is the next batch available? if s.HasNext(ctx) { // Current batch is going to be processed, update fallback batch position - s.fallbackBatchPos = s.BatchPos s.BatchPos += 1 return s.BatchBuffer.Pop() } @@ -419,7 +438,8 @@ func (s *EspressoStreamer[B]) Next(ctx context.Context) *B { return nil } -func (s *EspressoStreamer[B]) HasNext(ctx context.Context) bool { +// HasNext implements EspressoStreamerIFace +func (s *BatchStreamer[B]) HasNext(ctx context.Context) bool { if s.BatchBuffer.Len() > 0 { return (*s.BatchBuffer.Peek()).Number() == s.BatchPos } @@ -433,16 +453,22 @@ func (s *EspressoStreamer[B]) HasNext(ctx context.Context) bool { // // For reference on why doing this guarantees we won't skip any unsafe blocks: // https://eng-wiki.espressosys.com/mainch30.html#:Components:espresso%20streamer:initializing%20hotshot%20height -func (s *EspressoStreamer[B]) confirmEspressoBlockHeight(safeL1Origin eth.BlockID) error { +func (s *BatchStreamer[B]) confirmEspressoBlockHeight(safeL1Origin eth.BlockID) (shouldReset bool, err error) { hotshotState, err := s.EspressoLightClient. FinalizedState(&bind.CallOpts{BlockNumber: new(big.Int).SetUint64(safeL1Origin.Number)}) if errors.Is(err, bind.ErrNoCode) { s.fallbackHotShotPos = 0 - return nil + return false, nil } else if err != nil { - return err + return false, err } + shouldReset = hotshotState.BlockHeight < s.fallbackHotShotPos s.fallbackHotShotPos = hotshotState.BlockHeight - return nil + return shouldReset, nil +} + +// UnmarshalBatch implements EspressoStreamerIFace +func (s *BatchStreamer[B]) UnmarshalBatch(b []byte) (*B, error) { + return s.unmarshalBatch(b) } diff --git a/espresso/streamer_test.go b/espresso/streamer_test.go index 286c48fb889..ba9902e39e2 100644 --- a/espresso/streamer_test.go +++ b/espresso/streamer_test.go @@ -266,7 +266,7 @@ func createL2BlockRef(height uint64, l1Ref eth.L1BlockRef) eth.L2BlockRef { // setupStreamerTesting initializes a MockStreamerSource and an EspressoStreamer // for testing purposes. It sets up the initial state of the MockStreamerSource // and returns both the MockStreamerSource and the EspressoStreamer. -func setupStreamerTesting(namespace uint64, batcherAddress common.Address) (*MockStreamerSource, espresso.EspressoStreamer[derive.EspressoBatch]) { +func setupStreamerTesting(namespace uint64, batcherAddress common.Address) (*MockStreamerSource, *espresso.BatchStreamer[derive.EspressoBatch]) { state := NewMockStreamerSource() logger := new(NoOpLogger) diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index 01892a4057a..7f50cd45275 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -149,16 +149,18 @@ func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter { channelMgr: state, } - batchSubmitter.espressoStreamer = espresso.NewEspressoStreamer( - batchSubmitter.RollupConfig.L2ChainID.Uint64(), - NewAdaptL1BlockRefClient(batchSubmitter.L1Client), - batchSubmitter.Espresso, - batchSubmitter.EspressoLightClient, - batchSubmitter.Log, - func(data []byte) (*derive.EspressoBatch, error) { - return derive.UnmarshalEspressoTransaction(data, batchSubmitter.SequencerAddress) - }, - 2*time.Second, + batchSubmitter.espressoStreamer = espresso.NewBufferedEspressoStreamer( + espresso.NewEspressoStreamer( + batchSubmitter.RollupConfig.L2ChainID.Uint64(), + NewAdaptL1BlockRefClient(batchSubmitter.L1Client), + batchSubmitter.Espresso, + batchSubmitter.EspressoLightClient, + batchSubmitter.Log, + func(data []byte) (*derive.EspressoBatch, error) { + return derive.UnmarshalEspressoTransaction(data, batchSubmitter.SequencerAddress) + }, + 2*time.Second, + ), ) log.Info("Streamer started", "streamer", batchSubmitter.espressoStreamer) diff --git a/op-batcher/batcher/espresso.go b/op-batcher/batcher/espresso.go index 1d20ac974fd..9ae4c68f624 100644 --- a/op-batcher/batcher/espresso.go +++ b/op-batcher/batcher/espresso.go @@ -649,8 +649,8 @@ func (s *espressoTransactionSubmitter) Start() { go s.handleVerifyReceiptJobResponse() } -func (bs *BatcherService) EspressoStreamer() *espressoLocal.EspressoStreamer[derive.EspressoBatch] { - return &bs.driver.espressoStreamer +func (bs *BatcherService) EspressoStreamer() espressoLocal.EspressoStreamer[derive.EspressoBatch] { + return bs.driver.espressoStreamer } func (bs *BatcherService) initKeyPair() error { @@ -664,8 +664,8 @@ func (bs *BatcherService) initKeyPair() error { } // EspressoStreamer returns the batch submitter's Espresso streamer instance -func (l *BatchSubmitter) EspressoStreamer() *espresso.EspressoStreamer[derive.EspressoBatch] { - return &l.espressoStreamer +func (l *BatchSubmitter) EspressoStreamer() espresso.EspressoStreamer[derive.EspressoBatch] { + return l.espressoStreamer } // Converts a block to an EspressoBatch and starts a goroutine that publishes it to Espresso @@ -690,7 +690,7 @@ func (l *BatchSubmitter) queueBlockToEspresso(ctx context.Context, block *types. } func (l *BatchSubmitter) espressoSyncAndRefresh(ctx context.Context, newSyncStatus *eth.SyncStatus) { - err := l.espressoStreamer.Refresh(ctx, newSyncStatus.FinalizedL1, newSyncStatus.SafeL2.Number, newSyncStatus.SafeL2.L1Origin) + err := l.espressoStreamer.Refresh(ctx, newSyncStatus.FinalizedL1, newSyncStatus.FinalizedL2.Number, newSyncStatus.FinalizedL2.L1Origin) if err != nil { l.Log.Warn("Failed to refresh Espresso streamer", "err", err) } @@ -755,10 +755,6 @@ func (l *BatchSubmitter) espressoBatchLoadingLoop(ctx context.Context, wg *sync. l.espressoSyncAndRefresh(ctx, newSyncStatus) err = l.espressoStreamer.Update(ctx) - remainingListLen := len(l.espressoStreamer.RemainingBatches) - if remainingListLen > 0 { - l.Log.Warn("Remaining list not empty.", "Number items", remainingListLen) - } var batch *derive.EspressoBatch diff --git a/op-e2e/e2eutils/opnode/opnode.go b/op-e2e/e2eutils/opnode/opnode.go index ce861222554..022e5e59894 100644 --- a/op-e2e/e2eutils/opnode/opnode.go +++ b/op-e2e/e2eutils/opnode/opnode.go @@ -24,7 +24,7 @@ type Opnode struct { // // Note: This function should be used carefully to avoid a stall, since it is a getter and does not // create a new instance, which means the caller may deprive the node of the batches. -func (o *Opnode) EspressoStreamer() *espresso.EspressoStreamer[derive.EspressoBatch] { +func (o *Opnode) EspressoStreamer() *espresso.BatchStreamer[derive.EspressoBatch] { return o.node.EspressoStreamer() } diff --git a/op-node/node/node.go b/op-node/node/node.go index d31ed3017b8..d68a1b7c2f7 100644 --- a/op-node/node/node.go +++ b/op-node/node/node.go @@ -542,7 +542,7 @@ func (n *OpNode) initP2PSigner(ctx context.Context, cfg *Config) (err error) { return } -func (n *OpNode) EspressoStreamer() *espresso.EspressoStreamer[derive.EspressoBatch] { +func (n *OpNode) EspressoStreamer() *espresso.BatchStreamer[derive.EspressoBatch] { return n.l2Driver.SyncDeriver.Derivation.EspressoStreamer() } diff --git a/op-node/rollup/derive/attributes_queue.go b/op-node/rollup/derive/attributes_queue.go index e6d8482894e..2cf6196d432 100644 --- a/op-node/rollup/derive/attributes_queue.go +++ b/op-node/rollup/derive/attributes_queue.go @@ -65,7 +65,7 @@ type AttributesQueue struct { lastAttribs *AttributesWithParent isCaffNode bool - espressoStreamer *espresso.EspressoStreamer[EspressoBatch] + espressoStreamer *espresso.BatchStreamer[EspressoBatch] } type SingularBatchProvider interface { @@ -75,7 +75,7 @@ type SingularBatchProvider interface { NextBatch(context.Context, eth.L2BlockRef) (*SingularBatch, bool, error) } -func initEspressoStreamer(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher) *espresso.EspressoStreamer[EspressoBatch] { +func initEspressoStreamer(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetcher) *espresso.BatchStreamer[EspressoBatch] { if !cfg.CaffNodeConfig.IsCaffNode { log.Info("Espresso streamer not initialized: Caff node is not enabled") @@ -117,7 +117,7 @@ func initEspressoStreamer(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetche log.Debug("Espresso Streamer namespace:", streamer.Namespace) log.Info("Espresso streamer initialized", "namespace", cfg.L2ChainID.Uint64(), "next hotshot block num", cfg.CaffNodeConfig.NextHotShotBlockNum, "polling hotshot polling interval", cfg.CaffNodeConfig.PollingHotShotPollingInterval, "hotshot urls", cfg.CaffNodeConfig.HotShotUrls) - return &streamer + return streamer } func NewAttributesQueue(log log.Logger, cfg *rollup.Config, builder AttributesBuilder, prev SingularBatchProvider, l1Fetcher L1Fetcher) *AttributesQueue { @@ -143,7 +143,7 @@ func (aq *AttributesQueue) Origin() eth.L1BlockRef { // but with a few key differences: // - It only calls Update() when needed and everytime only calls Next() once. While the batcher calls Next() in a loop. // - It performs additional checks, such as validating the timestamp and parent hash, which does not apply to the batcher. -func CaffNextBatch(s *espresso.EspressoStreamer[EspressoBatch], ctx context.Context, parent eth.L2BlockRef, blockTime uint64, l1Fetcher L1Fetcher) (*SingularBatch, bool, error) { +func CaffNextBatch(s *espresso.BatchStreamer[EspressoBatch], ctx context.Context, parent eth.L2BlockRef, blockTime uint64, l1Fetcher L1Fetcher) (*SingularBatch, bool, error) { // Get the L1 finalized block finalizedL1Block, err := l1Fetcher.L1BlockRefByLabel(ctx, eth.Finalized) if err != nil { diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 18c1bb22e91..a2b8aad4766 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -289,6 +289,6 @@ func (dp *DerivationPipeline) ConfirmEngineReset() { dp.engineIsReset = true } -func (dp *DerivationPipeline) EspressoStreamer() *espresso.EspressoStreamer[EspressoBatch] { +func (dp *DerivationPipeline) EspressoStreamer() *espresso.BatchStreamer[EspressoBatch] { return dp.attrib.espressoStreamer } diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index 9bd18f30cda..65f8420f4b6 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -79,7 +79,7 @@ type DerivationPipeline interface { Origin() eth.L1BlockRef DerivationReady() bool ConfirmEngineReset() - EspressoStreamer() *espresso.EspressoStreamer[derive.EspressoBatch] + EspressoStreamer() *espresso.BatchStreamer[derive.EspressoBatch] } type EngineController interface {