Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 27 additions & 30 deletions op-service/bgpo/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bgpo

import (
"context"
"errors"
"fmt"
"math/big"
"sort"
Expand All @@ -24,7 +25,6 @@ type BTOBackend interface {
BlockNumber(ctx context.Context) (uint64, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error)
SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error)
}

// BlobTipOracle tracks blob base gas prices by subscribing to new block headers
Expand All @@ -49,7 +49,6 @@ type BlobTipOracle struct {
ctx context.Context
cancel context.CancelFunc

sub ethereum.Subscription
loopDone chan struct{}
}

Expand Down Expand Up @@ -121,48 +120,49 @@ func NewBlobTipOracle(backend BTOBackend, chainConfig *params.ChainConfig, log l
}

// Start starts the oracle's background processing. It returns after the cache is prepopulated and
// the subscription is set up. To stop the background processing, call [BlobTipOracle.Close]. The
// background processing will also stop if the subscription fails.
// the polling loop is started. To stop the background processing, call [BlobTipOracle.Close].
func (o *BlobTipOracle) Start() error {
// Pre-populate cache with recent blocks before subscribing
// Pre-populate cache with recent blocks before starting the polling loop
if err := o.prePopulateCache(); err != nil {
o.log.Warn("Failed to pre-populate cache, continuing anyway", "err", err)
}

headers := make(chan *types.Header, 10)

sub, err := o.backend.SubscribeNewHead(o.ctx, headers)
if err != nil {
return err
}
o.sub = sub
o.log.Info("Blob tip oracle started, subscribed to newHeads")
o.log.Info("Blob tip oracle started, polling for new headers")

o.loopDone = make(chan struct{})
go o.processHeaders(headers)
go o.pollLoop()
return nil
}

func (o *BlobTipOracle) processHeaders(headers chan *types.Header) {
defer o.log.Debug("Blob tip oracle header processing loop exited")
func (o *BlobTipOracle) pollLoop() {
defer o.log.Debug("Blob tip oracle polling loop exited")
defer close(o.loopDone)

// Process headers as they arrive
ticker := time.NewTicker(o.config.PollRate)
defer ticker.Stop()

for {
select {
case header := <-headers:
if err := o.processHeader(header); err != nil {
o.log.Error("Error processing header", "err", err, "block", header.Number)
}
case err := <-o.sub.Err():
if err != nil {
o.log.Error("Subscription error", "err", err)
return
}
return
case <-o.ctx.Done():
o.log.Info("Blob tip oracle context canceled")
return
case <-ticker.C:
nextBlock := o.latestBlock + 1
header, err := func() (*types.Header, error) {
ctx, cancel := context.WithTimeout(o.ctx, o.config.NetworkTimeout)
defer cancel()
return o.backend.HeaderByNumber(ctx, big.NewInt(int64(nextBlock)))
}()
if errors.Is(err, ethereum.NotFound) {
continue // Block not yet available
}
if err != nil {
o.log.Warn("Failed to get header", "err", err, "block", nextBlock)
continue
}
if err := o.processHeader(header); err != nil {
o.log.Error("Error processing header", "err", err, "block", nextBlock)
}
}
}
}
Expand Down Expand Up @@ -379,9 +379,6 @@ func (o *BlobTipOracle) extractTipsForBlobTxs(block *types.Block, baseFee *big.I
// Close stops the oracle and cleans up resources.
func (o *BlobTipOracle) Close() {
o.cancel()
if o.sub != nil {
o.sub.Unsubscribe()
}
if o.loopDone != nil {
<-o.loopDone
}
Expand Down
78 changes: 23 additions & 55 deletions op-service/bgpo/oracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/ethereum-optimism/optimism/op-service/testlog"
)

// mockBTOBackend mocks BTOBackend for testing.

type mockBTOBackend struct {
mock.Mock
}
Expand All @@ -44,38 +46,8 @@ func (m *mockBTOBackend) BlockByNumber(ctx context.Context, number *big.Int) (*t
return args.Get(0).(*types.Block), args.Error(1)
}

func (m *mockBTOBackend) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) {
args := m.Called(ctx, ch)
if args.Get(0) == nil {
return nil, args.Error(1)
}
return args.Get(0).(ethereum.Subscription), args.Error(1)
}

var _ BTOBackend = (*mockBTOBackend)(nil)

// mockSubscription implements ethereum.Subscription for testing.
type mockSubscription struct {
errCh chan error
unsubbed bool
}

func newMockSubscription() *mockSubscription {
return &mockSubscription{
errCh: make(chan error, 1),
}
}

func (s *mockSubscription) Unsubscribe() {
if !s.unsubbed {
s.unsubbed = true
}
}

func (s *mockSubscription) Err() <-chan error {
return s.errCh
}

func createHeader(blockNum uint64, excessBlobGas *uint64) *types.Header {
header := &types.Header{
Number: big.NewInt(int64(blockNum)),
Expand Down Expand Up @@ -433,19 +405,20 @@ func TestExtractBlobFeeCaps(t *testing.T) {
}

func TestOracleLifecycle(t *testing.T) {
mbackend := new(mockBTOBackend)
chainConfig := params.MainnetChainConfig
logger := testlog.Logger(t, log.LevelDebug)

oracle := NewBlobTipOracle(mbackend, chainConfig, logger, &BlobTipOracleConfig{
PricesCacheSize: 10,
BlockCacheSize: 10,
MaxBlocks: 2,
Percentile: 60,
NetworkTimeout: time.Second,
})
t.Run("start and close with polling", func(t *testing.T) {
mbackend := new(mockBTOBackend)
oracle := NewBlobTipOracle(mbackend, chainConfig, logger, &BlobTipOracleConfig{
PricesCacheSize: 10,
BlockCacheSize: 10,
MaxBlocks: 2,
Percentile: 60,
NetworkTimeout: time.Second,
PollRate: 50 * time.Millisecond, // Fast polling for test
})

t.Run("start and close", func(t *testing.T) {
latestBlock := uint64(100)

// Mock pre-population calls
Expand All @@ -459,12 +432,15 @@ func TestOracleLifecycle(t *testing.T) {
mbackend.On("BlockByNumber", mock.Anything, big.NewInt(int64(i))).Return(block, nil).Once()
}

// Mock subscription
sub := newMockSubscription()
var headerCh chan<- *types.Header
mbackend.On("SubscribeNewHead", mock.Anything, mock.Anything).Run(func(args mock.Arguments) {
headerCh = args.Get(1).(chan<- *types.Header)
}).Return(sub, nil).Once()
// Mock polling: first return NotFound, then return a new header
mbackend.On("HeaderByNumber", mock.Anything, big.NewInt(101)).Return(nil, ethereum.NotFound).Once()
newHeader := createHeader(101, &excessBlobGas)
newBlock := createBlock(101, newHeader.BaseFee, []*types.Transaction{})
mbackend.On("HeaderByNumber", mock.Anything, big.NewInt(101)).Return(newHeader, nil).Once()
mbackend.On("BlockByNumber", mock.Anything, big.NewInt(101)).Return(newBlock, nil).Once()

// After processing block 101, polling will try block 102 which doesn't exist
mbackend.On("HeaderByNumber", mock.Anything, big.NewInt(102)).Return(nil, ethereum.NotFound).Maybe()

// Start the oracle
err := oracle.Start()
Expand All @@ -481,14 +457,7 @@ func TestOracleLifecycle(t *testing.T) {
require.Equal(t, uint64(100), latestBlockNum)
require.NotNil(t, fee)

// Send a new header through the subscription to verify processing works
newHeader := createHeader(101, &excessBlobGas)
newBlock := createBlock(101, newHeader.BaseFee, []*types.Transaction{})
mbackend.On("BlockByNumber", mock.Anything, big.NewInt(101)).Return(newBlock, nil).Once()

headerCh <- newHeader

// Give the goroutine time to process
// Wait for polling to pick up the new header
require.Eventually(t, func() bool {
latestBlockNum, _ = oracle.GetLatestBlobBaseFee()
return latestBlockNum == 101
Expand All @@ -497,8 +466,6 @@ func TestOracleLifecycle(t *testing.T) {
// Close the oracle
oracle.Close()

// Verify subscription was unsubscribed
require.True(t, sub.unsubbed, "subscription should be unsubscribed after Close")
select {
case <-oracle.loopDone:
// Expect loop to have exited
Expand All @@ -510,6 +477,7 @@ func TestOracleLifecycle(t *testing.T) {
})

t.Run("close before start is safe", func(t *testing.T) {
mbackend := new(mockBTOBackend)
oracle2 := NewBlobTipOracle(mbackend, chainConfig, logger, &BlobTipOracleConfig{
PricesCacheSize: 10,
BlockCacheSize: 10,
Expand Down