diff --git a/op-batcher/batcher/batch_submitter.go b/op-batcher/batcher/batch_submitter.go index 964b925500388..132660614b2de 100644 --- a/op-batcher/batcher/batch_submitter.go +++ b/op-batcher/batcher/batch_submitter.go @@ -12,9 +12,9 @@ import ( gethrpc "github.com/ethereum/go-ethereum/rpc" "github.com/urfave/cli" + "github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/rpc" oplog "github.com/ethereum-optimism/optimism/op-service/log" - opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" oppprof "github.com/ethereum-optimism/optimism/op-service/pprof" oprpc "github.com/ethereum-optimism/optimism/op-service/rpc" ) @@ -36,9 +36,10 @@ func Main(version string, cliCtx *cli.Context) error { } l := oplog.NewLogger(cfg.LogConfig) + m := metrics.NewMetrics("default") l.Info("Initializing Batch Submitter") - batchSubmitter, err := NewBatchSubmitterFromCLIConfig(cfg, l) + batchSubmitter, err := NewBatchSubmitterFromCLIConfig(cfg, l, m) if err != nil { l.Error("Unable to create Batch Submitter", "error", err) return err @@ -64,16 +65,15 @@ func Main(version string, cliCtx *cli.Context) error { }() } - registry := opmetrics.NewRegistry() metricsCfg := cfg.MetricsConfig if metricsCfg.Enabled { l.Info("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort) go func() { - if err := opmetrics.ListenAndServe(ctx, registry, metricsCfg.ListenAddr, metricsCfg.ListenPort); err != nil { + if err := m.Serve(ctx, metricsCfg.ListenAddr, metricsCfg.ListenPort); err != nil { l.Error("error starting metrics server", err) } }() - opmetrics.LaunchBalanceMetrics(ctx, l, registry, "", batchSubmitter.L1Client, batchSubmitter.From) + m.StartBalanceMetrics(ctx, l, batchSubmitter.L1Client, batchSubmitter.From) } rpcCfg := cfg.RPCConfig @@ -95,6 +95,9 @@ func Main(version string, cliCtx *cli.Context) error { return fmt.Errorf("error starting RPC server: %w", err) } + m.RecordInfo(version) + m.RecordUp() + interruptChannel := make(chan os.Signal, 1) signal.Notify(interruptChannel, []os.Signal{ os.Interrupt, diff --git a/op-batcher/batcher/channel_builder.go b/op-batcher/batcher/channel_builder.go index 070c4a0e90f87..77d7c0d8574fa 100644 --- a/op-batcher/batcher/channel_builder.go +++ b/op-batcher/batcher/channel_builder.go @@ -136,6 +136,8 @@ type channelBuilder struct { blocks []*types.Block // frames data queue, to be send as txs frames []frameData + // total amount of output data of all frames created yet + outputBytes int } // newChannelBuilder creates a new channel builder or returns an error if the @@ -156,11 +158,21 @@ func (c *channelBuilder) ID() derive.ChannelID { return c.co.ID() } -// InputBytes returns to total amount of input bytes added to the channel. +// InputBytes returns the total amount of input bytes added to the channel. func (c *channelBuilder) InputBytes() int { return c.co.InputBytes() } +// ReadyBytes returns the amount of bytes ready in the compression pipeline to +// output into a frame. +func (c *channelBuilder) ReadyBytes() int { + return c.co.ReadyBytes() +} + +func (c *channelBuilder) OutputBytes() int { + return c.outputBytes +} + // Blocks returns a backup list of all blocks that were added to the channel. It // can be used in case the channel needs to be rebuilt. func (c *channelBuilder) Blocks() []*types.Block { @@ -184,22 +196,25 @@ func (c *channelBuilder) Reset() error { // AddBlock returns a ChannelFullError if called even though the channel is // already full. See description of FullErr for details. // +// AddBlock also returns the L1BlockInfo that got extracted from the block's +// first transaction for subsequent use by the caller. +// // Call OutputFrames() afterwards to create frames. -func (c *channelBuilder) AddBlock(block *types.Block) error { +func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error) { if c.IsFull() { - return c.FullErr() + return derive.L1BlockInfo{}, c.FullErr() } - batch, err := derive.BlockToBatch(block) + batch, l1info, err := derive.BlockToBatch(block) if err != nil { - return fmt.Errorf("converting block to batch: %w", err) + return l1info, fmt.Errorf("converting block to batch: %w", err) } if _, err = c.co.AddBatch(batch); errors.Is(err, derive.ErrTooManyRLPBytes) { c.setFullErr(err) - return c.FullErr() + return l1info, c.FullErr() } else if err != nil { - return fmt.Errorf("adding block to channel out: %w", err) + return l1info, fmt.Errorf("adding block to channel out: %w", err) } c.blocks = append(c.blocks, block) c.updateSwTimeout(batch) @@ -209,7 +224,7 @@ func (c *channelBuilder) AddBlock(block *types.Block) error { // Adding this block still worked, so don't return error, just mark as full } - return nil + return l1info, nil } // Timeout management @@ -381,10 +396,11 @@ func (c *channelBuilder) outputFrame() error { } frame := frameData{ - id: txID{chID: c.co.ID(), frameNumber: fn}, + id: frameID{chID: c.co.ID(), frameNumber: fn}, data: buf.Bytes(), } c.frames = append(c.frames, frame) + c.outputBytes += len(frame.data) return err // possibly io.EOF (last frame) } diff --git a/op-batcher/batcher/channel_builder_test.go b/op-batcher/batcher/channel_builder_test.go index 16854a09ba51d..1103214783901 100644 --- a/op-batcher/batcher/channel_builder_test.go +++ b/op-batcher/batcher/channel_builder_test.go @@ -2,14 +2,17 @@ package batcher import ( "bytes" - "crypto/rand" + "errors" "math" "math/big" + "math/rand" "testing" + "time" "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + dtest "github.com/ethereum-optimism/optimism/op-node/rollup/derive/test" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -52,54 +55,64 @@ func TestConfigValidation(t *testing.T) { require.ErrorIs(t, validChannelConfig.Check(), ErrInvalidChannelTimeout) } -// addNonsenseBlock is a helper function that adds a nonsense block -// to the channel builder using the [channelBuilder.AddBlock] method. -func addNonsenseBlock(cb *channelBuilder) error { - lBlock := types.NewBlock(&types.Header{ +// addMiniBlock adds a minimal valid L2 block to the channel builder using the +// channelBuilder.AddBlock method. +func addMiniBlock(cb *channelBuilder) error { + a := newMiniL2Block(0) + _, err := cb.AddBlock(a) + return err +} + +// newMiniL2Block returns a minimal L2 block with a minimal valid L1InfoDeposit +// transaction as first transaction. Both blocks are minimal in the sense that +// most fields are left at defaults or are unset. +// +// If numTx > 0, that many empty DynamicFeeTxs will be added to the txs. +func newMiniL2Block(numTx int) *types.Block { + return newMiniL2BlockWithNumberParent(numTx, new(big.Int), (common.Hash{})) +} + +// newMiniL2Block returns a minimal L2 block with a minimal valid L1InfoDeposit +// transaction as first transaction. Both blocks are minimal in the sense that +// most fields are left at defaults or are unset. Block number and parent hash +// will be set to the given parameters number and parent. +// +// If numTx > 0, that many empty DynamicFeeTxs will be added to the txs. +func newMiniL2BlockWithNumberParent(numTx int, number *big.Int, parent common.Hash) *types.Block { + l1Block := types.NewBlock(&types.Header{ BaseFee: big.NewInt(10), Difficulty: common.Big0, Number: big.NewInt(100), }, nil, nil, nil, trie.NewStackTrie(nil)) - l1InfoTx, err := derive.L1InfoDeposit(0, lBlock, eth.SystemConfig{}, false) + l1InfoTx, err := derive.L1InfoDeposit(0, l1Block, eth.SystemConfig{}, false) if err != nil { - return err + panic(err) } - txs := []*types.Transaction{types.NewTx(l1InfoTx)} - a := types.NewBlock(&types.Header{ - Number: big.NewInt(0), + + txs := make([]*types.Transaction, 0, 1+numTx) + txs = append(txs, types.NewTx(l1InfoTx)) + for i := 0; i < numTx; i++ { + txs = append(txs, types.NewTx(&types.DynamicFeeTx{})) + } + + return types.NewBlock(&types.Header{ + Number: number, + ParentHash: parent, }, txs, nil, nil, trie.NewStackTrie(nil)) - err = cb.AddBlock(a) - return err } -// buildTooLargeRlpEncodedBlockBatch is a helper function that builds a batch -// of blocks that are too large to be added to a channel. -func buildTooLargeRlpEncodedBlockBatch(cb *channelBuilder) error { - // Construct a block with way too many txs - lBlock := types.NewBlock(&types.Header{ - BaseFee: big.NewInt(10), - Difficulty: common.Big0, - Number: big.NewInt(100), - }, nil, nil, nil, trie.NewStackTrie(nil)) - l1InfoTx, _ := derive.L1InfoDeposit(0, lBlock, eth.SystemConfig{}, false) - txs := []*types.Transaction{types.NewTx(l1InfoTx)} - for i := 0; i < 500_000; i++ { - txData := make([]byte, 32) - _, _ = rand.Read(txData) - tx := types.NewTransaction(0, common.Address{}, big.NewInt(0), 0, big.NewInt(0), txData) - txs = append(txs, tx) +// addTooManyBlocks adds blocks to the channel until it hits an error, +// which is presumably ErrTooManyRLPBytes. +func addTooManyBlocks(cb *channelBuilder) error { + for i := 0; i < 10_000; i++ { + block := newMiniL2Block(100) + _, err := cb.AddBlock(block) + if err != nil { + return err + } } - block := types.NewBlock(&types.Header{ - Number: big.NewInt(0), - }, txs, nil, nil, trie.NewStackTrie(nil)) - // Try to add the block to the channel builder - // This should fail since the block is too large - // When a batch is constructed from the block and - // then rlp encoded in the channel out, the size - // will exceed [derive.MaxRLPBytesPerChannel] - err := cb.AddBlock(block) - return err + return nil } // FuzzDurationTimeoutZeroMaxChannelDuration ensures that when whenever the MaxChannelDuration @@ -391,7 +404,7 @@ func TestOutputFrames(t *testing.T) { require.Equal(t, 0, readyBytes) // Let's add a block - err = addNonsenseBlock(cb) + err = addMiniBlock(cb) require.NoError(t, err) // Check how many ready bytes @@ -421,17 +434,18 @@ func TestOutputFrames(t *testing.T) { // TestMaxRLPBytesPerChannel tests the [channelBuilder.OutputFrames] // function errors when the max RLP bytes per channel is reached. func TestMaxRLPBytesPerChannel(t *testing.T) { + t.Parallel() channelConfig := defaultTestChannelConfig - channelConfig.MaxFrameSize = 2 + channelConfig.MaxFrameSize = derive.MaxRLPBytesPerChannel * 2 + channelConfig.TargetFrameSize = derive.MaxRLPBytesPerChannel * 2 + channelConfig.ApproxComprRatio = 1 // Construct the channel builder cb, err := newChannelBuilder(channelConfig) require.NoError(t, err) - require.False(t, cb.IsFull()) - require.Equal(t, 0, cb.NumFrames()) // Add a block that overflows the [ChannelOut] - err = buildTooLargeRlpEncodedBlockBatch(cb) + err = addTooManyBlocks(cb) require.ErrorIs(t, err, derive.ErrTooManyRLPBytes) } @@ -462,7 +476,7 @@ func TestOutputFramesMaxFrameIndex(t *testing.T) { a := types.NewBlock(&types.Header{ Number: big.NewInt(0), }, txs, nil, nil, trie.NewStackTrie(nil)) - err = cb.AddBlock(a) + _, err = cb.AddBlock(a) if cb.IsFull() { fullErr := cb.FullErr() require.ErrorIs(t, fullErr, ErrMaxFrameIndex) @@ -494,7 +508,7 @@ func TestBuilderAddBlock(t *testing.T) { require.NoError(t, err) // Add a nonsense block to the channel builder - err = addNonsenseBlock(cb) + err = addMiniBlock(cb) require.NoError(t, err) // Check the fields reset in the AddBlock function @@ -505,7 +519,7 @@ func TestBuilderAddBlock(t *testing.T) { // Since the channel output is full, the next call to AddBlock // should return the channel out full error - err = addNonsenseBlock(cb) + err = addMiniBlock(cb) require.ErrorIs(t, err, ErrInputTargetReached) } @@ -520,7 +534,7 @@ func TestBuilderReset(t *testing.T) { require.NoError(t, err) // Add a nonsense block to the channel builder - err = addNonsenseBlock(cb) + err = addMiniBlock(cb) require.NoError(t, err) // Check the fields reset in the Reset function @@ -536,7 +550,7 @@ func TestBuilderReset(t *testing.T) { require.NoError(t, err) // Add another block to increment the block count - err = addNonsenseBlock(cb) + err = addMiniBlock(cb) require.NoError(t, err) // Check the fields reset in the Reset function @@ -622,3 +636,73 @@ func TestFramePublished(t *testing.T) { // Now the timeout will be 1000 require.Equal(t, uint64(1000), cb.timeout) } + +func TestChannelBuilder_InputBytes(t *testing.T) { + require := require.New(t) + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + cb, _ := defaultChannelBuilderSetup(t) + + require.Zero(cb.InputBytes()) + + var l int + for i := 0; i < 5; i++ { + block := newMiniL2Block(rng.Intn(32)) + l += blockBatchRlpSize(t, block) + + _, err := cb.AddBlock(block) + require.NoError(err) + require.Equal(cb.InputBytes(), l) + } +} + +func TestChannelBuilder_OutputBytes(t *testing.T) { + require := require.New(t) + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + cfg := defaultTestChannelConfig + cfg.TargetFrameSize = 1000 + cfg.MaxFrameSize = 1000 + cfg.TargetNumFrames = 16 + cfg.ApproxComprRatio = 1.0 + cb, err := newChannelBuilder(cfg) + require.NoError(err, "newChannelBuilder") + + require.Zero(cb.OutputBytes()) + + for { + block, _ := dtest.RandomL2Block(rng, rng.Intn(32)) + _, err := cb.AddBlock(block) + if errors.Is(err, ErrInputTargetReached) { + break + } + require.NoError(err) + } + + require.NoError(cb.OutputFrames()) + require.True(cb.IsFull()) + require.Greater(cb.NumFrames(), 1) + + var flen int + for cb.HasFrame() { + f := cb.NextFrame() + flen += len(f.data) + } + + require.Equal(cb.OutputBytes(), flen) +} + +func defaultChannelBuilderSetup(t *testing.T) (*channelBuilder, ChannelConfig) { + t.Helper() + cfg := defaultTestChannelConfig + cb, err := newChannelBuilder(cfg) + require.NoError(t, err, "newChannelBuilder") + return cb, cfg +} + +func blockBatchRlpSize(t *testing.T, b *types.Block) int { + t.Helper() + batch, _, err := derive.BlockToBatch(b) + require.NoError(t, err) + var buf bytes.Buffer + require.NoError(t, batch.EncodeRLP(&buf), "RLP-encoding batch") + return buf.Len() +} diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index c2c64dcce2faa..270d603953464 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -6,7 +6,9 @@ import ( "io" "math" + "github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" @@ -22,8 +24,9 @@ var ErrReorg = errors.New("block does not extend existing chain") // channel. // Functions on channelManager are not safe for concurrent access. type channelManager struct { - log log.Logger - cfg ChannelConfig + log log.Logger + metr metrics.Metricer + cfg ChannelConfig // All blocks since the last request for new tx data. blocks []*types.Block @@ -40,10 +43,12 @@ type channelManager struct { confirmedTransactions map[txID]eth.BlockID } -func NewChannelManager(log log.Logger, cfg ChannelConfig) *channelManager { +func NewChannelManager(log log.Logger, metr metrics.Metricer, cfg ChannelConfig) *channelManager { return &channelManager{ - log: log, - cfg: cfg, + log: log, + metr: metr, + cfg: cfg, + pendingTransactions: make(map[txID]txData), confirmedTransactions: make(map[txID]eth.BlockID), } @@ -71,6 +76,8 @@ func (s *channelManager) TxFailed(id txID) { } else { s.log.Warn("unknown transaction marked as failed", "id", id) } + + s.metr.RecordBatchTxFailed() } // TxConfirmed marks a transaction as confirmed on L1. Unfortunately even if all frames in @@ -78,7 +85,8 @@ func (s *channelManager) TxFailed(id txID) { // resubmitted. // This function may reset the pending channel if the pending channel has timed out. func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) { - s.log.Trace("marked transaction as confirmed", "id", id, "block", inclusionBlock) + s.metr.RecordBatchTxSubmitted() + s.log.Debug("marked transaction as confirmed", "id", id, "block", inclusionBlock) if _, ok := s.pendingTransactions[id]; !ok { s.log.Warn("unknown transaction marked as confirmed", "id", id, "block", inclusionBlock) // TODO: This can occur if we clear the channel while there are still pending transactions @@ -92,13 +100,15 @@ func (s *channelManager) TxConfirmed(id txID, inclusionBlock eth.BlockID) { // If this channel timed out, put the pending blocks back into the local saved blocks // and then reset this state so it can try to build a new channel. if s.pendingChannelIsTimedOut() { - s.log.Warn("Channel timed out", "chID", s.pendingChannel.ID()) + s.metr.RecordChannelTimedOut(s.pendingChannel.ID()) + s.log.Warn("Channel timed out", "id", s.pendingChannel.ID()) s.blocks = append(s.pendingChannel.Blocks(), s.blocks...) s.clearPendingChannel() } // If we are done with this channel, record that. if s.pendingChannelIsFullySubmitted() { - s.log.Info("Channel is fully submitted", "chID", s.pendingChannel.ID()) + s.metr.RecordChannelFullySubmitted(s.pendingChannel.ID()) + s.log.Info("Channel is fully submitted", "id", s.pendingChannel.ID()) s.clearPendingChannel() } } @@ -194,8 +204,8 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { // all pending blocks be included in this channel for submission. s.registerL1Block(l1Head) - if err := s.pendingChannel.OutputFrames(); err != nil { - return txData{}, fmt.Errorf("creating frames with channel builder: %w", err) + if err := s.outputFrames(); err != nil { + return txData{}, err } return s.nextTxData() @@ -211,7 +221,11 @@ func (s *channelManager) ensurePendingChannel(l1Head eth.BlockID) error { return fmt.Errorf("creating new channel: %w", err) } s.pendingChannel = cb - s.log.Info("Created channel", "chID", cb.ID(), "l1Head", l1Head) + s.log.Info("Created channel", + "id", cb.ID(), + "l1Head", l1Head, + "blocks_pending", len(s.blocks)) + s.metr.RecordChannelOpened(cb.ID(), len(s.blocks)) return nil } @@ -229,28 +243,27 @@ func (s *channelManager) registerL1Block(l1Head eth.BlockID) { // processBlocks adds blocks from the blocks queue to the pending channel until // either the queue got exhausted or the channel is full. func (s *channelManager) processBlocks() error { - var blocksAdded int - var _chFullErr *ChannelFullError // throw away, just for type checking + var ( + blocksAdded int + _chFullErr *ChannelFullError // throw away, just for type checking + latestL2ref eth.L2BlockRef + ) for i, block := range s.blocks { - if err := s.pendingChannel.AddBlock(block); errors.As(err, &_chFullErr) { + l1info, err := s.pendingChannel.AddBlock(block) + if errors.As(err, &_chFullErr) { // current block didn't get added because channel is already full break } else if err != nil { return fmt.Errorf("adding block[%d] to channel builder: %w", i, err) } blocksAdded += 1 + latestL2ref = l2BlockRefFromBlockAndL1Info(block, l1info) // current block got added but channel is now full if s.pendingChannel.IsFull() { break } } - s.log.Debug("Added blocks to channel", - "blocks_added", blocksAdded, - "channel_full", s.pendingChannel.IsFull(), - "blocks_pending", len(s.blocks)-blocksAdded, - "input_bytes", s.pendingChannel.InputBytes(), - ) if blocksAdded == len(s.blocks) { // all blocks processed, reuse slice s.blocks = s.blocks[:0] @@ -258,6 +271,53 @@ func (s *channelManager) processBlocks() error { // remove processed blocks s.blocks = s.blocks[blocksAdded:] } + + s.metr.RecordL2BlocksAdded(latestL2ref, + blocksAdded, + len(s.blocks), + s.pendingChannel.InputBytes(), + s.pendingChannel.ReadyBytes()) + s.log.Debug("Added blocks to channel", + "blocks_added", blocksAdded, + "blocks_pending", len(s.blocks), + "channel_full", s.pendingChannel.IsFull(), + "input_bytes", s.pendingChannel.InputBytes(), + "ready_bytes", s.pendingChannel.ReadyBytes(), + ) + return nil +} + +func (s *channelManager) outputFrames() error { + if err := s.pendingChannel.OutputFrames(); err != nil { + return fmt.Errorf("creating frames with channel builder: %w", err) + } + if !s.pendingChannel.IsFull() { + return nil + } + + inBytes, outBytes := s.pendingChannel.InputBytes(), s.pendingChannel.OutputBytes() + s.metr.RecordChannelClosed( + s.pendingChannel.ID(), + len(s.blocks), + s.pendingChannel.NumFrames(), + inBytes, + outBytes, + s.pendingChannel.FullErr(), + ) + + var comprRatio float64 + if inBytes > 0 { + comprRatio = float64(outBytes) / float64(inBytes) + } + s.log.Info("Channel closed", + "id", s.pendingChannel.ID(), + "blocks_pending", len(s.blocks), + "num_frames", s.pendingChannel.NumFrames(), + "input_bytes", inBytes, + "output_bytes", outBytes, + "full_reason", s.pendingChannel.FullErr(), + "compr_ratio", comprRatio, + ) return nil } @@ -273,3 +333,14 @@ func (s *channelManager) AddL2Block(block *types.Block) error { return nil } + +func l2BlockRefFromBlockAndL1Info(block *types.Block, l1info derive.L1BlockInfo) eth.L2BlockRef { + return eth.L2BlockRef{ + Hash: block.Hash(), + Number: block.NumberU64(), + ParentHash: block.ParentHash(), + Time: block.Time(), + L1Origin: eth.BlockID{Hash: l1info.BlockHash, Number: l1info.Number}, + SequenceNumber: l1info.SequenceNumber, + } +} diff --git a/op-batcher/batcher/channel_manager_test.go b/op-batcher/batcher/channel_manager_test.go index 21d501d6037d4..335859ca5141e 100644 --- a/op-batcher/batcher/channel_manager_test.go +++ b/op-batcher/batcher/channel_manager_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-node/eth" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" derivetest "github.com/ethereum-optimism/optimism/op-node/rollup/derive/test" @@ -14,7 +15,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/trie" "github.com/stretchr/testify/require" ) @@ -23,7 +23,7 @@ import ( func TestPendingChannelTimeout(t *testing.T) { // Create a new channel manager with a ChannelTimeout log := testlog.Logger(t, log.LvlCrit) - m := NewChannelManager(log, ChannelConfig{ + m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{ ChannelTimeout: 100, }) @@ -67,7 +67,7 @@ func TestPendingChannelTimeout(t *testing.T) { // detects a reorg when it has cached L1 blocks. func TestChannelManagerReturnsErrReorg(t *testing.T) { log := testlog.Logger(t, log.LvlCrit) - m := NewChannelManager(log, ChannelConfig{}) + m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}) a := types.NewBlock(&types.Header{ Number: big.NewInt(0), @@ -101,29 +101,17 @@ func TestChannelManagerReturnsErrReorg(t *testing.T) { // detects a reorg even if it does not have any blocks inside it. func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) { log := testlog.Logger(t, log.LvlCrit) - m := NewChannelManager(log, ChannelConfig{ - TargetFrameSize: 0, - MaxFrameSize: 120_000, - ApproxComprRatio: 1.0, - }) - l1Block := types.NewBlock(&types.Header{ - BaseFee: big.NewInt(10), - Difficulty: common.Big0, - Number: big.NewInt(100), - }, nil, nil, nil, trie.NewStackTrie(nil)) - l1InfoTx, err := derive.L1InfoDeposit(0, l1Block, eth.SystemConfig{}, false) - require.NoError(t, err) - txs := []*types.Transaction{types.NewTx(l1InfoTx)} + m := NewChannelManager(log, metrics.NoopMetrics, + ChannelConfig{ + TargetFrameSize: 0, + MaxFrameSize: 120_000, + ApproxComprRatio: 1.0, + }) - a := types.NewBlock(&types.Header{ - Number: big.NewInt(0), - }, txs, nil, nil, trie.NewStackTrie(nil)) - x := types.NewBlock(&types.Header{ - Number: big.NewInt(1), - ParentHash: common.Hash{0xff}, - }, txs, nil, nil, trie.NewStackTrie(nil)) + a := newMiniL2Block(0) + x := newMiniL2BlockWithNumberParent(0, big.NewInt(1), common.Hash{0xff}) - err = m.AddL2Block(a) + err := m.AddL2Block(a) require.NoError(t, err) _, err = m.TxData(eth.BlockID{}) @@ -138,7 +126,7 @@ func TestChannelManagerReturnsErrReorgWhenDrained(t *testing.T) { // TestChannelManagerNextTxData checks the nextTxData function. func TestChannelManagerNextTxData(t *testing.T) { log := testlog.Logger(t, log.LvlCrit) - m := NewChannelManager(log, ChannelConfig{}) + m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}) // Nil pending channel should return EOF returnedTxData, err := m.nextTxData() @@ -181,7 +169,7 @@ func TestClearChannelManager(t *testing.T) { // Create a channel manager log := testlog.Logger(t, log.LvlCrit) rng := rand.New(rand.NewSource(time.Now().UnixNano())) - m := NewChannelManager(log, ChannelConfig{ + m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{ // Need to set the channel timeout here so we don't clear pending // channels on confirmation. This would result in [TxConfirmed] // clearing confirmed transactions, and reseting the pendingChannels map @@ -254,7 +242,7 @@ func TestClearChannelManager(t *testing.T) { func TestChannelManagerTxConfirmed(t *testing.T) { // Create a channel manager log := testlog.Logger(t, log.LvlCrit) - m := NewChannelManager(log, ChannelConfig{ + m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{ // Need to set the channel timeout here so we don't clear pending // channels on confirmation. This would result in [TxConfirmed] // clearing confirmed transactions, and reseting the pendingChannels map @@ -308,7 +296,7 @@ func TestChannelManagerTxConfirmed(t *testing.T) { func TestChannelManagerTxFailed(t *testing.T) { // Create a channel manager log := testlog.Logger(t, log.LvlCrit) - m := NewChannelManager(log, ChannelConfig{}) + m := NewChannelManager(log, metrics.NoopMetrics, ChannelConfig{}) // Let's add a valid pending transaction to the channel // manager so we can demonstrate correctness @@ -351,11 +339,12 @@ func TestChannelManager_TxResend(t *testing.T) { require := require.New(t) rng := rand.New(rand.NewSource(time.Now().UnixNano())) log := testlog.Logger(t, log.LvlError) - m := NewChannelManager(log, ChannelConfig{ - TargetFrameSize: 0, - MaxFrameSize: 120_000, - ApproxComprRatio: 1.0, - }) + m := NewChannelManager(log, metrics.NoopMetrics, + ChannelConfig{ + TargetFrameSize: 0, + MaxFrameSize: 120_000, + ApproxComprRatio: 1.0, + }) a, _ := derivetest.RandomL2Block(rng, 4) diff --git a/op-batcher/batcher/config.go b/op-batcher/batcher/config.go index 7c0b9f9b9f110..1f26b4bdf8c67 100644 --- a/op-batcher/batcher/config.go +++ b/op-batcher/batcher/config.go @@ -9,6 +9,7 @@ import ( "github.com/urfave/cli" "github.com/ethereum-optimism/optimism/op-batcher/flags" + "github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-batcher/rpc" "github.com/ethereum-optimism/optimism/op-node/rollup" "github.com/ethereum-optimism/optimism/op-node/sources" @@ -20,18 +21,21 @@ import ( ) type Config struct { - log log.Logger - L1Client *ethclient.Client - L2Client *ethclient.Client - RollupNode *sources.RollupClient - PollInterval time.Duration + log log.Logger + metr metrics.Metricer + L1Client *ethclient.Client + L2Client *ethclient.Client + RollupNode *sources.RollupClient + + PollInterval time.Duration + From common.Address + TxManagerConfig txmgr.Config - From common.Address // RollupConfig is queried at startup Rollup *rollup.Config - // Channel creation parameters + // Channel builder parameters Channel ChannelConfig } diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index e288f60fe7c58..4e7d8f5780bc8 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -10,7 +10,9 @@ import ( "sync" "time" + "github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto" "github.com/ethereum-optimism/optimism/op-service/txmgr" "github.com/ethereum/go-ethereum/core/types" @@ -34,13 +36,14 @@ type BatchSubmitter struct { // lastStoredBlock is the last block loaded into `state`. If it is empty it should be set to the l2 safe head. lastStoredBlock eth.BlockID + lastL1Tip eth.L1BlockRef state *channelManager } // NewBatchSubmitterFromCLIConfig initializes the BatchSubmitter, gathering any resources // that will be needed during operation. -func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*BatchSubmitter, error) { +func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger, m metrics.Metricer) (*BatchSubmitter, error) { ctx := context.Background() signer, fromAddress, err := opcrypto.SignerFactoryFromConfig(l, cfg.PrivateKey, cfg.Mnemonic, cfg.SequencerHDPath, cfg.SignerConfig) @@ -104,12 +107,12 @@ func NewBatchSubmitterFromCLIConfig(cfg CLIConfig, l log.Logger) (*BatchSubmitte return nil, err } - return NewBatchSubmitter(ctx, batcherCfg, l) + return NewBatchSubmitter(ctx, batcherCfg, l, m) } // NewBatchSubmitter initializes the BatchSubmitter, gathering any resources // that will be needed during operation. -func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger) (*BatchSubmitter, error) { +func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger, m metrics.Metricer) (*BatchSubmitter, error) { balance, err := cfg.L1Client.BalanceAt(ctx, cfg.From, nil) if err != nil { return nil, err @@ -118,12 +121,14 @@ func NewBatchSubmitter(ctx context.Context, cfg Config, l log.Logger) (*BatchSub cfg.log = l cfg.log.Info("creating batch submitter", "submitter_addr", cfg.From, "submitter_bal", balance) + cfg.metr = m + return &BatchSubmitter{ Config: cfg, txMgr: NewTransactionManager(l, cfg.TxManagerConfig, cfg.Rollup.BatchInboxAddress, cfg.Rollup.L1ChainID, cfg.From, cfg.L1Client), - state: NewChannelManager(l, cfg.Channel), + state: NewChannelManager(l, m, cfg.Channel), }, nil } @@ -187,13 +192,16 @@ func (l *BatchSubmitter) Stop() error { func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) { start, end, err := l.calculateL2BlockRangeToStore(ctx) if err != nil { - l.log.Trace("was not able to calculate L2 block range", "err", err) + l.log.Warn("Error calculating L2 block range", "err", err) + return + } else if start.Number == end.Number { return } + var latestBlock *types.Block // Add all blocks to "state" for i := start.Number + 1; i < end.Number+1; i++ { - id, err := l.loadBlockIntoState(ctx, i) + block, err := l.loadBlockIntoState(ctx, i) if errors.Is(err, ErrReorg) { l.log.Warn("Found L2 reorg", "block_number", i) l.state.Clear() @@ -203,24 +211,34 @@ func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) { l.log.Warn("failed to load block into state", "err", err) return } - l.lastStoredBlock = id + l.lastStoredBlock = eth.ToBlockID(block) + latestBlock = block } + + l2ref, err := derive.L2BlockToBlockRef(latestBlock, &l.Rollup.Genesis) + if err != nil { + l.log.Warn("Invalid L2 block loaded into state", "err", err) + return + } + + l.metr.RecordL2BlocksLoaded(l2ref) } // loadBlockIntoState fetches & stores a single block into `state`. It returns the block it loaded. -func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (eth.BlockID, error) { +func (l *BatchSubmitter) loadBlockIntoState(ctx context.Context, blockNumber uint64) (*types.Block, error) { ctx, cancel := context.WithTimeout(ctx, networkTimeout) + defer cancel() block, err := l.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(blockNumber)) - cancel() if err != nil { - return eth.BlockID{}, err + return nil, fmt.Errorf("getting L2 block: %w", err) } + if err := l.state.AddL2Block(block); err != nil { - return eth.BlockID{}, err + return nil, fmt.Errorf("adding L2 block to state: %w", err) } - id := eth.ToBlockID(block) - l.log.Info("added L2 block to local state", "block", id, "tx_count", len(block.Transactions()), "time", block.Time()) - return id, nil + + l.log.Info("added L2 block to local state", "block", eth.ToBlockID(block), "tx_count", len(block.Transactions()), "time", block.Time()) + return block, nil } // calculateL2BlockRangeToStore determines the range (start,end] that should be loaded into the local state. @@ -283,6 +301,7 @@ func (l *BatchSubmitter) loop() { l.log.Error("Failed to query L1 tip", "error", err) break } + l.recordL1Tip(l1tip) // Collect next transaction data txdata, err := l.state.TxData(l1tip.ID()) @@ -316,6 +335,14 @@ func (l *BatchSubmitter) loop() { } } +func (l *BatchSubmitter) recordL1Tip(l1tip eth.L1BlockRef) { + if l.lastL1Tip == l1tip { + return + } + l.lastL1Tip = l1tip + l.metr.RecordLatestL1Block(l1tip) +} + func (l *BatchSubmitter) recordFailedTx(id txID, err error) { l.log.Warn("Failed to send transaction", "err", err) l.state.TxFailed(id) diff --git a/op-batcher/metrics/metrics.go b/op-batcher/metrics/metrics.go new file mode 100644 index 0000000000000..f7093a6e87f93 --- /dev/null +++ b/op-batcher/metrics/metrics.go @@ -0,0 +1,249 @@ +package metrics + +import ( + "context" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/log" + "github.com/prometheus/client_golang/prometheus" + + "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" +) + +const Namespace = "op_batcher" + +type Metricer interface { + RecordInfo(version string) + RecordUp() + + // Records all L1 and L2 block events + opmetrics.RefMetricer + + RecordLatestL1Block(l1ref eth.L1BlockRef) + RecordL2BlocksLoaded(l2ref eth.L2BlockRef) + RecordChannelOpened(id derive.ChannelID, numPendingBlocks int) + RecordL2BlocksAdded(l2ref eth.L2BlockRef, numBlocksAdded, numPendingBlocks, inputBytes, outputComprBytes int) + RecordChannelClosed(id derive.ChannelID, numPendingBlocks int, numFrames int, inputBytes int, outputComprBytes int, reason error) + RecordChannelFullySubmitted(id derive.ChannelID) + RecordChannelTimedOut(id derive.ChannelID) + + RecordBatchTxSubmitted() + RecordBatchTxSuccess() + RecordBatchTxFailed() + + Document() []opmetrics.DocumentedMetric +} + +type Metrics struct { + ns string + registry *prometheus.Registry + factory opmetrics.Factory + + opmetrics.RefMetrics + + Info prometheus.GaugeVec + Up prometheus.Gauge + + // label by openend, closed, fully_submitted, timed_out + ChannelEvs opmetrics.EventVec + + PendingBlocksCount prometheus.GaugeVec + BlocksAddedCount prometheus.Gauge + + ChannelInputBytes prometheus.GaugeVec + ChannelReadyBytes prometheus.Gauge + ChannelOutputBytes prometheus.Gauge + ChannelClosedReason prometheus.Gauge + ChannelNumFrames prometheus.Gauge + ChannelComprRatio prometheus.Histogram + + BatcherTxEvs opmetrics.EventVec +} + +var _ Metricer = (*Metrics)(nil) + +func NewMetrics(procName string) *Metrics { + if procName == "" { + procName = "default" + } + ns := Namespace + "_" + procName + + registry := opmetrics.NewRegistry() + factory := opmetrics.With(registry) + + return &Metrics{ + ns: ns, + registry: registry, + factory: factory, + + RefMetrics: opmetrics.MakeRefMetrics(ns, factory), + + Info: *factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "info", + Help: "Pseudo-metric tracking version and config info", + }, []string{ + "version", + }), + Up: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: ns, + Name: "up", + Help: "1 if the op-batcher has finished starting up", + }), + + ChannelEvs: opmetrics.NewEventVec(factory, ns, "channel", "Channel", []string{"stage"}), + + PendingBlocksCount: *factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "pending_blocks_count", + Help: "Number of pending blocks, not added to a channel yet.", + }, []string{"stage"}), + BlocksAddedCount: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: ns, + Name: "blocks_added_count", + Help: "Total number of blocks added to current channel.", + }), + + ChannelInputBytes: *factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "input_bytes", + Help: "Number of input bytes to a channel.", + }, []string{"stage"}), + ChannelReadyBytes: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: ns, + Name: "ready_bytes", + Help: "Number of bytes ready in the compression buffer.", + }), + ChannelOutputBytes: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: ns, + Name: "output_bytes", + Help: "Number of compressed output bytes from a channel.", + }), + ChannelClosedReason: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: ns, + Name: "channel_closed_reason", + Help: "Pseudo-metric to record the reason a channel got closed.", + }), + ChannelNumFrames: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: ns, + Name: "channel_num_frames", + Help: "Total number of frames of closed channel.", + }), + ChannelComprRatio: factory.NewHistogram(prometheus.HistogramOpts{ + Namespace: ns, + Name: "channel_compr_ratio", + Help: "Compression ratios of closed channel.", + Buckets: append([]float64{0.1, 0.2}, prometheus.LinearBuckets(0.3, 0.05, 14)...), + }), + + BatcherTxEvs: opmetrics.NewEventVec(factory, ns, "batcher_tx", "BatcherTx", []string{"stage"}), + } +} + +func (m *Metrics) Serve(ctx context.Context, host string, port int) error { + return opmetrics.ListenAndServe(ctx, m.registry, host, port) +} + +func (m *Metrics) Document() []opmetrics.DocumentedMetric { + return m.factory.Document() +} + +func (m *Metrics) StartBalanceMetrics(ctx context.Context, + l log.Logger, client *ethclient.Client, account common.Address) { + opmetrics.LaunchBalanceMetrics(ctx, l, m.registry, m.ns, client, account) +} + +// RecordInfo sets a pseudo-metric that contains versioning and +// config info for the op-batcher. +func (m *Metrics) RecordInfo(version string) { + m.Info.WithLabelValues(version).Set(1) +} + +// RecordUp sets the up metric to 1. +func (m *Metrics) RecordUp() { + prometheus.MustRegister() + m.Up.Set(1) +} + +const ( + StageLoaded = "loaded" + StageOpened = "opened" + StageAdded = "added" + StageClosed = "closed" + StageFullySubmitted = "fully_submitted" + StageTimedOut = "timed_out" + + TxStageSubmitted = "submitted" + TxStageSuccess = "success" + TxStageFailed = "failed" +) + +func (m *Metrics) RecordLatestL1Block(l1ref eth.L1BlockRef) { + m.RecordL1Ref("latest", l1ref) +} + +// RecordL2BlockLoaded should be called when a new L2 block was loaded into the +// channel manager (but not processed yet). +func (m *Metrics) RecordL2BlocksLoaded(l2ref eth.L2BlockRef) { + m.RecordL2Ref(StageLoaded, l2ref) +} + +func (m *Metrics) RecordChannelOpened(id derive.ChannelID, numPendingBlocks int) { + m.ChannelEvs.Record(StageOpened) + m.BlocksAddedCount.Set(0) // reset + m.PendingBlocksCount.WithLabelValues(StageOpened).Set(float64(numPendingBlocks)) +} + +// RecordL2BlocksAdded should be called when L2 block were added to the channel +// builder, with the latest added block. +func (m *Metrics) RecordL2BlocksAdded(l2ref eth.L2BlockRef, numBlocksAdded, numPendingBlocks, inputBytes, outputComprBytes int) { + m.RecordL2Ref(StageAdded, l2ref) + m.BlocksAddedCount.Add(float64(numBlocksAdded)) + m.PendingBlocksCount.WithLabelValues(StageAdded).Set(float64(numPendingBlocks)) + m.ChannelInputBytes.WithLabelValues(StageAdded).Set(float64(inputBytes)) + m.ChannelReadyBytes.Set(float64(outputComprBytes)) +} + +func (m *Metrics) RecordChannelClosed(id derive.ChannelID, numPendingBlocks int, numFrames int, inputBytes int, outputComprBytes int, reason error) { + m.ChannelEvs.Record(StageClosed) + m.PendingBlocksCount.WithLabelValues(StageClosed).Set(float64(numPendingBlocks)) + m.ChannelNumFrames.Set(float64(numFrames)) + m.ChannelInputBytes.WithLabelValues(StageClosed).Set(float64(inputBytes)) + m.ChannelOutputBytes.Set(float64(outputComprBytes)) + + var comprRatio float64 + if inputBytes > 0 { + comprRatio = float64(outputComprBytes) / float64(inputBytes) + } + m.ChannelComprRatio.Observe(comprRatio) + + m.ChannelClosedReason.Set(float64(ClosedReasonToNum(reason))) +} + +func ClosedReasonToNum(reason error) int { + // CLI-3640 + return 0 +} + +func (m *Metrics) RecordChannelFullySubmitted(id derive.ChannelID) { + m.ChannelEvs.Record(StageFullySubmitted) +} + +func (m *Metrics) RecordChannelTimedOut(id derive.ChannelID) { + m.ChannelEvs.Record(StageTimedOut) +} + +func (m *Metrics) RecordBatchTxSubmitted() { + m.BatcherTxEvs.Record(TxStageSubmitted) +} + +func (m *Metrics) RecordBatchTxSuccess() { + m.BatcherTxEvs.Record(TxStageSuccess) +} + +func (m *Metrics) RecordBatchTxFailed() { + m.BatcherTxEvs.Record(TxStageFailed) +} diff --git a/op-batcher/metrics/noop.go b/op-batcher/metrics/noop.go new file mode 100644 index 0000000000000..80f993f0854a2 --- /dev/null +++ b/op-batcher/metrics/noop.go @@ -0,0 +1,30 @@ +package metrics + +import ( + "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics" +) + +type noopMetrics struct{ opmetrics.NoopRefMetrics } + +var NoopMetrics Metricer = new(noopMetrics) + +func (*noopMetrics) Document() []opmetrics.DocumentedMetric { return nil } + +func (*noopMetrics) RecordInfo(version string) {} +func (*noopMetrics) RecordUp() {} + +func (*noopMetrics) RecordLatestL1Block(l1ref eth.L1BlockRef) {} +func (*noopMetrics) RecordL2BlocksLoaded(eth.L2BlockRef) {} +func (*noopMetrics) RecordChannelOpened(derive.ChannelID, int) {} +func (*noopMetrics) RecordL2BlocksAdded(eth.L2BlockRef, int, int, int, int) {} + +func (*noopMetrics) RecordChannelClosed(derive.ChannelID, int, int, int, int, error) {} + +func (*noopMetrics) RecordChannelFullySubmitted(derive.ChannelID) {} +func (*noopMetrics) RecordChannelTimedOut(derive.ChannelID) {} + +func (*noopMetrics) RecordBatchTxSubmitted() {} +func (*noopMetrics) RecordBatchTxSuccess() {} +func (*noopMetrics) RecordBatchTxFailed() {} diff --git a/op-e2e/migration_test.go b/op-e2e/migration_test.go index 1d646a5808b69..69bf616e65065 100644 --- a/op-e2e/migration_test.go +++ b/op-e2e/migration_test.go @@ -12,6 +12,7 @@ import ( "time" bss "github.com/ethereum-optimism/optimism/op-batcher/batcher" + batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-node/chaincfg" "github.com/ethereum-optimism/optimism/op-node/sources" l2os "github.com/ethereum-optimism/optimism/op-proposer/proposer" @@ -341,7 +342,7 @@ func TestMigration(t *testing.T) { Format: "text", }, PrivateKey: hexPriv(secrets.Batcher), - }, lgr.New("module", "batcher")) + }, lgr.New("module", "batcher"), batchermetrics.NoopMetrics) require.NoError(t, err) t.Cleanup(func() { batcher.StopIfRunning() diff --git a/op-e2e/setup.go b/op-e2e/setup.go index a33b9563c4853..fea710838265f 100644 --- a/op-e2e/setup.go +++ b/op-e2e/setup.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/require" bss "github.com/ethereum-optimism/optimism/op-batcher/batcher" + batchermetrics "github.com/ethereum-optimism/optimism/op-batcher/metrics" "github.com/ethereum-optimism/optimism/op-bindings/predeploys" "github.com/ethereum-optimism/optimism/op-chain-ops/genesis" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils" @@ -600,7 +601,7 @@ func (cfg SystemConfig) Start(_opts ...SystemConfigOption) (*System, error) { Format: "text", }, PrivateKey: hexPriv(cfg.Secrets.Batcher), - }, sys.cfg.Loggers["batcher"]) + }, sys.cfg.Loggers["batcher"], batchermetrics.NoopMetrics) if err != nil { return nil, fmt.Errorf("failed to setup batch submitter: %w", err) } diff --git a/op-node/rollup/derive/channel_out.go b/op-node/rollup/derive/channel_out.go index feef8ae7c1021..f644aab68ff97 100644 --- a/op-node/rollup/derive/channel_out.go +++ b/op-node/rollup/derive/channel_out.go @@ -76,7 +76,7 @@ func (co *ChannelOut) AddBlock(block *types.Block) (uint64, error) { return 0, errors.New("already closed") } - batch, err := BlockToBatch(block) + batch, _, err := BlockToBatch(block) if err != nil { return 0, err } @@ -182,7 +182,7 @@ func (co *ChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint16, erro } // BlockToBatch transforms a block into a batch object that can easily be RLP encoded. -func BlockToBatch(block *types.Block) (*BatchData, error) { +func BlockToBatch(block *types.Block) (*BatchData, L1BlockInfo, error) { opaqueTxs := make([]hexutil.Bytes, 0, len(block.Transactions())) for i, tx := range block.Transactions() { if tx.Type() == types.DepositTxType { @@ -190,17 +190,17 @@ func BlockToBatch(block *types.Block) (*BatchData, error) { } otx, err := tx.MarshalBinary() if err != nil { - return nil, fmt.Errorf("could not encode tx %v in block %v: %w", i, tx.Hash(), err) + return nil, L1BlockInfo{}, fmt.Errorf("could not encode tx %v in block %v: %w", i, tx.Hash(), err) } opaqueTxs = append(opaqueTxs, otx) } l1InfoTx := block.Transactions()[0] if l1InfoTx.Type() != types.DepositTxType { - return nil, ErrNotDepositTx + return nil, L1BlockInfo{}, ErrNotDepositTx } l1Info, err := L1InfoDepositTxData(l1InfoTx.Data()) if err != nil { - return nil, fmt.Errorf("could not parse the L1 Info deposit: %w", err) + return nil, l1Info, fmt.Errorf("could not parse the L1 Info deposit: %w", err) } return &BatchData{ @@ -211,7 +211,7 @@ func BlockToBatch(block *types.Block) (*BatchData, error) { Timestamp: block.Time(), Transactions: opaqueTxs, }, - }, nil + }, l1Info, nil } // ForceCloseTxData generates the transaction data for a transaction which will force close diff --git a/op-node/rollup/derive/l2block_util.go b/op-node/rollup/derive/l2block_util.go new file mode 100644 index 0000000000000..179b1021669ce --- /dev/null +++ b/op-node/rollup/derive/l2block_util.go @@ -0,0 +1,64 @@ +package derive + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + + "github.com/ethereum-optimism/optimism/op-node/eth" + "github.com/ethereum-optimism/optimism/op-node/rollup" +) + +// L2BlockRefSource is a source for the generation of a L2BlockRef. E.g. a +// *types.Block is a L2BlockRefSource. +// +// L2BlockToBlockRef extracts L2BlockRef from a L2BlockRefSource. The first +// transaction of a source must be a Deposit transaction. +type L2BlockRefSource interface { + Hash() common.Hash + ParentHash() common.Hash + NumberU64() uint64 + Time() uint64 + Transactions() types.Transactions +} + +// PayloadToBlockRef extracts the essential L2BlockRef information from an L2 +// block ref source, falling back to genesis information if necessary. +func L2BlockToBlockRef(block L2BlockRefSource, genesis *rollup.Genesis) (eth.L2BlockRef, error) { + hash, number := block.Hash(), block.NumberU64() + + var l1Origin eth.BlockID + var sequenceNumber uint64 + if number == genesis.L2.Number { + if hash != genesis.L2.Hash { + return eth.L2BlockRef{}, fmt.Errorf("expected L2 genesis hash to match L2 block at genesis block number %d: %s <> %s", genesis.L2.Number, hash, genesis.L2.Hash) + } + l1Origin = genesis.L1 + sequenceNumber = 0 + } else { + txs := block.Transactions() + if txs.Len() == 0 { + return eth.L2BlockRef{}, fmt.Errorf("l2 block is missing L1 info deposit tx, block hash: %s", hash) + } + tx := txs[0] + if tx.Type() != types.DepositTxType { + return eth.L2BlockRef{}, fmt.Errorf("first payload tx has unexpected tx type: %d", tx.Type()) + } + info, err := L1InfoDepositTxData(tx.Data()) + if err != nil { + return eth.L2BlockRef{}, fmt.Errorf("failed to parse L1 info deposit tx from L2 block: %w", err) + } + l1Origin = eth.BlockID{Hash: info.BlockHash, Number: info.Number} + sequenceNumber = info.SequenceNumber + } + + return eth.L2BlockRef{ + Hash: hash, + Number: number, + ParentHash: block.ParentHash(), + Time: block.Time(), + L1Origin: l1Origin, + SequenceNumber: sequenceNumber, + }, nil +} diff --git a/op-service/metrics/event.go b/op-service/metrics/event.go new file mode 100644 index 0000000000000..8589bf654d6ab --- /dev/null +++ b/op-service/metrics/event.go @@ -0,0 +1,58 @@ +package metrics + +import ( + "fmt" + + "github.com/prometheus/client_golang/prometheus" +) + +type Event struct { + Total prometheus.Counter + LastTime prometheus.Gauge +} + +func (e *Event) Record() { + e.Total.Inc() + e.LastTime.SetToCurrentTime() +} + +func NewEvent(factory Factory, ns string, name string, displayName string) Event { + return Event{ + Total: factory.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: fmt.Sprintf("%s_total", name), + Help: fmt.Sprintf("Count of %s events", displayName), + }), + LastTime: factory.NewGauge(prometheus.GaugeOpts{ + Namespace: ns, + Name: fmt.Sprintf("last_%s_unix", name), + Help: fmt.Sprintf("Timestamp of last %s event", displayName), + }), + } +} + +type EventVec struct { + Total prometheus.CounterVec + LastTime prometheus.GaugeVec +} + +func (e *EventVec) Record(lvs ...string) { + e.Total.WithLabelValues(lvs...).Inc() + e.LastTime.WithLabelValues(lvs...).SetToCurrentTime() +} + +func NewEventVec(factory Factory, ns string, name string, displayName string, labelNames []string) EventVec { + return EventVec{ + Total: *factory.NewCounterVec(prometheus.CounterOpts{ + Namespace: ns, + Name: fmt.Sprintf("%s_total", name), + Help: fmt.Sprintf("Count of %s events", displayName), + }, labelNames), + LastTime: *factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: fmt.Sprintf("last_%s_unix", name), + Help: fmt.Sprintf("Timestamp of last %s event", displayName), + }, + labelNames), + } +} diff --git a/op-service/metrics/ref_metrics.go b/op-service/metrics/ref_metrics.go new file mode 100644 index 0000000000000..159e788b2b611 --- /dev/null +++ b/op-service/metrics/ref_metrics.go @@ -0,0 +1,118 @@ +package metrics + +import ( + "encoding/binary" + "time" + + "github.com/ethereum-optimism/optimism/op-node/eth" + + "github.com/ethereum/go-ethereum/common" + "github.com/prometheus/client_golang/prometheus" +) + +type RefMetricer interface { + RecordRef(layer string, name string, num uint64, timestamp uint64, h common.Hash) + RecordL1Ref(name string, ref eth.L1BlockRef) + RecordL2Ref(name string, ref eth.L2BlockRef) +} + +// RefMetrics provides block reference metrics. It's a metrics module that's +// supposed to be embedded into a service metrics type. The service metrics type +// should set the full namespace and create the factory before calling +// NewRefMetrics. +type RefMetrics struct { + RefsNumber *prometheus.GaugeVec + RefsTime *prometheus.GaugeVec + RefsHash *prometheus.GaugeVec + RefsSeqNr *prometheus.GaugeVec + RefsLatency *prometheus.GaugeVec + // hash of the last seen block per name, so we don't reduce/increase latency on updates of the same data, + // and only count the first occurrence + LatencySeen map[string]common.Hash +} + +var _ RefMetricer = (*RefMetrics)(nil) + +// MakeRefMetrics returns a new RefMetrics, initializing its prometheus fields +// using factory. It is supposed to be used inside the construtors of metrics +// structs for any op service after the full namespace and factory have been +// setup. +// +// ns is the fully qualified namespace, e.g. "op_node_default". +func MakeRefMetrics(ns string, factory Factory) RefMetrics { + return RefMetrics{ + RefsNumber: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "refs_number", + Help: "Gauge representing the different L1/L2 reference block numbers", + }, []string{ + "layer", + "type", + }), + RefsTime: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "refs_time", + Help: "Gauge representing the different L1/L2 reference block timestamps", + }, []string{ + "layer", + "type", + }), + RefsHash: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "refs_hash", + Help: "Gauge representing the different L1/L2 reference block hashes truncated to float values", + }, []string{ + "layer", + "type", + }), + RefsSeqNr: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "refs_seqnr", + Help: "Gauge representing the different L2 reference sequence numbers", + }, []string{ + "type", + }), + RefsLatency: factory.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: ns, + Name: "refs_latency", + Help: "Gauge representing the different L1/L2 reference block timestamps minus current time, in seconds", + }, []string{ + "layer", + "type", + }), + LatencySeen: make(map[string]common.Hash), + } +} + +func (m *RefMetrics) RecordRef(layer string, name string, num uint64, timestamp uint64, h common.Hash) { + m.RefsNumber.WithLabelValues(layer, name).Set(float64(num)) + if timestamp != 0 { + m.RefsTime.WithLabelValues(layer, name).Set(float64(timestamp)) + // only meter the latency when we first see this hash for the given label name + if m.LatencySeen[name] != h { + m.LatencySeen[name] = h + m.RefsLatency.WithLabelValues(layer, name).Set(float64(timestamp) - (float64(time.Now().UnixNano()) / 1e9)) + } + } + // we map the first 8 bytes to a float64, so we can graph changes of the hash to find divergences visually. + // We don't do math.Float64frombits, just a regular conversion, to keep the value within a manageable range. + m.RefsHash.WithLabelValues(layer, name).Set(float64(binary.LittleEndian.Uint64(h[:]))) +} + +func (m *RefMetrics) RecordL1Ref(name string, ref eth.L1BlockRef) { + m.RecordRef("l1", name, ref.Number, ref.Time, ref.Hash) +} + +func (m *RefMetrics) RecordL2Ref(name string, ref eth.L2BlockRef) { + m.RecordRef("l2", name, ref.Number, ref.Time, ref.Hash) + m.RecordRef("l1_origin", name, ref.L1Origin.Number, 0, ref.L1Origin.Hash) + m.RefsSeqNr.WithLabelValues(name).Set(float64(ref.SequenceNumber)) +} + +// NoopRefMetrics can be embedded in a noop version of a metric implementation +// to have a noop RefMetricer. +type NoopRefMetrics struct{} + +func (*NoopRefMetrics) RecordRef(string, string, uint64, uint64, common.Hash) {} +func (*NoopRefMetrics) RecordL1Ref(string, eth.L1BlockRef) {} +func (*NoopRefMetrics) RecordL2Ref(string, eth.L2BlockRef) {}