Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions block/internal/da/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,39 @@ type client struct {
da coreda.DA
logger zerolog.Logger
defaultTimeout time.Duration
batchSize int
namespaceBz []byte
namespaceDataBz []byte
}

const (
defaultRetrieveBatchSize = 150
)

// Config contains configuration for the DA client.
type Config struct {
DA coreda.DA
Logger zerolog.Logger
DefaultTimeout time.Duration
Namespace string
DataNamespace string
DA coreda.DA
Logger zerolog.Logger
DefaultTimeout time.Duration
Namespace string
DataNamespace string
RetrieveBatchSize int
}

// NewClient creates a new DA client with pre-calculated namespace bytes.
func NewClient(cfg Config) *client {
if cfg.DefaultTimeout == 0 {
cfg.DefaultTimeout = 30 * time.Second
}
if cfg.RetrieveBatchSize <= 0 {
cfg.RetrieveBatchSize = defaultRetrieveBatchSize
}

return &client{
da: cfg.DA,
logger: cfg.Logger.With().Str("component", "da_client").Logger(),
defaultTimeout: cfg.DefaultTimeout,
batchSize: cfg.RetrieveBatchSize,
namespaceBz: coreda.NamespaceFromString(cfg.Namespace).Bytes(),
namespaceDataBz: coreda.NamespaceFromString(cfg.DataNamespace).Bytes(),
}
Expand Down Expand Up @@ -203,7 +213,8 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte)
}
}
// 2. Get Blobs using the retrieved IDs in batches
batchSize := 100
// Each batch has its own timeout while keeping the link to the parent context
batchSize := c.batchSize
blobs := make([][]byte, 0, len(idsResult.IDs))
for i := 0; i < len(idsResult.IDs); i += batchSize {
end := min(i+batchSize, len(idsResult.IDs))
Expand Down
5 changes: 1 addition & 4 deletions block/internal/submitting/da_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,12 +401,9 @@ func submitToDA[T any](
s.logger.Debug().Str("signingAddress", signingAddress).Msg("using signing address for DA submission")
}

submitCtx, cancel := context.WithTimeout(ctx, submissionTimeout)
defer cancel()

// Perform submission
start := time.Now()
res := s.client.Submit(submitCtx, marshaled, -1, namespace, mergedOptions)
res := s.client.Submit(ctx, marshaled, -1, namespace, mergedOptions)
s.logger.Debug().Int("attempts", rs.Attempt).Dur("elapsed", time.Since(start)).Uint64("code", uint64(res.Code)).Msg("got SubmitWithHelpers response from celestia")

// Record submission result for observability
Expand Down
13 changes: 6 additions & 7 deletions block/public.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package block

import (
"time"

"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/block/internal/da"
coreda "github.com/evstack/ev-node/core/da"
Expand Down Expand Up @@ -41,10 +39,11 @@ func NewDAClient(
logger zerolog.Logger,
) DAClient {
return da.NewClient(da.Config{
DA: daLayer,
Logger: logger,
DefaultTimeout: 10 * time.Second,
Namespace: config.DA.GetNamespace(),
DataNamespace: config.DA.GetDataNamespace(),
DA: daLayer,
Logger: logger,
Namespace: config.DA.GetNamespace(),
DefaultTimeout: config.DA.RequestTimeout.Duration,
DataNamespace: config.DA.GetDataNamespace(),
RetrieveBatchSize: config.DA.RetrieveBatchSize,
})
}
36 changes: 36 additions & 0 deletions docs/learn/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,42 @@ _Example:_ `--rollkit.da.mempool_ttl 30`
_Default:_ `20`
_Constant:_ `FlagDAMempoolTTL`

### DA Retrieve Batch Size

**Description:**
Number of blob IDs requested per DA `Get` call when the node retrieves blocks from the DA layer. Smaller batches help unreliable DA RPC endpoints return data before the per-request timeout, while larger batches reduce the total number of round trips for fast DA nodes.

**YAML:**

```yaml
da:
retrieve_batch_size: 100
```

**Command-line Flag:**
`--rollkit.da.retrieve_batch_size <int>`
_Example:_ `--rollkit.da.retrieve_batch_size 50`
_Default:_ `100`
_Constant:_ `FlagDARetrieveBatchSize`

### DA Request Timeout

**Description:**
Per-request timeout applied to DA `GetIDs` and `Get` RPC calls while retrieving blobs. Increase this value if your DA endpoint has high latency to avoid premature failures; decrease it to make the syncer fail fast and free resources sooner when the DA node becomes unresponsive.

**YAML:**

```yaml
da:
request_timeout: "30s"
```

**Command-line Flag:**
`--rollkit.da.request_timeout <duration>`
_Example:_ `--rollkit.da.request_timeout 45s`
_Default:_ `"30s"`
_Constant:_ `FlagDARequestTimeout`

## P2P Configuration (`p2p`)

Settings for peer-to-peer networking, enabling nodes to discover each other, exchange blocks, and share transactions.
Expand Down
2 changes: 2 additions & 0 deletions docs/learn/specs/block-manager.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ The block components share a common configuration:
| Namespace | da.Namespace | DA namespace ID for block submissions (deprecated, use HeaderNamespace and DataNamespace instead) |
| HeaderNamespace | string | namespace ID for submitting headers to DA layer (automatically encoded by the node) |
| DataNamespace | string | namespace ID for submitting data to DA layer (automatically encoded by the node) |
| RetrieveBatchSize | int | number of blob IDs fetched per DA `Get` call, trading off payload size vs. number of RPC round trips (default: 100) |
| RequestTimeout | duration | per-request timeout for DA `GetIDs`/`Get` calls; higher values tolerate slow DA nodes, lower values fail faster (default: 30s) |

### Block Production (Executor Component)

Expand Down
8 changes: 8 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ const (
FlagDAMempoolTTL = FlagPrefixEvnode + "da.mempool_ttl"
// FlagDAMaxSubmitAttempts is a flag for specifying the maximum DA submit attempts
FlagDAMaxSubmitAttempts = FlagPrefixEvnode + "da.max_submit_attempts"
// FlagDARetrieveBatchSize configures how many IDs are fetched per DA Get request
FlagDARetrieveBatchSize = FlagPrefixEvnode + "da.retrieve_batch_size"
// FlagDARequestTimeout controls the per-request timeout when talking to the DA layer
FlagDARequestTimeout = FlagPrefixEvnode + "da.request_timeout"

// P2P configuration flags

Expand Down Expand Up @@ -162,6 +166,8 @@ type DAConfig struct {
BlockTime DurationWrapper `mapstructure:"block_time" yaml:"block_time" comment:"Average block time of the DA chain (duration). Determines frequency of DA layer syncing, maximum backoff time for retries, and is multiplied by MempoolTTL to calculate transaction expiration. Examples: \"15s\", \"30s\", \"1m\", \"2m30s\", \"10m\"."`
MempoolTTL uint64 `mapstructure:"mempool_ttl" yaml:"mempool_ttl" comment:"Number of DA blocks after which a transaction is considered expired and dropped from the mempool. Controls retry backoff timing."`
MaxSubmitAttempts int `mapstructure:"max_submit_attempts" yaml:"max_submit_attempts" comment:"Maximum number of attempts to submit data to the DA layer before giving up. Higher values provide more resilience but can delay error reporting."`
RetrieveBatchSize int `mapstructure:"retrieve_batch_size" yaml:"retrieve_batch_size" comment:"Number of IDs to request per DA Get call when retrieving blobs. Smaller batches lower per-request latency; larger batches reduce the number of RPC round trips. Default: 100."`
RequestTimeout DurationWrapper `mapstructure:"request_timeout" yaml:"request_timeout" comment:"Per-request timeout applied to DA GetIDs/Get calls when retrieving blobs. Larger values tolerate slower DA nodes at the cost of waiting longer before failing. Default: 30s."`
}

// GetNamespace returns the namespace for header submissions.
Expand Down Expand Up @@ -320,6 +326,8 @@ func AddFlags(cmd *cobra.Command) {
cmd.Flags().StringSlice(FlagDASigningAddresses, def.DA.SigningAddresses, "Comma-separated list of addresses for DA submissions (used in round-robin)")
cmd.Flags().Uint64(FlagDAMempoolTTL, def.DA.MempoolTTL, "number of DA blocks until transaction is dropped from the mempool")
cmd.Flags().Int(FlagDAMaxSubmitAttempts, def.DA.MaxSubmitAttempts, "maximum number of attempts to submit data to the DA layer before giving up")
cmd.Flags().Int(FlagDARetrieveBatchSize, def.DA.RetrieveBatchSize, "number of IDs to request per DA Get call when retrieving blobs")
cmd.Flags().Duration(FlagDARequestTimeout, def.DA.RequestTimeout.Duration, "per-request timeout when retrieving blobs from the DA layer")

// P2P configuration flags
cmd.Flags().String(FlagP2PListenAddress, def.P2P.ListenAddress, "P2P listen address (host:port)")
Expand Down
6 changes: 5 additions & 1 deletion pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func TestDefaultConfig(t *testing.T) {
assert.Equal(t, "", def.DA.AuthToken)
assert.Equal(t, "", def.DA.SubmitOptions)
assert.NotEmpty(t, def.DA.Namespace)
assert.Equal(t, 100, def.DA.RetrieveBatchSize)
assert.Equal(t, 30*time.Second, def.DA.RequestTimeout.Duration)
assert.Equal(t, 1*time.Second, def.Node.BlockTime.Duration)
assert.Equal(t, 6*time.Second, def.DA.BlockTime.Duration)
assert.Equal(t, uint64(0), def.DA.MempoolTTL)
Expand Down Expand Up @@ -70,6 +72,8 @@ func TestAddFlags(t *testing.T) {
assertFlagValue(t, flags, FlagDASigningAddresses, DefaultConfig().DA.SigningAddresses)
assertFlagValue(t, flags, FlagDAMempoolTTL, DefaultConfig().DA.MempoolTTL)
assertFlagValue(t, flags, FlagDAMaxSubmitAttempts, DefaultConfig().DA.MaxSubmitAttempts)
assertFlagValue(t, flags, FlagDARetrieveBatchSize, DefaultConfig().DA.RetrieveBatchSize)
assertFlagValue(t, flags, FlagDARequestTimeout, DefaultConfig().DA.RequestTimeout.Duration)

// P2P flags
assertFlagValue(t, flags, FlagP2PListenAddress, DefaultConfig().P2P.ListenAddress)
Expand Down Expand Up @@ -99,7 +103,7 @@ func TestAddFlags(t *testing.T) {
assertFlagValue(t, flags, FlagRPCAddress, DefaultConfig().RPC.Address)

// Count the number of flags we're explicitly checking
expectedFlagCount := 37 // Update this number if you add more flag checks above
expectedFlagCount := 39 // Update this number if you add more flag checks above

// Get the actual number of flags (both regular and persistent)
actualFlagCount := 0
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func DefaultConfig() Config {
MaxSubmitAttempts: 30,
Namespace: randString(10),
DataNamespace: "",
RetrieveBatchSize: 100,
RequestTimeout: DurationWrapper{30 * time.Second},
},
Instrumentation: DefaultInstrumentationConfig(),
Log: LogConfig{
Expand Down
Loading