diff --git a/CHANGELOG.md b/CHANGELOG.md
index 30b04aea76..3f6ac5f26c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -12,15 +12,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- Enhanced health check system with separate liveness (`/health/live`) and readiness (`/health/ready`) HTTP endpoints. Readiness endpoint includes P2P listening check and aggregator block production rate validation (5x block time threshold). ([#2800](https://github.com/evstack/ev-node/pull/2800))
+- Added `GetP2PStoreInfo` RPC method to retrieve head/tail metadata for go-header stores used by P2P sync ([#2835](https://github.com/evstack/ev-node/pull/2835))
+- Added protobuf definitions for `P2PStoreEntry` and `P2PStoreSnapshot` messages to support P2P store inspection
### Changed
- Remove GasPrice and GasMultiplier from DA interface and configuration to use celestia-node's native fee estimation. ([#2822](https://github.com/evstack/ev-node/pull/2822))
- Use cache instead of in memory store for reaper. Persist cache on reload. Autoclean after 24 hours. ([#2811](https://github.com/evstack/ev-node/pull/2811))
+- Improved P2P sync service store initialization to be atomic and prevent race conditions ([#2838](https://github.com/evstack/ev-node/pull/2838))
+- Enhanced P2P bootstrap behavior to intelligently detect starting height from local store instead of requiring trusted hash
+- Relaxed execution layer height validation in block replay to allow execution to be ahead of target height, enabling recovery from manual intervention scenarios
### Removed
- **BREAKING:** Removed `evnode.v1.HealthService` gRPC endpoint. Use HTTP endpoints: `GET /health/live` and `GET /health/ready`. ([#2800](https://github.com/evstack/ev-node/pull/2800))
+- **BREAKING:** Removed `TrustedHash` configuration option and `--evnode.node.trusted_hash` flag. Sync service now automatically determines starting height from local store state ([#2838](https://github.com/evstack/ev-node/pull/2838))
+
+### Fixed
+
+- Fixed sync service initialization issue when node is not on genesis but has an empty store
## v1.0.0-beta.9
diff --git a/README.md b/README.md
index ddc7c248b3..5ce13f5798 100644
--- a/README.md
+++ b/README.md
@@ -8,7 +8,7 @@ Ev-node is the basis of the Evolve Stack. For more in-depth information about Ev
[](https://godoc.org/github.com/evstack/ev-node)
-> **⚠️ Version Notice**: Do not use tags or releases before v1.*. Pre-v1 releases are not stable and should be considered abandoned.
+> **⚠️ Version Notice**: Do not use tags or releases before v1.*. Pre-v1 releases are not stable and should be considered abandoned.
## Using Evolve
diff --git a/RELEASE.md b/RELEASE.md
index 6a5bfccd80..9de91ea3a9 100644
--- a/RELEASE.md
+++ b/RELEASE.md
@@ -303,7 +303,7 @@ go get github.com/evstack/ev-node/core@v0.3.0
- Wait 5-30 minutes for propagation
- Use `go list -m` to verify availability
-- Check https://proxy.golang.org/
+- Check
**Dependency version conflicts**
diff --git a/block/internal/common/replay.go b/block/internal/common/replay.go
index 4c4a4b26de..9c95bc3f36 100644
--- a/block/internal/common/replay.go
+++ b/block/internal/common/replay.go
@@ -42,7 +42,7 @@ func NewReplayer(
// This is useful for crash recovery scenarios where ev-node is ahead of the execution layer.
//
// Returns:
-// - error if sync fails or if execution layer is ahead of ev-node (unexpected state)
+// - error if sync fails
func (s *Replayer) SyncToHeight(ctx context.Context, targetHeight uint64) error {
// Check if the executor implements HeightProvider
execHeightProvider, ok := s.exec.(coreexecutor.HeightProvider)
@@ -67,13 +67,15 @@ func (s *Replayer) SyncToHeight(ctx context.Context, targetHeight uint64) error
Uint64("exec_layer_height", execHeight).
Msg("execution layer height check")
- // If execution layer is ahead, this is unexpected, fail hard
+ // If execution layer is ahead, skip syncing and continue. This can happen if execution
+ // progressed independently (e.g. after manual intervention). We log it for visibility but
+ // do not treat it as fatal.
if execHeight > targetHeight {
- s.logger.Error().
+ s.logger.Warn().
Uint64("target_height", targetHeight).
Uint64("exec_layer_height", execHeight).
- Msg("execution layer is ahead of target height - this should not happen")
- return fmt.Errorf("execution layer height (%d) is ahead of target height (%d)", execHeight, targetHeight)
+ Msg("execution layer is ahead of target height - skipping replay")
+ return nil
}
// If execution layer is behind, sync the missing blocks
diff --git a/block/internal/common/replay_test.go b/block/internal/common/replay_test.go
index cbbfac8ea8..6bc344d945 100644
--- a/block/internal/common/replay_test.go
+++ b/block/internal/common/replay_test.go
@@ -141,10 +141,12 @@ func TestReplayer_SyncToHeight_ExecutorAhead(t *testing.T) {
mockExec.On("GetLatestHeight", mock.Anything).Return(execHeight, nil)
- // Execute sync - should fail
+ // Execute sync - should just log and continue without error
err := syncer.SyncToHeight(ctx, targetHeight)
- require.Error(t, err)
- require.Contains(t, err.Error(), "execution layer height (101) is ahead of target height (100)")
+ require.NoError(t, err)
+
+ // No replay should be attempted
+ mockExec.AssertNotCalled(t, "ExecuteTxs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything)
}
func TestReplayer_SyncToHeight_NoHeightProvider(t *testing.T) {
diff --git a/block/internal/syncing/da_retriever_test.go b/block/internal/syncing/da_retriever_test.go
index c6e8daa78f..c398633d4b 100644
--- a/block/internal/syncing/da_retriever_test.go
+++ b/block/internal/syncing/da_retriever_test.go
@@ -128,9 +128,9 @@ func TestDARetriever_RetrieveFromDA_Timeout(t *testing.T) {
assert.Contains(t, err.Error(), "context deadline exceeded")
assert.Len(t, events, 0)
- // Verify timeout occurred approximately at expected time (with some tolerance)
- assert.Greater(t, duration, 9*time.Second, "should timeout after approximately 10 seconds")
- assert.Less(t, duration, 12*time.Second, "should not take much longer than timeout")
+ // Verify timeout occurred approximately at the helper timeout (with some tolerance)
+ assert.Greater(t, duration, defaultDATimeout-2*time.Second, "should timeout close to the helper timeout")
+ assert.Less(t, duration, defaultDATimeout+time.Second, "should not take much longer than timeout")
}
func TestDARetriever_RetrieveFromDA_TimeoutFast(t *testing.T) {
diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go
index d34dceca51..d91550cb77 100644
--- a/block/internal/syncing/syncer.go
+++ b/block/internal/syncing/syncer.go
@@ -299,11 +299,7 @@ func (s *Syncer) fetchDAUntilCaughtUp() error {
daHeight := s.GetDAHeight()
- // Create a new context with a timeout for the DA call
- ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second)
- defer cancel()
-
- events, err := s.daRetriever.RetrieveFromDA(ctx, daHeight)
+ events, err := s.daRetriever.RetrieveFromDA(s.ctx, daHeight)
if err != nil {
switch {
case errors.Is(err, coreda.ErrBlobNotFound):
diff --git a/docs/guides/full-node.md b/docs/guides/full-node.md
index 30241b075c..0022f65082 100644
--- a/docs/guides/full-node.md
+++ b/docs/guides/full-node.md
@@ -4,7 +4,7 @@
This guide covers how to set up a full node to run alongside a sequencer node in a Evolve-based blockchain network. A full node maintains a complete copy of the blockchain and helps validate transactions, improving the network's decentralization and security.
-> ** Note: The guide on how to run an evolve EVM full node can be found [here](./evm/single#setting-up-a-full-node). **
+> **Note: The guide on how to run an evolve EVM full node can be found [in the evm section](./evm/single#setting-up-a-full-node).**
## Prerequisites
diff --git a/docs/learn/config.md b/docs/learn/config.md
index ff855f28aa..094eeafde6 100644
--- a/docs/learn/config.md
+++ b/docs/learn/config.md
@@ -17,7 +17,6 @@ This document provides a comprehensive reference for all configuration options a
- [Maximum Pending Blocks](#maximum-pending-blocks)
- [Lazy Mode (Lazy Aggregator)](#lazy-mode-lazy-aggregator)
- [Lazy Block Interval](#lazy-block-interval)
- - [Trusted Hash](#trusted-hash)
- [Data Availability Configuration (`da`)](#data-availability-configuration-da)
- [DA Service Address](#da-service-address)
- [DA Authentication Token](#da-authentication-token)
@@ -275,24 +274,6 @@ _Example:_ `--rollkit.node.lazy_block_interval 1m`
_Default:_ `"30s"`
_Constant:_ `FlagLazyBlockTime`
-### Trusted Hash
-
-**Description:**
-The initial trusted hash used to bootstrap the header exchange service. This allows nodes to start synchronizing from a specific, trusted point in the chain history instead of from the genesis block. When provided, the node will fetch the corresponding header/block from peers using this hash. If not provided, the node attempts to sync from genesis.
-
-**YAML:**
-
-```yaml
-node:
- trusted_hash: "YOUR_TRUSTED_HASH_HEX_STRING"
-```
-
-**Command-line Flag:**
-`--rollkit.node.trusted_hash `
-_Example:_ `--rollkit.node.trusted_hash ABCDEF012345...`
-_Default:_ `""` (empty, sync from genesis)
-_Constant:_ `FlagTrustedHash`
-
## Data Availability Configuration (`da`)
Parameters for connecting and interacting with the Data Availability (DA) layer, which Evolve uses to publish block data.
@@ -658,6 +639,7 @@ curl http://localhost:7331/health/live
#### `/health/ready`
Returns `200 OK` if the node can serve correct data. Checks:
+
- P2P is listening (if enabled)
- Has synced blocks
- Not too far behind network
@@ -669,6 +651,7 @@ curl http://localhost:7331/health/ready
```
Configure max blocks behind:
+
```yaml
node:
readiness_max_blocks_behind: 15
diff --git a/docs/learn/specs/header-sync.md b/docs/learn/specs/header-sync.md
index d6a7a2da26..750f325933 100644
--- a/docs/learn/specs/header-sync.md
+++ b/docs/learn/specs/header-sync.md
@@ -68,10 +68,9 @@ The Executor component (in aggregator nodes) broadcasts headers and data in para
- Chain IDs for pubsub topics are also separated:
- Headers: `{chainID}-headerSync` creates topic like `/gm-headerSync/header-sub/v0.0.1`
- Data: `{chainID}-dataSync` creates topic like `/gm-dataSync/header-sub/v0.0.1`
-- Both stores must be initialized with genesis items before starting:
- - Header store needs genesis header
- - Data store needs genesis data (if applicable)
-- Genesis items can be loaded via `NodeConfig.TrustedHash` or P2P network query
+- Both stores must contain at least one item before the syncer starts:
+ - On first boot, the services fetch the configured genesis height from peers
+ - On restart, each store reuses its latest item to derive the initial height requested from peers
- Sync services work only when connected to P2P network via `P2PConfig.Seeds`
- Node context is passed to all components for graceful shutdown
- Headers and data are linked through DataHash but synced independently
diff --git a/docs/src/openapi-rpc.json b/docs/src/openapi-rpc.json
index e8fd381ed6..ec7c9c5671 100644
--- a/docs/src/openapi-rpc.json
+++ b/docs/src/openapi-rpc.json
@@ -472,6 +472,48 @@
}
}
}
+ },
+ "/evnode.v1.StoreService/GetP2PStoreInfo": {
+ "post": {
+ "tags": [
+ "Store Service"
+ ],
+ "summary": "Inspect go-header stores",
+ "description": "Returns head/tail information for the header and data go-header stores used by P2P sync.",
+ "operationId": "getP2PStoreInfo",
+ "requestBody": {
+ "description": "Empty request",
+ "required": true,
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/Empty"
+ },
+ "examples": {
+ "default": {
+ "summary": "Get go-header store snapshots",
+ "value": {}
+ }
+ }
+ }
+ }
+ },
+ "responses": {
+ "200": {
+ "description": "Snapshots returned successfully",
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/GetP2PStoreInfoResponse"
+ }
+ }
+ }
+ },
+ "500": {
+ "$ref": "#/components/responses/InternalError"
+ }
+ }
+ }
}
},
"components": {
@@ -1012,6 +1054,81 @@
"description": "Additional error details"
}
}
+ },
+ "P2PStoreEntry": {
+ "type": "object",
+ "description": "Head or tail entry for a go-header store",
+ "required": [
+ "height",
+ "hash"
+ ],
+ "properties": {
+ "height": {
+ "type": "integer",
+ "format": "int64",
+ "description": "Block height"
+ },
+ "hash": {
+ "type": "string",
+ "format": "byte",
+ "description": "Header/data hash (base64-encoded)"
+ },
+ "time": {
+ "type": "string",
+ "format": "date-time",
+ "description": "Entry timestamp"
+ }
+ }
+ },
+ "P2PStoreSnapshot": {
+ "type": "object",
+ "description": "Snapshot of a go-header store",
+ "required": [
+ "label",
+ "height",
+ "head_present",
+ "tail_present"
+ ],
+ "properties": {
+ "label": {
+ "type": "string",
+ "description": "Human friendly store label"
+ },
+ "height": {
+ "type": "integer",
+ "format": "int64",
+ "description": "Highest contiguous height"
+ },
+ "head_present": {
+ "type": "boolean",
+ "description": "Whether a head entry exists"
+ },
+ "head": {
+ "$ref": "#/components/schemas/P2PStoreEntry"
+ },
+ "tail_present": {
+ "type": "boolean",
+ "description": "Whether a tail entry exists"
+ },
+ "tail": {
+ "$ref": "#/components/schemas/P2PStoreEntry"
+ }
+ }
+ },
+ "GetP2PStoreInfoResponse": {
+ "type": "object",
+ "description": "Snapshot of the header and data go-header stores",
+ "required": [
+ "stores"
+ ],
+ "properties": {
+ "stores": {
+ "type": "array",
+ "items": {
+ "$ref": "#/components/schemas/P2PStoreSnapshot"
+ }
+ }
+ }
}
}
}
diff --git a/node/full.go b/node/full.go
index 2097c24b59..6d03a87c04 100644
--- a/node/full.go
+++ b/node/full.go
@@ -291,7 +291,16 @@ func (n *FullNode) Run(parentCtx context.Context) error {
return min(hHeight, dHeight)
}
- handler, err := rpcserver.NewServiceHandler(n.Store, n.p2pClient, n.genesis.ProposerAddress, n.Logger, n.nodeConfig, bestKnownHeightProvider)
+ handler, err := rpcserver.NewServiceHandler(
+ n.Store,
+ n.hSyncService.Store(),
+ n.dSyncService.Store(),
+ n.p2pClient,
+ n.genesis.ProposerAddress,
+ n.Logger,
+ n.nodeConfig,
+ bestKnownHeightProvider,
+ )
if err != nil {
return fmt.Errorf("error creating RPC handler: %w", err)
}
diff --git a/node/full_node_integration_test.go b/node/full_node_integration_test.go
index a214c13163..c13d6dc0d7 100644
--- a/node/full_node_integration_test.go
+++ b/node/full_node_integration_test.go
@@ -260,10 +260,9 @@ func TestSingleSequencerTwoFullNodesBlockSyncSpeed(t *testing.T) {
// TestDataExchange verifies data exchange and synchronization between nodes in various network topologies.
//
-// This test runs three sub-tests:
+// This test runs two sub-tests:
// 1. Single sequencer and single full
// 2. Single sequencer and two full nodes.
-// 3. Single sequencer and single full node with trusted hash.
//
// Each sub-test checks data exchange and synchronization to ensure correct data propagation and consistency across nodes.
func TestDataExchange(t *testing.T) {
@@ -273,17 +272,13 @@ func TestDataExchange(t *testing.T) {
t.Run("SingleSequencerTwoFullNodes", func(t *testing.T) {
testSingleSequencerTwoFullNodes(t, Data)
})
- t.Run("SingleSequencerSingleFullNodeTrustedHash", func(t *testing.T) {
- testSingleSequencerSingleFullNodeTrustedHash(t, Data)
- })
}
// TestHeaderExchange verifies header exchange and synchronization between nodes in various network topologies.
//
-// This test runs three sub-tests:
+// This test runs two sub-tests:
// 1. Single sequencer and single full
// 2. Single sequencer and two full nodes.
-// 3. Single sequencer and single full node with trusted hash.
//
// Each sub-test checks header exchange and synchronization to ensure correct header propagation and consistency across nodes.
func TestHeaderExchange(t *testing.T) {
@@ -293,9 +288,6 @@ func TestHeaderExchange(t *testing.T) {
t.Run("SingleSequencerTwoFullNodes", func(t *testing.T) {
testSingleSequencerTwoFullNodes(t, Header)
})
- t.Run("SingleSequencerSingleFullNodeTrustedHash", func(t *testing.T) {
- testSingleSequencerSingleFullNodeTrustedHash(t, Header)
- })
}
// testSingleSequencerSingleFullNode sets up a single sequencer and a single full node, starts the sequencer, waits for it to produce a block, then starts the full
@@ -392,68 +384,6 @@ func testSingleSequencerTwoFullNodes(t *testing.T, source Source) {
shutdownAndWait(t, cancels, &runningWg, 5*time.Second)
}
-// testSingleSequencerSingleFullNodeTrustedHash sets up a single sequencer and a single full node with a trusted hash, starts the sequencer, waits for it to produce a block, then starts the full node with the trusted hash.
-// It waits for both nodes to reach a target block height (using the provided 'source' to determine block inclusion), verifies that both nodes are fully synced, and then shuts them down.
-func testSingleSequencerSingleFullNodeTrustedHash(t *testing.T, source Source) {
- require := require.New(t)
-
- // Set up one sequencer and one full node
- config := getTestConfig(t, 1)
- numNodes := 2
- nodes, cleanups := createNodesWithCleanup(t, numNodes, config)
- for _, cleanup := range cleanups {
- defer cleanup()
- }
-
- ctxs, cancels := createNodeContexts(numNodes)
- var runningWg sync.WaitGroup
- errChan := make(chan error, numNodes)
-
- // Start the sequencer first
- startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan)
-
- // Wait for the sequencer to produce at first block
- require.NoError(waitForFirstBlock(nodes[0], source))
-
- // Get the hash of the first block (using the correct source)
- var trustedHash string
- switch source {
- case Data:
- trustedHashValue, err := nodes[0].dSyncService.Store().GetByHeight(ctxs[0], 1)
- require.NoError(err)
- trustedHash = trustedHashValue.Hash().String()
- case Header:
- trustedHashValue, err := nodes[0].hSyncService.Store().GetByHeight(ctxs[0], 1)
- require.NoError(err)
- trustedHash = trustedHashValue.Hash().String()
- default:
- t.Fatalf("unsupported source for trusted hash test: %v", source)
- }
-
- // Set the trusted hash in the full node
- nodeConfig := nodes[1].nodeConfig
- nodeConfig.Node.TrustedHash = trustedHash
-
- // Add a small delay to ensure P2P services are fully ready
- time.Sleep(500 * time.Millisecond)
-
- // Start the full node
- startNodeInBackground(t, nodes, ctxs, &runningWg, 1, errChan)
-
- blocksToWaitFor := uint64(3)
- // Wait for both nodes to reach at least blocksToWaitFor blocks
- for _, nodeItem := range nodes {
- requireEmptyChan(t, errChan)
- require.NoError(waitForAtLeastNBlocks(nodeItem, blocksToWaitFor, source))
- }
-
- // Verify both nodes are synced using the helper
- require.NoError(verifyNodesSynced(nodes[0], nodes[1], source))
-
- // Cancel all node contexts to signal shutdown and wait
- shutdownAndWait(t, cancels, &runningWg, 5*time.Second)
-}
-
// TestTwoChainsInOneNamespace verifies that two chains in the same namespace can coexist without any issues.
func TestTwoChainsInOneNamespace(t *testing.T) {
cases := []struct {
diff --git a/node/light.go b/node/light.go
index 4b1d1cc7ed..bdc4a92351 100644
--- a/node/light.go
+++ b/node/light.go
@@ -84,7 +84,16 @@ func (ln *LightNode) Run(parentCtx context.Context) error {
return ln.hSyncService.Store().Height()
}
- handler, err := rpcserver.NewServiceHandler(ln.Store, ln.P2P, nil, ln.Logger, ln.nodeConfig, bestKnown)
+ handler, err := rpcserver.NewServiceHandler(
+ ln.Store,
+ ln.hSyncService.Store(),
+ nil,
+ ln.P2P,
+ nil,
+ ln.Logger,
+ ln.nodeConfig,
+ bestKnown,
+ )
if err != nil {
return fmt.Errorf("error creating RPC handler: %w", err)
}
diff --git a/pkg/cmd/run_node_test.go b/pkg/cmd/run_node_test.go
index 2980f94741..16430ee450 100644
--- a/pkg/cmd/run_node_test.go
+++ b/pkg/cmd/run_node_test.go
@@ -69,7 +69,6 @@ func TestParseFlags(t *testing.T) {
"--rollkit.node.lazy_block_interval", "2m",
"--rollkit.node.light",
"--rollkit.node.max_pending_headers_and_data", "100",
- "--rollkit.node.trusted_hash", "abcdef1234567890",
"--rollkit.da.submit_options", "custom-options",
// Instrumentation flags
"--rollkit.instrumentation.prometheus", "true",
@@ -125,7 +124,6 @@ func TestParseFlags(t *testing.T) {
{"LazyBlockTime", nodeConfig.Node.LazyBlockInterval.Duration, 2 * time.Minute},
{"Light", nodeConfig.Node.Light, true},
{"MaxPendingHeadersAndData", nodeConfig.Node.MaxPendingHeadersAndData, uint64(100)},
- {"TrustedHash", nodeConfig.Node.TrustedHash, "abcdef1234567890"},
{"DASubmitOptions", nodeConfig.DA.SubmitOptions, "custom-options"},
{"Prometheus", nodeConfig.Instrumentation.Prometheus, true},
{"PrometheusListenAddr", nodeConfig.Instrumentation.PrometheusListenAddr, ":26665"},
diff --git a/pkg/config/config.go b/pkg/config/config.go
index aad4ce6b93..f6b7dff2ed 100644
--- a/pkg/config/config.go
+++ b/pkg/config/config.go
@@ -37,8 +37,6 @@ const (
FlagLight = FlagPrefixEvnode + "node.light"
// FlagBlockTime is a flag for specifying the block time
FlagBlockTime = FlagPrefixEvnode + "node.block_time"
- // FlagTrustedHash is a flag for specifying the trusted hash
- FlagTrustedHash = FlagPrefixEvnode + "node.trusted_hash"
// FlagLazyAggregator is a flag for enabling lazy aggregation mode that only produces blocks when transactions are available
FlagLazyAggregator = FlagPrefixEvnode + "node.lazy_mode"
// FlagMaxPendingHeadersAndData is a flag to limit and pause block production when too many headers or data are waiting for DA confirmation
@@ -192,9 +190,6 @@ type NodeConfig struct {
LazyMode bool `mapstructure:"lazy_mode" yaml:"lazy_mode" comment:"Enables lazy aggregation mode, where blocks are only produced when transactions are available or after LazyBlockTime. Optimizes resources by avoiding empty block creation during periods of inactivity."`
LazyBlockInterval DurationWrapper `mapstructure:"lazy_block_interval" yaml:"lazy_block_interval" comment:"Maximum interval between blocks in lazy aggregation mode (LazyAggregator). Ensures blocks are produced periodically even without transactions to keep the chain active. Generally larger than BlockTime."`
- // Header configuration
- TrustedHash string `mapstructure:"trusted_hash" yaml:"trusted_hash" comment:"Initial trusted hash used to bootstrap the header exchange service. Allows nodes to start synchronizing from a specific trusted point in the chain instead of genesis. When provided, the node will fetch the corresponding header/block from peers using this hash and use it as a starting point for synchronization. If not provided, the node will attempt to fetch the genesis block instead."`
-
// Readiness / health configuration
ReadinessWindowSeconds uint64 `mapstructure:"readiness_window_seconds" yaml:"readiness_window_seconds" comment:"Time window in seconds used to calculate ReadinessMaxBlocksBehind based on block time. Default: 15 seconds."`
ReadinessMaxBlocksBehind uint64 `mapstructure:"readiness_max_blocks_behind" yaml:"readiness_max_blocks_behind" comment:"How many blocks behind best-known head the node can be and still be considered ready. 0 means must be exactly at head."`
@@ -309,7 +304,6 @@ func AddFlags(cmd *cobra.Command) {
cmd.Flags().Bool(FlagAggregator, def.Node.Aggregator, "run node in aggregator mode")
cmd.Flags().Bool(FlagLight, def.Node.Light, "run light client")
cmd.Flags().Duration(FlagBlockTime, def.Node.BlockTime.Duration, "block time (for aggregator mode)")
- cmd.Flags().String(FlagTrustedHash, def.Node.TrustedHash, "initial trusted hash to start the header exchange service")
cmd.Flags().Bool(FlagLazyAggregator, def.Node.LazyMode, "produce blocks only when transactions are available or after lazy block time")
cmd.Flags().Uint64(FlagMaxPendingHeadersAndData, def.Node.MaxPendingHeadersAndData, "maximum headers or data pending DA confirmation before pausing block production (0 for no limit)")
cmd.Flags().Duration(FlagLazyBlockTime, def.Node.LazyBlockInterval.Duration, "maximum interval between blocks in lazy aggregation mode")
diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go
index 206d6040bd..7834e42aab 100644
--- a/pkg/config/config_test.go
+++ b/pkg/config/config_test.go
@@ -32,7 +32,6 @@ func TestDefaultConfig(t *testing.T) {
assert.Equal(t, uint64(0), def.Node.MaxPendingHeadersAndData)
assert.Equal(t, false, def.Node.LazyMode)
assert.Equal(t, 60*time.Second, def.Node.LazyBlockInterval.Duration)
- assert.Equal(t, "", def.Node.TrustedHash)
assert.Equal(t, "file", def.Signer.SignerType)
assert.Equal(t, "config", def.Signer.SignerPath)
assert.Equal(t, "127.0.0.1:7331", def.RPC.Address)
@@ -56,7 +55,6 @@ func TestAddFlags(t *testing.T) {
assertFlagValue(t, flags, FlagAggregator, DefaultConfig().Node.Aggregator)
assertFlagValue(t, flags, FlagLight, DefaultConfig().Node.Light)
assertFlagValue(t, flags, FlagBlockTime, DefaultConfig().Node.BlockTime.Duration)
- assertFlagValue(t, flags, FlagTrustedHash, DefaultConfig().Node.TrustedHash)
assertFlagValue(t, flags, FlagLazyAggregator, DefaultConfig().Node.LazyMode)
assertFlagValue(t, flags, FlagMaxPendingHeadersAndData, DefaultConfig().Node.MaxPendingHeadersAndData)
assertFlagValue(t, flags, FlagLazyBlockTime, DefaultConfig().Node.LazyBlockInterval.Duration)
@@ -101,7 +99,7 @@ func TestAddFlags(t *testing.T) {
assertFlagValue(t, flags, FlagRPCAddress, DefaultConfig().RPC.Address)
// Count the number of flags we're explicitly checking
- expectedFlagCount := 38 // Update this number if you add more flag checks above
+ expectedFlagCount := 37 // Update this number if you add more flag checks above
// Get the actual number of flags (both regular and persistent)
actualFlagCount := 0
diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go
index 3410547531..6a6f813a3c 100644
--- a/pkg/config/defaults.go
+++ b/pkg/config/defaults.go
@@ -66,7 +66,6 @@ func DefaultConfig() Config {
LazyMode: false,
LazyBlockInterval: DurationWrapper{60 * time.Second},
Light: false,
- TrustedHash: "",
ReadinessWindowSeconds: defaultReadinessWindowSeconds,
ReadinessMaxBlocksBehind: calculateReadinessMaxBlocksBehind(defaultBlockTime.Duration, defaultReadinessWindowSeconds),
},
diff --git a/pkg/rpc/client/client.go b/pkg/rpc/client/client.go
index f7a03d536f..54a46eaf1e 100644
--- a/pkg/rpc/client/client.go
+++ b/pkg/rpc/client/client.go
@@ -89,6 +89,16 @@ func (c *Client) GetMetadata(ctx context.Context, key string) ([]byte, error) {
return resp.Msg.Value, nil
}
+// GetP2PStoreInfo returns head/tail metadata for the go-header stores
+func (c *Client) GetP2PStoreInfo(ctx context.Context) ([]*pb.P2PStoreSnapshot, error) {
+ req := connect.NewRequest(&emptypb.Empty{})
+ resp, err := c.storeClient.GetP2PStoreInfo(ctx, req)
+ if err != nil {
+ return nil, err
+ }
+ return resp.Msg.Stores, nil
+}
+
// GetPeerInfo returns information about the connected peers
func (c *Client) GetPeerInfo(ctx context.Context) ([]*pb.PeerInfo, error) {
req := connect.NewRequest(&emptypb.Empty{})
diff --git a/pkg/rpc/client/client_test.go b/pkg/rpc/client/client_test.go
index db375dee5b..4b2b82e1b3 100644
--- a/pkg/rpc/client/client_test.go
+++ b/pkg/rpc/client/client_test.go
@@ -7,6 +7,7 @@ import (
"testing"
"time"
+ goheader "github.com/celestiaorg/go-header"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/rs/zerolog"
@@ -19,15 +20,22 @@ import (
"github.com/evstack/ev-node/pkg/p2p"
"github.com/evstack/ev-node/pkg/rpc/server"
"github.com/evstack/ev-node/test/mocks"
+ headerstoremocks "github.com/evstack/ev-node/test/mocks/external"
"github.com/evstack/ev-node/types"
rpc "github.com/evstack/ev-node/types/pb/evnode/v1/v1connect"
)
-func setupTestServer(t *testing.T, mockStore *mocks.MockStore, mockP2P *mocks.MockP2PRPC) (*httptest.Server, *Client) {
+func setupTestServer(
+ t *testing.T,
+ mockStore *mocks.MockStore,
+ headerStore goheader.Store[*types.SignedHeader],
+ dataStore goheader.Store[*types.Data],
+ mockP2P *mocks.MockP2PRPC,
+) (*httptest.Server, *Client) {
mux := http.NewServeMux()
logger := zerolog.Nop()
- storeServer := server.NewStoreServer(mockStore, logger)
+ storeServer := server.NewStoreServer(mockStore, headerStore, dataStore, logger)
p2pServer := server.NewP2PServer(mockP2P)
testConfig := config.DefaultConfig()
@@ -62,7 +70,7 @@ func TestClientGetState(t *testing.T) {
mockStore.On("GetState", mock.Anything).Return(state, nil)
- testServer, client := setupTestServer(t, mockStore, mockP2P)
+ testServer, client := setupTestServer(t, mockStore, nil, nil, mockP2P)
defer testServer.Close()
resultState, err := client.GetState(context.Background())
@@ -84,7 +92,7 @@ func TestClientGetMetadata(t *testing.T) {
mockStore.On("GetMetadata", mock.Anything, key).Return(value, nil)
- testServer, client := setupTestServer(t, mockStore, mockP2P)
+ testServer, client := setupTestServer(t, mockStore, nil, nil, mockP2P)
defer testServer.Close()
resultValue, err := client.GetMetadata(context.Background(), key)
@@ -94,6 +102,42 @@ func TestClientGetMetadata(t *testing.T) {
mockStore.AssertExpectations(t)
}
+func TestClientGetP2PStoreInfo(t *testing.T) {
+ mockStore := mocks.NewMockStore(t)
+ mockP2P := mocks.NewMockP2PRPC(t)
+ headerStore := headerstoremocks.NewMockStore[*types.SignedHeader](t)
+ dataStore := headerstoremocks.NewMockStore[*types.Data](t)
+
+ now := time.Now().UTC()
+
+ headerHead := testSignedHeader(10, now)
+ headerTail := testSignedHeader(5, now.Add(-time.Minute))
+ headerStore.On("Height").Return(uint64(10))
+ headerStore.On("Head", mock.Anything).Return(headerHead, nil)
+ headerStore.On("Tail", mock.Anything).Return(headerTail, nil)
+
+ dataHead := testData(8, now.Add(-30*time.Second))
+ dataTail := testData(4, now.Add(-2*time.Minute))
+ dataStore.On("Height").Return(uint64(8))
+ dataStore.On("Head", mock.Anything).Return(dataHead, nil)
+ dataStore.On("Tail", mock.Anything).Return(dataTail, nil)
+
+ testServer, client := setupTestServer(t, mockStore, headerStore, dataStore, mockP2P)
+ defer testServer.Close()
+
+ stores, err := client.GetP2PStoreInfo(context.Background())
+ require.NoError(t, err)
+ require.Len(t, stores, 2)
+
+ require.Equal(t, "Header Store", stores[0].Label)
+ require.Equal(t, uint64(10), stores[0].Head.Height)
+ require.Equal(t, uint64(5), stores[0].Tail.Height)
+
+ require.Equal(t, "Data Store", stores[1].Label)
+ require.Equal(t, uint64(8), stores[1].Height)
+ require.Equal(t, uint64(4), stores[1].Tail.Height)
+}
+
func TestClientGetBlockByHeight(t *testing.T) {
mockStore := mocks.NewMockStore(t)
mockP2P := mocks.NewMockP2PRPC(t)
@@ -104,7 +148,7 @@ func TestClientGetBlockByHeight(t *testing.T) {
mockStore.On("GetBlockData", mock.Anything, height).Return(header, data, nil)
- testServer, client := setupTestServer(t, mockStore, mockP2P)
+ testServer, client := setupTestServer(t, mockStore, nil, nil, mockP2P)
defer testServer.Close()
block, err := client.GetBlockByHeight(context.Background(), height)
@@ -124,7 +168,7 @@ func TestClientGetBlockByHash(t *testing.T) {
mockStore.On("GetBlockByHash", mock.Anything, hash).Return(header, data, nil)
- testServer, client := setupTestServer(t, mockStore, mockP2P)
+ testServer, client := setupTestServer(t, mockStore, nil, nil, mockP2P)
defer testServer.Close()
block, err := client.GetBlockByHash(context.Background(), hash)
@@ -154,7 +198,7 @@ func TestClientGetPeerInfo(t *testing.T) {
mockP2P.On("GetPeers").Return(peers, nil)
- testServer, client := setupTestServer(t, mockStore, mockP2P)
+ testServer, client := setupTestServer(t, mockStore, nil, nil, mockP2P)
defer testServer.Close()
resultPeers, err := client.GetPeerInfo(context.Background())
@@ -179,7 +223,7 @@ func TestClientGetNetInfo(t *testing.T) {
mockP2P.On("GetNetworkInfo").Return(netInfo, nil)
- testServer, client := setupTestServer(t, mockStore, mockP2P)
+ testServer, client := setupTestServer(t, mockStore, nil, nil, mockP2P)
defer testServer.Close()
resultNetInfo, err := client.GetNetInfo(context.Background())
@@ -194,7 +238,7 @@ func TestClientGetNamespace(t *testing.T) {
mockStore := mocks.NewMockStore(t)
mockP2P := mocks.NewMockP2PRPC(t)
- testServer, client := setupTestServer(t, mockStore, mockP2P)
+ testServer, client := setupTestServer(t, mockStore, nil, nil, mockP2P)
defer testServer.Close()
namespaceResp, err := client.GetNamespace(context.Background())
@@ -205,3 +249,28 @@ func TestClientGetNamespace(t *testing.T) {
require.NotEmpty(t, namespaceResp.HeaderNamespace)
require.NotEmpty(t, namespaceResp.DataNamespace)
}
+
+func testSignedHeader(height uint64, ts time.Time) *types.SignedHeader {
+ return &types.SignedHeader{
+ Header: types.Header{
+ BaseHeader: types.BaseHeader{
+ Height: height,
+ Time: uint64(ts.UnixNano()),
+ ChainID: "test-chain",
+ },
+ ProposerAddress: []byte{0x01},
+ DataHash: []byte{0x02},
+ AppHash: []byte{0x03},
+ },
+ }
+}
+
+func testData(height uint64, ts time.Time) *types.Data {
+ return &types.Data{
+ Metadata: &types.Metadata{
+ ChainID: "test-chain",
+ Height: height,
+ Time: uint64(ts.UnixNano()),
+ },
+ }
+}
diff --git a/pkg/rpc/example/example.go b/pkg/rpc/example/example.go
index a1bc4cd7c0..f3c020fa87 100644
--- a/pkg/rpc/example/example.go
+++ b/pkg/rpc/example/example.go
@@ -22,7 +22,7 @@ func StartStoreServer(s store.Store, address string, logger zerolog.Logger) {
rpcAddr := fmt.Sprintf("%s:%d", "localhost", 8080)
cfg := config.DefaultConfig()
- handler, err := server.NewServiceHandler(s, nil, nil, logger, cfg, nil)
+ handler, err := server.NewServiceHandler(s, nil, nil, nil, nil, logger, cfg, nil)
if err != nil {
panic(err)
}
@@ -82,7 +82,7 @@ func ExampleServer(s store.Store) {
// Start RPC server
rpcAddr := fmt.Sprintf("%s:%d", "localhost", 8080)
cfg := config.DefaultConfig()
- handler, err := server.NewServiceHandler(s, nil, nil, logger, cfg, nil)
+ handler, err := server.NewServiceHandler(s, nil, nil, nil, nil, logger, cfg, nil)
if err != nil {
panic(err)
}
diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go
index f649fda37d..8af957ed9d 100644
--- a/pkg/rpc/server/server.go
+++ b/pkg/rpc/server/server.go
@@ -12,6 +12,7 @@ import (
"connectrpc.com/connect"
"connectrpc.com/grpcreflect"
+ goheader "github.com/celestiaorg/go-header"
coreda "github.com/evstack/ev-node/core/da"
ds "github.com/ipfs/go-datastore"
"github.com/rs/zerolog"
@@ -32,15 +33,24 @@ var _ rpc.StoreServiceHandler = (*StoreServer)(nil)
// StoreServer implements the StoreService defined in the proto file
type StoreServer struct {
- store store.Store
- logger zerolog.Logger
+ store store.Store
+ headerStore goheader.Store[*types.SignedHeader]
+ dataStore goheader.Store[*types.Data]
+ logger zerolog.Logger
}
// NewStoreServer creates a new StoreServer instance
-func NewStoreServer(store store.Store, logger zerolog.Logger) *StoreServer {
+func NewStoreServer(
+ store store.Store,
+ headerStore goheader.Store[*types.SignedHeader],
+ dataStore goheader.Store[*types.Data],
+ logger zerolog.Logger,
+) *StoreServer {
return &StoreServer{
- store: store,
- logger: logger,
+ store: store,
+ headerStore: headerStore,
+ dataStore: dataStore,
+ logger: logger,
}
}
@@ -172,6 +182,34 @@ func (s *StoreServer) GetGenesisDaHeight(
}
+// GetP2PStoreInfo implements the GetP2PStoreInfo RPC method
+func (s *StoreServer) GetP2PStoreInfo(
+ ctx context.Context,
+ _ *connect.Request[emptypb.Empty],
+) (*connect.Response[pb.GetP2PStoreInfoResponse], error) {
+ snapshots := make([]*pb.P2PStoreSnapshot, 0, 2)
+
+ if s.headerStore != nil {
+ snapshot, err := collectP2PStoreSnapshot(ctx, s.headerStore, "Header Store")
+ if err != nil {
+ return nil, connect.NewError(connect.CodeInternal, err)
+ }
+ snapshots = append(snapshots, snapshot)
+ }
+
+ if s.dataStore != nil {
+ snapshot, err := collectP2PStoreSnapshot(ctx, s.dataStore, "Data Store")
+ if err != nil {
+ return nil, connect.NewError(connect.CodeInternal, err)
+ }
+ snapshots = append(snapshots, snapshot)
+ }
+
+ return connect.NewResponse(&pb.GetP2PStoreInfoResponse{
+ Stores: snapshots,
+ }), nil
+}
+
// GetMetadata implements the GetMetadata RPC method
func (s *StoreServer) GetMetadata(
ctx context.Context,
@@ -187,6 +225,48 @@ func (s *StoreServer) GetMetadata(
}), nil
}
+func collectP2PStoreSnapshot[H goheader.Header[H]](
+ ctx context.Context,
+ store goheader.Store[H],
+ label string,
+) (*pb.P2PStoreSnapshot, error) {
+ snapshot := &pb.P2PStoreSnapshot{
+ Label: label,
+ Height: store.Height(),
+ }
+
+ if head, err := store.Head(ctx); err == nil {
+ snapshot.Head = toP2PStoreEntry(head)
+ } else if !errors.Is(err, goheader.ErrEmptyStore) && !errors.Is(err, goheader.ErrNotFound) {
+ return nil, fmt.Errorf("failed to read %s head: %w", label, err)
+ }
+
+ if tail, err := store.Tail(ctx); err == nil {
+ snapshot.Tail = toP2PStoreEntry(tail)
+ } else if !errors.Is(err, goheader.ErrEmptyStore) && !errors.Is(err, goheader.ErrNotFound) {
+ return nil, fmt.Errorf("failed to read %s tail: %w", label, err)
+ }
+
+ return snapshot, nil
+}
+
+func toP2PStoreEntry[H goheader.Header[H]](item H) *pb.P2PStoreEntry {
+ if any(item) == nil {
+ return nil
+ }
+
+ entry := &pb.P2PStoreEntry{
+ Height: item.Height(),
+ Hash: append([]byte(nil), item.Hash()...),
+ }
+
+ if ts := item.Time(); !ts.IsZero() {
+ entry.Time = timestamppb.New(ts)
+ }
+
+ return entry
+}
+
type ConfigServer struct {
config config.Config
signer []byte
@@ -288,8 +368,17 @@ func (p *P2PServer) GetNetInfo(
}
// NewServiceHandler creates a new HTTP handler for Store, P2P and Config services
-func NewServiceHandler(store store.Store, peerManager p2p.P2PRPC, proposerAddress []byte, logger zerolog.Logger, config config.Config, bestKnown BestKnownHeightProvider) (http.Handler, error) {
- storeServer := NewStoreServer(store, logger)
+func NewServiceHandler(
+ store store.Store,
+ headerStore goheader.Store[*types.SignedHeader],
+ dataStore goheader.Store[*types.Data],
+ peerManager p2p.P2PRPC,
+ proposerAddress []byte,
+ logger zerolog.Logger,
+ config config.Config,
+ bestKnown BestKnownHeightProvider,
+) (http.Handler, error) {
+ storeServer := NewStoreServer(store, headerStore, dataStore, logger)
p2pServer := NewP2PServer(peerManager)
configServer := NewConfigServer(config, proposerAddress, logger)
diff --git a/pkg/rpc/server/server_test.go b/pkg/rpc/server/server_test.go
index 1b0818e862..2ac3e004e7 100644
--- a/pkg/rpc/server/server_test.go
+++ b/pkg/rpc/server/server_test.go
@@ -23,6 +23,7 @@ import (
"github.com/evstack/ev-node/pkg/p2p"
"github.com/evstack/ev-node/pkg/store"
"github.com/evstack/ev-node/test/mocks"
+ headerstoremocks "github.com/evstack/ev-node/test/mocks/external"
"github.com/evstack/ev-node/types"
pb "github.com/evstack/ev-node/types/pb/evnode/v1"
)
@@ -46,7 +47,7 @@ func TestGetBlock(t *testing.T) {
// Create server with mock store
logger := zerolog.Nop()
- server := NewStoreServer(mockStore, logger)
+ server := NewStoreServer(mockStore, nil, nil, logger)
// Test GetBlock with height - success case
t.Run("by height with DA heights", func(t *testing.T) {
@@ -138,7 +139,7 @@ func TestGetBlock_Latest(t *testing.T) {
mockStore := mocks.NewMockStore(t)
logger := zerolog.Nop()
- server := NewStoreServer(mockStore, logger)
+ server := NewStoreServer(mockStore, nil, nil, logger)
latestHeight := uint64(20)
header := &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: latestHeight}}}
@@ -194,7 +195,7 @@ func TestGetState(t *testing.T) {
// Create server with mock store
logger := zerolog.Nop()
- server := NewStoreServer(mockStore, logger)
+ server := NewStoreServer(mockStore, nil, nil, logger)
// Call GetState
req := connect.NewRequest(&emptypb.Empty{})
@@ -217,7 +218,7 @@ func TestGetState_Error(t *testing.T) {
mockStore := mocks.NewMockStore(t)
mockStore.On("GetState", mock.Anything).Return(types.State{}, fmt.Errorf("state error"))
logger := zerolog.Nop()
- server := NewStoreServer(mockStore, logger)
+ server := NewStoreServer(mockStore, nil, nil, logger)
resp, err := server.GetState(context.Background(), connect.NewRequest(&emptypb.Empty{}))
require.Error(t, err)
require.Nil(t, resp)
@@ -236,7 +237,7 @@ func TestGetMetadata(t *testing.T) {
// Create server with mock store
logger := zerolog.Nop()
- server := NewStoreServer(mockStore, logger)
+ server := NewStoreServer(mockStore, nil, nil, logger)
// Call GetMetadata
req := connect.NewRequest(&pb.GetMetadataRequest{
@@ -254,7 +255,7 @@ func TestGetMetadata_Error(t *testing.T) {
mockStore := mocks.NewMockStore(t)
mockStore.On("GetMetadata", mock.Anything, "bad").Return(nil, fmt.Errorf("meta error"))
logger := zerolog.Nop()
- server := NewStoreServer(mockStore, logger)
+ server := NewStoreServer(mockStore, nil, nil, logger)
resp, err := server.GetMetadata(context.Background(), connect.NewRequest(&pb.GetMetadataRequest{Key: "bad"}))
require.Error(t, err)
require.Nil(t, resp)
@@ -269,7 +270,7 @@ func TestGetGenesisDaHeight(t *testing.T) {
mockStore.On("GetMetadata", mock.Anything, store.GenesisDAHeightKey).Return(heightBytes, nil).Once()
logger := zerolog.Nop()
- server := NewStoreServer(mockStore, logger)
+ server := NewStoreServer(mockStore, nil, nil, logger)
t.Run("success", func(t *testing.T) {
req := connect.NewRequest(&emptypb.Empty{})
@@ -287,7 +288,7 @@ func TestGetGenesisDaHeight_NotFound(t *testing.T) {
mockStore.On("GetMetadata", mock.Anything, store.GenesisDAHeightKey).Return(nil, fmt.Errorf("genesis DA height not found")).Once()
logger := zerolog.Nop()
- server := NewStoreServer(mockStore, logger)
+ server := NewStoreServer(mockStore, nil, nil, logger)
req := connect.NewRequest(&emptypb.Empty{})
resp, err := server.GetGenesisDaHeight(context.Background(), req)
@@ -307,7 +308,7 @@ func TestGetGenesisDaHeight_InvalidLength(t *testing.T) {
mockStore.On("GetMetadata", mock.Anything, store.GenesisDAHeightKey).Return(invalidBytes, nil).Once()
logger := zerolog.Nop()
- server := NewStoreServer(mockStore, logger)
+ server := NewStoreServer(mockStore, nil, nil, logger)
req := connect.NewRequest(&emptypb.Empty{})
resp, err := server.GetGenesisDaHeight(context.Background(), req)
@@ -321,6 +322,50 @@ func TestGetGenesisDaHeight_InvalidLength(t *testing.T) {
mockStore.AssertExpectations(t)
}
+func TestGetP2PStoreInfo(t *testing.T) {
+ t.Run("returns snapshots for configured stores", func(t *testing.T) {
+ mockStore := mocks.NewMockStore(t)
+ headerStore := headerstoremocks.NewMockStore[*types.SignedHeader](t)
+ dataStore := headerstoremocks.NewMockStore[*types.Data](t)
+ logger := zerolog.Nop()
+ server := NewStoreServer(mockStore, headerStore, dataStore, logger)
+
+ now := time.Now().UTC()
+ headerStore.On("Height").Return(uint64(12))
+ headerStore.On("Head", mock.Anything).Return(makeTestSignedHeader(12, now), nil)
+ headerStore.On("Tail", mock.Anything).Return(makeTestSignedHeader(7, now.Add(-time.Minute)), nil)
+
+ dataStore.On("Height").Return(uint64(9))
+ dataStore.On("Head", mock.Anything).Return(makeTestData(9, now.Add(-30*time.Second)), nil)
+ dataStore.On("Tail", mock.Anything).Return(makeTestData(4, now.Add(-2*time.Minute)), nil)
+
+ resp, err := server.GetP2PStoreInfo(context.Background(), connect.NewRequest(&emptypb.Empty{}))
+ require.NoError(t, err)
+ require.Len(t, resp.Msg.Stores, 2)
+
+ require.Equal(t, "Header Store", resp.Msg.Stores[0].Label)
+ require.Equal(t, uint64(12), resp.Msg.Stores[0].Height)
+
+ require.Equal(t, "Data Store", resp.Msg.Stores[1].Label)
+ require.Equal(t, uint64(9), resp.Msg.Stores[1].Height)
+ require.Equal(t, uint64(9), resp.Msg.Stores[1].Head.Height)
+ require.Equal(t, uint64(4), resp.Msg.Stores[1].Tail.Height)
+ })
+
+ t.Run("returns error when a store edge fails", func(t *testing.T) {
+ mockStore := mocks.NewMockStore(t)
+ headerStore := headerstoremocks.NewMockStore[*types.SignedHeader](t)
+ logger := zerolog.Nop()
+ headerStore.On("Height").Return(uint64(0))
+ headerStore.On("Head", mock.Anything).Return((*types.SignedHeader)(nil), fmt.Errorf("boom"))
+
+ server := NewStoreServer(mockStore, headerStore, nil, logger)
+ resp, err := server.GetP2PStoreInfo(context.Background(), connect.NewRequest(&emptypb.Empty{}))
+ require.Error(t, err)
+ require.Nil(t, resp)
+ })
+}
+
func TestP2PServer_GetPeerInfo(t *testing.T) {
mockP2P := &mocks.MockP2PRPC{}
addr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/4001")
@@ -371,7 +416,7 @@ func TestHealthLiveEndpoint(t *testing.T) {
// Mock successful store access
mockStore.On("Height", mock.Anything).Return(uint64(100), nil)
- handler, err := NewServiceHandler(mockStore, mockP2PManager, nil, logger, testConfig, nil)
+ handler, err := NewServiceHandler(mockStore, nil, nil, mockP2PManager, nil, logger, testConfig, nil)
require.NoError(t, err)
server := httptest.NewServer(handler)
@@ -396,7 +441,7 @@ func TestHealthLiveEndpoint(t *testing.T) {
// Mock store access failure
mockStore.On("Height", mock.Anything).Return(uint64(0), fmt.Errorf("store unavailable"))
- handler, err := NewServiceHandler(mockStore, mockP2PManager, nil, logger, testConfig, nil)
+ handler, err := NewServiceHandler(mockStore, nil, nil, mockP2PManager, nil, logger, testConfig, nil)
require.NoError(t, err)
server := httptest.NewServer(handler)
@@ -421,7 +466,7 @@ func TestHealthLiveEndpoint(t *testing.T) {
// Mock successful store access at genesis
mockStore.On("Height", mock.Anything).Return(uint64(0), nil)
- handler, err := NewServiceHandler(mockStore, mockP2PManager, nil, logger, testConfig, nil)
+ handler, err := NewServiceHandler(mockStore, nil, nil, mockP2PManager, nil, logger, testConfig, nil)
require.NoError(t, err)
server := httptest.NewServer(handler)
@@ -498,7 +543,7 @@ func TestHealthReadyEndpoint(t *testing.T) {
}
bestKnown := func() uint64 { return tc.bestKnown }
- handler, err := NewServiceHandler(mockStore, mockP2P, nil, logger, testConfig, bestKnown)
+ handler, err := NewServiceHandler(mockStore, nil, nil, mockP2P, nil, logger, testConfig, bestKnown)
require.NoError(t, err)
server := httptest.NewServer(handler)
defer server.Close()
@@ -539,7 +584,7 @@ func TestHealthReadyEndpoint(t *testing.T) {
mockStore.On("GetState", mock.Anything).Return(state, nil)
bestKnown := func() uint64 { return 100 }
- handler, err := NewServiceHandler(mockStore, mockP2P, nil, logger, testConfig, bestKnown)
+ handler, err := NewServiceHandler(mockStore, nil, nil, mockP2P, nil, logger, testConfig, bestKnown)
require.NoError(t, err)
server := httptest.NewServer(handler)
defer server.Close()
@@ -569,7 +614,7 @@ func TestHealthReadyEndpoint(t *testing.T) {
mockStore.On("GetState", mock.Anything).Return(state, nil)
bestKnown := func() uint64 { return 100 }
- handler, err := NewServiceHandler(mockStore, mockP2P, nil, logger, testConfig, bestKnown)
+ handler, err := NewServiceHandler(mockStore, nil, nil, mockP2P, nil, logger, testConfig, bestKnown)
require.NoError(t, err)
server := httptest.NewServer(handler)
defer server.Close()
@@ -581,3 +626,28 @@ func TestHealthReadyEndpoint(t *testing.T) {
})
})
}
+
+func makeTestSignedHeader(height uint64, ts time.Time) *types.SignedHeader {
+ return &types.SignedHeader{
+ Header: types.Header{
+ BaseHeader: types.BaseHeader{
+ Height: height,
+ Time: uint64(ts.UnixNano()),
+ ChainID: "test-chain",
+ },
+ ProposerAddress: []byte{0x01},
+ DataHash: []byte{0x02},
+ AppHash: []byte{0x03},
+ },
+ }
+}
+
+func makeTestData(height uint64, ts time.Time) *types.Data {
+ return &types.Data{
+ Metadata: &types.Metadata{
+ ChainID: "test-chain",
+ Height: height,
+ Time: uint64(ts.UnixNano()),
+ },
+ }
+}
diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go
index ba808e366c..6a17a42a85 100644
--- a/pkg/sync/sync_service.go
+++ b/pkg/sync/sync_service.go
@@ -2,10 +2,10 @@ package sync
import (
"context"
- "encoding/hex"
"errors"
"fmt"
"strings"
+ "sync/atomic"
"time"
"github.com/celestiaorg/go-header"
@@ -55,6 +55,7 @@ type SyncService[H header.Header[H]] struct {
syncer *goheadersync.Syncer[H]
syncerStatus *SyncerStatus
topicSubscription header.Subscription[H]
+ storeInitialized atomic.Bool
}
// DataSyncService is the P2P Sync Service for blocks.
@@ -135,9 +136,12 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context,
return fmt.Errorf("empty header/data cannot write to store or broadcast")
}
- isGenesis := headerOrData.Height() == syncService.genesis.InitialHeight
- if isGenesis { // when starting the syncer for the first time, we have no blocks, so initFromP2P didn't initialize the genesis block.
- if err := syncService.initStore(ctx, headerOrData); err != nil {
+ storeInitialized := false
+ if syncService.storeInitialized.CompareAndSwap(false, true) {
+ var err error
+ storeInitialized, err = syncService.initStore(ctx, headerOrData)
+ if err != nil {
+ syncService.storeInitialized.Store(false)
return fmt.Errorf("failed to initialize the store: %w", err)
}
}
@@ -156,10 +160,10 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context,
// as we have already initialized the store for starting the syncer.
// Hence, we ignore the error. Exact reason: validation ignored
if (firstStart && errors.Is(err, pubsub.ValidationError{Reason: pubsub.RejectValidationIgnored})) ||
- // for the genesis header, broadcast error is expected as we have already initialized the store
+ // for the genesis header (or any first header used to bootstrap the store), broadcast error is expected as we have already initialized the store
// for starting the syncer. Hence, we ignore the error.
// exact reason: validation failed, err header verification failed: known header: '1' <= current '1'
- (isGenesis && errors.Is(err, pubsub.ValidationError{Reason: pubsub.RejectValidationFailed})) {
+ ((storeInitialized) && errors.Is(err, pubsub.ValidationError{Reason: pubsub.RejectValidationFailed})) {
return nil
}
@@ -221,23 +225,27 @@ func (syncService *SyncService[H]) startSyncer(ctx context.Context) error {
// initStore initializes the store with the given initial header.
// it is a no-op if the store is already initialized.
-func (syncService *SyncService[H]) initStore(ctx context.Context, initial H) error {
+// Returns true when the store was initialized by this call.
+func (syncService *SyncService[H]) initStore(ctx context.Context, initial H) (bool, error) {
if initial.IsZero() {
- return errors.New("failed to initialize the store")
+ return false, errors.New("failed to initialize the store")
}
if _, err := syncService.store.Head(ctx); errors.Is(err, header.ErrNotFound) || errors.Is(err, header.ErrEmptyStore) {
if err := syncService.store.Append(ctx, initial); err != nil {
- return err
+ return false, err
}
if err := syncService.store.Sync(ctx); err != nil {
- return err
+ return false, err
}
+ return true, nil
+ } else if err != nil {
+ return false, err
}
- return nil
+ return false, nil
}
// setupP2PInfrastructure sets up the P2P infrastructure (Exchange, ExchangeServer, Store)
@@ -301,8 +309,9 @@ func (syncService *SyncService[H]) startSubscriber(ctx context.Context) error {
}
// initFromP2PWithRetry initializes the syncer from P2P with a retry mechanism.
-// If trusted hash is available, it fetches the trusted header/block (by hash) from peers.
-// Otherwise, it tries to fetch the genesis header/block by height.
+// It inspects the local store to determine the first height to request:
+// - when the store already contains items, it reuses the latest height as the starting point;
+// - otherwise, it falls back to the configured genesis height.
func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, peerIDs []peer.ID) error {
if len(peerIDs) == 0 {
return nil
@@ -310,26 +319,30 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee
tryInit := func(ctx context.Context) (bool, error) {
var (
- trusted H
- err error
+ trusted H
+ err error
+ heightToQuery uint64
)
- if syncService.conf.Node.TrustedHash != "" {
- trustedHashBytes, err := hex.DecodeString(syncService.conf.Node.TrustedHash)
- if err != nil {
- return false, fmt.Errorf("failed to parse the trusted hash for initializing the store: %w", err)
- }
- if trusted, err = syncService.ex.Get(ctx, trustedHashBytes); err != nil {
- return false, fmt.Errorf("failed to fetch the trusted header/block for initializing the store: %w", err)
- }
- } else {
- if trusted, err = syncService.ex.GetByHeight(ctx, syncService.genesis.InitialHeight); err != nil {
- return false, fmt.Errorf("failed to fetch the genesis: %w", err)
- }
+ head, headErr := syncService.store.Head(ctx)
+ switch {
+ case errors.Is(headErr, header.ErrNotFound), errors.Is(headErr, header.ErrEmptyStore):
+ heightToQuery = syncService.genesis.InitialHeight
+ case headErr != nil:
+ return false, fmt.Errorf("failed to inspect local store head: %w", headErr)
+ default:
+ heightToQuery = head.Height()
}
- if err := syncService.initStore(ctx, trusted); err != nil {
- return false, fmt.Errorf("failed to initialize the store: %w", err)
+ if trusted, err = syncService.ex.GetByHeight(ctx, heightToQuery); err != nil {
+ return false, fmt.Errorf("failed to fetch height %d from peers: %w", heightToQuery, err)
+ }
+
+ if syncService.storeInitialized.CompareAndSwap(false, true) {
+ if _, err := syncService.initStore(ctx, trusted); err != nil {
+ syncService.storeInitialized.Store(false)
+ return false, fmt.Errorf("failed to initialize the store: %w", err)
+ }
}
if err := syncService.startSyncer(ctx); err != nil {
return false, err
diff --git a/pkg/sync/sync_service_test.go b/pkg/sync/sync_service_test.go
index 8086c0849e..93603752a7 100644
--- a/pkg/sync/sync_service_test.go
+++ b/pkg/sync/sync_service_test.go
@@ -114,6 +114,59 @@ func TestHeaderSyncServiceRestart(t *testing.T) {
cancel()
}
+func TestHeaderSyncServiceInitFromHigherHeight(t *testing.T) {
+ mainKV := sync.MutexWrap(datastore.NewMapDatastore())
+ pk, _, err := crypto.GenerateEd25519Key(cryptoRand.Reader)
+ require.NoError(t, err)
+ noopSigner, err := noop.NewNoopSigner(pk)
+ require.NoError(t, err)
+ rnd := rand.New(rand.NewSource(1)) // nolint:gosec // test code only
+ mn := mocknet.New()
+
+ chainId := "test-chain-id"
+
+ proposerAddr := []byte("test")
+ genesisDoc := genesispkg.Genesis{
+ ChainID: chainId,
+ StartTime: time.Now(),
+ InitialHeight: 1,
+ ProposerAddress: proposerAddr,
+ }
+ conf := config.DefaultConfig()
+ conf.RootDir = t.TempDir()
+ nodeKey, err := key.LoadOrGenNodeKey(filepath.Dir(conf.ConfigPath()))
+ require.NoError(t, err)
+ logger := zerolog.Nop()
+ priv := nodeKey.PrivKey
+ h, err := mn.AddPeer(priv, nil)
+ require.NoError(t, err)
+
+ p2pClient, err := p2p.NewClientWithHost(conf.P2P, nodeKey.PrivKey, mainKV, chainId, logger, p2p.NopMetrics(), h)
+ require.NoError(t, err)
+
+ ctx, cancel := context.WithCancel(t.Context())
+ defer cancel()
+ require.NoError(t, p2pClient.Start(ctx))
+ t.Cleanup(func() { _ = p2pClient.Close() })
+
+ svc, err := NewHeaderSyncService(mainKV, conf, genesisDoc, p2pClient, logger)
+ require.NoError(t, err)
+ require.NoError(t, svc.Start(ctx))
+ t.Cleanup(func() { _ = svc.Stop(context.Background()) })
+
+ headerConfig := types.HeaderConfig{
+ Height: genesisDoc.InitialHeight + 5,
+ DataHash: bytesN(rnd, 32),
+ AppHash: bytesN(rnd, 32),
+ Signer: noopSigner,
+ }
+ signedHeader, err := types.GetRandomSignedHeaderCustom(&headerConfig, genesisDoc.ChainID)
+ require.NoError(t, err)
+ require.NoError(t, signedHeader.Validate())
+
+ require.NoError(t, svc.WriteToStoreAndBroadcast(ctx, signedHeader))
+}
+
func nextHeader(t *testing.T, previousHeader *types.SignedHeader, chainID string, noopSigner signer.Signer) *types.SignedHeader {
newSignedHeader := &types.SignedHeader{
Header: types.GetRandomNextHeader(previousHeader.Header, chainID),
diff --git a/proto/evnode/v1/state_rpc.proto b/proto/evnode/v1/state_rpc.proto
index 1b468b6ef8..3c8e4bb403 100644
--- a/proto/evnode/v1/state_rpc.proto
+++ b/proto/evnode/v1/state_rpc.proto
@@ -21,6 +21,9 @@ service StoreService {
// GetGenesisDaHeight returns the DA height at which the first Evolve block was included.
rpc GetGenesisDaHeight(google.protobuf.Empty) returns (GetGenesisDaHeightResponse) {}
+
+ // GetP2PStoreInfo returns head/tail information for the go-header stores used by P2P sync.
+ rpc GetP2PStoreInfo(google.protobuf.Empty) returns (GetP2PStoreInfoResponse) {}
}
// Block contains all the components of a complete block
@@ -62,5 +65,25 @@ message GetMetadataResponse {
// GetGenesisDaHeightResponse defines the DA height at which the first Evolve block was included.
message GetGenesisDaHeightResponse {
- uint64 height = 3;
+ uint64 height = 3;
+}
+
+// P2PStoreEntry captures a single head or tail record from a go-header store.
+message P2PStoreEntry {
+ uint64 height = 1;
+ bytes hash = 2;
+ google.protobuf.Timestamp time = 3;
+}
+
+// P2PStoreSnapshot provides head/tail status for a go-header store.
+message P2PStoreSnapshot {
+ string label = 1;
+ uint64 height = 2;
+ P2PStoreEntry head = 4;
+ P2PStoreEntry tail = 6;
+}
+
+// GetP2PStoreInfoResponse holds the snapshots for configured go-header stores.
+message GetP2PStoreInfoResponse {
+ repeated P2PStoreSnapshot stores = 1;
}
diff --git a/types/pb/evnode/v1/state_rpc.pb.go b/types/pb/evnode/v1/state_rpc.pb.go
index e087a82897..6aa4c000c8 100644
--- a/types/pb/evnode/v1/state_rpc.pb.go
+++ b/types/pb/evnode/v1/state_rpc.pb.go
@@ -10,7 +10,7 @@ import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
emptypb "google.golang.org/protobuf/types/known/emptypb"
- _ "google.golang.org/protobuf/types/known/timestamppb"
+ timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect"
sync "sync"
unsafe "unsafe"
@@ -402,6 +402,181 @@ func (x *GetGenesisDaHeightResponse) GetHeight() uint64 {
return 0
}
+// P2PStoreEntry captures a single head or tail record from a go-header store.
+type P2PStoreEntry struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Height uint64 `protobuf:"varint,1,opt,name=height,proto3" json:"height,omitempty"`
+ Hash []byte `protobuf:"bytes,2,opt,name=hash,proto3" json:"hash,omitempty"`
+ Time *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=time,proto3" json:"time,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *P2PStoreEntry) Reset() {
+ *x = P2PStoreEntry{}
+ mi := &file_evnode_v1_state_rpc_proto_msgTypes[7]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *P2PStoreEntry) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*P2PStoreEntry) ProtoMessage() {}
+
+func (x *P2PStoreEntry) ProtoReflect() protoreflect.Message {
+ mi := &file_evnode_v1_state_rpc_proto_msgTypes[7]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use P2PStoreEntry.ProtoReflect.Descriptor instead.
+func (*P2PStoreEntry) Descriptor() ([]byte, []int) {
+ return file_evnode_v1_state_rpc_proto_rawDescGZIP(), []int{7}
+}
+
+func (x *P2PStoreEntry) GetHeight() uint64 {
+ if x != nil {
+ return x.Height
+ }
+ return 0
+}
+
+func (x *P2PStoreEntry) GetHash() []byte {
+ if x != nil {
+ return x.Hash
+ }
+ return nil
+}
+
+func (x *P2PStoreEntry) GetTime() *timestamppb.Timestamp {
+ if x != nil {
+ return x.Time
+ }
+ return nil
+}
+
+// P2PStoreSnapshot provides head/tail status for a go-header store.
+type P2PStoreSnapshot struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Label string `protobuf:"bytes,1,opt,name=label,proto3" json:"label,omitempty"`
+ Height uint64 `protobuf:"varint,2,opt,name=height,proto3" json:"height,omitempty"`
+ Head *P2PStoreEntry `protobuf:"bytes,4,opt,name=head,proto3" json:"head,omitempty"`
+ Tail *P2PStoreEntry `protobuf:"bytes,6,opt,name=tail,proto3" json:"tail,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *P2PStoreSnapshot) Reset() {
+ *x = P2PStoreSnapshot{}
+ mi := &file_evnode_v1_state_rpc_proto_msgTypes[8]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *P2PStoreSnapshot) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*P2PStoreSnapshot) ProtoMessage() {}
+
+func (x *P2PStoreSnapshot) ProtoReflect() protoreflect.Message {
+ mi := &file_evnode_v1_state_rpc_proto_msgTypes[8]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use P2PStoreSnapshot.ProtoReflect.Descriptor instead.
+func (*P2PStoreSnapshot) Descriptor() ([]byte, []int) {
+ return file_evnode_v1_state_rpc_proto_rawDescGZIP(), []int{8}
+}
+
+func (x *P2PStoreSnapshot) GetLabel() string {
+ if x != nil {
+ return x.Label
+ }
+ return ""
+}
+
+func (x *P2PStoreSnapshot) GetHeight() uint64 {
+ if x != nil {
+ return x.Height
+ }
+ return 0
+}
+
+func (x *P2PStoreSnapshot) GetHead() *P2PStoreEntry {
+ if x != nil {
+ return x.Head
+ }
+ return nil
+}
+
+func (x *P2PStoreSnapshot) GetTail() *P2PStoreEntry {
+ if x != nil {
+ return x.Tail
+ }
+ return nil
+}
+
+// GetP2PStoreInfoResponse holds the snapshots for configured go-header stores.
+type GetP2PStoreInfoResponse struct {
+ state protoimpl.MessageState `protogen:"open.v1"`
+ Stores []*P2PStoreSnapshot `protobuf:"bytes,1,rep,name=stores,proto3" json:"stores,omitempty"`
+ unknownFields protoimpl.UnknownFields
+ sizeCache protoimpl.SizeCache
+}
+
+func (x *GetP2PStoreInfoResponse) Reset() {
+ *x = GetP2PStoreInfoResponse{}
+ mi := &file_evnode_v1_state_rpc_proto_msgTypes[9]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+}
+
+func (x *GetP2PStoreInfoResponse) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*GetP2PStoreInfoResponse) ProtoMessage() {}
+
+func (x *GetP2PStoreInfoResponse) ProtoReflect() protoreflect.Message {
+ mi := &file_evnode_v1_state_rpc_proto_msgTypes[9]
+ if x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use GetP2PStoreInfoResponse.ProtoReflect.Descriptor instead.
+func (*GetP2PStoreInfoResponse) Descriptor() ([]byte, []int) {
+ return file_evnode_v1_state_rpc_proto_rawDescGZIP(), []int{9}
+}
+
+func (x *GetP2PStoreInfoResponse) GetStores() []*P2PStoreSnapshot {
+ if x != nil {
+ return x.Stores
+ }
+ return nil
+}
+
var File_evnode_v1_state_rpc_proto protoreflect.FileDescriptor
const file_evnode_v1_state_rpc_proto_rawDesc = "" +
@@ -426,12 +601,24 @@ const file_evnode_v1_state_rpc_proto_rawDesc = "" +
"\x13GetMetadataResponse\x12\x14\n" +
"\x05value\x18\x01 \x01(\fR\x05value\"4\n" +
"\x1aGetGenesisDaHeightResponse\x12\x16\n" +
- "\x06height\x18\x03 \x01(\x04R\x06height2\xbf\x02\n" +
+ "\x06height\x18\x03 \x01(\x04R\x06height\"k\n" +
+ "\rP2PStoreEntry\x12\x16\n" +
+ "\x06height\x18\x01 \x01(\x04R\x06height\x12\x12\n" +
+ "\x04hash\x18\x02 \x01(\fR\x04hash\x12.\n" +
+ "\x04time\x18\x03 \x01(\v2\x1a.google.protobuf.TimestampR\x04time\"\x9c\x01\n" +
+ "\x10P2PStoreSnapshot\x12\x14\n" +
+ "\x05label\x18\x01 \x01(\tR\x05label\x12\x16\n" +
+ "\x06height\x18\x02 \x01(\x04R\x06height\x12,\n" +
+ "\x04head\x18\x04 \x01(\v2\x18.evnode.v1.P2PStoreEntryR\x04head\x12,\n" +
+ "\x04tail\x18\x06 \x01(\v2\x18.evnode.v1.P2PStoreEntryR\x04tail\"N\n" +
+ "\x17GetP2PStoreInfoResponse\x123\n" +
+ "\x06stores\x18\x01 \x03(\v2\x1b.evnode.v1.P2PStoreSnapshotR\x06stores2\x90\x03\n" +
"\fStoreService\x12E\n" +
"\bGetBlock\x12\x1a.evnode.v1.GetBlockRequest\x1a\x1b.evnode.v1.GetBlockResponse\"\x00\x12A\n" +
"\bGetState\x12\x16.google.protobuf.Empty\x1a\x1b.evnode.v1.GetStateResponse\"\x00\x12N\n" +
"\vGetMetadata\x12\x1d.evnode.v1.GetMetadataRequest\x1a\x1e.evnode.v1.GetMetadataResponse\"\x00\x12U\n" +
- "\x12GetGenesisDaHeight\x12\x16.google.protobuf.Empty\x1a%.evnode.v1.GetGenesisDaHeightResponse\"\x00B/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3"
+ "\x12GetGenesisDaHeight\x12\x16.google.protobuf.Empty\x1a%.evnode.v1.GetGenesisDaHeightResponse\"\x00\x12O\n" +
+ "\x0fGetP2PStoreInfo\x12\x16.google.protobuf.Empty\x1a\".evnode.v1.GetP2PStoreInfoResponse\"\x00B/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3"
var (
file_evnode_v1_state_rpc_proto_rawDescOnce sync.Once
@@ -445,7 +632,7 @@ func file_evnode_v1_state_rpc_proto_rawDescGZIP() []byte {
return file_evnode_v1_state_rpc_proto_rawDescData
}
-var file_evnode_v1_state_rpc_proto_msgTypes = make([]protoimpl.MessageInfo, 7)
+var file_evnode_v1_state_rpc_proto_msgTypes = make([]protoimpl.MessageInfo, 10)
var file_evnode_v1_state_rpc_proto_goTypes = []any{
(*Block)(nil), // 0: evnode.v1.Block
(*GetBlockRequest)(nil), // 1: evnode.v1.GetBlockRequest
@@ -454,29 +641,39 @@ var file_evnode_v1_state_rpc_proto_goTypes = []any{
(*GetMetadataRequest)(nil), // 4: evnode.v1.GetMetadataRequest
(*GetMetadataResponse)(nil), // 5: evnode.v1.GetMetadataResponse
(*GetGenesisDaHeightResponse)(nil), // 6: evnode.v1.GetGenesisDaHeightResponse
- (*SignedHeader)(nil), // 7: evnode.v1.SignedHeader
- (*Data)(nil), // 8: evnode.v1.Data
- (*State)(nil), // 9: evnode.v1.State
- (*emptypb.Empty)(nil), // 10: google.protobuf.Empty
+ (*P2PStoreEntry)(nil), // 7: evnode.v1.P2PStoreEntry
+ (*P2PStoreSnapshot)(nil), // 8: evnode.v1.P2PStoreSnapshot
+ (*GetP2PStoreInfoResponse)(nil), // 9: evnode.v1.GetP2PStoreInfoResponse
+ (*SignedHeader)(nil), // 10: evnode.v1.SignedHeader
+ (*Data)(nil), // 11: evnode.v1.Data
+ (*State)(nil), // 12: evnode.v1.State
+ (*timestamppb.Timestamp)(nil), // 13: google.protobuf.Timestamp
+ (*emptypb.Empty)(nil), // 14: google.protobuf.Empty
}
var file_evnode_v1_state_rpc_proto_depIdxs = []int32{
- 7, // 0: evnode.v1.Block.header:type_name -> evnode.v1.SignedHeader
- 8, // 1: evnode.v1.Block.data:type_name -> evnode.v1.Data
+ 10, // 0: evnode.v1.Block.header:type_name -> evnode.v1.SignedHeader
+ 11, // 1: evnode.v1.Block.data:type_name -> evnode.v1.Data
0, // 2: evnode.v1.GetBlockResponse.block:type_name -> evnode.v1.Block
- 9, // 3: evnode.v1.GetStateResponse.state:type_name -> evnode.v1.State
- 1, // 4: evnode.v1.StoreService.GetBlock:input_type -> evnode.v1.GetBlockRequest
- 10, // 5: evnode.v1.StoreService.GetState:input_type -> google.protobuf.Empty
- 4, // 6: evnode.v1.StoreService.GetMetadata:input_type -> evnode.v1.GetMetadataRequest
- 10, // 7: evnode.v1.StoreService.GetGenesisDaHeight:input_type -> google.protobuf.Empty
- 2, // 8: evnode.v1.StoreService.GetBlock:output_type -> evnode.v1.GetBlockResponse
- 3, // 9: evnode.v1.StoreService.GetState:output_type -> evnode.v1.GetStateResponse
- 5, // 10: evnode.v1.StoreService.GetMetadata:output_type -> evnode.v1.GetMetadataResponse
- 6, // 11: evnode.v1.StoreService.GetGenesisDaHeight:output_type -> evnode.v1.GetGenesisDaHeightResponse
- 8, // [8:12] is the sub-list for method output_type
- 4, // [4:8] is the sub-list for method input_type
- 4, // [4:4] is the sub-list for extension type_name
- 4, // [4:4] is the sub-list for extension extendee
- 0, // [0:4] is the sub-list for field type_name
+ 12, // 3: evnode.v1.GetStateResponse.state:type_name -> evnode.v1.State
+ 13, // 4: evnode.v1.P2PStoreEntry.time:type_name -> google.protobuf.Timestamp
+ 7, // 5: evnode.v1.P2PStoreSnapshot.head:type_name -> evnode.v1.P2PStoreEntry
+ 7, // 6: evnode.v1.P2PStoreSnapshot.tail:type_name -> evnode.v1.P2PStoreEntry
+ 8, // 7: evnode.v1.GetP2PStoreInfoResponse.stores:type_name -> evnode.v1.P2PStoreSnapshot
+ 1, // 8: evnode.v1.StoreService.GetBlock:input_type -> evnode.v1.GetBlockRequest
+ 14, // 9: evnode.v1.StoreService.GetState:input_type -> google.protobuf.Empty
+ 4, // 10: evnode.v1.StoreService.GetMetadata:input_type -> evnode.v1.GetMetadataRequest
+ 14, // 11: evnode.v1.StoreService.GetGenesisDaHeight:input_type -> google.protobuf.Empty
+ 14, // 12: evnode.v1.StoreService.GetP2PStoreInfo:input_type -> google.protobuf.Empty
+ 2, // 13: evnode.v1.StoreService.GetBlock:output_type -> evnode.v1.GetBlockResponse
+ 3, // 14: evnode.v1.StoreService.GetState:output_type -> evnode.v1.GetStateResponse
+ 5, // 15: evnode.v1.StoreService.GetMetadata:output_type -> evnode.v1.GetMetadataResponse
+ 6, // 16: evnode.v1.StoreService.GetGenesisDaHeight:output_type -> evnode.v1.GetGenesisDaHeightResponse
+ 9, // 17: evnode.v1.StoreService.GetP2PStoreInfo:output_type -> evnode.v1.GetP2PStoreInfoResponse
+ 13, // [13:18] is the sub-list for method output_type
+ 8, // [8:13] is the sub-list for method input_type
+ 8, // [8:8] is the sub-list for extension type_name
+ 8, // [8:8] is the sub-list for extension extendee
+ 0, // [0:8] is the sub-list for field type_name
}
func init() { file_evnode_v1_state_rpc_proto_init() }
@@ -496,7 +693,7 @@ func file_evnode_v1_state_rpc_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_evnode_v1_state_rpc_proto_rawDesc), len(file_evnode_v1_state_rpc_proto_rawDesc)),
NumEnums: 0,
- NumMessages: 7,
+ NumMessages: 10,
NumExtensions: 0,
NumServices: 1,
},
diff --git a/types/pb/evnode/v1/v1connect/state_rpc.connect.go b/types/pb/evnode/v1/v1connect/state_rpc.connect.go
index 1fe049fa11..c86e260e90 100644
--- a/types/pb/evnode/v1/v1connect/state_rpc.connect.go
+++ b/types/pb/evnode/v1/v1connect/state_rpc.connect.go
@@ -44,6 +44,9 @@ const (
// StoreServiceGetGenesisDaHeightProcedure is the fully-qualified name of the StoreService's
// GetGenesisDaHeight RPC.
StoreServiceGetGenesisDaHeightProcedure = "/evnode.v1.StoreService/GetGenesisDaHeight"
+ // StoreServiceGetP2PStoreInfoProcedure is the fully-qualified name of the StoreService's
+ // GetP2PStoreInfo RPC.
+ StoreServiceGetP2PStoreInfoProcedure = "/evnode.v1.StoreService/GetP2PStoreInfo"
)
// StoreServiceClient is a client for the evnode.v1.StoreService service.
@@ -56,6 +59,8 @@ type StoreServiceClient interface {
GetMetadata(context.Context, *connect.Request[v1.GetMetadataRequest]) (*connect.Response[v1.GetMetadataResponse], error)
// GetGenesisDaHeight returns the DA height at which the first Evolve block was included.
GetGenesisDaHeight(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetGenesisDaHeightResponse], error)
+ // GetP2PStoreInfo returns head/tail information for the go-header stores used by P2P sync.
+ GetP2PStoreInfo(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetP2PStoreInfoResponse], error)
}
// NewStoreServiceClient constructs a client for the evnode.v1.StoreService service. By default, it
@@ -93,6 +98,12 @@ func NewStoreServiceClient(httpClient connect.HTTPClient, baseURL string, opts .
connect.WithSchema(storeServiceMethods.ByName("GetGenesisDaHeight")),
connect.WithClientOptions(opts...),
),
+ getP2PStoreInfo: connect.NewClient[emptypb.Empty, v1.GetP2PStoreInfoResponse](
+ httpClient,
+ baseURL+StoreServiceGetP2PStoreInfoProcedure,
+ connect.WithSchema(storeServiceMethods.ByName("GetP2PStoreInfo")),
+ connect.WithClientOptions(opts...),
+ ),
}
}
@@ -102,6 +113,7 @@ type storeServiceClient struct {
getState *connect.Client[emptypb.Empty, v1.GetStateResponse]
getMetadata *connect.Client[v1.GetMetadataRequest, v1.GetMetadataResponse]
getGenesisDaHeight *connect.Client[emptypb.Empty, v1.GetGenesisDaHeightResponse]
+ getP2PStoreInfo *connect.Client[emptypb.Empty, v1.GetP2PStoreInfoResponse]
}
// GetBlock calls evnode.v1.StoreService.GetBlock.
@@ -124,6 +136,11 @@ func (c *storeServiceClient) GetGenesisDaHeight(ctx context.Context, req *connec
return c.getGenesisDaHeight.CallUnary(ctx, req)
}
+// GetP2PStoreInfo calls evnode.v1.StoreService.GetP2PStoreInfo.
+func (c *storeServiceClient) GetP2PStoreInfo(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetP2PStoreInfoResponse], error) {
+ return c.getP2PStoreInfo.CallUnary(ctx, req)
+}
+
// StoreServiceHandler is an implementation of the evnode.v1.StoreService service.
type StoreServiceHandler interface {
// GetBlock returns a block by height or hash
@@ -134,6 +151,8 @@ type StoreServiceHandler interface {
GetMetadata(context.Context, *connect.Request[v1.GetMetadataRequest]) (*connect.Response[v1.GetMetadataResponse], error)
// GetGenesisDaHeight returns the DA height at which the first Evolve block was included.
GetGenesisDaHeight(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetGenesisDaHeightResponse], error)
+ // GetP2PStoreInfo returns head/tail information for the go-header stores used by P2P sync.
+ GetP2PStoreInfo(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetP2PStoreInfoResponse], error)
}
// NewStoreServiceHandler builds an HTTP handler from the service implementation. It returns the
@@ -167,6 +186,12 @@ func NewStoreServiceHandler(svc StoreServiceHandler, opts ...connect.HandlerOpti
connect.WithSchema(storeServiceMethods.ByName("GetGenesisDaHeight")),
connect.WithHandlerOptions(opts...),
)
+ storeServiceGetP2PStoreInfoHandler := connect.NewUnaryHandler(
+ StoreServiceGetP2PStoreInfoProcedure,
+ svc.GetP2PStoreInfo,
+ connect.WithSchema(storeServiceMethods.ByName("GetP2PStoreInfo")),
+ connect.WithHandlerOptions(opts...),
+ )
return "/evnode.v1.StoreService/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case StoreServiceGetBlockProcedure:
@@ -177,6 +202,8 @@ func NewStoreServiceHandler(svc StoreServiceHandler, opts ...connect.HandlerOpti
storeServiceGetMetadataHandler.ServeHTTP(w, r)
case StoreServiceGetGenesisDaHeightProcedure:
storeServiceGetGenesisDaHeightHandler.ServeHTTP(w, r)
+ case StoreServiceGetP2PStoreInfoProcedure:
+ storeServiceGetP2PStoreInfoHandler.ServeHTTP(w, r)
default:
http.NotFound(w, r)
}
@@ -201,3 +228,7 @@ func (UnimplementedStoreServiceHandler) GetMetadata(context.Context, *connect.Re
func (UnimplementedStoreServiceHandler) GetGenesisDaHeight(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetGenesisDaHeightResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("evnode.v1.StoreService.GetGenesisDaHeight is not implemented"))
}
+
+func (UnimplementedStoreServiceHandler) GetP2PStoreInfo(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetP2PStoreInfoResponse], error) {
+ return nil, connect.NewError(connect.CodeUnimplemented, errors.New("evnode.v1.StoreService.GetP2PStoreInfo is not implemented"))
+}