diff --git a/block/internal/da/client.go b/block/internal/da/client.go index 571e5f7650..1fe4c00d4a 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -32,17 +32,23 @@ type client struct { da coreda.DA logger zerolog.Logger defaultTimeout time.Duration + batchSize int namespaceBz []byte namespaceDataBz []byte } +const ( + defaultRetrieveBatchSize = 150 +) + // Config contains configuration for the DA client. type Config struct { - DA coreda.DA - Logger zerolog.Logger - DefaultTimeout time.Duration - Namespace string - DataNamespace string + DA coreda.DA + Logger zerolog.Logger + DefaultTimeout time.Duration + Namespace string + DataNamespace string + RetrieveBatchSize int } // NewClient creates a new DA client with pre-calculated namespace bytes. @@ -50,11 +56,15 @@ func NewClient(cfg Config) *client { if cfg.DefaultTimeout == 0 { cfg.DefaultTimeout = 30 * time.Second } + if cfg.RetrieveBatchSize <= 0 { + cfg.RetrieveBatchSize = defaultRetrieveBatchSize + } return &client{ da: cfg.DA, logger: cfg.Logger.With().Str("component", "da_client").Logger(), defaultTimeout: cfg.DefaultTimeout, + batchSize: cfg.RetrieveBatchSize, namespaceBz: coreda.NamespaceFromString(cfg.Namespace).Bytes(), namespaceDataBz: coreda.NamespaceFromString(cfg.DataNamespace).Bytes(), } @@ -203,7 +213,8 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) } } // 2. Get Blobs using the retrieved IDs in batches - batchSize := 100 + // Each batch has its own timeout while keeping the link to the parent context + batchSize := c.batchSize blobs := make([][]byte, 0, len(idsResult.IDs)) for i := 0; i < len(idsResult.IDs); i += batchSize { end := min(i+batchSize, len(idsResult.IDs)) diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index f38d2163c8..25e65f3d0e 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -401,12 +401,9 @@ func submitToDA[T any]( s.logger.Debug().Str("signingAddress", signingAddress).Msg("using signing address for DA submission") } - submitCtx, cancel := context.WithTimeout(ctx, submissionTimeout) - defer cancel() - // Perform submission start := time.Now() - res := s.client.Submit(submitCtx, marshaled, -1, namespace, mergedOptions) + res := s.client.Submit(ctx, marshaled, -1, namespace, mergedOptions) s.logger.Debug().Int("attempts", rs.Attempt).Dur("elapsed", time.Since(start)).Uint64("code", uint64(res.Code)).Msg("got SubmitWithHelpers response from celestia") // Record submission result for observability diff --git a/block/public.go b/block/public.go index f084f2757f..a5c105ed5f 100644 --- a/block/public.go +++ b/block/public.go @@ -1,8 +1,6 @@ package block import ( - "time" - "github.com/evstack/ev-node/block/internal/common" "github.com/evstack/ev-node/block/internal/da" coreda "github.com/evstack/ev-node/core/da" @@ -41,10 +39,11 @@ func NewDAClient( logger zerolog.Logger, ) DAClient { return da.NewClient(da.Config{ - DA: daLayer, - Logger: logger, - DefaultTimeout: 10 * time.Second, - Namespace: config.DA.GetNamespace(), - DataNamespace: config.DA.GetDataNamespace(), + DA: daLayer, + Logger: logger, + Namespace: config.DA.GetNamespace(), + DefaultTimeout: config.DA.RequestTimeout.Duration, + DataNamespace: config.DA.GetDataNamespace(), + RetrieveBatchSize: config.DA.RetrieveBatchSize, }) } diff --git a/docs/learn/config.md b/docs/learn/config.md index 094eeafde6..5d7de4103b 100644 --- a/docs/learn/config.md +++ b/docs/learn/config.md @@ -490,6 +490,42 @@ _Example:_ `--rollkit.da.mempool_ttl 30` _Default:_ `20` _Constant:_ `FlagDAMempoolTTL` +### DA Retrieve Batch Size + +**Description:** +Number of blob IDs requested per DA `Get` call when the node retrieves blocks from the DA layer. Smaller batches help unreliable DA RPC endpoints return data before the per-request timeout, while larger batches reduce the total number of round trips for fast DA nodes. + +**YAML:** + +```yaml +da: + retrieve_batch_size: 100 +``` + +**Command-line Flag:** +`--rollkit.da.retrieve_batch_size ` +_Example:_ `--rollkit.da.retrieve_batch_size 50` +_Default:_ `100` +_Constant:_ `FlagDARetrieveBatchSize` + +### DA Request Timeout + +**Description:** +Per-request timeout applied to DA `GetIDs` and `Get` RPC calls while retrieving blobs. Increase this value if your DA endpoint has high latency to avoid premature failures; decrease it to make the syncer fail fast and free resources sooner when the DA node becomes unresponsive. + +**YAML:** + +```yaml +da: + request_timeout: "30s" +``` + +**Command-line Flag:** +`--rollkit.da.request_timeout ` +_Example:_ `--rollkit.da.request_timeout 45s` +_Default:_ `"30s"` +_Constant:_ `FlagDARequestTimeout` + ## P2P Configuration (`p2p`) Settings for peer-to-peer networking, enabling nodes to discover each other, exchange blocks, and share transactions. diff --git a/docs/learn/specs/block-manager.md b/docs/learn/specs/block-manager.md index b74087a4c9..ac47d47ab6 100644 --- a/docs/learn/specs/block-manager.md +++ b/docs/learn/specs/block-manager.md @@ -167,6 +167,8 @@ The block components share a common configuration: | Namespace | da.Namespace | DA namespace ID for block submissions (deprecated, use HeaderNamespace and DataNamespace instead) | | HeaderNamespace | string | namespace ID for submitting headers to DA layer (automatically encoded by the node) | | DataNamespace | string | namespace ID for submitting data to DA layer (automatically encoded by the node) | +| RetrieveBatchSize | int | number of blob IDs fetched per DA `Get` call, trading off payload size vs. number of RPC round trips (default: 100) | +| RequestTimeout | duration | per-request timeout for DA `GetIDs`/`Get` calls; higher values tolerate slow DA nodes, lower values fail faster (default: 30s) | ### Block Production (Executor Component) diff --git a/pkg/config/config.go b/pkg/config/config.go index d6b1f15539..8060019abe 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -70,6 +70,10 @@ const ( FlagDAMempoolTTL = FlagPrefixEvnode + "da.mempool_ttl" // FlagDAMaxSubmitAttempts is a flag for specifying the maximum DA submit attempts FlagDAMaxSubmitAttempts = FlagPrefixEvnode + "da.max_submit_attempts" + // FlagDARetrieveBatchSize configures how many IDs are fetched per DA Get request + FlagDARetrieveBatchSize = FlagPrefixEvnode + "da.retrieve_batch_size" + // FlagDARequestTimeout controls the per-request timeout when talking to the DA layer + FlagDARequestTimeout = FlagPrefixEvnode + "da.request_timeout" // P2P configuration flags @@ -162,6 +166,8 @@ type DAConfig struct { BlockTime DurationWrapper `mapstructure:"block_time" yaml:"block_time" comment:"Average block time of the DA chain (duration). Determines frequency of DA layer syncing, maximum backoff time for retries, and is multiplied by MempoolTTL to calculate transaction expiration. Examples: \"15s\", \"30s\", \"1m\", \"2m30s\", \"10m\"."` MempoolTTL uint64 `mapstructure:"mempool_ttl" yaml:"mempool_ttl" comment:"Number of DA blocks after which a transaction is considered expired and dropped from the mempool. Controls retry backoff timing."` MaxSubmitAttempts int `mapstructure:"max_submit_attempts" yaml:"max_submit_attempts" comment:"Maximum number of attempts to submit data to the DA layer before giving up. Higher values provide more resilience but can delay error reporting."` + RetrieveBatchSize int `mapstructure:"retrieve_batch_size" yaml:"retrieve_batch_size" comment:"Number of IDs to request per DA Get call when retrieving blobs. Smaller batches lower per-request latency; larger batches reduce the number of RPC round trips. Default: 100."` + RequestTimeout DurationWrapper `mapstructure:"request_timeout" yaml:"request_timeout" comment:"Per-request timeout applied to DA GetIDs/Get calls when retrieving blobs. Larger values tolerate slower DA nodes at the cost of waiting longer before failing. Default: 30s."` } // GetNamespace returns the namespace for header submissions. @@ -320,6 +326,8 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().StringSlice(FlagDASigningAddresses, def.DA.SigningAddresses, "Comma-separated list of addresses for DA submissions (used in round-robin)") cmd.Flags().Uint64(FlagDAMempoolTTL, def.DA.MempoolTTL, "number of DA blocks until transaction is dropped from the mempool") cmd.Flags().Int(FlagDAMaxSubmitAttempts, def.DA.MaxSubmitAttempts, "maximum number of attempts to submit data to the DA layer before giving up") + cmd.Flags().Int(FlagDARetrieveBatchSize, def.DA.RetrieveBatchSize, "number of IDs to request per DA Get call when retrieving blobs") + cmd.Flags().Duration(FlagDARequestTimeout, def.DA.RequestTimeout.Duration, "per-request timeout when retrieving blobs from the DA layer") // P2P configuration flags cmd.Flags().String(FlagP2PListenAddress, def.P2P.ListenAddress, "P2P listen address (host:port)") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 7834e42aab..af96aa69e4 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -26,6 +26,8 @@ func TestDefaultConfig(t *testing.T) { assert.Equal(t, "", def.DA.AuthToken) assert.Equal(t, "", def.DA.SubmitOptions) assert.NotEmpty(t, def.DA.Namespace) + assert.Equal(t, 100, def.DA.RetrieveBatchSize) + assert.Equal(t, 30*time.Second, def.DA.RequestTimeout.Duration) assert.Equal(t, 1*time.Second, def.Node.BlockTime.Duration) assert.Equal(t, 6*time.Second, def.DA.BlockTime.Duration) assert.Equal(t, uint64(0), def.DA.MempoolTTL) @@ -70,6 +72,8 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagDASigningAddresses, DefaultConfig().DA.SigningAddresses) assertFlagValue(t, flags, FlagDAMempoolTTL, DefaultConfig().DA.MempoolTTL) assertFlagValue(t, flags, FlagDAMaxSubmitAttempts, DefaultConfig().DA.MaxSubmitAttempts) + assertFlagValue(t, flags, FlagDARetrieveBatchSize, DefaultConfig().DA.RetrieveBatchSize) + assertFlagValue(t, flags, FlagDARequestTimeout, DefaultConfig().DA.RequestTimeout.Duration) // P2P flags assertFlagValue(t, flags, FlagP2PListenAddress, DefaultConfig().P2P.ListenAddress) @@ -99,7 +103,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRPCAddress, DefaultConfig().RPC.Address) // Count the number of flags we're explicitly checking - expectedFlagCount := 37 // Update this number if you add more flag checks above + expectedFlagCount := 39 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 6a6f813a3c..6e8b01b191 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -75,6 +75,8 @@ func DefaultConfig() Config { MaxSubmitAttempts: 30, Namespace: randString(10), DataNamespace: "", + RetrieveBatchSize: 100, + RequestTimeout: DurationWrapper{30 * time.Second}, }, Instrumentation: DefaultInstrumentationConfig(), Log: LogConfig{