Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion FORK.md
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
568f0805d2f427ee2d78ff1eb13233883a0737bb
# Fork

## optimism

- tag: `v1.0.3`
- differences
- to be updated
2 changes: 1 addition & 1 deletion components/node/client/polling.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewPollingClient(ctx context.Context, lgr log.Logger, c RPC, opts ...Wrappe
res := &PollingClient{
c: c,
lgr: lgr,
pollRate: 250 * time.Millisecond,
pollRate: 12 * time.Second,
ctx: ctx,
cancel: cancel,
pollReqCh: make(chan struct{}, 1),
Expand Down
52 changes: 52 additions & 0 deletions components/node/client/rate_limited.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package client

import (
"context"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc"
"golang.org/x/time/rate"
)

// RateLimitingClient is a wrapper around a pure RPC that implements a global rate-limit on requests.
type RateLimitingClient struct {
c RPC
rl *rate.Limiter
}

// NewRateLimitingClient implements a global rate-limit for all RPC requests.
// A limit of N will ensure that over a long enough time-frame the given number of tokens per second is targeted.
// Burst limits how far off we can be from the target, by specifying how many requests are allowed at once.
func NewRateLimitingClient(c RPC, limit rate.Limit, burst int) *RateLimitingClient {
return &RateLimitingClient{c: c, rl: rate.NewLimiter(limit, burst)}
}

func (b *RateLimitingClient) Close() {
b.c.Close()
}

func (b *RateLimitingClient) CallContext(ctx context.Context, result any, method string, args ...any) error {
if err := b.rl.Wait(ctx); err != nil {
return err
}
cCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
return b.c.CallContext(cCtx, result, method, args...)
}

func (b *RateLimitingClient) BatchCallContext(ctx context.Context, batch []rpc.BatchElem) error {
if err := b.rl.WaitN(ctx, len(batch)); err != nil {
return err
}
cCtx, cancel := context.WithTimeout(ctx, 20*time.Second)
defer cancel()
return b.c.BatchCallContext(cCtx, batch)
}

func (b *RateLimitingClient) EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error) {
if err := b.rl.Wait(ctx); err != nil {
return nil, err
}
return b.c.EthSubscribe(ctx, channel, args...)
}
74 changes: 68 additions & 6 deletions components/node/client/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/time/rate"

"github.com/kroma-network/kroma/components/node/metrics"
"github.com/kroma-network/kroma/utils/service/backoff"
Expand All @@ -24,27 +25,88 @@ type RPC interface {
EthSubscribe(ctx context.Context, channel any, args ...any) (ethereum.Subscription, error)
}

type rpcConfig struct {
elCliRPCOptions []rpc.ClientOption
httpPollInterval time.Duration
backoffAttempts int
limit float64
burst int
}

type RPCOption func(cfg *rpcConfig) error

// WithDialBackoff configures the number of attempts for the initial dial to the RPC,
// attempts are executed with an exponential backoff strategy.
func WithDialBackoff(attempts int) RPCOption {
return func(cfg *rpcConfig) error {
cfg.backoffAttempts = attempts
return nil
}
}

// WithHttpPollInterval configures the RPC to poll at the given rate, in case RPC subscriptions are not available.
func WithHttpPollInterval(duration time.Duration) RPCOption {
return func(cfg *rpcConfig) error {
cfg.httpPollInterval = duration
return nil
}
}

// WithGethRPCOptions passes the list of go-ethereum RPC options to the internal RPC instance.
func WithGethRPCOptions(gethRPCOptions ...rpc.ClientOption) RPCOption {
return func(cfg *rpcConfig) error {
cfg.elCliRPCOptions = append(cfg.elCliRPCOptions, gethRPCOptions...)
return nil
}
}

// WithRateLimit configures the RPC to target the given rate limit (in requests / second).
// See NewRateLimitingClient for more details.
func WithRateLimit(rateLimit float64, burst int) RPCOption {
return func(cfg *rpcConfig) error {
cfg.limit = rateLimit
cfg.burst = burst
return nil
}
}

// NewRPC returns the correct client.RPC instance for a given RPC url.
func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...rpc.ClientOption) (RPC, error) {
underlying, err := DialRPCClientWithBackoff(ctx, lgr, addr, opts...)
func NewRPC(ctx context.Context, lgr log.Logger, addr string, opts ...RPCOption) (RPC, error) {
var cfg rpcConfig
for i, opt := range opts {
if err := opt(&cfg); err != nil {
return nil, fmt.Errorf("rpc option %d failed to apply to RPC config: %w", i, err)
}
}
if cfg.backoffAttempts < 1 { // default to at least 1 attempt, or it always fails to dial.
cfg.backoffAttempts = 1
}

underlying, err := dialRPCClientWithBackoff(ctx, lgr, addr, cfg.backoffAttempts, cfg.elCliRPCOptions...)
if err != nil {
return nil, err
}

wrapped := &BaseRPCClient{
var wrapped RPC = &BaseRPCClient{
c: underlying,
}

if cfg.limit != 0 {
wrapped = NewRateLimitingClient(wrapped, rate.Limit(cfg.limit), cfg.burst)
}

if httpRegex.MatchString(addr) {
return NewPollingClient(ctx, lgr, wrapped), nil
wrapped = NewPollingClient(ctx, lgr, wrapped, WithPollRate(cfg.httpPollInterval))
}

return wrapped, nil
}

// Dials a JSON-RPC endpoint repeatedly, with a backoff, until a client connection is established. Auth is optional.
func DialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, opts ...rpc.ClientOption) (*rpc.Client, error) {
func dialRPCClientWithBackoff(ctx context.Context, log log.Logger, addr string, attempts int, opts ...rpc.ClientOption) (*rpc.Client, error) {
bOff := backoff.Exponential()
var ret *rpc.Client
err := backoff.DoCtx(ctx, 10, bOff, func() error {
err := backoff.DoCtx(ctx, attempts, bOff, func() error {
client, err := rpc.DialOptions(ctx, addr, opts...)
if err != nil {
if client == nil {
Expand Down
6 changes: 6 additions & 0 deletions components/node/cmd/batch_decoder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ those frames need to be generated differently than simply closing the channel.

# Select all channels that are not ready and then get the id and inclusion block & tx hash of the first frame.
> jq "select(.is_ready == false)|[.id, .frames[0].inclusion_block, .frames[0].transaction_hash]" $CHANNEL_DIR

# Show all of the frames in a channel without seeing the batches or frame data
> jq 'del(.batches)|del(.frames[]|.frame.data)' $CHANNEL_FILE

# Show all batches (without timestamps) in a channel
> jq '.batches|del(.[]|.Transactions)' $CHANNEL_FILE
```


Expand Down
6 changes: 6 additions & 0 deletions components/node/eth/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,9 @@ const (
// - L2: Derived chain tip from finalized L1 data
Finalized = "finalized"
)

func (label BlockLabel) Arg() any { return string(label) }

func (BlockLabel) CheckID(id BlockID) error {
return nil
}
3 changes: 3 additions & 0 deletions components/node/eth/sync_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ type SyncStatus struct {
// FinalizedL2 points to the L2 block that was derived fully from
// finalized L1 information, thus irreversible.
FinalizedL2 L2BlockRef `json:"finalized_l2"`
// UnsafeL2SyncTarget points to the first unprocessed unsafe L2 block.
// It may be zeroed if there is no targeted block.
UnsafeL2SyncTarget L2BlockRef `json:"queued_unsafe_l2"`
}
41 changes: 39 additions & 2 deletions components/node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (

// Flags

const envVarPrefix = "NODE_"
const envVarPrefix = "NODE"

func prefixEnvVar(name string) string {
return envVarPrefix + name
return envVarPrefix + "_" + name
}

var (
Expand Down Expand Up @@ -75,6 +75,24 @@ var (
return &out
}(),
}
L1RPCRateLimit = cli.Float64Flag{
Name: "l1.rpc-rate-limit",
Usage: "Optional self-imposed global rate-limit on L1 RPC requests, specified in requests / second. Disabled if set to 0.",
EnvVar: prefixEnvVar("L1_RPC_RATE_LIMIT"),
Value: 0,
}
L1RPCMaxBatchSize = cli.IntFlag{
Name: "l1.rpc-max-batch-size",
Usage: "Maximum number of RPC requests to bundle, e.g. during L1 blocks receipt fetching. The L1 RPC rate limit counts this as N items, but allows it to burst at once.",
EnvVar: prefixEnvVar("L1_RPC_MAX_BATCH_SIZE"),
Value: 20,
}
L1HTTPPollInterval = cli.DurationFlag{
Name: "l1.http-poll-interval",
Usage: "Polling interval for latest-block subscription when using an HTTP RPC provider. Ignored for other types of RPC endpoints.",
EnvVar: prefixEnvVar("L1_HTTP_POLL_INTERVAL"),
Value: time.Second * 12,
}
L2EngineJWTSecret = cli.StringFlag{
Name: "l2.jwt-secret",
Usage: "Path to JWT secret key. Keys are 32 bytes, hex encoded in a file. A new key will be generated if left empty.",
Expand All @@ -100,6 +118,13 @@ var (
Usage: "Initialize the proposer in a stopped state. The proposer can be started using the admin_startProposer RPC",
EnvVar: prefixEnvVar("PROPOSER_STOPPED"),
}
ProposerMaxSafeLagFlag = cli.Uint64Flag{
Name: "proposer.max-safe-lag",
Usage: "Maximum number of L2 blocks for restricting the distance between L2 safe and unsafe. Disabled if 0.",
EnvVar: prefixEnvVar("PROPOSER_MAX_SAFE_LAG"),
Required: false,
Value: 0,
}
ProposerL1Confs = cli.Uint64Flag{
Name: "proposer.l1-confs",
Usage: "Number of L1 blocks to keep distance from the L1 head as a proposer for picking an L1 origin.",
Expand Down Expand Up @@ -175,6 +200,13 @@ var (
EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC"),
Required: false,
}
BackupL2UnsafeSyncRPCTrustRPC = cli.StringFlag{
Name: "l2.backup-unsafe-sync-rpc.trustrpc",
Usage: "Like l1.trustrpc, configure if response data from the RPC needs to be verified, e.g. blockhash computation." +
"This does not include checks if the blockhash is part of the canonical chain.",
EnvVar: prefixEnvVar("L2_BACKUP_UNSAFE_SYNC_RPC_TRUST_RPC"),
Required: false,
}
)

var requiredFlags = []cli.Flag{
Expand All @@ -189,10 +221,14 @@ var optionalFlags = []cli.Flag{
Network,
L1TrustRPC,
L1RPCProviderKind,
L1RPCRateLimit,
L1RPCMaxBatchSize,
L1HTTPPollInterval,
L2EngineJWTSecret,
SyncerL1Confs,
ProposerEnabledFlag,
ProposerStoppedFlag,
ProposerMaxSafeLagFlag,
ProposerL1Confs,
L1EpochPollIntervalFlag,
RPCEnableAdmin,
Expand All @@ -207,6 +243,7 @@ var optionalFlags = []cli.Flag{
HeartbeatMonikerFlag,
HeartbeatURLFlag,
BackupL2UnsafeSyncRPC,
BackupL2UnsafeSyncRPCTrustRPC,
}

// Flags contains the list of configuration options available to the binary.
Expand Down
20 changes: 20 additions & 0 deletions components/node/flags/p2p_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ var (
Value: "none",
EnvVar: p2pEnv("PEER_SCORING"),
}
PeerScoreBands = cli.StringFlag{
Name: "p2p.score.bands",
Usage: "Sets the peer score bands used primarily for peer score metrics. " +
"Should be provided in following format: <threshold>:<label>;<threshold>:<label>;..." +
"For example: -40:graylist;-20:restricted;0:nopx;20:friend;",
Required: false,
Value: "-40:graylist;-20:restricted;0:nopx;20:friend;",
EnvVar: p2pEnv("SCORE_BANDS"),
}

// Banning Flag - whether or not we want to act on the scoring
Banning = cli.BoolFlag{
Expand Down Expand Up @@ -267,6 +276,12 @@ var (
Hidden: true,
EnvVar: p2pEnv("GOSSIP_FLOOD_PUBLISH"),
}
SyncReqRespFlag = cli.BoolFlag{
Name: "p2p.sync.req-resp",
Usage: "Enables experimental P2P req-resp alternative sync method, on both server and client side.",
Required: false,
EnvVar: p2pEnv("SYNC_REQ_RESP"),
}
)

// None of these flags are strictly required.
Expand All @@ -276,6 +291,10 @@ var p2pFlags = []cli.Flag{
NoDiscovery,
P2PPrivPath,
P2PPrivRaw,
PeerScoring,
PeerScoreBands,
Banning,
TopicScoring,
ListenIP,
ListenTCPPort,
ListenUDPPort,
Expand All @@ -302,4 +321,5 @@ var p2pFlags = []cli.Flag{
GossipMeshDhiFlag,
GossipMeshDlazyFlag,
GossipFloodPublishFlag,
SyncReqRespFlag,
}
Loading