diff --git a/FORK.md b/FORK.md index 7755d91bd3..e4846d5152 100644 --- a/FORK.md +++ b/FORK.md @@ -1 +1,7 @@ -568f0805d2f427ee2d78ff1eb13233883a0737bb +# Fork + +## optimism + +- tag: `v1.0.3` +- differences + - to be updated diff --git a/components/node/client/polling.go b/components/node/client/polling.go index 8794feac5f..033a8942de 100644 --- a/components/node/client/polling.go +++ b/components/node/client/polling.go @@ -57,7 +57,7 @@ func NewPollingClient(ctx context.Context, lgr log.Logger, c RPC, opts ...Wrappe res := &PollingClient{ c: c, lgr: lgr, - pollRate: 250 * time.Millisecond, + pollRate: 12 * time.Second, ctx: ctx, cancel: cancel, pollReqCh: make(chan struct{}, 1), diff --git a/components/node/client/rate_limited.go b/components/node/client/rate_limited.go new file mode 100644 index 0000000000..9f4bfa159b --- /dev/null +++ b/components/node/client/rate_limited.go @@ -0,0 +1,52 @@ +package client + +import ( + "context" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/rpc" + "golang.org/x/time/rate" +) + +// RateLimitingClient is a wrapper around a pure RPC that implements a global rate-limit on requests. +type RateLimitingClient struct { + c RPC + rl *rate.Limiter +} + +// NewRateLimitingClient implements a global rate-limit for all RPC requests. +// A limit of N will ensure that over a long enough time-frame the given number of tokens per second is targeted. +// Burst limits how far off we can be from the target, by specifying how many requests are allowed at once. +func NewRateLimitingClient(c RPC, limit rate.Limit, burst int) *RateLimitingClient { + return &RateLimitingClient{c: c, rl: rate.NewLimiter(limit, burst)} +} + +func (b *RateLimitingClient) Close() { + b.c.Close() +} + +func (b *RateLimitingClient) CallContext(ctx context.Context, result any, method string, args ...any) error { + if err := b.rl.Wait(ctx); err != nil { + return err + } + cCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + return b.c.CallContext(cCtx, result, method, args...) +} + +func (b *RateLimitingClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error { + if err := b.rl.WaitN(ctx, len(batch)); err != nil { + return err + } + cCtx, cancel := context.WithTimeout(ctx, 20*time.Second) + defer cancel() + return b.c.BatchCallContext(cCtx, batch) +} + +func (b *RateLimitingClient) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) { + if err := b.rl.Wait(ctx); err != nil { + return nil, err + } + return b.c.EthSubscribe(ctx, channel, args...) +} diff --git a/components/node/client/rpc.go b/components/node/client/rpc.go index 73d8690851..e7c9e13d8a 100644 --- a/components/node/client/rpc.go +++ b/components/node/client/rpc.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/time/rate" "github.com/kroma-network/kroma/components/node/metrics" "github.com/kroma-network/kroma/utils/service/backoff" @@ -24,27 +25,88 @@ type RPC interface { EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) } +type rpcConfig struct { + elCliRPCOptions []rpc.ClientOption + httpPollInterval time.Duration + backoffAttempts int + limit float64 + burst int +} + +type RPCOption func(cfg *rpcConfig) error + +// WithDialBackoff configures the number of attempts for the initial dial to the RPC, +// attempts are executed with an exponential backoff strategy. +func WithDialBackoff(attempts int) RPCOption { + return func(cfg *rpcConfig) error { + cfg.backoffAttempts = attempts + return nil + } +} + +// WithHttpPollInterval configures the RPC to poll at the given rate, in case RPC subscriptions are not available. +func WithHttpPollInterval(duration time.Duration) RPCOption { + return func(cfg *rpcConfig) error { + cfg.httpPollInterval = duration + return nil + } +} + +// WithGethRPCOptions passes the list of go-ethereum RPC options to the internal RPC instance. +func WithGethRPCOptions(gethRPCOptions ...rpc.ClientOption) RPCOption { + return func(cfg *rpcConfig) error { + cfg.elCliRPCOptions = append(cfg.elCliRPCOptions, gethRPCOptions...) + return nil + } +} + +// WithRateLimit configures the RPC to target the given rate limit (in requests / second). +// See NewRateLimitingClient for more details. +func WithRateLimit(rateLimit float64, burst int) RPCOption { + return func(cfg *rpcConfig) error { + cfg.limit = rateLimit + cfg.burst = burst + return nil + } +} + // NewRPC returns the correct client.RPC instance for a given RPC url. -func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...rpc.ClientOption) (RPC, error) { - underlying, err := DialRPCClientWithBackoff(ctx, lgr, addr, opts...) +func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption) (RPC, error) { + var cfg rpcConfig + for i, opt := range opts { + if err := opt(&cfg); err != nil { + return nil, fmt.Errorf("rpc option %d failed to apply to RPC config: %w", i, err) + } + } + if cfg.backoffAttempts < 1 { // default to at least 1 attempt, or it always fails to dial. + cfg.backoffAttempts = 1 + } + + underlying, err := dialRPCClientWithBackoff(ctx, lgr, addr, cfg.backoffAttempts, cfg.elCliRPCOptions...) if err != nil { return nil, err } - wrapped := &BaseRPCClient{ + var wrapped RPC = &BaseRPCClient{ c: underlying, } + + if cfg.limit != 0 { + wrapped = NewRateLimitingClient(wrapped, rate.Limit(cfg.limit), cfg.burst) + } + if httpRegex.MatchString(addr) { - return NewPollingClient(ctx, lgr, wrapped), nil + wrapped = NewPollingClient(ctx, lgr, wrapped, WithPollRate(cfg.httpPollInterval)) } + return wrapped, nil } // Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional. -func DialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, opts ...rpc.ClientOption) (*rpc.Client, error) { +func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, attempts int, opts ...rpc.ClientOption) (*rpc.Client, error) { bOff := backoff.Exponential() var ret *rpc.Client - err := backoff.DoCtx(ctx, 10, bOff, func() error { + err := backoff.DoCtx(ctx, attempts, bOff, func() error { client, err := rpc.DialOptions(ctx, addr, opts...) if err != nil { if client == nil { diff --git a/components/node/cmd/batch_decoder/README.md b/components/node/cmd/batch_decoder/README.md index f8c192da55..2329ed509a 100644 --- a/components/node/cmd/batch_decoder/README.md +++ b/components/node/cmd/batch_decoder/README.md @@ -50,6 +50,12 @@ those frames need to be generated differently than simply closing the channel. # Select all channels that are not ready and then get the id and inclusion block & tx hash of the first frame. > jq "select(.is_ready == false)|[.id, .frames[0].inclusion_block, .frames[0].transaction_hash]" $CHANNEL_DIR + +# Show all of the frames in a channel without seeing the batches or frame data +> jq 'del(.batches)|del(.frames[]|.frame.data)' $CHANNEL_FILE + +# Show all batches (without timestamps) in a channel +> jq '.batches|del(.[]|.Transactions)' $CHANNEL_FILE ``` diff --git a/components/node/eth/label.go b/components/node/eth/label.go index f79629e607..b741f2e342 100644 --- a/components/node/eth/label.go +++ b/components/node/eth/label.go @@ -17,3 +17,9 @@ const ( // - L2: Derived chain tip from finalized L1 data Finalized = "finalized" ) + +func (label BlockLabel) Arg() any { return string(label) } + +func (BlockLabel) CheckID(id BlockID) error { + return nil +} diff --git a/components/node/eth/sync_status.go b/components/node/eth/sync_status.go index e57e64aff1..d50e8aacdf 100644 --- a/components/node/eth/sync_status.go +++ b/components/node/eth/sync_status.go @@ -32,4 +32,7 @@ type SyncStatus struct { // FinalizedL2 points to the L2 block that was derived fully from // finalized L1 information, thus irreversible. FinalizedL2 L2BlockRef `json:"finalized_l2"` + // UnsafeL2SyncTarget points to the first unprocessed unsafe L2 block. + // It may be zeroed if there is no targeted block. + UnsafeL2SyncTarget L2BlockRef `json:"queued_unsafe_l2"` } diff --git a/components/node/flags/flags.go b/components/node/flags/flags.go index 0f5c6e66d2..688b536ef3 100644 --- a/components/node/flags/flags.go +++ b/components/node/flags/flags.go @@ -14,10 +14,10 @@ import ( // Flags -const envVarPrefix = "NODE_" +const envVarPrefix = "NODE" func prefixEnvVar(name string) string { - return envVarPrefix + name + return envVarPrefix + "_" + name } var ( @@ -75,6 +75,24 @@ var ( return &out }(), } + L1RPCRateLimit = cli.Float64Flag{ + Name: "l1.rpc-rate-limit", + Usage: "Optional self-imposed global rate-limit on L1 RPC requests, specified in requests / second. Disabled if set to 0.", + EnvVar: prefixEnvVar("L1_RPC_RATE_LIMIT"), + Value: 0, + } + L1RPCMaxBatchSize = cli.IntFlag{ + Name: "l1.rpc-max-batch-size", + Usage: "Maximum number of RPC requests to bundle, e.g. during L1 blocks receipt fetching. The L1 RPC rate limit counts this as N items, but allows it to burst at once.", + EnvVar: prefixEnvVar("L1_RPC_MAX_BATCH_SIZE"), + Value: 20, + } + L1HTTPPollInterval = cli.DurationFlag{ + Name: "l1.http-poll-interval", + Usage: "Polling interval for latest-block subscription when using an HTTP RPC provider. Ignored for other types of RPC endpoints.", + EnvVar: prefixEnvVar("L1_HTTP_POLL_INTERVAL"), + Value: time.Second * 12, + } L2EngineJWTSecret = cli.StringFlag{ Name: "l2.jwt-secret", Usage: "Path to JWT secret key. Keys are 32 bytes, hex encoded in a file. A new key will be generated if left empty.", @@ -100,6 +118,13 @@ var ( Usage: "Initialize the proposer in a stopped state. The proposer can be started using the admin_startProposer RPC", EnvVar: prefixEnvVar("PROPOSER_STOPPED"), } + ProposerMaxSafeLagFlag = cli.Uint64Flag{ + Name: "proposer.max-safe-lag", + Usage: "Maximum number of L2 blocks for restricting the distance between L2 safe and unsafe. Disabled if 0.", + EnvVar: prefixEnvVar("PROPOSER_MAX_SAFE_LAG"), + Required: false, + Value: 0, + } ProposerL1Confs = cli.Uint64Flag{ Name: "proposer.l1-confs", Usage: "Number of L1 blocks to keep distance from the L1 head as a proposer for picking an L1 origin.", @@ -175,6 +200,13 @@ var ( EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC"), Required: false, } + BackupL2UnsafeSyncRPCTrustRPC = cli.StringFlag{ + Name: "l2.backup-unsafe-sync-rpc.trustrpc", + Usage: "Like l1.trustrpc, configure if response data from the RPC needs to be verified, e.g. blockhash computation." + + "This does not include checks if the blockhash is part of the canonical chain.", + EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"), + Required: false, + } ) var requiredFlags = []cli.Flag{ @@ -189,10 +221,14 @@ var optionalFlags = []cli.Flag{ Network, L1TrustRPC, L1RPCProviderKind, + L1RPCRateLimit, + L1RPCMaxBatchSize, + L1HTTPPollInterval, L2EngineJWTSecret, SyncerL1Confs, ProposerEnabledFlag, ProposerStoppedFlag, + ProposerMaxSafeLagFlag, ProposerL1Confs, L1EpochPollIntervalFlag, RPCEnableAdmin, @@ -207,6 +243,7 @@ var optionalFlags = []cli.Flag{ HeartbeatMonikerFlag, HeartbeatURLFlag, BackupL2UnsafeSyncRPC, + BackupL2UnsafeSyncRPCTrustRPC, } // Flags contains the list of configuration options available to the binary. diff --git a/components/node/flags/p2p_flags.go b/components/node/flags/p2p_flags.go index bc2ea4428b..ec0c8c048c 100644 --- a/components/node/flags/p2p_flags.go +++ b/components/node/flags/p2p_flags.go @@ -34,6 +34,15 @@ var ( Value: "none", EnvVar: p2pEnv("PEER_SCORING"), } + PeerScoreBands = cli.StringFlag{ + Name: "p2p.score.bands", + Usage: "Sets the peer score bands used primarily for peer score metrics. " + + "Should be provided in following format: :