Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ jobs:
image: registry:2
ports:
- 5000:5000
strategy:
matrix:
batch-submitter: [ts-batch-submitter, go-batch-submitter]
env:
DOCKER_BUILDKIT: 1
COMPOSE_DOCKER_CLI_BUILD: 1
Expand All @@ -41,7 +44,7 @@ jobs:
working-directory: ./ops
run: |
./scripts/stats.sh &
docker-compose up -d
docker-compose -f docker-compose.yml -f docker-compose.${{ matrix.batch-submitter }}.yml up -d

- name: Wait for the Sequencer node
working-directory: ./ops
Expand Down
156 changes: 141 additions & 15 deletions go/batch-submitter/batch_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@ import (
"context"
"crypto/ecdsa"
"fmt"
"math/big"
"net/http"
"os"
"strconv"
"time"

"github.com/ethereum-optimism/optimism/go/batch-submitter/drivers/proposer"
"github.com/ethereum-optimism/optimism/go/batch-submitter/drivers/sequencer"
"github.com/ethereum-optimism/optimism/go/batch-submitter/txmgr"
l2ethclient "github.com/ethereum-optimism/optimism/l2geth/ethclient"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
Expand Down Expand Up @@ -41,12 +46,25 @@ func Main(gitVersion string) func(ctx *cli.Context) error {
defer sentry.Flush(2 * time.Second)
}

_, err = NewBatchSubmitter(cfg, gitVersion)
log.Info("Initializing batch submitter")

batchSubmitter, err := NewBatchSubmitter(cfg, gitVersion)
if err != nil {
log.Error("Unable to create batch submitter", "error", err)
return err
}

log.Info("Starting batch submitter")

if err := batchSubmitter.Start(); err != nil {
return err
}
defer batchSubmitter.Stop()

log.Info("Batch submitter started")

<-(chan struct{})(nil)

return nil
}
}
Expand All @@ -57,11 +75,14 @@ type BatchSubmitter struct {
ctx context.Context
cfg Config
l1Client *ethclient.Client
l2Client *ethclient.Client
l2Client *l2ethclient.Client
sequencerPrivKey *ecdsa.PrivateKey
proposerPrivKey *ecdsa.PrivateKey
ctcAddress common.Address
sccAddress common.Address

batchTxService *Service
batchStateService *Service
}

// NewBatchSubmitter initializes the BatchSubmitter, gathering any resources
Expand Down Expand Up @@ -118,14 +139,14 @@ func NewBatchSubmitter(cfg Config, gitVersion string) (*BatchSubmitter, error) {
return nil, err
}

// Connect to L1 and L2 providers. Perform these lastsince they are the
// Connect to L1 and L2 providers. Perform these last since they are the
// most expensive.
l1Client, err := dialEthClientWithTimeout(ctx, cfg.L1EthRpc)
l1Client, err := dialL1EthClientWithTimeout(ctx, cfg.L1EthRpc)
if err != nil {
return nil, err
}

l2Client, err := dialEthClientWithTimeout(ctx, cfg.L2EthRpc)
l2Client, err := dialL2EthClientWithTimeout(ctx, cfg.L2EthRpc)
if err != nil {
return nil, err
}
Expand All @@ -134,18 +155,107 @@ func NewBatchSubmitter(cfg Config, gitVersion string) (*BatchSubmitter, error) {
go runMetricsServer(cfg.MetricsHostname, cfg.MetricsPort)
}

chainID, err := l1Client.ChainID(ctx)
if err != nil {
return nil, err
}

txManagerConfig := txmgr.Config{
MinGasPrice: gasPriceFromGwei(1),
MaxGasPrice: gasPriceFromGwei(cfg.MaxGasPriceInGwei),
GasRetryIncrement: gasPriceFromGwei(cfg.GasRetryIncrement),
ResubmissionTimeout: cfg.ResubmissionTimeout,
ReceiptQueryInterval: time.Second,
}

var batchTxService *Service
if cfg.RunTxBatchSubmitter {
batchTxDriver, err := sequencer.NewDriver(sequencer.Config{
Name: "SEQUENCER",
L1Client: l1Client,
L2Client: l2Client,
BlockOffset: cfg.BlockOffset,
MaxTxSize: cfg.MaxL1TxSize,
CTCAddr: ctcAddress,
ChainID: chainID,
PrivKey: sequencerPrivKey,
})
if err != nil {
return nil, err
}

batchTxService = NewService(ServiceConfig{
Context: ctx,
Driver: batchTxDriver,
PollInterval: cfg.PollInterval,
L1Client: l1Client,
TxManagerConfig: txManagerConfig,
})
}

var batchStateService *Service
if cfg.RunStateBatchSubmitter {
batchStateDriver, err := proposer.NewDriver(proposer.Config{
Name: "PROPOSER",
L1Client: l1Client,
L2Client: l2Client,
BlockOffset: cfg.BlockOffset,
MaxTxSize: cfg.MaxL1TxSize,
SCCAddr: sccAddress,
CTCAddr: ctcAddress,
ChainID: chainID,
PrivKey: proposerPrivKey,
})
if err != nil {
return nil, err
}

batchStateService = NewService(ServiceConfig{
Context: ctx,
Driver: batchStateDriver,
PollInterval: cfg.PollInterval,
L1Client: l1Client,
TxManagerConfig: txManagerConfig,
})
}

return &BatchSubmitter{
ctx: ctx,
cfg: cfg,
l1Client: l1Client,
l2Client: l2Client,
sequencerPrivKey: sequencerPrivKey,
proposerPrivKey: proposerPrivKey,
ctcAddress: ctcAddress,
sccAddress: sccAddress,
ctx: ctx,
cfg: cfg,
l1Client: l1Client,
l2Client: l2Client,
sequencerPrivKey: sequencerPrivKey,
proposerPrivKey: proposerPrivKey,
ctcAddress: ctcAddress,
sccAddress: sccAddress,
batchTxService: batchTxService,
batchStateService: batchStateService,
}, nil
}

func (b *BatchSubmitter) Start() error {
if b.cfg.RunTxBatchSubmitter {
if err := b.batchTxService.Start(); err != nil {
return err
}
}
if b.cfg.RunStateBatchSubmitter {
if err := b.batchStateService.Start(); err != nil {
return err
}
}
return nil
}

func (b *BatchSubmitter) Stop() {
if b.cfg.RunTxBatchSubmitter {
_ = b.batchTxService.Stop()
}
if b.cfg.RunStateBatchSubmitter {
_ = b.batchStateService.Stop()
}
}

// parseWalletPrivKeyAndContractAddr returns the wallet private key to use for
// sending transactions as well as the contract address to send to for a
// particular sub-service.
Expand Down Expand Up @@ -191,10 +301,10 @@ func runMetricsServer(hostname string, port uint64) {
_ = http.ListenAndServe(metricsAddr, nil)
}

// dialEthClientWithTimeout attempts to dial the L1 or L2 provider using the
// dialL1EthClientWithTimeout attempts to dial the L1 provider using the
// provided URL. If the dial doesn't complete within defaultDialTimeout seconds,
// this method will return an error.
func dialEthClientWithTimeout(ctx context.Context, url string) (
func dialL1EthClientWithTimeout(ctx context.Context, url string) (
*ethclient.Client, error) {

ctxt, cancel := context.WithTimeout(ctx, defaultDialTimeout)
Expand All @@ -203,6 +313,18 @@ func dialEthClientWithTimeout(ctx context.Context, url string) (
return ethclient.DialContext(ctxt, url)
}

// dialL2EthClientWithTimeout attempts to dial the L2 provider using the
// provided URL. If the dial doesn't complete within defaultDialTimeout seconds,
// this method will return an error.
func dialL2EthClientWithTimeout(ctx context.Context, url string) (
*l2ethclient.Client, error) {

ctxt, cancel := context.WithTimeout(ctx, defaultDialTimeout)
defer cancel()

return l2ethclient.DialContext(ctxt, url)
}

// traceRateToFloat64 converts a time.Duration into a valid float64 for the
// Sentry client. The client only accepts values between 0.0 and 1.0, so this
// method clamps anything greater than 1 second to 1.0.
Expand All @@ -213,3 +335,7 @@ func traceRateToFloat64(rate time.Duration) float64 {
}
return rate64
}

func gasPriceFromGwei(gasPriceInGwei uint64) *big.Int {
return new(big.Int).SetUint64(gasPriceInGwei * 1e9)
}
4 changes: 4 additions & 0 deletions go/batch-submitter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ func NewConfig(ctx *cli.Context) (Config, error) {
// ensure that it is well-formed.
func ValidateConfig(cfg *Config) error {
// Sanity check log level.
if cfg.LogLevel == "" {
cfg.LogLevel = "debug"
}

_, err := log.LvlFromString(cfg.LogLevel)
if err != nil {
return err
Expand Down
Loading