Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions .envrc

This file was deleted.

6 changes: 3 additions & 3 deletions .github/workflows/docker-build-scan.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
id: detect-files-changed
uses: step-security/changed-files@3dbe17c78367e7d60f00d78ae6781a35be47b4a1
with:
separator: ","
separator: ','

# Build op-node op-batcher op-proposer using docker-bake
build-op-stack:
Expand Down Expand Up @@ -55,8 +55,8 @@ jobs:
- name: Login at GCP Artifact Registry
uses: celo-org/reusable-workflows/.github/actions/auth-gcp-artifact-registry@v2.0
with:
workload-id-provider: "projects/1094498259535/locations/global/workloadIdentityPools/gh-optimism/providers/github-by-repos"
service-account: "celo-optimism-gh@devopsre.iam.gserviceaccount.com"
workload-id-provider: 'projects/1094498259535/locations/global/workloadIdentityPools/gh-optimism/providers/github-by-repos'
service-account: 'celo-optimism-gh@devopsre.iam.gserviceaccount.com'
docker-gcp-registries: us-west1-docker.pkg.dev
# We need a custom steps as it's using docker bake
- name: Set up Docker Buildx
Expand Down
File renamed without changes.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ packages/contracts-bedrock/deployments/anvil

.secrets
.env
.envrc
!espresso/.env
!.env.example
!.envrc.example
Expand Down
3 changes: 0 additions & 3 deletions op-alt-da/cmd/daserver/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ func StartDAServer(cliCtx *cli.Context) error {
return fmt.Errorf("failed to create S3 store: %w", err)
}
store = s3
} else if cfg.EspressoEnabled() {
l.Info("Using Espresso DA", "url", cfg.EspressoBaseUrl)
store = NewEspressoStore(cfg.EspressoBaseUrl, l)
}

server := altda.NewDAServer(cliCtx.String(ListenAddrFlagName), cliCtx.Int(PortFlagName), store, l, cfg.UseGenericComm)
Expand Down
36 changes: 6 additions & 30 deletions op-alt-da/cmd/daserver/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
const (
ListenAddrFlagName = "addr"
PortFlagName = "port"
EspressoBaseUrlFlagName = "espresso.url"
S3BucketFlagName = "s3.bucket"
S3EndpointFlagName = "s3.endpoint"
S3AccessKeyIDFlagName = "s3.access-key-id"
Expand Down Expand Up @@ -75,12 +74,6 @@ var (
Value: "",
EnvVars: prefixEnvVars("S3_ACCESS_KEY_SECRET"),
}
EspressoBaseUrlFlag = &cli.StringFlag{
Name: EspressoBaseUrlFlagName,
Usage: "espresso network base url",
Value: "",
EnvVars: prefixEnvVars("ESPRESSO_BASE_URL"),
}
)

var requiredFlags = []cli.Flag{
Expand All @@ -94,7 +87,6 @@ var optionalFlags = []cli.Flag{
S3EndpointFlag,
S3AccessKeyIDFlag,
S3AccessKeySecretFlag,
EspressoBaseUrlFlag,
GenericCommFlag,
}

Expand All @@ -112,7 +104,6 @@ type CLIConfig struct {
S3Endpoint string
S3AccessKeyID string
S3AccessKeySecret string
EspressoBaseUrl string
UseGenericComm bool
}

Expand All @@ -123,38 +114,23 @@ func ReadCLIConfig(ctx *cli.Context) CLIConfig {
S3Endpoint: ctx.String(S3EndpointFlagName),
S3AccessKeyID: ctx.String(S3AccessKeyIDFlagName),
S3AccessKeySecret: ctx.String(S3AccessKeySecretFlagName),
EspressoBaseUrl: ctx.String(EspressoBaseUrlFlagName),
UseGenericComm: ctx.Bool(GenericCommFlagName),
}
}

func (c CLIConfig) Check() error {
enabledCount := 0
if c.S3Enabled() {
enabledCount++
if c.S3Bucket == "" || c.S3Endpoint == "" || c.S3AccessKeyID == "" || c.S3AccessKeySecret == "" {
return errors.New("all S3 flags must be set")
}
}
if c.FileStoreEnabled() {
enabledCount++
}
if c.EspressoEnabled() {
enabledCount++
}
if enabledCount == 0 {
if !c.S3Enabled() && !c.FileStoreEnabled() {
return errors.New("at least one storage backend must be enabled")
}
if enabledCount > 1 {
return errors.New("only one storage backend must be enabled")
if c.S3Enabled() && c.FileStoreEnabled() {
return errors.New("only one storage backend can be enabled")
}
if c.S3Enabled() && (c.S3Bucket == "" || c.S3Endpoint == "" || c.S3AccessKeyID == "" || c.S3AccessKeySecret == "") {
return errors.New("all S3 flags must be set")
}
return nil
}

func (c CLIConfig) EspressoEnabled() bool {
return c.EspressoBaseUrl != ""
}

func (c CLIConfig) S3Enabled() bool {
return !(c.S3Bucket == "" && c.S3Endpoint == "" && c.S3AccessKeyID == "" && c.S3AccessKeySecret == "")
}
Expand Down
66 changes: 33 additions & 33 deletions op-batcher/batcher/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ type CLIConfig struct {
// and creating a new batch.
PollInterval time.Duration

EspressoPollInterval time.Duration

// MaxPendingTransactions is the maximum number of concurrent pending
// transactions sent to the transaction manager (0 == no limit).
MaxPendingTransactions uint64
Expand Down Expand Up @@ -126,6 +124,7 @@ type CLIConfig struct {
RPC oprpc.CLIConfig
AltDA altda.CLIConfig

EspressoPollInterval time.Duration
EspressoUrls []string
EspressoLightClientAddr string
TestingEspressoBatcherPrivateKey string
Expand Down Expand Up @@ -193,41 +192,42 @@ func (c *CLIConfig) Check() error {
func NewConfig(ctx *cli.Context) *CLIConfig {
return &CLIConfig{
/* Required Flags */
L1EthRpc: ctx.String(flags.L1EthRpcFlag.Name),
L2EthRpc: ctx.String(flags.L2EthRpcFlag.Name),
RollupRpc: ctx.String(flags.RollupRpcFlag.Name),
SubSafetyMargin: ctx.Uint64(flags.SubSafetyMarginFlag.Name),
PollInterval: ctx.Duration(flags.PollIntervalFlag.Name),
EspressoPollInterval: ctx.Duration(flags.EspressoPollIntervalFlag.Name),
L1EthRpc: ctx.String(flags.L1EthRpcFlag.Name),
L2EthRpc: ctx.String(flags.L2EthRpcFlag.Name),
RollupRpc: ctx.String(flags.RollupRpcFlag.Name),
SubSafetyMargin: ctx.Uint64(flags.SubSafetyMarginFlag.Name),
PollInterval: ctx.Duration(flags.PollIntervalFlag.Name),

/* Optional Flags */
MaxPendingTransactions: ctx.Uint64(flags.MaxPendingTransactionsFlag.Name),
MaxChannelDuration: ctx.Uint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.Uint64(flags.MaxL1TxSizeBytesFlag.Name),
MaxBlocksPerSpanBatch: ctx.Int(flags.MaxBlocksPerSpanBatch.Name),
TargetNumFrames: ctx.Int(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.Float64(flags.ApproxComprRatioFlag.Name),
Compressor: ctx.String(flags.CompressorFlag.Name),
CompressionAlgo: derive.CompressionAlgo(ctx.String(flags.CompressionAlgoFlag.Name)),
Stopped: ctx.Bool(flags.StoppedFlag.Name),
WaitNodeSync: ctx.Bool(flags.WaitNodeSyncFlag.Name),
CheckRecentTxsDepth: ctx.Int(flags.CheckRecentTxsDepthFlag.Name),
BatchType: ctx.Uint(flags.BatchTypeFlag.Name),
DataAvailabilityType: flags.DataAvailabilityType(ctx.String(flags.DataAvailabilityTypeFlag.Name)),
ActiveSequencerCheckDuration: ctx.Duration(flags.ActiveSequencerCheckDurationFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx),
AltDA: altda.ReadCLIConfig(ctx),
ThrottleThreshold: ctx.Uint64(flags.ThrottleThresholdFlag.Name),
ThrottleTxSize: ctx.Uint64(flags.ThrottleTxSizeFlag.Name),
ThrottleBlockSize: ctx.Uint64(flags.ThrottleBlockSizeFlag.Name),
ThrottleAlwaysBlockSize: ctx.Uint64(flags.ThrottleAlwaysBlockSizeFlag.Name),
PreferLocalSafeL2: ctx.Bool(flags.PreferLocalSafeL2Flag.Name),
MaxPendingTransactions: ctx.Uint64(flags.MaxPendingTransactionsFlag.Name),
MaxChannelDuration: ctx.Uint64(flags.MaxChannelDurationFlag.Name),
MaxL1TxSize: ctx.Uint64(flags.MaxL1TxSizeBytesFlag.Name),
MaxBlocksPerSpanBatch: ctx.Int(flags.MaxBlocksPerSpanBatch.Name),
TargetNumFrames: ctx.Int(flags.TargetNumFramesFlag.Name),
ApproxComprRatio: ctx.Float64(flags.ApproxComprRatioFlag.Name),
Compressor: ctx.String(flags.CompressorFlag.Name),
CompressionAlgo: derive.CompressionAlgo(ctx.String(flags.CompressionAlgoFlag.Name)),
Stopped: ctx.Bool(flags.StoppedFlag.Name),
WaitNodeSync: ctx.Bool(flags.WaitNodeSyncFlag.Name),
CheckRecentTxsDepth: ctx.Int(flags.CheckRecentTxsDepthFlag.Name),
BatchType: ctx.Uint(flags.BatchTypeFlag.Name),
DataAvailabilityType: flags.DataAvailabilityType(ctx.String(flags.DataAvailabilityTypeFlag.Name)),
ActiveSequencerCheckDuration: ctx.Duration(flags.ActiveSequencerCheckDurationFlag.Name),
TxMgrConfig: txmgr.ReadCLIConfig(ctx),
LogConfig: oplog.ReadCLIConfig(ctx),
MetricsConfig: opmetrics.ReadCLIConfig(ctx),
PprofConfig: oppprof.ReadCLIConfig(ctx),
RPC: oprpc.ReadCLIConfig(ctx),
AltDA: altda.ReadCLIConfig(ctx),
ThrottleThreshold: ctx.Uint64(flags.ThrottleThresholdFlag.Name),
ThrottleTxSize: ctx.Uint64(flags.ThrottleTxSizeFlag.Name),
ThrottleBlockSize: ctx.Uint64(flags.ThrottleBlockSizeFlag.Name),
ThrottleAlwaysBlockSize: ctx.Uint64(flags.ThrottleAlwaysBlockSizeFlag.Name),
PreferLocalSafeL2: ctx.Bool(flags.PreferLocalSafeL2Flag.Name),

EspressoUrls: ctx.StringSlice(flags.EspressoUrlsFlag.Name),
EspressoLightClientAddr: ctx.String(flags.EspressoLCAddrFlag.Name),
TestingEspressoBatcherPrivateKey: ctx.String(flags.TestingEspressoBatcherPrivateKeyFlag.Name),
EspressoPollInterval: ctx.Duration(flags.EspressoPollIntervalFlag.Name),
}
}
60 changes: 29 additions & 31 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
altda "github.com/ethereum-optimism/optimism/op-alt-da"
"github.com/ethereum-optimism/optimism/op-batcher/metrics"
"github.com/ethereum-optimism/optimism/op-node/rollup"
derive "github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
opcrypto "github.com/ethereum-optimism/optimism/op-service/crypto"
"github.com/ethereum-optimism/optimism/op-service/dial"
"github.com/ethereum-optimism/optimism/op-service/eth"
Expand Down Expand Up @@ -94,17 +94,18 @@ type AltDAClient interface {

// DriverSetup is the collection of input/output interfaces and configuration that the driver operates on.
type DriverSetup struct {
Log log.Logger
Metr metrics.Metricer
RollupConfig *rollup.Config
Config BatcherConfig
Txmgr txmgr.TxManager
L1Client L1Client
EndpointProvider dial.L2EndpointProvider
ChannelConfig ChannelConfigProvider
AltDA AltDAClient
ChannelOutFactory ChannelOutFactory
ActiveSeqChanged chan struct{} // optional
Log log.Logger
Metr metrics.Metricer
RollupConfig *rollup.Config
Config BatcherConfig
Txmgr txmgr.TxManager
L1Client L1Client
EndpointProvider dial.L2EndpointProvider
ChannelConfig ChannelConfigProvider
AltDA AltDAClient
ChannelOutFactory ChannelOutFactory
ActiveSeqChanged chan struct{} // optional

Espresso *espressoClient.MultipleNodesClient
EspressoLightClient *espressoLightClient.LightclientCaller
ChainSigner opcrypto.ChainSigner
Expand All @@ -124,20 +125,16 @@ type BatchSubmitter struct {
mutex sync.Mutex
running bool

submitter *espressoTransactionSubmitter
streamer espresso.EspressoStreamer[derive.EspressoBatch]
txpoolMutex sync.Mutex // guards txpoolState and txpoolBlockedBlob
txpoolState TxPoolState
txpoolBlockedBlob bool

channelMgrMutex sync.Mutex // guards channelMgr and prevCurrentL1
channelMgr *channelManager
prevCurrentL1 eth.L1BlockRef // cached CurrentL1 from the last syncStatus
}

// EspressoStreamer returns the batch submitter's Espresso streamer instance
func (l *BatchSubmitter) EspressoStreamer() *espresso.EspressoStreamer[derive.EspressoBatch] {
return &l.streamer
espressoSubmitter *espressoTransactionSubmitter
espressoStreamer espresso.EspressoStreamer[derive.EspressoBatch]
}

// NewBatchSubmitter initializes the BatchSubmitter driver from a preconfigured DriverSetup
Expand All @@ -152,7 +149,7 @@ func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter {
channelMgr: state,
}

batchSubmitter.streamer = espresso.NewEspressoStreamer(
batchSubmitter.espressoStreamer = espresso.NewEspressoStreamer(
batchSubmitter.RollupConfig.L2ChainID.Uint64(),
NewAdaptL1BlockRefClient(batchSubmitter.L1Client),
batchSubmitter.Espresso,
Expand All @@ -164,7 +161,7 @@ func NewBatchSubmitter(setup DriverSetup) *BatchSubmitter {
2*time.Second,
)

log.Info("Streamer started", "streamer", batchSubmitter.streamer)
log.Info("Streamer started", "streamer", batchSubmitter.espressoStreamer)

return batchSubmitter
}
Expand Down Expand Up @@ -219,26 +216,29 @@ func (l *BatchSubmitter) StartBatchSubmitting() error {
return fmt.Errorf("could not register with batch inbox contract: %w", err)
}

l.submitter = NewEspressoTransactionSubmitter(
l.espressoSubmitter = NewEspressoTransactionSubmitter(
WithContext(l.shutdownCtx),
WithWaitGroup(l.wg),
WithEspressoClient(l.Espresso),
)
l.submitter.SpawnWorkers(4, 4)
l.submitter.Start()
l.espressoSubmitter.SpawnWorkers(4, 4)
l.espressoSubmitter.Start()

l.wg.Add(4)
go l.receiptsLoop(l.wg, receiptsCh) // ranges over receiptsCh channel
go l.espressoBatchQueueingLoop(l.shutdownCtx, l.wg)
go l.espressoBatchLoadingLoop(l.shutdownCtx, l.wg, publishSignal)
go l.publishingLoop(l.killCtx, l.wg, receiptsCh, publishSignal) // ranges over publishSignal, spawns routines which send on receiptsCh. Closes receiptsCh when done.
} else {
l.wg.Add(3)
go l.receiptsLoop(l.wg, receiptsCh) // ranges over receiptsCh channel
go l.publishingLoop(l.killCtx, l.wg, receiptsCh, publishSignal) // ranges over publishSignal, spawns routines which send on receiptsCh. Closes receiptsCh when done.
go l.blockLoadingLoop(l.shutdownCtx, l.wg, pendingBytesUpdated, publishSignal) // sends on pendingBytesUpdated (if throttling enabled), and publishSignal. Closes them both when done

l.Log.Info("Batch Submitter started in Espresso mode")
return nil
}

l.wg.Add(3)
go l.receiptsLoop(l.wg, receiptsCh) // ranges over receiptsCh channel
go l.publishingLoop(l.killCtx, l.wg, receiptsCh, publishSignal) // ranges over publishSignal, spawns routines which send on receiptsCh. Closes receiptsCh when done.
go l.blockLoadingLoop(l.shutdownCtx, l.wg, pendingBytesUpdated, publishSignal) // sends on pendingBytesUpdated (if throttling enabled), and publishSignal. Closes them both when done

l.Log.Info("Batch Submitter started")
return nil
}
Expand Down Expand Up @@ -779,7 +779,7 @@ func (l *BatchSubmitter) clearState(ctx context.Context) {
defer l.channelMgrMutex.Unlock()
l.channelMgr.Clear(l1SafeOrigin)
if l.Config.UseEspresso {
l.streamer.Reset()
l.espressoStreamer.Reset()
}
return true
}
Expand Down Expand Up @@ -931,8 +931,6 @@ func (l *BatchSubmitter) sendTransaction(txdata txData, queue *txmgr.Queue[txRef
if !l.Config.UseAltDA {
l.Log.Crit("Received AltDA type txdata without AltDA being enabled")
}

// if Alt DA is enabled we post the txdata to the DA Provider and replace it with the commitment.
if txdata.altDACommitment == nil {
// This means the txdata was not sent to the DA Provider yet.
// This will send the txdata to the DA Provider and store the commitment in the channelMgr.
Expand Down
Loading