Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
adec5b4
Batch-derivation changes
protolambda Jun 8, 2022
52fdff7
drop unused db from batcher
protolambda Jun 28, 2022
78407ee
op-node: ChannelOut doc comments
protolambda Jun 28, 2022
8664826
ops-bedrock: fix channel timeout format and value
protolambda Jun 28, 2022
8936551
op-node: channel ID terminal string
protolambda Jun 28, 2022
f48ad1d
ops-bedrock: set channel timeout larger than L1 block time, increase …
protolambda Jun 28, 2022
db85069
fix devnet genesis tool
protolambda Jun 28, 2022
74357a8
sanity check channel timeout is not 0 in rollup config check
protolambda Jun 28, 2022
cc4af3b
op-e2e: get rid of unused temp batcher db file
protolambda Jun 28, 2022
ec2b8a3
relax p2p propagation time, it was too close to L1 block time
protolambda Jun 28, 2022
26179b1
fix FillMissingBatches edge case, and correct spec off by 1
protolambda Jun 29, 2022
dc42213
op-node: wrap derivation error with stage index for debugging
protolambda Jun 29, 2022
0787820
op-e2e: Use correct base fee in fees test (#2888)
mslipper Jun 29, 2022
a9028f1
default idleDerivation to false, to not sequence new blocks before sy…
protolambda Jun 29, 2022
df4d4ee
genesis: Reduce number of predeploys (#2889)
mslipper Jun 29, 2022
2b91cc6
batch Epoch() method, improve logging of batch filter
protolambda Jun 29, 2022
a704a09
make engine API logging less verbose
protolambda Jun 29, 2022
d005e41
uncomment snapshot logging, defer json encoding
protolambda Jun 29, 2022
b16513a
fix state-viz
protolambda Jun 29, 2022
0ea9e4c
fix Progress update
protolambda Jun 29, 2022
5d68360
snapshot log
protolambda Jun 29, 2022
cb8ecb0
make: Add devnet-logs command (#2903)
maurelian Jun 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ devnet-clean:
docker volume ls --filter name=ops-bedrock --format='{{.Name}}' | xargs -r docker volume rm
.PHONY: devnet-clean

devnet-logs:
@(cd ./ops-bedrock && docker-compose logs -f)
.PHONY: devnet-logs

test-unit:
make -C ./op-node test
make -C ./op-proposer test
Expand Down
286 changes: 226 additions & 60 deletions op-batcher/batch_submitter.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
package op_batcher

import (
"bytes"
"context"
"fmt"
"io"
"math/big"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/ethereum-optimism/optimism/op-batcher/db"
"github.com/ethereum-optimism/optimism/op-batcher/sequencer"
proposer "github.com/ethereum-optimism/optimism/op-proposer"
"github.com/ethereum-optimism/optimism/op-proposer/rollupclient"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-proposer/txmgr"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
hdwallet "github.com/miguelmota/go-ethereum-hdwallet"
"github.com/urfave/cli"
)
Expand Down Expand Up @@ -54,7 +59,7 @@ func Main(version string) func(ctx *cli.Context) error {

l.Info("Initializing Batch Submitter")

batchSubmitter, err := NewBatchSubmitter(cfg, version, l)
batchSubmitter, err := NewBatchSubmitter(cfg, l)
if err != nil {
l.Error("Unable to create Batch Submitter", "error", err)
return err
Expand Down Expand Up @@ -86,18 +91,23 @@ func Main(version string) func(ctx *cli.Context) error {
// BatchSubmitter encapsulates a service responsible for submitting L2 tx
// batches to L1 for availability.
type BatchSubmitter struct {
ctx context.Context
sequencerService *proposer.Service
txMgr txmgr.TxManager
cfg sequencer.Config
wg sync.WaitGroup
done chan struct{}
log log.Logger

ctx context.Context
cancel context.CancelFunc

l2HeadNumber uint64

ch *derive.ChannelOut
}

// NewBatchSubmitter initializes the BatchSubmitter, gathering any resources
// that will be needed during operation.
func NewBatchSubmitter(
cfg Config,
gitVersion string,
l log.Logger,
) (*BatchSubmitter, error) {

func NewBatchSubmitter(cfg Config, l log.Logger) (*BatchSubmitter, error) {
ctx := context.Background()

// Parse wallet private key that will be used to submit L2 txs to the batch
Expand All @@ -121,8 +131,6 @@ func NewBatchSubmitter(
return nil, err
}

genesisHash := common.HexToHash(cfg.SequencerGenesisHash)

// Connect to L1 and L2 providers. Perform these last since they are the
// most expensive.
l1Client, err := dialEthClientWithTimeout(ctx, cfg.L1EthRpc)
Expand All @@ -135,18 +143,6 @@ func NewBatchSubmitter(
return nil, err
}

rollupClient, err := dialRollupClientWithTimeout(ctx, cfg.RollupRpc)
if err != nil {
return nil, err
}

historyDB, err := db.OpenJSONFileDatabase(
cfg.SequencerHistoryDBFilename, 600, genesisHash,
)
if err != nil {
return nil, err
}

chainID, err := l1Client.ChainID(ctx)
if err != nil {
return nil, err
Expand All @@ -161,44 +157,229 @@ func NewBatchSubmitter(
SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount,
}

sequencerDriver, err := sequencer.NewDriver(sequencer.Config{
batcherCfg := sequencer.Config{
Log: l,
Name: "Batch Submitter",
L1Client: l1Client,
L2Client: l2Client,
RollupClient: rollupClient,
MinL1TxSize: cfg.MinL1TxSize,
MaxL1TxSize: cfg.MaxL1TxSize,
BatchInboxAddress: batchInboxAddress,
HistoryDB: historyDB,
ChannelTimeout: cfg.ChannelTimeout,
ChainID: chainID,
PrivKey: sequencerPrivKey,
})
if err != nil {
return nil, err
PollInterval: cfg.PollInterval,
}

sequencerService := proposer.NewService(proposer.ServiceConfig{
Log: l,
Context: ctx,
Driver: sequencerDriver,
PollInterval: cfg.PollInterval,
L1Client: l1Client,
TxManagerConfig: txManagerConfig,
})
ctx, cancel := context.WithCancel(context.Background())

return &BatchSubmitter{
ctx: ctx,
sequencerService: sequencerService,
cfg: batcherCfg,
txMgr: txmgr.NewSimpleTxManager("batcher", txManagerConfig, l1Client),
done: make(chan struct{}),
log: l,
// TODO: this context only exists because the even loop doesn't reach done
// if the tx manager is blocking forever due to e.g. insufficient balance.
ctx: ctx,
cancel: cancel,
}, nil
}

func (l *BatchSubmitter) Start() error {
return l.sequencerService.Start()
l.wg.Add(1)
go l.loop()
return nil
}

func (l *BatchSubmitter) Stop() {
_ = l.sequencerService.Stop()
l.cancel()
close(l.done)
l.wg.Wait()
}

func (l *BatchSubmitter) loop() {
defer l.wg.Done()

ticker := time.NewTicker(l.cfg.PollInterval)
defer ticker.Stop()
mainLoop:
for {
select {
case <-ticker.C:
// Do the simplest thing of one channel per range of blocks since the iteration of this loop.
// The channel is closed at the end of this loop (to avoid lifecycle management of the channel).
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10)
head, err := l.cfg.L2Client.BlockByNumber(ctx, nil)
cancel()
if err != nil {
l.log.Error("issue fetching L2 head", "err", err)
continue
}
l.log.Info("Got new L2 Block", "block", head.Number())
if head.NumberU64() <= l.l2HeadNumber {
// Didn't advance
l.log.Trace("Old block")
continue
}
if ch, err := derive.NewChannelOut(uint64(time.Now().Unix())); err != nil {
l.log.Error("Error creating channel", "err", err)
continue
} else {
l.ch = ch
}
for i := l.l2HeadNumber + 1; i <= head.NumberU64(); i++ {
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10)
block, err := l.cfg.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(i))
cancel()
if err != nil {
l.log.Error("issue fetching L2 block", "err", err)
continue mainLoop
}
if err := l.ch.AddBlock(block); err != nil {
l.log.Error("issue adding L2 Block to the channel", "err", err, "channel_id", l.ch.ID())
continue mainLoop
}
l.log.Info("added L2 block to channel", "block", eth.BlockID{Hash: block.Hash(), Number: block.NumberU64()}, "channel_id", l.ch.ID(), "tx_count", len(block.Transactions()), "time", block.Time())
}
// TODO: above there are ugly "continue mainLoop" because we shouldn't progress if we're missing blocks, since the submitter logic can't handle gaps yet.
l.l2HeadNumber = head.NumberU64()

if err := l.ch.Close(); err != nil {
l.log.Error("issue getting adding L2 Block", "err", err)
continue
}
// Hand role do-while loop to fully pull all frames out of the channel
for {
// Collect the output frame
data := new(bytes.Buffer)
data.WriteByte(derive.DerivationVersion0)
done := false
if err := l.ch.OutputFrame(data, l.cfg.MaxL1TxSize); err == io.EOF {
done = true
} else if err != nil {
l.log.Error("error outputting frame", "err", err)
continue mainLoop
}

// Query for the submitter's current nonce.
walletAddr := crypto.PubkeyToAddress(l.cfg.PrivKey.PublicKey)
ctx, cancel = context.WithTimeout(l.ctx, time.Second*10)
nonce, err := l.cfg.L1Client.NonceAt(ctx, walletAddr, nil)
cancel()
if err != nil {
l.log.Error("unable to get current nonce", "err", err)
continue mainLoop
}

// Create the transaction
ctx, cancel = context.WithTimeout(l.ctx, time.Second*10)
tx, err := l.CraftTx(ctx, data.Bytes(), nonce)
cancel()
if err != nil {
l.log.Error("unable to craft tx", "err", err)
continue mainLoop
}

// Construct the a closure that will update the txn with the current gas prices.
updateGasPrice := func(ctx context.Context) (*types.Transaction, error) {
l.log.Debug("updating batch tx gas price")
return l.UpdateGasPrice(ctx, tx)
}

// Wait until one of our submitted transactions confirms. If no
// receipt is received it's likely our gas price was too low.
// TODO: does the tx manager nicely replace the tx?
// (submit a new one, that's within the channel timeout, but higher fee than previously submitted tx? Or use a cheap cancel tx?)
ctx, cancel = context.WithTimeout(l.ctx, time.Second*time.Duration(l.cfg.ChannelTimeout))
receipt, err := l.txMgr.Send(ctx, updateGasPrice, l.cfg.L1Client.SendTransaction)
cancel()
if err != nil {
l.log.Error("unable to publish tx", "err", err)
continue mainLoop
}

// The transaction was successfully submitted.
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "channel_id", l.ch.ID())

// If `ch.OutputFrame` returned io.EOF we don't need to submit any more frames for this channel.
if done {
break // local do-while loop
}
}

case <-l.done:
return
}
}
}

// NOTE: This method SHOULD NOT publish the resulting transaction.
func (l *BatchSubmitter) CraftTx(ctx context.Context, data []byte, nonce uint64) (*types.Transaction, error) {
gasTipCap, err := l.cfg.L1Client.SuggestGasTipCap(ctx)
if err != nil {
return nil, err
}

head, err := l.cfg.L1Client.HeaderByNumber(ctx, nil)
if err != nil {
return nil, err
}

gasFeeCap := txmgr.CalcGasFeeCap(head.BaseFee, gasTipCap)

rawTx := &types.DynamicFeeTx{
ChainID: l.cfg.ChainID,
Nonce: nonce,
To: &l.cfg.BatchInboxAddress,
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Data: data,
}
l.log.Debug("creating tx", "to", rawTx.To, "from", crypto.PubkeyToAddress(l.cfg.PrivKey.PublicKey))

gas, err := core.IntrinsicGas(rawTx.Data, nil, false, true, true)
if err != nil {
return nil, err
}
rawTx.Gas = gas

return types.SignNewTx(l.cfg.PrivKey, types.LatestSignerForChainID(l.cfg.ChainID), rawTx)
}

// UpdateGasPrice signs an otherwise identical txn to the one provided but with
// updated gas prices sampled from the existing network conditions.
//
// NOTE: Thie method SHOULD NOT publish the resulting transaction.
func (l *BatchSubmitter) UpdateGasPrice(ctx context.Context, tx *types.Transaction) (*types.Transaction, error) {
gasTipCap, err := l.cfg.L1Client.SuggestGasTipCap(ctx)
if err != nil {
return nil, err
}

head, err := l.cfg.L1Client.HeaderByNumber(ctx, nil)
if err != nil {
return nil, err
}

gasFeeCap := txmgr.CalcGasFeeCap(head.BaseFee, gasTipCap)

rawTx := &types.DynamicFeeTx{
ChainID: l.cfg.ChainID,
Nonce: tx.Nonce(),
To: tx.To(),
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Gas: tx.Gas(),
Data: tx.Data(),
}

return types.SignNewTx(l.cfg.PrivKey, types.LatestSignerForChainID(l.cfg.ChainID), rawTx)
}

// SendTransaction injects a signed transaction into the pending pool for
// execution.
func (l *BatchSubmitter) SendTransaction(ctx context.Context, tx *types.Transaction) error {
return l.cfg.L1Client.SendTransaction(ctx, tx)
}

// dialEthClientWithTimeout attempts to dial the L1 provider using the provided
Expand All @@ -213,21 +394,6 @@ func dialEthClientWithTimeout(ctx context.Context, url string) (
return ethclient.DialContext(ctxt, url)
}

// dialRollupClientWithTimeout attempts to dial the RPC provider using the provided
// URL. If the dial doesn't complete within defaultDialTimeout seconds, this
// method will return an error.
func dialRollupClientWithTimeout(ctx context.Context, url string) (*rollupclient.RollupClient, error) {
ctxt, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()

client, err := rpc.DialContext(ctxt, url)
if err != nil {
return nil, err
}

return rollupclient.NewRollupClient(client), nil
}

// parseAddress parses an ETH address from a hex string. This method will fail if
// the address is not a valid hexadecimal address.
func parseAddress(address string) (common.Address, error) {
Expand Down
Loading