diff --git a/op-service/bgpo/oracle.go b/op-service/bgpo/oracle.go index c945263637b68..abd4a1f3059f5 100644 --- a/op-service/bgpo/oracle.go +++ b/op-service/bgpo/oracle.go @@ -2,6 +2,7 @@ package bgpo import ( "context" + "errors" "fmt" "math/big" "sort" @@ -24,7 +25,6 @@ type BTOBackend interface { BlockNumber(ctx context.Context) (uint64, error) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) - SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) } // BlobTipOracle tracks blob base gas prices by subscribing to new block headers @@ -49,7 +49,6 @@ type BlobTipOracle struct { ctx context.Context cancel context.CancelFunc - sub ethereum.Subscription loopDone chan struct{} } @@ -121,48 +120,49 @@ func NewBlobTipOracle(backend BTOBackend, chainConfig *params.ChainConfig, log l } // Start starts the oracle's background processing. It returns after the cache is prepopulated and -// the subscription is set up. To stop the background processing, call [BlobTipOracle.Close]. The -// background processing will also stop if the subscription fails. +// the polling loop is started. To stop the background processing, call [BlobTipOracle.Close]. func (o *BlobTipOracle) Start() error { - // Pre-populate cache with recent blocks before subscribing + // Pre-populate cache with recent blocks before starting the polling loop if err := o.prePopulateCache(); err != nil { o.log.Warn("Failed to pre-populate cache, continuing anyway", "err", err) } - headers := make(chan *types.Header, 10) - - sub, err := o.backend.SubscribeNewHead(o.ctx, headers) - if err != nil { - return err - } - o.sub = sub - o.log.Info("Blob tip oracle started, subscribed to newHeads") + o.log.Info("Blob tip oracle started, polling for new headers") o.loopDone = make(chan struct{}) - go o.processHeaders(headers) + go o.pollLoop() return nil } -func (o *BlobTipOracle) processHeaders(headers chan *types.Header) { - defer o.log.Debug("Blob tip oracle header processing loop exited") +func (o *BlobTipOracle) pollLoop() { + defer o.log.Debug("Blob tip oracle polling loop exited") defer close(o.loopDone) - // Process headers as they arrive + ticker := time.NewTicker(o.config.PollRate) + defer ticker.Stop() + for { select { - case header := <-headers: - if err := o.processHeader(header); err != nil { - o.log.Error("Error processing header", "err", err, "block", header.Number) - } - case err := <-o.sub.Err(): - if err != nil { - o.log.Error("Subscription error", "err", err) - return - } - return case <-o.ctx.Done(): o.log.Info("Blob tip oracle context canceled") return + case <-ticker.C: + nextBlock := o.latestBlock + 1 + header, err := func() (*types.Header, error) { + ctx, cancel := context.WithTimeout(o.ctx, o.config.NetworkTimeout) + defer cancel() + return o.backend.HeaderByNumber(ctx, big.NewInt(int64(nextBlock))) + }() + if errors.Is(err, ethereum.NotFound) { + continue // Block not yet available + } + if err != nil { + o.log.Warn("Failed to get header", "err", err, "block", nextBlock) + continue + } + if err := o.processHeader(header); err != nil { + o.log.Error("Error processing header", "err", err, "block", nextBlock) + } } } } @@ -379,9 +379,6 @@ func (o *BlobTipOracle) extractTipsForBlobTxs(block *types.Block, baseFee *big.I // Close stops the oracle and cleans up resources. func (o *BlobTipOracle) Close() { o.cancel() - if o.sub != nil { - o.sub.Unsubscribe() - } if o.loopDone != nil { <-o.loopDone } diff --git a/op-service/bgpo/oracle_test.go b/op-service/bgpo/oracle_test.go index 02c687c3ee427..b0258566f97fb 100644 --- a/op-service/bgpo/oracle_test.go +++ b/op-service/bgpo/oracle_test.go @@ -19,6 +19,8 @@ import ( "github.com/ethereum-optimism/optimism/op-service/testlog" ) +// mockBTOBackend mocks BTOBackend for testing. + type mockBTOBackend struct { mock.Mock } @@ -44,38 +46,8 @@ func (m *mockBTOBackend) BlockByNumber(ctx context.Context, number *big.Int) (*t return args.Get(0).(*types.Block), args.Error(1) } -func (m *mockBTOBackend) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { - args := m.Called(ctx, ch) - if args.Get(0) == nil { - return nil, args.Error(1) - } - return args.Get(0).(ethereum.Subscription), args.Error(1) -} - var _ BTOBackend = (*mockBTOBackend)(nil) -// mockSubscription implements ethereum.Subscription for testing. -type mockSubscription struct { - errCh chan error - unsubbed bool -} - -func newMockSubscription() *mockSubscription { - return &mockSubscription{ - errCh: make(chan error, 1), - } -} - -func (s *mockSubscription) Unsubscribe() { - if !s.unsubbed { - s.unsubbed = true - } -} - -func (s *mockSubscription) Err() <-chan error { - return s.errCh -} - func createHeader(blockNum uint64, excessBlobGas *uint64) *types.Header { header := &types.Header{ Number: big.NewInt(int64(blockNum)), @@ -433,19 +405,20 @@ func TestExtractBlobFeeCaps(t *testing.T) { } func TestOracleLifecycle(t *testing.T) { - mbackend := new(mockBTOBackend) chainConfig := params.MainnetChainConfig logger := testlog.Logger(t, log.LevelDebug) - oracle := NewBlobTipOracle(mbackend, chainConfig, logger, &BlobTipOracleConfig{ - PricesCacheSize: 10, - BlockCacheSize: 10, - MaxBlocks: 2, - Percentile: 60, - NetworkTimeout: time.Second, - }) + t.Run("start and close with polling", func(t *testing.T) { + mbackend := new(mockBTOBackend) + oracle := NewBlobTipOracle(mbackend, chainConfig, logger, &BlobTipOracleConfig{ + PricesCacheSize: 10, + BlockCacheSize: 10, + MaxBlocks: 2, + Percentile: 60, + NetworkTimeout: time.Second, + PollRate: 50 * time.Millisecond, // Fast polling for test + }) - t.Run("start and close", func(t *testing.T) { latestBlock := uint64(100) // Mock pre-population calls @@ -459,12 +432,15 @@ func TestOracleLifecycle(t *testing.T) { mbackend.On("BlockByNumber", mock.Anything, big.NewInt(int64(i))).Return(block, nil).Once() } - // Mock subscription - sub := newMockSubscription() - var headerCh chan<- *types.Header - mbackend.On("SubscribeNewHead", mock.Anything, mock.Anything).Run(func(args mock.Arguments) { - headerCh = args.Get(1).(chan<- *types.Header) - }).Return(sub, nil).Once() + // Mock polling: first return NotFound, then return a new header + mbackend.On("HeaderByNumber", mock.Anything, big.NewInt(101)).Return(nil, ethereum.NotFound).Once() + newHeader := createHeader(101, &excessBlobGas) + newBlock := createBlock(101, newHeader.BaseFee, []*types.Transaction{}) + mbackend.On("HeaderByNumber", mock.Anything, big.NewInt(101)).Return(newHeader, nil).Once() + mbackend.On("BlockByNumber", mock.Anything, big.NewInt(101)).Return(newBlock, nil).Once() + + // After processing block 101, polling will try block 102 which doesn't exist + mbackend.On("HeaderByNumber", mock.Anything, big.NewInt(102)).Return(nil, ethereum.NotFound).Maybe() // Start the oracle err := oracle.Start() @@ -481,14 +457,7 @@ func TestOracleLifecycle(t *testing.T) { require.Equal(t, uint64(100), latestBlockNum) require.NotNil(t, fee) - // Send a new header through the subscription to verify processing works - newHeader := createHeader(101, &excessBlobGas) - newBlock := createBlock(101, newHeader.BaseFee, []*types.Transaction{}) - mbackend.On("BlockByNumber", mock.Anything, big.NewInt(101)).Return(newBlock, nil).Once() - - headerCh <- newHeader - - // Give the goroutine time to process + // Wait for polling to pick up the new header require.Eventually(t, func() bool { latestBlockNum, _ = oracle.GetLatestBlobBaseFee() return latestBlockNum == 101 @@ -497,8 +466,6 @@ func TestOracleLifecycle(t *testing.T) { // Close the oracle oracle.Close() - // Verify subscription was unsubscribed - require.True(t, sub.unsubbed, "subscription should be unsubscribed after Close") select { case <-oracle.loopDone: // Expect loop to have exited @@ -510,6 +477,7 @@ func TestOracleLifecycle(t *testing.T) { }) t.Run("close before start is safe", func(t *testing.T) { + mbackend := new(mockBTOBackend) oracle2 := NewBlobTipOracle(mbackend, chainConfig, logger, &BlobTipOracleConfig{ PricesCacheSize: 10, BlockCacheSize: 10,