Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
45212a0
feat: bedrock inwards/outwards batch deriv draft v2
protolambda Jun 8, 2022
5153065
skip timed out frames, keep reading tx data
protolambda Jun 9, 2022
ef748c5
ignore frame if it already exists, do not stop reading tx
protolambda Jun 9, 2022
4baf070
update pruning
protolambda Jun 9, 2022
77ffb8a
fix channel closing
protolambda Jun 9, 2022
771cf93
fix reorg recover func
protolambda Jun 9, 2022
3b2ca60
misc reorg func fixes
protolambda Jun 9, 2022
e594387
channels for multiplexing submissions, inclusion for ordering of proc…
protolambda Jun 9, 2022
2a45654
ignore timed out frames
protolambda Jun 9, 2022
a7dd8fd
fix maxBlocksPerChannel name
protolambda Jun 9, 2022
1bcdc37
fix var name, stop producing output data if no blocks are left
protolambda Jun 9, 2022
5891e45
implement channel out reader, start testing, renaming, structure etc.
protolambda Jun 10, 2022
b54cb9e
rename pipeline to channel-in-reader, fix old l2 package imports
protolambda Jun 12, 2022
2d77b85
close compression stream
protolambda Jun 12, 2022
e1f4cb7
improve channel out reader
protolambda Jun 12, 2022
4346c88
add de compression and rlp reading to channel-in-reader
protolambda Jun 12, 2022
98bfd46
channel in reader: l1 origin update
protolambda Jun 12, 2022
8780d84
channel in reader updates
protolambda Jun 12, 2022
98fefa9
move new deriv code into derive package
protolambda Jun 13, 2022
6ea1478
work in progress integration of batch derivation changes
protolambda Jun 13, 2022
cc5f18f
work in progress, l2 derivation stepper
protolambda Jun 13, 2022
581c06e
fix rlp dependency
protolambda Jun 14, 2022
4a64110
channel in reader is broken, left todo
protolambda Jun 14, 2022
cf15a0f
update work in progress derivation pipeline with todo spec per function
protolambda Jun 14, 2022
9c694f0
engine queue todo
protolambda Jun 14, 2022
a2994e8
work in progress integration with driver
protolambda Jun 14, 2022
4d089ef
fix channel in reader init
protolambda Jun 14, 2022
a02f0eb
driver event loop and derive loop separation
protolambda Jun 14, 2022
e9c67b0
(WIP) derive: Implement BatchQueue with full window
trianglesphere Jun 13, 2022
0abce6e
derive: Fully derive payload attributes
trianglesphere Jun 13, 2022
713c42b
Remove batch bundle, split of reading of data from txs
protolambda Jun 14, 2022
6f1cd8e
move engine update/consolidation into derive package
protolambda Jun 14, 2022
0d4fddc
tag channel bank with l1 origin as whole, read frame data may not rev…
protolambda Jun 14, 2022
2b20d66
read full channel, forward L1 origin changes in channel-in-reader, do…
protolambda Jun 14, 2022
88ac1e4
engine queue
protolambda Jun 15, 2022
8748e84
engine queue work
protolambda Jun 15, 2022
0715de7
driver updates
protolambda Jun 15, 2022
312956c
carry data between pipeline stages
protolambda Jun 15, 2022
0c014ec
log sync progress
protolambda Jun 15, 2022
efb23f3
wip init pipeline
protolambda Jun 15, 2022
b03e48d
fetch l1 data as part of derivation pipeline
protolambda Jun 15, 2022
a8fa41d
init fix
protolambda Jun 15, 2022
76d4cf9
work in progress channel bank reset change
protolambda Jun 15, 2022
cbf1c02
channel bank resetting as part of pipeline
protolambda Jun 15, 2022
ec3d760
define interfaces for stages, clean up l1 interface usage
protolambda Jun 15, 2022
6a43871
less trigger happy derivation pipeline resets, just reset when the pi…
protolambda Jun 15, 2022
800888f
test utils
protolambda Jun 16, 2022
a9e1533
update driver snapshot usage, move L1Info interface to eth package, m…
protolambda Jun 16, 2022
d8ab2a0
use channel emitter for api, fix build issues
protolambda Jun 16, 2022
f3feca5
update batch submitter (work in progress, needs more testing)
protolambda Jun 17, 2022
bfcc04d
engine queue fix (@trianglesphere)
protolambda Jun 17, 2022
529b362
find sync start reduce args, just get l2 head directly
protolambda Jun 17, 2022
a31c8d4
fix channel reader: don't attempt to read when there's no channel dat…
protolambda Jun 17, 2022
a595d2b
log batcher and proposer in e2e
protolambda Jun 17, 2022
701e255
channel emitter / channel out reader fixes
protolambda Jun 17, 2022
2cebe2b
fix channel emitter timeout
protolambda Jun 21, 2022
b011ba2
fix channel reading end
protolambda Jun 21, 2022
4bfde99
fix unexpected eof when buffer is not filled due to compressing layer…
protolambda Jun 21, 2022
bdcbd7f
add logging to batch filtering
protolambda Jun 21, 2022
3f2754f
fix batch inputs, don't derive attributes before reading all batches …
protolambda Jun 21, 2022
8d3ee68
all derivation pipeline stages now have the same Step and ResetStep i…
protolambda Jun 22, 2022
d52ba9e
misc open/close origin fixes and sync work
protolambda Jun 22, 2022
35dd285
fix test
protolambda Jun 22, 2022
e4fabb0
lint
protolambda Jun 22, 2022
0a3d0ae
improve testutils, fix l1 mock, implement calldata source tests
protolambda Jun 23, 2022
9541779
more mocking/testing utils, split l1 source/traversal, test first few…
protolambda Jun 23, 2022
949773d
improve mock test utils, don't use bignum in l2 api
protolambda Jun 23, 2022
5cd331b
test pipeline per stage
protolambda Jun 23, 2022
15b73b8
channel timeout config param, test channel bank
protolambda Jun 24, 2022
3df20e7
fix batcher channel timeout flag
protolambda Jun 24, 2022
8f24e09
new op-batcher
trianglesphere Jun 25, 2022
435968c
new batcher in the op-node
trianglesphere Jun 25, 2022
52b8d48
logging / disable parts of the op-node for testing
trianglesphere Jun 25, 2022
209ea88
fix off by one in batcher
trianglesphere Jun 25, 2022
a50a9fc
Close l1src stage
trianglesphere Jun 25, 2022
3877cd0
logging + hacks to make the sequencer work & verifier half work
trianglesphere Jun 25, 2022
f93db03
change open/close origin api, fix genesis seq nr bug, e2e passing
protolambda Jun 26, 2022
58688c8
fix progress/origin naming, avoid engine api linear unwind in consoli…
protolambda Jun 26, 2022
341df5f
remove old ChannelEmitter, remove ChannelOutReader in favor of Channe…
protolambda Jun 26, 2022
4d9680c
fix op-batcher flags / docker compose update
protolambda Jun 26, 2022
45a5417
clean up logging
protolambda Jun 27, 2022
d967e6b
lint
protolambda Jun 27, 2022
66f49cf
test valid -> if err == nil, not err != nil
protolambda Jun 27, 2022
cf2514d
L1Source -> L1Retrieval, fix receiver names
protolambda Jun 27, 2022
6eab5f6
wait for derivation to be idle before sequencing new block
protolambda Jun 27, 2022
dcb48f3
implement verifier and sequencer confirmation depth
protolambda Jun 27, 2022
b1fbac4
op-node: Add Epoch Hash to batch
trianglesphere Jun 27, 2022
8d1836a
fix missing epoch block hash
protolambda Jun 28, 2022
ac19c38
Merge branch 'jg/new_batch_format' into batch-deriv-v2
protolambda Jun 28, 2022
80ee8ac
batcher: Handle multiple frames per channel
trianglesphere Jun 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 227 additions & 54 deletions op-batcher/batch_submitter.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
package op_batcher

import (
"bytes"
"context"
"fmt"
"io"
"math/big"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/ethereum-optimism/optimism/op-batcher/db"
"github.com/ethereum-optimism/optimism/op-batcher/sequencer"
proposer "github.com/ethereum-optimism/optimism/op-proposer"
"github.com/ethereum-optimism/optimism/op-proposer/rollupclient"
"github.com/ethereum-optimism/optimism/op-node/eth"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-proposer/txmgr"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
hdwallet "github.com/miguelmota/go-ethereum-hdwallet"
"github.com/urfave/cli"
)
Expand Down Expand Up @@ -54,7 +60,7 @@ func Main(version string) func(ctx *cli.Context) error {

l.Info("Initializing Batch Submitter")

batchSubmitter, err := NewBatchSubmitter(cfg, version, l)
batchSubmitter, err := NewBatchSubmitter(cfg, l)
if err != nil {
l.Error("Unable to create Batch Submitter", "error", err)
return err
Expand Down Expand Up @@ -86,18 +92,23 @@ func Main(version string) func(ctx *cli.Context) error {
// BatchSubmitter encapsulates a service responsible for submitting L2 tx
// batches to L1 for availability.
type BatchSubmitter struct {
ctx context.Context
sequencerService *proposer.Service
txMgr txmgr.TxManager
cfg sequencer.Config
wg sync.WaitGroup
done chan struct{}
log log.Logger

ctx context.Context
cancel context.CancelFunc

l2HeadNumber uint64

ch *derive.ChannelOut
}

// NewBatchSubmitter initializes the BatchSubmitter, gathering any resources
// that will be needed during operation.
func NewBatchSubmitter(
cfg Config,
gitVersion string,
l log.Logger,
) (*BatchSubmitter, error) {

func NewBatchSubmitter(cfg Config, l log.Logger) (*BatchSubmitter, error) {
ctx := context.Background()

// Parse wallet private key that will be used to submit L2 txs to the batch
Expand All @@ -121,8 +132,6 @@ func NewBatchSubmitter(
return nil, err
}

genesisHash := common.HexToHash(cfg.SequencerGenesisHash)

// Connect to L1 and L2 providers. Perform these last since they are the
// most expensive.
l1Client, err := dialEthClientWithTimeout(ctx, cfg.L1EthRpc)
Expand All @@ -135,14 +144,7 @@ func NewBatchSubmitter(
return nil, err
}

rollupClient, err := dialRollupClientWithTimeout(ctx, cfg.RollupRpc)
if err != nil {
return nil, err
}

historyDB, err := db.OpenJSONFileDatabase(
cfg.SequencerHistoryDBFilename, 600, genesisHash,
)
historyDB, err := db.OpenJSONFileDatabase(cfg.SequencerHistoryDBFilename)
if err != nil {
return nil, err
}
Expand All @@ -161,44 +163,230 @@ func NewBatchSubmitter(
SafeAbortNonceTooLowCount: cfg.SafeAbortNonceTooLowCount,
}

sequencerDriver, err := sequencer.NewDriver(sequencer.Config{
batcherCfg := sequencer.Config{
Log: l,
Name: "Batch Submitter",
L1Client: l1Client,
L2Client: l2Client,
RollupClient: rollupClient,
MinL1TxSize: cfg.MinL1TxSize,
MaxL1TxSize: cfg.MaxL1TxSize,
BatchInboxAddress: batchInboxAddress,
HistoryDB: historyDB,
ChannelTimeout: cfg.ChannelTimeout,
ChainID: chainID,
PrivKey: sequencerPrivKey,
})
if err != nil {
return nil, err
PollInterval: cfg.PollInterval,
}

sequencerService := proposer.NewService(proposer.ServiceConfig{
Log: l,
Context: ctx,
Driver: sequencerDriver,
PollInterval: cfg.PollInterval,
L1Client: l1Client,
TxManagerConfig: txManagerConfig,
})
ctx, cancel := context.WithCancel(context.Background())

return &BatchSubmitter{
ctx: ctx,
sequencerService: sequencerService,
cfg: batcherCfg,
txMgr: txmgr.NewSimpleTxManager("batcher", txManagerConfig, l1Client),
done: make(chan struct{}),
log: l,
// TODO: this context only exists because the even loop doesn't reach done
// if the tx manager is blocking forever due to e.g. insufficient balance.
ctx: ctx,
cancel: cancel,
}, nil
}

func (l *BatchSubmitter) Start() error {
return l.sequencerService.Start()
l.wg.Add(1)
go l.loop()
return nil
}

func (l *BatchSubmitter) Stop() {
_ = l.sequencerService.Stop()
l.cancel()
close(l.done)
l.wg.Wait()
}

func (l *BatchSubmitter) loop() {
defer l.wg.Done()

ticker := time.NewTicker(l.cfg.PollInterval)
defer ticker.Stop()
mainLoop:
for {
select {
case <-ticker.C:
// Do the simplest thing of one channel per range of blocks since the iteration of this loop.
// The channel is closed at the end of this loop (to avoid lifecycle management of the channel).
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10)
head, err := l.cfg.L2Client.BlockByNumber(ctx, nil)
cancel()
if err != nil {
l.log.Error("issue fetching L2 head", "err", err)
continue
}
l.log.Info("Got new L2 Block", "block", head.Number())
if head.NumberU64() <= l.l2HeadNumber {
// Didn't advance
l.log.Trace("Old block")
continue
}
if ch, err := derive.NewChannelOut(uint64(time.Now().Unix())); err != nil {
l.log.Error("Error creating channel", "err", err)
continue
} else {
l.ch = ch
}
for i := l.l2HeadNumber + 1; i <= head.NumberU64(); i++ {
ctx, cancel := context.WithTimeout(l.ctx, time.Second*10)
block, err := l.cfg.L2Client.BlockByNumber(ctx, new(big.Int).SetUint64(i))
cancel()
if err != nil {
l.log.Error("issue fetching L2 block", "err", err)
continue mainLoop
}
if err := l.ch.AddBlock(block); err != nil {
l.log.Error("issue adding L2 Block to the channel", "err", err, "channel_id", l.ch.ID())
continue mainLoop
}
l.log.Info("added L2 block to channel", "block", eth.BlockID{Hash: block.Hash(), Number: block.NumberU64()}, "channel_id", l.ch.ID(), "tx_count", len(block.Transactions()), "time", block.Time())
}
// TODO: above there are ugly "continue mainLoop" because we shouldn't progress if we're missing blocks, since the submitter logic can't handle gaps yet.
l.l2HeadNumber = head.NumberU64()

if err := l.ch.Close(); err != nil {
l.log.Error("issue getting adding L2 Block", "err", err)
continue
}
// Hand role do-while loop to fully pull all frames out of the channel
for {
// Collect the output frame
data := new(bytes.Buffer)
data.WriteByte(derive.DerivationVersion0)
done := false
if err := l.ch.OutputFrame(data, l.cfg.MaxL1TxSize); err == io.EOF {
done = true
} else if err != nil {
l.log.Error("error outputting frame", "err", err)
continue mainLoop
}

// Query for the submitter's current nonce.
walletAddr := crypto.PubkeyToAddress(l.cfg.PrivKey.PublicKey)
ctx, cancel = context.WithTimeout(l.ctx, time.Second*10)
nonce, err := l.cfg.L1Client.NonceAt(ctx, walletAddr, nil)
cancel()
if err != nil {
l.log.Error("unable to get current nonce", "err", err)
continue mainLoop
}

// Create the transaction
ctx, cancel = context.WithTimeout(l.ctx, time.Second*10)
tx, err := l.CraftTx(ctx, data.Bytes(), nonce)
cancel()
if err != nil {
l.log.Error("unable to craft tx", "err", err)
continue mainLoop
}

// Construct the a closure that will update the txn with the current gas prices.
updateGasPrice := func(ctx context.Context) (*types.Transaction, error) {
l.log.Debug("updating batch tx gas price")
return l.UpdateGasPrice(ctx, tx)
}

// Wait until one of our submitted transactions confirms. If no
// receipt is received it's likely our gas price was too low.
// TODO: does the tx manager nicely replace the tx?
// (submit a new one, that's within the channel timeout, but higher fee than previously submitted tx? Or use a cheap cancel tx?)
ctx, cancel = context.WithTimeout(l.ctx, time.Second*time.Duration(l.cfg.ChannelTimeout))
receipt, err := l.txMgr.Send(ctx, updateGasPrice, l.cfg.L1Client.SendTransaction)
cancel()
if err != nil {
l.log.Error("unable to publish tx", "err", err)
continue mainLoop
}

// The transaction was successfully submitted.
l.log.Info("tx successfully published", "tx_hash", receipt.TxHash, "channel_id", l.ch.ID())

// If `ch.OutputFrame` returned io.EOF we don't need to submit any more frames for this channel.
if done {
break // local do-while loop
}
}

case <-l.done:
return
}
}
}

// NOTE: This method SHOULD NOT publish the resulting transaction.
func (l *BatchSubmitter) CraftTx(ctx context.Context, data []byte, nonce uint64) (*types.Transaction, error) {
gasTipCap, err := l.cfg.L1Client.SuggestGasTipCap(ctx)
if err != nil {
return nil, err
}

head, err := l.cfg.L1Client.HeaderByNumber(ctx, nil)
if err != nil {
return nil, err
}

gasFeeCap := txmgr.CalcGasFeeCap(head.BaseFee, gasTipCap)

rawTx := &types.DynamicFeeTx{
ChainID: l.cfg.ChainID,
Nonce: nonce,
To: &l.cfg.BatchInboxAddress,
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Data: data,
}
l.log.Debug("creating tx", "to", rawTx.To, "from", crypto.PubkeyToAddress(l.cfg.PrivKey.PublicKey))

gas, err := core.IntrinsicGas(rawTx.Data, nil, false, true, true)
if err != nil {
return nil, err
}
rawTx.Gas = gas

return types.SignNewTx(l.cfg.PrivKey, types.LatestSignerForChainID(l.cfg.ChainID), rawTx)
}

// UpdateGasPrice signs an otherwise identical txn to the one provided but with
// updated gas prices sampled from the existing network conditions.
//
// NOTE: Thie method SHOULD NOT publish the resulting transaction.
func (l *BatchSubmitter) UpdateGasPrice(ctx context.Context, tx *types.Transaction) (*types.Transaction, error) {
gasTipCap, err := l.cfg.L1Client.SuggestGasTipCap(ctx)
if err != nil {
return nil, err
}

head, err := l.cfg.L1Client.HeaderByNumber(ctx, nil)
if err != nil {
return nil, err
}

gasFeeCap := txmgr.CalcGasFeeCap(head.BaseFee, gasTipCap)

rawTx := &types.DynamicFeeTx{
ChainID: l.cfg.ChainID,
Nonce: tx.Nonce(),
To: tx.To(),
GasTipCap: gasTipCap,
GasFeeCap: gasFeeCap,
Gas: tx.Gas(),
Data: tx.Data(),
}

return types.SignNewTx(l.cfg.PrivKey, types.LatestSignerForChainID(l.cfg.ChainID), rawTx)
}

// SendTransaction injects a signed transaction into the pending pool for
// execution.
func (l *BatchSubmitter) SendTransaction(ctx context.Context, tx *types.Transaction) error {
return l.cfg.L1Client.SendTransaction(ctx, tx)
}

// dialEthClientWithTimeout attempts to dial the L1 provider using the provided
Expand All @@ -213,21 +401,6 @@ func dialEthClientWithTimeout(ctx context.Context, url string) (
return ethclient.DialContext(ctxt, url)
}

// dialRollupClientWithTimeout attempts to dial the RPC provider using the provided
// URL. If the dial doesn't complete within defaultDialTimeout seconds, this
// method will return an error.
func dialRollupClientWithTimeout(ctx context.Context, url string) (*rollupclient.RollupClient, error) {
ctxt, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()

client, err := rpc.DialContext(ctxt, url)
if err != nil {
return nil, err
}

return rollupclient.NewRollupClient(client), nil
}

// parseAddress parses an ETH address from a hex string. This method will fail if
// the address is not a valid hexadecimal address.
func parseAddress(address string) (common.Address, error) {
Expand Down
Loading