Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions op-batcher/batcher/batch_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/urfave/cli"

"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-batcher/rpc"
oplog "github.com/ethereum-optimism/optimism/op-service/log"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
oppprof "github.com/ethereum-optimism/optimism/op-service/pprof"
oprpc "github.com/ethereum-optimism/optimism/op-service/rpc"
)
Expand All @@ -36,9 +36,10 @@ func Main(version string, cliCtx *cli.Context) error {
}

l := oplog.NewLogger(cfg.LogConfig)
m := metrics.NewMetrics("default")
l.Info("Initializing Batch Submitter")

batchSubmitter, err := NewBatchSubmitterFromCLIConfig(cfg, l)
batchSubmitter, err := NewBatchSubmitterFromCLIConfig(cfg, l, m)
if err != nil {
l.Error("Unable to create Batch Submitter", "error", err)
return err
Expand All @@ -64,16 +65,15 @@ func Main(version string, cliCtx *cli.Context) error {
}()
}

registry := opmetrics.NewRegistry()
metricsCfg := cfg.MetricsConfig
if metricsCfg.Enabled {
l.Info("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
go func() {
if err := opmetrics.ListenAndServe(ctx, registry, metricsCfg.ListenAddr, metricsCfg.ListenPort); err != nil {
if err := m.Serve(ctx, metricsCfg.ListenAddr, metricsCfg.ListenPort); err != nil {
l.Error("error starting metrics server", err)
}
}()
opmetrics.LaunchBalanceMetrics(ctx, l, registry, "", batchSubmitter.L1Client, batchSubmitter.From)
m.StartBalanceMetrics(ctx, l, batchSubmitter.L1Client, batchSubmitter.From)
}

rpcCfg := cfg.RPCConfig
Expand All @@ -95,6 +95,9 @@ func Main(version string, cliCtx *cli.Context) error {
return fmt.Errorf("error starting RPC server: %w", err)
}

m.RecordInfo(version)
m.RecordUp()

interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, []os.Signal{
os.Interrupt,
Expand Down
34 changes: 25 additions & 9 deletions op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ type channelBuilder struct {
blocks []*types.Block
// frames data queue, to be send as txs
frames []frameData
// total amount of output data of all frames created yet
outputBytes int
}

// newChannelBuilder creates a new channel builder or returns an error if the
Expand All @@ -156,11 +158,21 @@ func (c *channelBuilder) ID() derive.ChannelID {
return c.co.ID()
}

// InputBytes returns to total amount of input bytes added to the channel.
// InputBytes returns the total amount of input bytes added to the channel.
func (c *channelBuilder) InputBytes() int {
return c.co.InputBytes()
}

// ReadyBytes returns the amount of bytes ready in the compression pipeline to
// output into a frame.
func (c *channelBuilder) ReadyBytes() int {
return c.co.ReadyBytes()
}

func (c *channelBuilder) OutputBytes() int {
return c.outputBytes
}

// Blocks returns a backup list of all blocks that were added to the channel. It
// can be used in case the channel needs to be rebuilt.
func (c *channelBuilder) Blocks() []*types.Block {
Expand All @@ -184,22 +196,25 @@ func (c *channelBuilder) Reset() error {
// AddBlock returns a ChannelFullError if called even though the channel is
// already full. See description of FullErr for details.
//
// AddBlock also returns the L1BlockInfo that got extracted from the block's
// first transaction for subsequent use by the caller.
//
// Call OutputFrames() afterwards to create frames.
func (c *channelBuilder) AddBlock(block *types.Block) error {
func (c *channelBuilder) AddBlock(block *types.Block) (derive.L1BlockInfo, error) {
if c.IsFull() {
return c.FullErr()
return derive.L1BlockInfo{}, c.FullErr()
}

batch, err := derive.BlockToBatch(block)
batch, l1info, err := derive.BlockToBatch(block)
if err != nil {
return fmt.Errorf("converting block to batch: %w", err)
return l1info, fmt.Errorf("converting block to batch: %w", err)
}

if _, err = c.co.AddBatch(batch); errors.Is(err, derive.ErrTooManyRLPBytes) {
c.setFullErr(err)
return c.FullErr()
return l1info, c.FullErr()
} else if err != nil {
return fmt.Errorf("adding block to channel out: %w", err)
return l1info, fmt.Errorf("adding block to channel out: %w", err)
}
c.blocks = append(c.blocks, block)
c.updateSwTimeout(batch)
Expand All @@ -209,7 +224,7 @@ func (c *channelBuilder) AddBlock(block *types.Block) error {
// Adding this block still worked, so don't return error, just mark as full
}

return nil
return l1info, nil
}

// Timeout management
Expand Down Expand Up @@ -381,10 +396,11 @@ func (c *channelBuilder) outputFrame() error {
}

frame := frameData{
id: txID{chID: c.co.ID(), frameNumber: fn},
id: frameID{chID: c.co.ID(), frameNumber: fn},
data: buf.Bytes(),
}
c.frames = append(c.frames, frame)
c.outputBytes += len(frame.data)
return err // possibly io.EOF (last frame)
}

Expand Down
Loading