From bd0c95d6a336c8648d650650c652798a7c1a7234 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 12 Nov 2025 15:07:10 +0100 Subject: [PATCH 01/23] sync service fix for when we are not on genesis but have an empty store --- pkg/sync/sync_service.go | 27 ++++++++++-------- pkg/sync/sync_service_test.go | 53 +++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+), 12 deletions(-) diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index ba808e366c..bb642ffecc 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -136,10 +136,9 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, } isGenesis := headerOrData.Height() == syncService.genesis.InitialHeight - if isGenesis { // when starting the syncer for the first time, we have no blocks, so initFromP2P didn't initialize the genesis block. - if err := syncService.initStore(ctx, headerOrData); err != nil { - return fmt.Errorf("failed to initialize the store: %w", err) - } + storeInitialized, err := syncService.initStore(ctx, headerOrData) + if err != nil { + return fmt.Errorf("failed to initialize the store: %w", err) } firstStart := false @@ -156,10 +155,10 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, // as we have already initialized the store for starting the syncer. // Hence, we ignore the error. Exact reason: validation ignored if (firstStart && errors.Is(err, pubsub.ValidationError{Reason: pubsub.RejectValidationIgnored})) || - // for the genesis header, broadcast error is expected as we have already initialized the store + // for the genesis header (or any first header used to bootstrap the store), broadcast error is expected as we have already initialized the store // for starting the syncer. Hence, we ignore the error. // exact reason: validation failed, err header verification failed: known header: '1' <= current '1' - (isGenesis && errors.Is(err, pubsub.ValidationError{Reason: pubsub.RejectValidationFailed})) { + ((isGenesis || storeInitialized) && errors.Is(err, pubsub.ValidationError{Reason: pubsub.RejectValidationFailed})) { return nil } @@ -221,23 +220,27 @@ func (syncService *SyncService[H]) startSyncer(ctx context.Context) error { // initStore initializes the store with the given initial header. // it is a no-op if the store is already initialized. -func (syncService *SyncService[H]) initStore(ctx context.Context, initial H) error { +// Returns true when the store was initialized by this call. +func (syncService *SyncService[H]) initStore(ctx context.Context, initial H) (bool, error) { if initial.IsZero() { - return errors.New("failed to initialize the store") + return false, errors.New("failed to initialize the store") } if _, err := syncService.store.Head(ctx); errors.Is(err, header.ErrNotFound) || errors.Is(err, header.ErrEmptyStore) { if err := syncService.store.Append(ctx, initial); err != nil { - return err + return false, err } if err := syncService.store.Sync(ctx); err != nil { - return err + return false, err } + return true, nil + } else if err != nil { + return false, err } - return nil + return false, nil } // setupP2PInfrastructure sets up the P2P infrastructure (Exchange, ExchangeServer, Store) @@ -328,7 +331,7 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee } } - if err := syncService.initStore(ctx, trusted); err != nil { + if _, err := syncService.initStore(ctx, trusted); err != nil { return false, fmt.Errorf("failed to initialize the store: %w", err) } if err := syncService.startSyncer(ctx); err != nil { diff --git a/pkg/sync/sync_service_test.go b/pkg/sync/sync_service_test.go index 8086c0849e..93603752a7 100644 --- a/pkg/sync/sync_service_test.go +++ b/pkg/sync/sync_service_test.go @@ -114,6 +114,59 @@ func TestHeaderSyncServiceRestart(t *testing.T) { cancel() } +func TestHeaderSyncServiceInitFromHigherHeight(t *testing.T) { + mainKV := sync.MutexWrap(datastore.NewMapDatastore()) + pk, _, err := crypto.GenerateEd25519Key(cryptoRand.Reader) + require.NoError(t, err) + noopSigner, err := noop.NewNoopSigner(pk) + require.NoError(t, err) + rnd := rand.New(rand.NewSource(1)) // nolint:gosec // test code only + mn := mocknet.New() + + chainId := "test-chain-id" + + proposerAddr := []byte("test") + genesisDoc := genesispkg.Genesis{ + ChainID: chainId, + StartTime: time.Now(), + InitialHeight: 1, + ProposerAddress: proposerAddr, + } + conf := config.DefaultConfig() + conf.RootDir = t.TempDir() + nodeKey, err := key.LoadOrGenNodeKey(filepath.Dir(conf.ConfigPath())) + require.NoError(t, err) + logger := zerolog.Nop() + priv := nodeKey.PrivKey + h, err := mn.AddPeer(priv, nil) + require.NoError(t, err) + + p2pClient, err := p2p.NewClientWithHost(conf.P2P, nodeKey.PrivKey, mainKV, chainId, logger, p2p.NopMetrics(), h) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + require.NoError(t, p2pClient.Start(ctx)) + t.Cleanup(func() { _ = p2pClient.Close() }) + + svc, err := NewHeaderSyncService(mainKV, conf, genesisDoc, p2pClient, logger) + require.NoError(t, err) + require.NoError(t, svc.Start(ctx)) + t.Cleanup(func() { _ = svc.Stop(context.Background()) }) + + headerConfig := types.HeaderConfig{ + Height: genesisDoc.InitialHeight + 5, + DataHash: bytesN(rnd, 32), + AppHash: bytesN(rnd, 32), + Signer: noopSigner, + } + signedHeader, err := types.GetRandomSignedHeaderCustom(&headerConfig, genesisDoc.ChainID) + require.NoError(t, err) + require.NoError(t, signedHeader.Validate()) + + require.NoError(t, svc.WriteToStoreAndBroadcast(ctx, signedHeader)) +} + func nextHeader(t *testing.T, previousHeader *types.SignedHeader, chainID string, noopSigner signer.Signer) *types.SignedHeader { newSignedHeader := &types.SignedHeader{ Header: types.GetRandomNextHeader(previousHeader.Header, chainID), From 1e203ba4c0a256cac34db763e26ac37c4a78707c Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 12 Nov 2025 15:26:29 +0100 Subject: [PATCH 02/23] remove isgenesis --- pkg/sync/sync_service.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index bb642ffecc..1a3ca0eed4 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -135,7 +135,6 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, return fmt.Errorf("empty header/data cannot write to store or broadcast") } - isGenesis := headerOrData.Height() == syncService.genesis.InitialHeight storeInitialized, err := syncService.initStore(ctx, headerOrData) if err != nil { return fmt.Errorf("failed to initialize the store: %w", err) @@ -158,7 +157,7 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, // for the genesis header (or any first header used to bootstrap the store), broadcast error is expected as we have already initialized the store // for starting the syncer. Hence, we ignore the error. // exact reason: validation failed, err header verification failed: known header: '1' <= current '1' - ((isGenesis || storeInitialized) && errors.Is(err, pubsub.ValidationError{Reason: pubsub.RejectValidationFailed})) { + ((storeInitialized) && errors.Is(err, pubsub.ValidationError{Reason: pubsub.RejectValidationFailed})) { return nil } From 1911431464968836d021e857160fef12d9056a8b Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 12 Nov 2025 15:27:59 +0100 Subject: [PATCH 03/23] lint --- README.md | 2 +- RELEASE.md | 10 +++++++++- docs/guides/full-node.md | 2 +- docs/learn/config.md | 2 ++ 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index ddc7c248b3..5ce13f5798 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ Ev-node is the basis of the Evolve Stack. For more in-depth information about Ev [![GoDoc](https://godoc.org/github.com/evstack/ev-node?status.svg)](https://godoc.org/github.com/evstack/ev-node) -> **⚠️ Version Notice**: Do not use tags or releases before v1.*. Pre-v1 releases are not stable and should be considered abandoned. +> **⚠️ Version Notice**: Do not use tags or releases before v1.*. Pre-v1 releases are not stable and should be considered abandoned. ## Using Evolve diff --git a/RELEASE.md b/RELEASE.md index 7506810629..1652dfb30c 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -1,6 +1,7 @@ # ev-node Release Guide This document covers the release process for ev-node components: + - **Docker Image Releases** - Automated via GitHub workflows (for deployable applications) - **Go Module Releases** - Manual process for library packages and dependencies @@ -32,6 +33,7 @@ docker pull ghcr.io/evstack/ev-node-evm-single:v0.2.0 Use the hierarchical tag format: `{app-path}/v{major}.{minor}.{patch}` **Examples:** + - `evm/single/v0.2.0` → Releases `apps/evm/single/` - `testapp/v1.0.0` → Releases `apps/testapp/` - `grpc/single/v2.1.3` → Releases `apps/grpc/single/` @@ -124,6 +126,7 @@ These packages have the most dependencies and should be released last: **IMPORTANT**: Each module must be fully released and available on the Go proxy before updating dependencies in dependent modules. **Before Starting:** + - Create a protected version branch (e.g., `v0` for major versions, `v0.3` for minor breaking changes) - Ensure CHANGELOG.md is up to date with all changes properly categorized - Remove all `replace` directives from go.mod files @@ -324,25 +327,30 @@ go get github.com/evstack/ev-node/core@v0.3.0 ### Docker Releases **"App directory does not exist"** + - Ensure tag matches app path: `apps/evm/single/` → `evm/single/v0.2.0` - Check spelling and case sensitivity **"Dockerfile not found"** + - Verify Dockerfile exists at `apps/{app-path}/Dockerfile` - Check filename is exactly `Dockerfile` **"Image not found" in tests** + - Wait for Docker build workflow to complete - Check workflow dependencies in Actions tab ### Go Module Releases **Go proxy delay** + - Wait 5-30 minutes for propagation - Use `go list -m` to verify availability -- Check https://proxy.golang.org/ +- Check **Dependency version conflicts** + - Ensure all dependencies are released before dependent modules - Verify go.mod has correct versions - Remove `replace` directives diff --git a/docs/guides/full-node.md b/docs/guides/full-node.md index 30241b075c..eb123c227f 100644 --- a/docs/guides/full-node.md +++ b/docs/guides/full-node.md @@ -4,7 +4,7 @@ This guide covers how to set up a full node to run alongside a sequencer node in a Evolve-based blockchain network. A full node maintains a complete copy of the blockchain and helps validate transactions, improving the network's decentralization and security. -> ** Note: The guide on how to run an evolve EVM full node can be found [here](./evm/single#setting-up-a-full-node). ** +> **Note: The guide on how to run an evolve EVM full node can be found [here](./evm/single#setting-up-a-full-node).** ## Prerequisites diff --git a/docs/learn/config.md b/docs/learn/config.md index ff855f28aa..9334741dc2 100644 --- a/docs/learn/config.md +++ b/docs/learn/config.md @@ -658,6 +658,7 @@ curl http://localhost:7331/health/live #### `/health/ready` Returns `200 OK` if the node can serve correct data. Checks: + - P2P is listening (if enabled) - Has synced blocks - Not too far behind network @@ -669,6 +670,7 @@ curl http://localhost:7331/health/ready ``` Configure max blocks behind: + ```yaml node: readiness_max_blocks_behind: 15 From 3327cf6a0fd36219751b9c0a63b6c00c7e0fd2a9 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Thu, 13 Nov 2025 21:41:34 +0100 Subject: [PATCH 04/23] panic test --- pkg/sync/sync_service.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index 1a3ca0eed4..c72883769b 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -148,6 +148,11 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, } } + if err := headerOrData.Validate(); err != nil { + syncService.logger.Error().Err(err).Msg("failed to validate header") + panic(err) + } + // Broadcast for subscribers if err := syncService.sub.Broadcast(ctx, headerOrData, opts...); err != nil { // for the first block when starting the app, broadcast error is expected @@ -163,6 +168,7 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, } syncService.logger.Error().Err(err).Msg("failed to broadcast") + panic(err) } return nil From f235c720f563e325c4878997b991fec64537b9c9 Mon Sep 17 00:00:00 2001 From: Marko Date: Thu, 13 Nov 2025 22:02:40 +0100 Subject: [PATCH 05/23] cmd: p2p store info (#2835) ## Overview add store-info command to inspect p2p store to see what is present --- apps/evm/single/main.go | 1 + apps/grpc/single/main.go | 1 + apps/testapp/main.go | 1 + pkg/cmd/store.go | 200 +++++++++++++++++++++++++++++++++++++++ pkg/cmd/store_test.go | 96 +++++++++++++++++++ 5 files changed, 299 insertions(+) diff --git a/apps/evm/single/main.go b/apps/evm/single/main.go index a562ceff7c..749cf9f3a0 100644 --- a/apps/evm/single/main.go +++ b/apps/evm/single/main.go @@ -31,6 +31,7 @@ func main() { rollcmd.VersionCmd, rollcmd.NetInfoCmd, rollcmd.StoreUnsafeCleanCmd, + rollcmd.StoreP2PInspectCmd, rollcmd.KeysCmd(), ) diff --git a/apps/grpc/single/main.go b/apps/grpc/single/main.go index a0d7f934d0..08f0145047 100644 --- a/apps/grpc/single/main.go +++ b/apps/grpc/single/main.go @@ -30,6 +30,7 @@ the Evolve execution gRPC interface.`, evcmd.VersionCmd, evcmd.NetInfoCmd, evcmd.StoreUnsafeCleanCmd, + evcmd.StoreP2PInspectCmd, evcmd.KeysCmd(), ) diff --git a/apps/testapp/main.go b/apps/testapp/main.go index cd8f019709..5c711479f7 100644 --- a/apps/testapp/main.go +++ b/apps/testapp/main.go @@ -19,6 +19,7 @@ func main() { rollcmd.VersionCmd, rollcmd.NetInfoCmd, rollcmd.StoreUnsafeCleanCmd, + rollcmd.StoreP2PInspectCmd, rollcmd.KeysCmd(), cmds.NewRollbackCmd(), initCmd, diff --git a/pkg/cmd/store.go b/pkg/cmd/store.go index e68a0eaed6..79e1fd3cc1 100644 --- a/pkg/cmd/store.go +++ b/pkg/cmd/store.go @@ -1,11 +1,23 @@ package cmd import ( + "context" + "errors" "fmt" "os" "path/filepath" + "time" + goheader "github.com/celestiaorg/go-header" + goheaderstore "github.com/celestiaorg/go-header/store" + ds "github.com/ipfs/go-datastore" + kt "github.com/ipfs/go-datastore/keytransform" "github.com/spf13/cobra" + + "github.com/evstack/ev-node/node" + "github.com/evstack/ev-node/pkg/config" + "github.com/evstack/ev-node/pkg/store" + "github.com/evstack/ev-node/types" ) // UnsafeCleanDataDir removes all contents of the specified data directory. @@ -53,3 +65,191 @@ This operation is unsafe and cannot be undone. Use with caution!`, return nil }, } + +// StoreP2PInspectCmd reports head/tail information for the go-header stores used by P2P sync. +var StoreP2PInspectCmd = &cobra.Command{ + Use: "store-info", + Short: "Inspect the go-header (P2P) stores and display their tail/head entries", + Long: `Opens the datastore used by the node's go-header services and reports +the current height, head, and tail information for both the header and data stores.`, + RunE: func(cmd *cobra.Command, args []string) error { + nodeConfig, err := ParseConfig(cmd) + if err != nil { + return fmt.Errorf("error parsing config: %w", err) + } + + ctx := cmd.Context() + if ctx == nil { + ctx = context.Background() + } + + dbName := resolveDBName(cmd) + + rawStore, err := store.NewDefaultKVStore(nodeConfig.RootDir, nodeConfig.DBPath, dbName) + if err != nil { + return fmt.Errorf("failed to open datastore: %w", err) + } + defer func() { + if closeErr := rawStore.Close(); closeErr != nil { + cmd.PrintErrf("warning: failed to close datastore: %v\n", closeErr) + } + }() + + mainStore := kt.Wrap(rawStore, &kt.PrefixTransform{ + Prefix: ds.NewKey(node.EvPrefix), + }) + + headerSnapshot, err := inspectP2PStore[*types.SignedHeader](ctx, mainStore, headerStorePrefix, "Header Store") + if err != nil { + return fmt.Errorf("failed to inspect header store: %w", err) + } + + dataSnapshot, err := inspectP2PStore[*types.Data](ctx, mainStore, dataStorePrefix, "Data Store") + if err != nil { + return fmt.Errorf("failed to inspect data store: %w", err) + } + + storePath := resolveStorePath(nodeConfig.RootDir, nodeConfig.DBPath, dbName) + + out := cmd.OutOrStdout() + fmt.Fprintf(out, "Inspecting go-header stores at %s\n", storePath) + printP2PStoreSnapshot(cmd, headerSnapshot) + printP2PStoreSnapshot(cmd, dataSnapshot) + + return nil + }, +} + +const ( + headerStorePrefix = "headerSync" + dataStorePrefix = "dataSync" +) + +type p2pStoreSnapshot struct { + Label string + Prefix string + Height uint64 + HeadHeight uint64 + HeadHash string + HeadTime time.Time + TailHeight uint64 + TailHash string + TailTime time.Time + HeadPresent bool + TailPresent bool + Empty bool +} + +func inspectP2PStore[H goheader.Header[H]]( + ctx context.Context, + datastore ds.Batching, + prefix string, + label string, +) (p2pStoreSnapshot, error) { + storeImpl, err := goheaderstore.NewStore[H]( + datastore, + goheaderstore.WithStorePrefix(prefix), + goheaderstore.WithMetrics(), + ) + if err != nil { + return p2pStoreSnapshot{}, fmt.Errorf("failed to open %s: %w", label, err) + } + + if err := storeImpl.Start(ctx); err != nil { + return p2pStoreSnapshot{}, fmt.Errorf("failed to start %s: %w", label, err) + } + defer func() { + _ = storeImpl.Stop(context.Background()) + }() + + snapshot := p2pStoreSnapshot{ + Label: label, + Prefix: prefix, + Height: storeImpl.Height(), + } + + if err := populateSnapshot(ctx, storeImpl, &snapshot); err != nil { + return p2pStoreSnapshot{}, err + } + + return snapshot, nil +} + +func populateSnapshot[H goheader.Header[H]]( + ctx context.Context, + storeImpl *goheaderstore.Store[H], + snapshot *p2pStoreSnapshot, +) error { + head, err := storeImpl.Head(ctx) + switch { + case err == nil: + snapshot.HeadPresent = true + snapshot.HeadHeight = head.Height() + snapshot.HeadHash = head.Hash().String() + snapshot.HeadTime = head.Time() + case errors.Is(err, goheader.ErrEmptyStore), errors.Is(err, goheader.ErrNotFound): + // store not initialized yet + default: + return fmt.Errorf("failed to read %s head: %w", snapshot.Label, err) + } + + tail, err := storeImpl.Tail(ctx) + switch { + case err == nil: + snapshot.TailPresent = true + snapshot.TailHeight = tail.Height() + snapshot.TailHash = tail.Hash().String() + snapshot.TailTime = tail.Time() + case errors.Is(err, goheader.ErrEmptyStore), errors.Is(err, goheader.ErrNotFound): + default: + return fmt.Errorf("failed to read %s tail: %w", snapshot.Label, err) + } + + snapshot.Empty = !(snapshot.HeadPresent || snapshot.TailPresent) + + return nil +} + +func printP2PStoreSnapshot(cmd *cobra.Command, snapshot p2pStoreSnapshot) { + out := cmd.OutOrStdout() + fmt.Fprintf(out, "\n[%s]\n", snapshot.Label) + fmt.Fprintf(out, "prefix: %s\n", snapshot.Prefix) + fmt.Fprintf(out, "height: %d\n", snapshot.Height) + if snapshot.Empty { + fmt.Fprintln(out, "status: empty (no entries found)") + return + } + + if snapshot.TailPresent { + fmt.Fprintf(out, "tail: height=%d hash=%s%s\n", snapshot.TailHeight, snapshot.TailHash, formatTime(snapshot.TailTime)) + } + if snapshot.HeadPresent { + fmt.Fprintf(out, "head: height=%d hash=%s%s\n", snapshot.HeadHeight, snapshot.HeadHash, formatTime(snapshot.HeadTime)) + } +} + +func formatTime(t time.Time) string { + if t.IsZero() { + return "" + } + return fmt.Sprintf(" time=%s", t.UTC().Format(time.RFC3339)) +} + +func resolveDBName(cmd *cobra.Command) string { + if cmd == nil { + return config.ConfigFileName + } + root := cmd.Root() + if root == nil || root.Name() == "" { + return config.ConfigFileName + } + return root.Name() +} + +func resolveStorePath(rootDir, dbPath, dbName string) string { + base := dbPath + if !filepath.IsAbs(dbPath) { + base = filepath.Join(rootDir, dbPath) + } + return filepath.Join(base, dbName) +} diff --git a/pkg/cmd/store_test.go b/pkg/cmd/store_test.go index d97b255c6a..bf0b2c4bc3 100644 --- a/pkg/cmd/store_test.go +++ b/pkg/cmd/store_test.go @@ -2,13 +2,25 @@ package cmd import ( "bytes" + "context" + cryptoRand "crypto/rand" "fmt" "os" "path/filepath" "testing" + goheaderstore "github.com/celestiaorg/go-header/store" + ds "github.com/ipfs/go-datastore" + kt "github.com/ipfs/go-datastore/keytransform" + "github.com/libp2p/go-libp2p/core/crypto" "github.com/spf13/cobra" "github.com/stretchr/testify/require" + + "github.com/evstack/ev-node/node" + "github.com/evstack/ev-node/pkg/config" + "github.com/evstack/ev-node/pkg/signer/noop" + "github.com/evstack/ev-node/pkg/store" + "github.com/evstack/ev-node/types" ) func TestUnsafeCleanDataDir(t *testing.T) { @@ -85,3 +97,87 @@ func TestStoreUnsafeCleanCmd(t *testing.T) { // Check output message (optional) require.Contains(t, buf.String(), fmt.Sprintf("All contents of the data directory at %s have been removed.", dataDir)) } + +func TestStoreP2PInspectCmd(t *testing.T) { + tempDir := t.TempDir() + const appName = "testapp" + + // Seed the header store with a couple of entries. + seedHeaderStore(t, tempDir, appName) + + rootCmd := &cobra.Command{Use: appName} + rootCmd.PersistentFlags().String(config.FlagRootDir, tempDir, "root directory") + rootCmd.AddCommand(StoreP2PInspectCmd) + + buf := new(bytes.Buffer) + rootCmd.SetOut(buf) + rootCmd.SetErr(buf) + rootCmd.SetArgs([]string{"store-info"}) + + err := rootCmd.Execute() + require.NoError(t, err) + + output := buf.String() + require.Contains(t, output, "Inspecting go-header stores") + require.Contains(t, output, "[Header Store]") + require.Contains(t, output, "tail: height=1") + require.Contains(t, output, "head: height=2") + require.Contains(t, output, "[Data Store]") + require.Contains(t, output, "status: empty") +} + +func seedHeaderStore(t *testing.T, rootDir, dbName string) { + t.Helper() + + rawStore, err := store.NewDefaultKVStore(rootDir, "data", dbName) + require.NoError(t, err) + + mainStore := kt.Wrap(rawStore, &kt.PrefixTransform{ + Prefix: ds.NewKey(node.EvPrefix), + }) + + headerStore, err := goheaderstore.NewStore[*types.SignedHeader]( + mainStore, + goheaderstore.WithStorePrefix(headerStorePrefix), + goheaderstore.WithMetrics(), + ) + require.NoError(t, err) + + ctx := context.Background() + require.NoError(t, headerStore.Start(ctx)) + + defer func() { + require.NoError(t, headerStore.Stop(ctx)) + require.NoError(t, rawStore.Close()) + }() + + pk, _, err := crypto.GenerateEd25519Key(cryptoRand.Reader) + require.NoError(t, err) + noopSigner, err := noop.NewNoopSigner(pk) + require.NoError(t, err) + + chainID := "test-chain" + headerCfg := types.HeaderConfig{ + Height: 1, + DataHash: types.GetRandomBytes(32), + AppHash: types.GetRandomBytes(32), + Signer: noopSigner, + } + + first, err := types.GetRandomSignedHeaderCustom(&headerCfg, chainID) + require.NoError(t, err) + require.NoError(t, headerStore.Append(ctx, first)) + + next := &types.SignedHeader{ + Header: types.GetRandomNextHeader(first.Header, chainID), + Signer: first.Signer, + } + payload, err := next.Header.MarshalBinary() + require.NoError(t, err) + signature, err := noopSigner.Sign(payload) + require.NoError(t, err) + next.Signature = signature + + require.NoError(t, headerStore.Append(ctx, next)) + require.NoError(t, headerStore.Sync(ctx)) +} From 3d78fd130ab847f083b5df13a32caa3973f2bf27 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 14 Nov 2025 12:06:15 +0100 Subject: [PATCH 06/23] add p2p store info --- docs/src/openapi-rpc.json | 117 ++++++++ node/full.go | 11 +- node/light.go | 11 +- pkg/rpc/client/client.go | 10 + pkg/rpc/client/client_test.go | 89 +++++- pkg/rpc/example/example.go | 4 +- pkg/rpc/server/server.go | 105 ++++++- pkg/rpc/server/server_test.go | 102 ++++++- proto/evnode/v1/state_rpc.proto | 25 ++ types/pb/evnode/v1/state_rpc.pb.go | 265 ++++++++++++++++-- .../evnode/v1/v1connect/state_rpc.connect.go | 31 ++ 11 files changed, 710 insertions(+), 60 deletions(-) diff --git a/docs/src/openapi-rpc.json b/docs/src/openapi-rpc.json index e8fd381ed6..ec7c9c5671 100644 --- a/docs/src/openapi-rpc.json +++ b/docs/src/openapi-rpc.json @@ -472,6 +472,48 @@ } } } + }, + "/evnode.v1.StoreService/GetP2PStoreInfo": { + "post": { + "tags": [ + "Store Service" + ], + "summary": "Inspect go-header stores", + "description": "Returns head/tail information for the header and data go-header stores used by P2P sync.", + "operationId": "getP2PStoreInfo", + "requestBody": { + "description": "Empty request", + "required": true, + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/Empty" + }, + "examples": { + "default": { + "summary": "Get go-header store snapshots", + "value": {} + } + } + } + } + }, + "responses": { + "200": { + "description": "Snapshots returned successfully", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/GetP2PStoreInfoResponse" + } + } + } + }, + "500": { + "$ref": "#/components/responses/InternalError" + } + } + } } }, "components": { @@ -1012,6 +1054,81 @@ "description": "Additional error details" } } + }, + "P2PStoreEntry": { + "type": "object", + "description": "Head or tail entry for a go-header store", + "required": [ + "height", + "hash" + ], + "properties": { + "height": { + "type": "integer", + "format": "int64", + "description": "Block height" + }, + "hash": { + "type": "string", + "format": "byte", + "description": "Header/data hash (base64-encoded)" + }, + "time": { + "type": "string", + "format": "date-time", + "description": "Entry timestamp" + } + } + }, + "P2PStoreSnapshot": { + "type": "object", + "description": "Snapshot of a go-header store", + "required": [ + "label", + "height", + "head_present", + "tail_present" + ], + "properties": { + "label": { + "type": "string", + "description": "Human friendly store label" + }, + "height": { + "type": "integer", + "format": "int64", + "description": "Highest contiguous height" + }, + "head_present": { + "type": "boolean", + "description": "Whether a head entry exists" + }, + "head": { + "$ref": "#/components/schemas/P2PStoreEntry" + }, + "tail_present": { + "type": "boolean", + "description": "Whether a tail entry exists" + }, + "tail": { + "$ref": "#/components/schemas/P2PStoreEntry" + } + } + }, + "GetP2PStoreInfoResponse": { + "type": "object", + "description": "Snapshot of the header and data go-header stores", + "required": [ + "stores" + ], + "properties": { + "stores": { + "type": "array", + "items": { + "$ref": "#/components/schemas/P2PStoreSnapshot" + } + } + } } } } diff --git a/node/full.go b/node/full.go index 2097c24b59..6d03a87c04 100644 --- a/node/full.go +++ b/node/full.go @@ -291,7 +291,16 @@ func (n *FullNode) Run(parentCtx context.Context) error { return min(hHeight, dHeight) } - handler, err := rpcserver.NewServiceHandler(n.Store, n.p2pClient, n.genesis.ProposerAddress, n.Logger, n.nodeConfig, bestKnownHeightProvider) + handler, err := rpcserver.NewServiceHandler( + n.Store, + n.hSyncService.Store(), + n.dSyncService.Store(), + n.p2pClient, + n.genesis.ProposerAddress, + n.Logger, + n.nodeConfig, + bestKnownHeightProvider, + ) if err != nil { return fmt.Errorf("error creating RPC handler: %w", err) } diff --git a/node/light.go b/node/light.go index 89fc0230f6..cc92b3fd8f 100644 --- a/node/light.go +++ b/node/light.go @@ -84,7 +84,16 @@ func (ln *LightNode) Run(parentCtx context.Context) error { return ln.hSyncService.Store().Height() } - handler, err := rpcserver.NewServiceHandler(ln.Store, ln.P2P, nil, ln.Logger, ln.nodeConfig, bestKnown) + handler, err := rpcserver.NewServiceHandler( + ln.Store, + ln.hSyncService.Store(), + nil, + ln.P2P, + nil, + ln.Logger, + ln.nodeConfig, + bestKnown, + ) if err != nil { return fmt.Errorf("error creating RPC handler: %w", err) } diff --git a/pkg/rpc/client/client.go b/pkg/rpc/client/client.go index f7a03d536f..54a46eaf1e 100644 --- a/pkg/rpc/client/client.go +++ b/pkg/rpc/client/client.go @@ -89,6 +89,16 @@ func (c *Client) GetMetadata(ctx context.Context, key string) ([]byte, error) { return resp.Msg.Value, nil } +// GetP2PStoreInfo returns head/tail metadata for the go-header stores +func (c *Client) GetP2PStoreInfo(ctx context.Context) ([]*pb.P2PStoreSnapshot, error) { + req := connect.NewRequest(&emptypb.Empty{}) + resp, err := c.storeClient.GetP2PStoreInfo(ctx, req) + if err != nil { + return nil, err + } + return resp.Msg.Stores, nil +} + // GetPeerInfo returns information about the connected peers func (c *Client) GetPeerInfo(ctx context.Context) ([]*pb.PeerInfo, error) { req := connect.NewRequest(&emptypb.Empty{}) diff --git a/pkg/rpc/client/client_test.go b/pkg/rpc/client/client_test.go index db375dee5b..3429daaf72 100644 --- a/pkg/rpc/client/client_test.go +++ b/pkg/rpc/client/client_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + goheader "github.com/celestiaorg/go-header" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "github.com/rs/zerolog" @@ -19,15 +20,22 @@ import ( "github.com/evstack/ev-node/pkg/p2p" "github.com/evstack/ev-node/pkg/rpc/server" "github.com/evstack/ev-node/test/mocks" + headerstoremocks "github.com/evstack/ev-node/test/mocks/external" "github.com/evstack/ev-node/types" rpc "github.com/evstack/ev-node/types/pb/evnode/v1/v1connect" ) -func setupTestServer(t *testing.T, mockStore *mocks.MockStore, mockP2P *mocks.MockP2PRPC) (*httptest.Server, *Client) { +func setupTestServer( + t *testing.T, + mockStore *mocks.MockStore, + headerStore goheader.Store[*types.SignedHeader], + dataStore goheader.Store[*types.Data], + mockP2P *mocks.MockP2PRPC, +) (*httptest.Server, *Client) { mux := http.NewServeMux() logger := zerolog.Nop() - storeServer := server.NewStoreServer(mockStore, logger) + storeServer := server.NewStoreServer(mockStore, headerStore, dataStore, logger) p2pServer := server.NewP2PServer(mockP2P) testConfig := config.DefaultConfig() @@ -62,7 +70,7 @@ func TestClientGetState(t *testing.T) { mockStore.On("GetState", mock.Anything).Return(state, nil) - testServer, client := setupTestServer(t, mockStore, mockP2P) + testServer, client := setupTestServer(t, mockStore, nil, nil, mockP2P) defer testServer.Close() resultState, err := client.GetState(context.Background()) @@ -84,7 +92,7 @@ func TestClientGetMetadata(t *testing.T) { mockStore.On("GetMetadata", mock.Anything, key).Return(value, nil) - testServer, client := setupTestServer(t, mockStore, mockP2P) + testServer, client := setupTestServer(t, mockStore, nil, nil, mockP2P) defer testServer.Close() resultValue, err := client.GetMetadata(context.Background(), key) @@ -94,6 +102,44 @@ func TestClientGetMetadata(t *testing.T) { mockStore.AssertExpectations(t) } +func TestClientGetP2PStoreInfo(t *testing.T) { + mockStore := mocks.NewMockStore(t) + mockP2P := mocks.NewMockP2PRPC(t) + headerStore := headerstoremocks.NewMockStore[*types.SignedHeader](t) + dataStore := headerstoremocks.NewMockStore[*types.Data](t) + + now := time.Now().UTC() + + headerHead := testSignedHeader(10, now) + headerTail := testSignedHeader(5, now.Add(-time.Minute)) + headerStore.On("Height").Return(uint64(10)) + headerStore.On("Head", mock.Anything).Return(headerHead, nil) + headerStore.On("Tail", mock.Anything).Return(headerTail, nil) + + dataHead := testData(8, now.Add(-30*time.Second)) + dataTail := testData(4, now.Add(-2*time.Minute)) + dataStore.On("Height").Return(uint64(8)) + dataStore.On("Head", mock.Anything).Return(dataHead, nil) + dataStore.On("Tail", mock.Anything).Return(dataTail, nil) + + testServer, client := setupTestServer(t, mockStore, headerStore, dataStore, mockP2P) + defer testServer.Close() + + stores, err := client.GetP2PStoreInfo(context.Background()) + require.NoError(t, err) + require.Len(t, stores, 2) + + require.Equal(t, "Header Store", stores[0].Label) + require.True(t, stores[0].HeadPresent) + require.Equal(t, uint64(10), stores[0].Head.Height) + require.True(t, stores[0].TailPresent) + require.Equal(t, uint64(5), stores[0].Tail.Height) + + require.Equal(t, "Data Store", stores[1].Label) + require.Equal(t, uint64(8), stores[1].Height) + require.Equal(t, uint64(4), stores[1].Tail.Height) +} + func TestClientGetBlockByHeight(t *testing.T) { mockStore := mocks.NewMockStore(t) mockP2P := mocks.NewMockP2PRPC(t) @@ -104,7 +150,7 @@ func TestClientGetBlockByHeight(t *testing.T) { mockStore.On("GetBlockData", mock.Anything, height).Return(header, data, nil) - testServer, client := setupTestServer(t, mockStore, mockP2P) + testServer, client := setupTestServer(t, mockStore, nil, nil, mockP2P) defer testServer.Close() block, err := client.GetBlockByHeight(context.Background(), height) @@ -124,7 +170,7 @@ func TestClientGetBlockByHash(t *testing.T) { mockStore.On("GetBlockByHash", mock.Anything, hash).Return(header, data, nil) - testServer, client := setupTestServer(t, mockStore, mockP2P) + testServer, client := setupTestServer(t, mockStore, nil, nil, mockP2P) defer testServer.Close() block, err := client.GetBlockByHash(context.Background(), hash) @@ -154,7 +200,7 @@ func TestClientGetPeerInfo(t *testing.T) { mockP2P.On("GetPeers").Return(peers, nil) - testServer, client := setupTestServer(t, mockStore, mockP2P) + testServer, client := setupTestServer(t, mockStore, nil, nil, mockP2P) defer testServer.Close() resultPeers, err := client.GetPeerInfo(context.Background()) @@ -179,7 +225,7 @@ func TestClientGetNetInfo(t *testing.T) { mockP2P.On("GetNetworkInfo").Return(netInfo, nil) - testServer, client := setupTestServer(t, mockStore, mockP2P) + testServer, client := setupTestServer(t, mockStore, nil, nil, mockP2P) defer testServer.Close() resultNetInfo, err := client.GetNetInfo(context.Background()) @@ -194,7 +240,7 @@ func TestClientGetNamespace(t *testing.T) { mockStore := mocks.NewMockStore(t) mockP2P := mocks.NewMockP2PRPC(t) - testServer, client := setupTestServer(t, mockStore, mockP2P) + testServer, client := setupTestServer(t, mockStore, nil, nil, mockP2P) defer testServer.Close() namespaceResp, err := client.GetNamespace(context.Background()) @@ -205,3 +251,28 @@ func TestClientGetNamespace(t *testing.T) { require.NotEmpty(t, namespaceResp.HeaderNamespace) require.NotEmpty(t, namespaceResp.DataNamespace) } + +func testSignedHeader(height uint64, ts time.Time) *types.SignedHeader { + return &types.SignedHeader{ + Header: types.Header{ + BaseHeader: types.BaseHeader{ + Height: height, + Time: uint64(ts.UnixNano()), + ChainID: "test-chain", + }, + ProposerAddress: []byte{0x01}, + DataHash: []byte{0x02}, + AppHash: []byte{0x03}, + }, + } +} + +func testData(height uint64, ts time.Time) *types.Data { + return &types.Data{ + Metadata: &types.Metadata{ + ChainID: "test-chain", + Height: height, + Time: uint64(ts.UnixNano()), + }, + } +} diff --git a/pkg/rpc/example/example.go b/pkg/rpc/example/example.go index a1bc4cd7c0..f3c020fa87 100644 --- a/pkg/rpc/example/example.go +++ b/pkg/rpc/example/example.go @@ -22,7 +22,7 @@ func StartStoreServer(s store.Store, address string, logger zerolog.Logger) { rpcAddr := fmt.Sprintf("%s:%d", "localhost", 8080) cfg := config.DefaultConfig() - handler, err := server.NewServiceHandler(s, nil, nil, logger, cfg, nil) + handler, err := server.NewServiceHandler(s, nil, nil, nil, nil, logger, cfg, nil) if err != nil { panic(err) } @@ -82,7 +82,7 @@ func ExampleServer(s store.Store) { // Start RPC server rpcAddr := fmt.Sprintf("%s:%d", "localhost", 8080) cfg := config.DefaultConfig() - handler, err := server.NewServiceHandler(s, nil, nil, logger, cfg, nil) + handler, err := server.NewServiceHandler(s, nil, nil, nil, nil, logger, cfg, nil) if err != nil { panic(err) } diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index f649fda37d..e397aa8125 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -12,6 +12,7 @@ import ( "connectrpc.com/connect" "connectrpc.com/grpcreflect" + goheader "github.com/celestiaorg/go-header" coreda "github.com/evstack/ev-node/core/da" ds "github.com/ipfs/go-datastore" "github.com/rs/zerolog" @@ -32,15 +33,24 @@ var _ rpc.StoreServiceHandler = (*StoreServer)(nil) // StoreServer implements the StoreService defined in the proto file type StoreServer struct { - store store.Store - logger zerolog.Logger + store store.Store + headerStore goheader.Store[*types.SignedHeader] + dataStore goheader.Store[*types.Data] + logger zerolog.Logger } // NewStoreServer creates a new StoreServer instance -func NewStoreServer(store store.Store, logger zerolog.Logger) *StoreServer { +func NewStoreServer( + store store.Store, + headerStore goheader.Store[*types.SignedHeader], + dataStore goheader.Store[*types.Data], + logger zerolog.Logger, +) *StoreServer { return &StoreServer{ - store: store, - logger: logger, + store: store, + headerStore: headerStore, + dataStore: dataStore, + logger: logger, } } @@ -172,6 +182,34 @@ func (s *StoreServer) GetGenesisDaHeight( } +// GetP2PStoreInfo implements the GetP2PStoreInfo RPC method +func (s *StoreServer) GetP2PStoreInfo( + ctx context.Context, + _ *connect.Request[emptypb.Empty], +) (*connect.Response[pb.GetP2PStoreInfoResponse], error) { + snapshots := make([]*pb.P2PStoreSnapshot, 0, 2) + + if s.headerStore != nil { + snapshot, err := collectP2PStoreSnapshot(ctx, s.headerStore, "Header Store") + if err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + snapshots = append(snapshots, snapshot) + } + + if s.dataStore != nil { + snapshot, err := collectP2PStoreSnapshot(ctx, s.dataStore, "Data Store") + if err != nil { + return nil, connect.NewError(connect.CodeInternal, err) + } + snapshots = append(snapshots, snapshot) + } + + return connect.NewResponse(&pb.GetP2PStoreInfoResponse{ + Stores: snapshots, + }), nil +} + // GetMetadata implements the GetMetadata RPC method func (s *StoreServer) GetMetadata( ctx context.Context, @@ -187,6 +225,50 @@ func (s *StoreServer) GetMetadata( }), nil } +func collectP2PStoreSnapshot[H goheader.Header[H]]( + ctx context.Context, + store goheader.Store[H], + label string, +) (*pb.P2PStoreSnapshot, error) { + snapshot := &pb.P2PStoreSnapshot{ + Label: label, + Height: store.Height(), + } + + if head, err := store.Head(ctx); err == nil { + snapshot.HeadPresent = true + snapshot.Head = toP2PStoreEntry(head) + } else if !errors.Is(err, goheader.ErrEmptyStore) && !errors.Is(err, goheader.ErrNotFound) { + return nil, fmt.Errorf("failed to read %s head: %w", label, err) + } + + if tail, err := store.Tail(ctx); err == nil { + snapshot.TailPresent = true + snapshot.Tail = toP2PStoreEntry(tail) + } else if !errors.Is(err, goheader.ErrEmptyStore) && !errors.Is(err, goheader.ErrNotFound) { + return nil, fmt.Errorf("failed to read %s tail: %w", label, err) + } + + return snapshot, nil +} + +func toP2PStoreEntry[H goheader.Header[H]](item H) *pb.P2PStoreEntry { + if any(item) == nil { + return nil + } + + entry := &pb.P2PStoreEntry{ + Height: item.Height(), + Hash: append([]byte(nil), item.Hash()...), + } + + if ts := item.Time(); !ts.IsZero() { + entry.Time = timestamppb.New(ts) + } + + return entry +} + type ConfigServer struct { config config.Config signer []byte @@ -288,8 +370,17 @@ func (p *P2PServer) GetNetInfo( } // NewServiceHandler creates a new HTTP handler for Store, P2P and Config services -func NewServiceHandler(store store.Store, peerManager p2p.P2PRPC, proposerAddress []byte, logger zerolog.Logger, config config.Config, bestKnown BestKnownHeightProvider) (http.Handler, error) { - storeServer := NewStoreServer(store, logger) +func NewServiceHandler( + store store.Store, + headerStore goheader.Store[*types.SignedHeader], + dataStore goheader.Store[*types.Data], + peerManager p2p.P2PRPC, + proposerAddress []byte, + logger zerolog.Logger, + config config.Config, + bestKnown BestKnownHeightProvider, +) (http.Handler, error) { + storeServer := NewStoreServer(store, headerStore, dataStore, logger) p2pServer := NewP2PServer(peerManager) configServer := NewConfigServer(config, proposerAddress, logger) diff --git a/pkg/rpc/server/server_test.go b/pkg/rpc/server/server_test.go index 1b0818e862..98eecc006a 100644 --- a/pkg/rpc/server/server_test.go +++ b/pkg/rpc/server/server_test.go @@ -23,6 +23,7 @@ import ( "github.com/evstack/ev-node/pkg/p2p" "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/test/mocks" + headerstoremocks "github.com/evstack/ev-node/test/mocks/external" "github.com/evstack/ev-node/types" pb "github.com/evstack/ev-node/types/pb/evnode/v1" ) @@ -46,7 +47,7 @@ func TestGetBlock(t *testing.T) { // Create server with mock store logger := zerolog.Nop() - server := NewStoreServer(mockStore, logger) + server := NewStoreServer(mockStore, nil, nil, logger) // Test GetBlock with height - success case t.Run("by height with DA heights", func(t *testing.T) { @@ -138,7 +139,7 @@ func TestGetBlock_Latest(t *testing.T) { mockStore := mocks.NewMockStore(t) logger := zerolog.Nop() - server := NewStoreServer(mockStore, logger) + server := NewStoreServer(mockStore, nil, nil, logger) latestHeight := uint64(20) header := &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: latestHeight}}} @@ -194,7 +195,7 @@ func TestGetState(t *testing.T) { // Create server with mock store logger := zerolog.Nop() - server := NewStoreServer(mockStore, logger) + server := NewStoreServer(mockStore, nil, nil, logger) // Call GetState req := connect.NewRequest(&emptypb.Empty{}) @@ -217,7 +218,7 @@ func TestGetState_Error(t *testing.T) { mockStore := mocks.NewMockStore(t) mockStore.On("GetState", mock.Anything).Return(types.State{}, fmt.Errorf("state error")) logger := zerolog.Nop() - server := NewStoreServer(mockStore, logger) + server := NewStoreServer(mockStore, nil, nil, logger) resp, err := server.GetState(context.Background(), connect.NewRequest(&emptypb.Empty{})) require.Error(t, err) require.Nil(t, resp) @@ -236,7 +237,7 @@ func TestGetMetadata(t *testing.T) { // Create server with mock store logger := zerolog.Nop() - server := NewStoreServer(mockStore, logger) + server := NewStoreServer(mockStore, nil, nil, logger) // Call GetMetadata req := connect.NewRequest(&pb.GetMetadataRequest{ @@ -254,7 +255,7 @@ func TestGetMetadata_Error(t *testing.T) { mockStore := mocks.NewMockStore(t) mockStore.On("GetMetadata", mock.Anything, "bad").Return(nil, fmt.Errorf("meta error")) logger := zerolog.Nop() - server := NewStoreServer(mockStore, logger) + server := NewStoreServer(mockStore, nil, nil, logger) resp, err := server.GetMetadata(context.Background(), connect.NewRequest(&pb.GetMetadataRequest{Key: "bad"})) require.Error(t, err) require.Nil(t, resp) @@ -269,7 +270,7 @@ func TestGetGenesisDaHeight(t *testing.T) { mockStore.On("GetMetadata", mock.Anything, store.GenesisDAHeightKey).Return(heightBytes, nil).Once() logger := zerolog.Nop() - server := NewStoreServer(mockStore, logger) + server := NewStoreServer(mockStore, nil, nil, logger) t.Run("success", func(t *testing.T) { req := connect.NewRequest(&emptypb.Empty{}) @@ -287,7 +288,7 @@ func TestGetGenesisDaHeight_NotFound(t *testing.T) { mockStore.On("GetMetadata", mock.Anything, store.GenesisDAHeightKey).Return(nil, fmt.Errorf("genesis DA height not found")).Once() logger := zerolog.Nop() - server := NewStoreServer(mockStore, logger) + server := NewStoreServer(mockStore, nil, nil, logger) req := connect.NewRequest(&emptypb.Empty{}) resp, err := server.GetGenesisDaHeight(context.Background(), req) @@ -307,7 +308,7 @@ func TestGetGenesisDaHeight_InvalidLength(t *testing.T) { mockStore.On("GetMetadata", mock.Anything, store.GenesisDAHeightKey).Return(invalidBytes, nil).Once() logger := zerolog.Nop() - server := NewStoreServer(mockStore, logger) + server := NewStoreServer(mockStore, nil, nil, logger) req := connect.NewRequest(&emptypb.Empty{}) resp, err := server.GetGenesisDaHeight(context.Background(), req) @@ -321,6 +322,52 @@ func TestGetGenesisDaHeight_InvalidLength(t *testing.T) { mockStore.AssertExpectations(t) } +func TestGetP2PStoreInfo(t *testing.T) { + t.Run("returns snapshots for configured stores", func(t *testing.T) { + mockStore := mocks.NewMockStore(t) + headerStore := headerstoremocks.NewMockStore[*types.SignedHeader](t) + dataStore := headerstoremocks.NewMockStore[*types.Data](t) + logger := zerolog.Nop() + server := NewStoreServer(mockStore, headerStore, dataStore, logger) + + now := time.Now().UTC() + headerStore.On("Height").Return(uint64(12)) + headerStore.On("Head", mock.Anything).Return(makeTestSignedHeader(12, now), nil) + headerStore.On("Tail", mock.Anything).Return(makeTestSignedHeader(7, now.Add(-time.Minute)), nil) + + dataStore.On("Height").Return(uint64(9)) + dataStore.On("Head", mock.Anything).Return(makeTestData(9, now.Add(-30*time.Second)), nil) + dataStore.On("Tail", mock.Anything).Return(makeTestData(4, now.Add(-2*time.Minute)), nil) + + resp, err := server.GetP2PStoreInfo(context.Background(), connect.NewRequest(&emptypb.Empty{})) + require.NoError(t, err) + require.Len(t, resp.Msg.Stores, 2) + + require.Equal(t, "Header Store", resp.Msg.Stores[0].Label) + require.Equal(t, uint64(12), resp.Msg.Stores[0].Height) + require.True(t, resp.Msg.Stores[0].HeadPresent) + require.True(t, resp.Msg.Stores[0].TailPresent) + + require.Equal(t, "Data Store", resp.Msg.Stores[1].Label) + require.Equal(t, uint64(9), resp.Msg.Stores[1].Height) + require.Equal(t, uint64(9), resp.Msg.Stores[1].Head.Height) + require.Equal(t, uint64(4), resp.Msg.Stores[1].Tail.Height) + }) + + t.Run("returns error when a store edge fails", func(t *testing.T) { + mockStore := mocks.NewMockStore(t) + headerStore := headerstoremocks.NewMockStore[*types.SignedHeader](t) + logger := zerolog.Nop() + headerStore.On("Height").Return(uint64(0)) + headerStore.On("Head", mock.Anything).Return((*types.SignedHeader)(nil), fmt.Errorf("boom")) + + server := NewStoreServer(mockStore, headerStore, nil, logger) + resp, err := server.GetP2PStoreInfo(context.Background(), connect.NewRequest(&emptypb.Empty{})) + require.Error(t, err) + require.Nil(t, resp) + }) +} + func TestP2PServer_GetPeerInfo(t *testing.T) { mockP2P := &mocks.MockP2PRPC{} addr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/4001") @@ -371,7 +418,7 @@ func TestHealthLiveEndpoint(t *testing.T) { // Mock successful store access mockStore.On("Height", mock.Anything).Return(uint64(100), nil) - handler, err := NewServiceHandler(mockStore, mockP2PManager, nil, logger, testConfig, nil) + handler, err := NewServiceHandler(mockStore, nil, nil, mockP2PManager, nil, logger, testConfig, nil) require.NoError(t, err) server := httptest.NewServer(handler) @@ -396,7 +443,7 @@ func TestHealthLiveEndpoint(t *testing.T) { // Mock store access failure mockStore.On("Height", mock.Anything).Return(uint64(0), fmt.Errorf("store unavailable")) - handler, err := NewServiceHandler(mockStore, mockP2PManager, nil, logger, testConfig, nil) + handler, err := NewServiceHandler(mockStore, nil, nil, mockP2PManager, nil, logger, testConfig, nil) require.NoError(t, err) server := httptest.NewServer(handler) @@ -421,7 +468,7 @@ func TestHealthLiveEndpoint(t *testing.T) { // Mock successful store access at genesis mockStore.On("Height", mock.Anything).Return(uint64(0), nil) - handler, err := NewServiceHandler(mockStore, mockP2PManager, nil, logger, testConfig, nil) + handler, err := NewServiceHandler(mockStore, nil, nil, mockP2PManager, nil, logger, testConfig, nil) require.NoError(t, err) server := httptest.NewServer(handler) @@ -498,7 +545,7 @@ func TestHealthReadyEndpoint(t *testing.T) { } bestKnown := func() uint64 { return tc.bestKnown } - handler, err := NewServiceHandler(mockStore, mockP2P, nil, logger, testConfig, bestKnown) + handler, err := NewServiceHandler(mockStore, nil, nil, mockP2P, nil, logger, testConfig, bestKnown) require.NoError(t, err) server := httptest.NewServer(handler) defer server.Close() @@ -539,7 +586,7 @@ func TestHealthReadyEndpoint(t *testing.T) { mockStore.On("GetState", mock.Anything).Return(state, nil) bestKnown := func() uint64 { return 100 } - handler, err := NewServiceHandler(mockStore, mockP2P, nil, logger, testConfig, bestKnown) + handler, err := NewServiceHandler(mockStore, nil, nil, mockP2P, nil, logger, testConfig, bestKnown) require.NoError(t, err) server := httptest.NewServer(handler) defer server.Close() @@ -569,7 +616,7 @@ func TestHealthReadyEndpoint(t *testing.T) { mockStore.On("GetState", mock.Anything).Return(state, nil) bestKnown := func() uint64 { return 100 } - handler, err := NewServiceHandler(mockStore, mockP2P, nil, logger, testConfig, bestKnown) + handler, err := NewServiceHandler(mockStore, nil, nil, mockP2P, nil, logger, testConfig, bestKnown) require.NoError(t, err) server := httptest.NewServer(handler) defer server.Close() @@ -581,3 +628,28 @@ func TestHealthReadyEndpoint(t *testing.T) { }) }) } + +func makeTestSignedHeader(height uint64, ts time.Time) *types.SignedHeader { + return &types.SignedHeader{ + Header: types.Header{ + BaseHeader: types.BaseHeader{ + Height: height, + Time: uint64(ts.UnixNano()), + ChainID: "test-chain", + }, + ProposerAddress: []byte{0x01}, + DataHash: []byte{0x02}, + AppHash: []byte{0x03}, + }, + } +} + +func makeTestData(height uint64, ts time.Time) *types.Data { + return &types.Data{ + Metadata: &types.Metadata{ + ChainID: "test-chain", + Height: height, + Time: uint64(ts.UnixNano()), + }, + } +} diff --git a/proto/evnode/v1/state_rpc.proto b/proto/evnode/v1/state_rpc.proto index 1b468b6ef8..e0997b3d9d 100644 --- a/proto/evnode/v1/state_rpc.proto +++ b/proto/evnode/v1/state_rpc.proto @@ -21,6 +21,9 @@ service StoreService { // GetGenesisDaHeight returns the DA height at which the first Evolve block was included. rpc GetGenesisDaHeight(google.protobuf.Empty) returns (GetGenesisDaHeightResponse) {} + + // GetP2PStoreInfo returns head/tail information for the go-header stores used by P2P sync. + rpc GetP2PStoreInfo(google.protobuf.Empty) returns (GetP2PStoreInfoResponse) {} } // Block contains all the components of a complete block @@ -64,3 +67,25 @@ message GetMetadataResponse { message GetGenesisDaHeightResponse { uint64 height = 3; } + +// P2PStoreEntry captures a single head or tail record from a go-header store. +message P2PStoreEntry { + uint64 height = 1; + bytes hash = 2; + google.protobuf.Timestamp time = 3; +} + +// P2PStoreSnapshot provides head/tail status for a go-header store. +message P2PStoreSnapshot { + string label = 1; + uint64 height = 2; + bool head_present = 3; + P2PStoreEntry head = 4; + bool tail_present = 5; + P2PStoreEntry tail = 6; +} + +// GetP2PStoreInfoResponse holds the snapshots for configured go-header stores. +message GetP2PStoreInfoResponse { + repeated P2PStoreSnapshot stores = 1; +} diff --git a/types/pb/evnode/v1/state_rpc.pb.go b/types/pb/evnode/v1/state_rpc.pb.go index e087a82897..a36fe4cd5a 100644 --- a/types/pb/evnode/v1/state_rpc.pb.go +++ b/types/pb/evnode/v1/state_rpc.pb.go @@ -10,7 +10,7 @@ import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" emptypb "google.golang.org/protobuf/types/known/emptypb" - _ "google.golang.org/protobuf/types/known/timestamppb" + timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" unsafe "unsafe" @@ -402,6 +402,197 @@ func (x *GetGenesisDaHeightResponse) GetHeight() uint64 { return 0 } +// P2PStoreEntry captures a single head or tail record from a go-header store. +type P2PStoreEntry struct { + state protoimpl.MessageState `protogen:"open.v1"` + Height uint64 `protobuf:"varint,1,opt,name=height,proto3" json:"height,omitempty"` + Hash []byte `protobuf:"bytes,2,opt,name=hash,proto3" json:"hash,omitempty"` + Time *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=time,proto3" json:"time,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *P2PStoreEntry) Reset() { + *x = P2PStoreEntry{} + mi := &file_evnode_v1_state_rpc_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *P2PStoreEntry) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*P2PStoreEntry) ProtoMessage() {} + +func (x *P2PStoreEntry) ProtoReflect() protoreflect.Message { + mi := &file_evnode_v1_state_rpc_proto_msgTypes[7] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use P2PStoreEntry.ProtoReflect.Descriptor instead. +func (*P2PStoreEntry) Descriptor() ([]byte, []int) { + return file_evnode_v1_state_rpc_proto_rawDescGZIP(), []int{7} +} + +func (x *P2PStoreEntry) GetHeight() uint64 { + if x != nil { + return x.Height + } + return 0 +} + +func (x *P2PStoreEntry) GetHash() []byte { + if x != nil { + return x.Hash + } + return nil +} + +func (x *P2PStoreEntry) GetTime() *timestamppb.Timestamp { + if x != nil { + return x.Time + } + return nil +} + +// P2PStoreSnapshot provides head/tail status for a go-header store. +type P2PStoreSnapshot struct { + state protoimpl.MessageState `protogen:"open.v1"` + Label string `protobuf:"bytes,1,opt,name=label,proto3" json:"label,omitempty"` + Height uint64 `protobuf:"varint,2,opt,name=height,proto3" json:"height,omitempty"` + HeadPresent bool `protobuf:"varint,3,opt,name=head_present,json=headPresent,proto3" json:"head_present,omitempty"` + Head *P2PStoreEntry `protobuf:"bytes,4,opt,name=head,proto3" json:"head,omitempty"` + TailPresent bool `protobuf:"varint,5,opt,name=tail_present,json=tailPresent,proto3" json:"tail_present,omitempty"` + Tail *P2PStoreEntry `protobuf:"bytes,6,opt,name=tail,proto3" json:"tail,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *P2PStoreSnapshot) Reset() { + *x = P2PStoreSnapshot{} + mi := &file_evnode_v1_state_rpc_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *P2PStoreSnapshot) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*P2PStoreSnapshot) ProtoMessage() {} + +func (x *P2PStoreSnapshot) ProtoReflect() protoreflect.Message { + mi := &file_evnode_v1_state_rpc_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use P2PStoreSnapshot.ProtoReflect.Descriptor instead. +func (*P2PStoreSnapshot) Descriptor() ([]byte, []int) { + return file_evnode_v1_state_rpc_proto_rawDescGZIP(), []int{8} +} + +func (x *P2PStoreSnapshot) GetLabel() string { + if x != nil { + return x.Label + } + return "" +} + +func (x *P2PStoreSnapshot) GetHeight() uint64 { + if x != nil { + return x.Height + } + return 0 +} + +func (x *P2PStoreSnapshot) GetHeadPresent() bool { + if x != nil { + return x.HeadPresent + } + return false +} + +func (x *P2PStoreSnapshot) GetHead() *P2PStoreEntry { + if x != nil { + return x.Head + } + return nil +} + +func (x *P2PStoreSnapshot) GetTailPresent() bool { + if x != nil { + return x.TailPresent + } + return false +} + +func (x *P2PStoreSnapshot) GetTail() *P2PStoreEntry { + if x != nil { + return x.Tail + } + return nil +} + +// GetP2PStoreInfoResponse holds the snapshots for configured go-header stores. +type GetP2PStoreInfoResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Stores []*P2PStoreSnapshot `protobuf:"bytes,1,rep,name=stores,proto3" json:"stores,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetP2PStoreInfoResponse) Reset() { + *x = GetP2PStoreInfoResponse{} + mi := &file_evnode_v1_state_rpc_proto_msgTypes[9] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetP2PStoreInfoResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetP2PStoreInfoResponse) ProtoMessage() {} + +func (x *GetP2PStoreInfoResponse) ProtoReflect() protoreflect.Message { + mi := &file_evnode_v1_state_rpc_proto_msgTypes[9] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetP2PStoreInfoResponse.ProtoReflect.Descriptor instead. +func (*GetP2PStoreInfoResponse) Descriptor() ([]byte, []int) { + return file_evnode_v1_state_rpc_proto_rawDescGZIP(), []int{9} +} + +func (x *GetP2PStoreInfoResponse) GetStores() []*P2PStoreSnapshot { + if x != nil { + return x.Stores + } + return nil +} + var File_evnode_v1_state_rpc_proto protoreflect.FileDescriptor const file_evnode_v1_state_rpc_proto_rawDesc = "" + @@ -426,12 +617,26 @@ const file_evnode_v1_state_rpc_proto_rawDesc = "" + "\x13GetMetadataResponse\x12\x14\n" + "\x05value\x18\x01 \x01(\fR\x05value\"4\n" + "\x1aGetGenesisDaHeightResponse\x12\x16\n" + - "\x06height\x18\x03 \x01(\x04R\x06height2\xbf\x02\n" + + "\x06height\x18\x03 \x01(\x04R\x06height\"k\n" + + "\rP2PStoreEntry\x12\x16\n" + + "\x06height\x18\x01 \x01(\x04R\x06height\x12\x12\n" + + "\x04hash\x18\x02 \x01(\fR\x04hash\x12.\n" + + "\x04time\x18\x03 \x01(\v2\x1a.google.protobuf.TimestampR\x04time\"\xe2\x01\n" + + "\x10P2PStoreSnapshot\x12\x14\n" + + "\x05label\x18\x01 \x01(\tR\x05label\x12\x16\n" + + "\x06height\x18\x02 \x01(\x04R\x06height\x12!\n" + + "\fhead_present\x18\x03 \x01(\bR\vheadPresent\x12,\n" + + "\x04head\x18\x04 \x01(\v2\x18.evnode.v1.P2PStoreEntryR\x04head\x12!\n" + + "\ftail_present\x18\x05 \x01(\bR\vtailPresent\x12,\n" + + "\x04tail\x18\x06 \x01(\v2\x18.evnode.v1.P2PStoreEntryR\x04tail\"N\n" + + "\x17GetP2PStoreInfoResponse\x123\n" + + "\x06stores\x18\x01 \x03(\v2\x1b.evnode.v1.P2PStoreSnapshotR\x06stores2\x90\x03\n" + "\fStoreService\x12E\n" + "\bGetBlock\x12\x1a.evnode.v1.GetBlockRequest\x1a\x1b.evnode.v1.GetBlockResponse\"\x00\x12A\n" + "\bGetState\x12\x16.google.protobuf.Empty\x1a\x1b.evnode.v1.GetStateResponse\"\x00\x12N\n" + "\vGetMetadata\x12\x1d.evnode.v1.GetMetadataRequest\x1a\x1e.evnode.v1.GetMetadataResponse\"\x00\x12U\n" + - "\x12GetGenesisDaHeight\x12\x16.google.protobuf.Empty\x1a%.evnode.v1.GetGenesisDaHeightResponse\"\x00B/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3" + "\x12GetGenesisDaHeight\x12\x16.google.protobuf.Empty\x1a%.evnode.v1.GetGenesisDaHeightResponse\"\x00\x12O\n" + + "\x0fGetP2PStoreInfo\x12\x16.google.protobuf.Empty\x1a\".evnode.v1.GetP2PStoreInfoResponse\"\x00B/Z-github.com/evstack/ev-node/types/pb/evnode/v1b\x06proto3" var ( file_evnode_v1_state_rpc_proto_rawDescOnce sync.Once @@ -445,7 +650,7 @@ func file_evnode_v1_state_rpc_proto_rawDescGZIP() []byte { return file_evnode_v1_state_rpc_proto_rawDescData } -var file_evnode_v1_state_rpc_proto_msgTypes = make([]protoimpl.MessageInfo, 7) +var file_evnode_v1_state_rpc_proto_msgTypes = make([]protoimpl.MessageInfo, 10) var file_evnode_v1_state_rpc_proto_goTypes = []any{ (*Block)(nil), // 0: evnode.v1.Block (*GetBlockRequest)(nil), // 1: evnode.v1.GetBlockRequest @@ -454,29 +659,39 @@ var file_evnode_v1_state_rpc_proto_goTypes = []any{ (*GetMetadataRequest)(nil), // 4: evnode.v1.GetMetadataRequest (*GetMetadataResponse)(nil), // 5: evnode.v1.GetMetadataResponse (*GetGenesisDaHeightResponse)(nil), // 6: evnode.v1.GetGenesisDaHeightResponse - (*SignedHeader)(nil), // 7: evnode.v1.SignedHeader - (*Data)(nil), // 8: evnode.v1.Data - (*State)(nil), // 9: evnode.v1.State - (*emptypb.Empty)(nil), // 10: google.protobuf.Empty + (*P2PStoreEntry)(nil), // 7: evnode.v1.P2PStoreEntry + (*P2PStoreSnapshot)(nil), // 8: evnode.v1.P2PStoreSnapshot + (*GetP2PStoreInfoResponse)(nil), // 9: evnode.v1.GetP2PStoreInfoResponse + (*SignedHeader)(nil), // 10: evnode.v1.SignedHeader + (*Data)(nil), // 11: evnode.v1.Data + (*State)(nil), // 12: evnode.v1.State + (*timestamppb.Timestamp)(nil), // 13: google.protobuf.Timestamp + (*emptypb.Empty)(nil), // 14: google.protobuf.Empty } var file_evnode_v1_state_rpc_proto_depIdxs = []int32{ - 7, // 0: evnode.v1.Block.header:type_name -> evnode.v1.SignedHeader - 8, // 1: evnode.v1.Block.data:type_name -> evnode.v1.Data + 10, // 0: evnode.v1.Block.header:type_name -> evnode.v1.SignedHeader + 11, // 1: evnode.v1.Block.data:type_name -> evnode.v1.Data 0, // 2: evnode.v1.GetBlockResponse.block:type_name -> evnode.v1.Block - 9, // 3: evnode.v1.GetStateResponse.state:type_name -> evnode.v1.State - 1, // 4: evnode.v1.StoreService.GetBlock:input_type -> evnode.v1.GetBlockRequest - 10, // 5: evnode.v1.StoreService.GetState:input_type -> google.protobuf.Empty - 4, // 6: evnode.v1.StoreService.GetMetadata:input_type -> evnode.v1.GetMetadataRequest - 10, // 7: evnode.v1.StoreService.GetGenesisDaHeight:input_type -> google.protobuf.Empty - 2, // 8: evnode.v1.StoreService.GetBlock:output_type -> evnode.v1.GetBlockResponse - 3, // 9: evnode.v1.StoreService.GetState:output_type -> evnode.v1.GetStateResponse - 5, // 10: evnode.v1.StoreService.GetMetadata:output_type -> evnode.v1.GetMetadataResponse - 6, // 11: evnode.v1.StoreService.GetGenesisDaHeight:output_type -> evnode.v1.GetGenesisDaHeightResponse - 8, // [8:12] is the sub-list for method output_type - 4, // [4:8] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name + 12, // 3: evnode.v1.GetStateResponse.state:type_name -> evnode.v1.State + 13, // 4: evnode.v1.P2PStoreEntry.time:type_name -> google.protobuf.Timestamp + 7, // 5: evnode.v1.P2PStoreSnapshot.head:type_name -> evnode.v1.P2PStoreEntry + 7, // 6: evnode.v1.P2PStoreSnapshot.tail:type_name -> evnode.v1.P2PStoreEntry + 8, // 7: evnode.v1.GetP2PStoreInfoResponse.stores:type_name -> evnode.v1.P2PStoreSnapshot + 1, // 8: evnode.v1.StoreService.GetBlock:input_type -> evnode.v1.GetBlockRequest + 14, // 9: evnode.v1.StoreService.GetState:input_type -> google.protobuf.Empty + 4, // 10: evnode.v1.StoreService.GetMetadata:input_type -> evnode.v1.GetMetadataRequest + 14, // 11: evnode.v1.StoreService.GetGenesisDaHeight:input_type -> google.protobuf.Empty + 14, // 12: evnode.v1.StoreService.GetP2PStoreInfo:input_type -> google.protobuf.Empty + 2, // 13: evnode.v1.StoreService.GetBlock:output_type -> evnode.v1.GetBlockResponse + 3, // 14: evnode.v1.StoreService.GetState:output_type -> evnode.v1.GetStateResponse + 5, // 15: evnode.v1.StoreService.GetMetadata:output_type -> evnode.v1.GetMetadataResponse + 6, // 16: evnode.v1.StoreService.GetGenesisDaHeight:output_type -> evnode.v1.GetGenesisDaHeightResponse + 9, // 17: evnode.v1.StoreService.GetP2PStoreInfo:output_type -> evnode.v1.GetP2PStoreInfoResponse + 13, // [13:18] is the sub-list for method output_type + 8, // [8:13] is the sub-list for method input_type + 8, // [8:8] is the sub-list for extension type_name + 8, // [8:8] is the sub-list for extension extendee + 0, // [0:8] is the sub-list for field type_name } func init() { file_evnode_v1_state_rpc_proto_init() } @@ -496,7 +711,7 @@ func file_evnode_v1_state_rpc_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_evnode_v1_state_rpc_proto_rawDesc), len(file_evnode_v1_state_rpc_proto_rawDesc)), NumEnums: 0, - NumMessages: 7, + NumMessages: 10, NumExtensions: 0, NumServices: 1, }, diff --git a/types/pb/evnode/v1/v1connect/state_rpc.connect.go b/types/pb/evnode/v1/v1connect/state_rpc.connect.go index 1fe049fa11..c86e260e90 100644 --- a/types/pb/evnode/v1/v1connect/state_rpc.connect.go +++ b/types/pb/evnode/v1/v1connect/state_rpc.connect.go @@ -44,6 +44,9 @@ const ( // StoreServiceGetGenesisDaHeightProcedure is the fully-qualified name of the StoreService's // GetGenesisDaHeight RPC. StoreServiceGetGenesisDaHeightProcedure = "/evnode.v1.StoreService/GetGenesisDaHeight" + // StoreServiceGetP2PStoreInfoProcedure is the fully-qualified name of the StoreService's + // GetP2PStoreInfo RPC. + StoreServiceGetP2PStoreInfoProcedure = "/evnode.v1.StoreService/GetP2PStoreInfo" ) // StoreServiceClient is a client for the evnode.v1.StoreService service. @@ -56,6 +59,8 @@ type StoreServiceClient interface { GetMetadata(context.Context, *connect.Request[v1.GetMetadataRequest]) (*connect.Response[v1.GetMetadataResponse], error) // GetGenesisDaHeight returns the DA height at which the first Evolve block was included. GetGenesisDaHeight(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetGenesisDaHeightResponse], error) + // GetP2PStoreInfo returns head/tail information for the go-header stores used by P2P sync. + GetP2PStoreInfo(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetP2PStoreInfoResponse], error) } // NewStoreServiceClient constructs a client for the evnode.v1.StoreService service. By default, it @@ -93,6 +98,12 @@ func NewStoreServiceClient(httpClient connect.HTTPClient, baseURL string, opts . connect.WithSchema(storeServiceMethods.ByName("GetGenesisDaHeight")), connect.WithClientOptions(opts...), ), + getP2PStoreInfo: connect.NewClient[emptypb.Empty, v1.GetP2PStoreInfoResponse]( + httpClient, + baseURL+StoreServiceGetP2PStoreInfoProcedure, + connect.WithSchema(storeServiceMethods.ByName("GetP2PStoreInfo")), + connect.WithClientOptions(opts...), + ), } } @@ -102,6 +113,7 @@ type storeServiceClient struct { getState *connect.Client[emptypb.Empty, v1.GetStateResponse] getMetadata *connect.Client[v1.GetMetadataRequest, v1.GetMetadataResponse] getGenesisDaHeight *connect.Client[emptypb.Empty, v1.GetGenesisDaHeightResponse] + getP2PStoreInfo *connect.Client[emptypb.Empty, v1.GetP2PStoreInfoResponse] } // GetBlock calls evnode.v1.StoreService.GetBlock. @@ -124,6 +136,11 @@ func (c *storeServiceClient) GetGenesisDaHeight(ctx context.Context, req *connec return c.getGenesisDaHeight.CallUnary(ctx, req) } +// GetP2PStoreInfo calls evnode.v1.StoreService.GetP2PStoreInfo. +func (c *storeServiceClient) GetP2PStoreInfo(ctx context.Context, req *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetP2PStoreInfoResponse], error) { + return c.getP2PStoreInfo.CallUnary(ctx, req) +} + // StoreServiceHandler is an implementation of the evnode.v1.StoreService service. type StoreServiceHandler interface { // GetBlock returns a block by height or hash @@ -134,6 +151,8 @@ type StoreServiceHandler interface { GetMetadata(context.Context, *connect.Request[v1.GetMetadataRequest]) (*connect.Response[v1.GetMetadataResponse], error) // GetGenesisDaHeight returns the DA height at which the first Evolve block was included. GetGenesisDaHeight(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetGenesisDaHeightResponse], error) + // GetP2PStoreInfo returns head/tail information for the go-header stores used by P2P sync. + GetP2PStoreInfo(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetP2PStoreInfoResponse], error) } // NewStoreServiceHandler builds an HTTP handler from the service implementation. It returns the @@ -167,6 +186,12 @@ func NewStoreServiceHandler(svc StoreServiceHandler, opts ...connect.HandlerOpti connect.WithSchema(storeServiceMethods.ByName("GetGenesisDaHeight")), connect.WithHandlerOptions(opts...), ) + storeServiceGetP2PStoreInfoHandler := connect.NewUnaryHandler( + StoreServiceGetP2PStoreInfoProcedure, + svc.GetP2PStoreInfo, + connect.WithSchema(storeServiceMethods.ByName("GetP2PStoreInfo")), + connect.WithHandlerOptions(opts...), + ) return "/evnode.v1.StoreService/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case StoreServiceGetBlockProcedure: @@ -177,6 +202,8 @@ func NewStoreServiceHandler(svc StoreServiceHandler, opts ...connect.HandlerOpti storeServiceGetMetadataHandler.ServeHTTP(w, r) case StoreServiceGetGenesisDaHeightProcedure: storeServiceGetGenesisDaHeightHandler.ServeHTTP(w, r) + case StoreServiceGetP2PStoreInfoProcedure: + storeServiceGetP2PStoreInfoHandler.ServeHTTP(w, r) default: http.NotFound(w, r) } @@ -201,3 +228,7 @@ func (UnimplementedStoreServiceHandler) GetMetadata(context.Context, *connect.Re func (UnimplementedStoreServiceHandler) GetGenesisDaHeight(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetGenesisDaHeightResponse], error) { return nil, connect.NewError(connect.CodeUnimplemented, errors.New("evnode.v1.StoreService.GetGenesisDaHeight is not implemented")) } + +func (UnimplementedStoreServiceHandler) GetP2PStoreInfo(context.Context, *connect.Request[emptypb.Empty]) (*connect.Response[v1.GetP2PStoreInfoResponse], error) { + return nil, connect.NewError(connect.CodeUnimplemented, errors.New("evnode.v1.StoreService.GetP2PStoreInfo is not implemented")) +} From d85c1d8d7907b014b68091e058326bdac57b2e5b Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 14 Nov 2025 12:09:52 +0100 Subject: [PATCH 07/23] lint --- pkg/cmd/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cmd/store.go b/pkg/cmd/store.go index 79e1fd3cc1..0f524325f2 100644 --- a/pkg/cmd/store.go +++ b/pkg/cmd/store.go @@ -205,7 +205,7 @@ func populateSnapshot[H goheader.Header[H]]( return fmt.Errorf("failed to read %s tail: %w", snapshot.Label, err) } - snapshot.Empty = !(snapshot.HeadPresent || snapshot.TailPresent) + snapshot.Empty = !snapshot.HeadPresent && !snapshot.TailPresent return nil } From f728e4dfadf0e2d01d802da3707b893e2eb5f06f Mon Sep 17 00:00:00 2001 From: Marko Date: Fri, 14 Nov 2025 12:10:23 +0100 Subject: [PATCH 08/23] refactor: remove trusted hash (#2838) ## Overview This pr removes the trsuted hash approach to sync. this works for celestia node since they do not reconstruct state so they can jump to a height that is closer to the head. With Evolve this assumption is incorrect, we ned to reconstruct state, meaning we either need to sync from genesis or download a db snapshot then start the node from there. --- docs/learn/config.md | 19 -------- docs/learn/specs/header-sync.md | 7 ++- node/full_node_integration_test.go | 74 +----------------------------- pkg/cmd/run_node_test.go | 2 - pkg/config/config.go | 6 --- pkg/config/config_test.go | 4 +- pkg/config/defaults.go | 1 - pkg/sync/sync_service.go | 35 +++++++------- 8 files changed, 24 insertions(+), 124 deletions(-) diff --git a/docs/learn/config.md b/docs/learn/config.md index 9334741dc2..094eeafde6 100644 --- a/docs/learn/config.md +++ b/docs/learn/config.md @@ -17,7 +17,6 @@ This document provides a comprehensive reference for all configuration options a - [Maximum Pending Blocks](#maximum-pending-blocks) - [Lazy Mode (Lazy Aggregator)](#lazy-mode-lazy-aggregator) - [Lazy Block Interval](#lazy-block-interval) - - [Trusted Hash](#trusted-hash) - [Data Availability Configuration (`da`)](#data-availability-configuration-da) - [DA Service Address](#da-service-address) - [DA Authentication Token](#da-authentication-token) @@ -275,24 +274,6 @@ _Example:_ `--rollkit.node.lazy_block_interval 1m` _Default:_ `"30s"` _Constant:_ `FlagLazyBlockTime` -### Trusted Hash - -**Description:** -The initial trusted hash used to bootstrap the header exchange service. This allows nodes to start synchronizing from a specific, trusted point in the chain history instead of from the genesis block. When provided, the node will fetch the corresponding header/block from peers using this hash. If not provided, the node attempts to sync from genesis. - -**YAML:** - -```yaml -node: - trusted_hash: "YOUR_TRUSTED_HASH_HEX_STRING" -``` - -**Command-line Flag:** -`--rollkit.node.trusted_hash ` -_Example:_ `--rollkit.node.trusted_hash ABCDEF012345...` -_Default:_ `""` (empty, sync from genesis) -_Constant:_ `FlagTrustedHash` - ## Data Availability Configuration (`da`) Parameters for connecting and interacting with the Data Availability (DA) layer, which Evolve uses to publish block data. diff --git a/docs/learn/specs/header-sync.md b/docs/learn/specs/header-sync.md index d6a7a2da26..750f325933 100644 --- a/docs/learn/specs/header-sync.md +++ b/docs/learn/specs/header-sync.md @@ -68,10 +68,9 @@ The Executor component (in aggregator nodes) broadcasts headers and data in para - Chain IDs for pubsub topics are also separated: - Headers: `{chainID}-headerSync` creates topic like `/gm-headerSync/header-sub/v0.0.1` - Data: `{chainID}-dataSync` creates topic like `/gm-dataSync/header-sub/v0.0.1` -- Both stores must be initialized with genesis items before starting: - - Header store needs genesis header - - Data store needs genesis data (if applicable) -- Genesis items can be loaded via `NodeConfig.TrustedHash` or P2P network query +- Both stores must contain at least one item before the syncer starts: + - On first boot, the services fetch the configured genesis height from peers + - On restart, each store reuses its latest item to derive the initial height requested from peers - Sync services work only when connected to P2P network via `P2PConfig.Seeds` - Node context is passed to all components for graceful shutdown - Headers and data are linked through DataHash but synced independently diff --git a/node/full_node_integration_test.go b/node/full_node_integration_test.go index a214c13163..c13d6dc0d7 100644 --- a/node/full_node_integration_test.go +++ b/node/full_node_integration_test.go @@ -260,10 +260,9 @@ func TestSingleSequencerTwoFullNodesBlockSyncSpeed(t *testing.T) { // TestDataExchange verifies data exchange and synchronization between nodes in various network topologies. // -// This test runs three sub-tests: +// This test runs two sub-tests: // 1. Single sequencer and single full // 2. Single sequencer and two full nodes. -// 3. Single sequencer and single full node with trusted hash. // // Each sub-test checks data exchange and synchronization to ensure correct data propagation and consistency across nodes. func TestDataExchange(t *testing.T) { @@ -273,17 +272,13 @@ func TestDataExchange(t *testing.T) { t.Run("SingleSequencerTwoFullNodes", func(t *testing.T) { testSingleSequencerTwoFullNodes(t, Data) }) - t.Run("SingleSequencerSingleFullNodeTrustedHash", func(t *testing.T) { - testSingleSequencerSingleFullNodeTrustedHash(t, Data) - }) } // TestHeaderExchange verifies header exchange and synchronization between nodes in various network topologies. // -// This test runs three sub-tests: +// This test runs two sub-tests: // 1. Single sequencer and single full // 2. Single sequencer and two full nodes. -// 3. Single sequencer and single full node with trusted hash. // // Each sub-test checks header exchange and synchronization to ensure correct header propagation and consistency across nodes. func TestHeaderExchange(t *testing.T) { @@ -293,9 +288,6 @@ func TestHeaderExchange(t *testing.T) { t.Run("SingleSequencerTwoFullNodes", func(t *testing.T) { testSingleSequencerTwoFullNodes(t, Header) }) - t.Run("SingleSequencerSingleFullNodeTrustedHash", func(t *testing.T) { - testSingleSequencerSingleFullNodeTrustedHash(t, Header) - }) } // testSingleSequencerSingleFullNode sets up a single sequencer and a single full node, starts the sequencer, waits for it to produce a block, then starts the full @@ -392,68 +384,6 @@ func testSingleSequencerTwoFullNodes(t *testing.T, source Source) { shutdownAndWait(t, cancels, &runningWg, 5*time.Second) } -// testSingleSequencerSingleFullNodeTrustedHash sets up a single sequencer and a single full node with a trusted hash, starts the sequencer, waits for it to produce a block, then starts the full node with the trusted hash. -// It waits for both nodes to reach a target block height (using the provided 'source' to determine block inclusion), verifies that both nodes are fully synced, and then shuts them down. -func testSingleSequencerSingleFullNodeTrustedHash(t *testing.T, source Source) { - require := require.New(t) - - // Set up one sequencer and one full node - config := getTestConfig(t, 1) - numNodes := 2 - nodes, cleanups := createNodesWithCleanup(t, numNodes, config) - for _, cleanup := range cleanups { - defer cleanup() - } - - ctxs, cancels := createNodeContexts(numNodes) - var runningWg sync.WaitGroup - errChan := make(chan error, numNodes) - - // Start the sequencer first - startNodeInBackground(t, nodes, ctxs, &runningWg, 0, errChan) - - // Wait for the sequencer to produce at first block - require.NoError(waitForFirstBlock(nodes[0], source)) - - // Get the hash of the first block (using the correct source) - var trustedHash string - switch source { - case Data: - trustedHashValue, err := nodes[0].dSyncService.Store().GetByHeight(ctxs[0], 1) - require.NoError(err) - trustedHash = trustedHashValue.Hash().String() - case Header: - trustedHashValue, err := nodes[0].hSyncService.Store().GetByHeight(ctxs[0], 1) - require.NoError(err) - trustedHash = trustedHashValue.Hash().String() - default: - t.Fatalf("unsupported source for trusted hash test: %v", source) - } - - // Set the trusted hash in the full node - nodeConfig := nodes[1].nodeConfig - nodeConfig.Node.TrustedHash = trustedHash - - // Add a small delay to ensure P2P services are fully ready - time.Sleep(500 * time.Millisecond) - - // Start the full node - startNodeInBackground(t, nodes, ctxs, &runningWg, 1, errChan) - - blocksToWaitFor := uint64(3) - // Wait for both nodes to reach at least blocksToWaitFor blocks - for _, nodeItem := range nodes { - requireEmptyChan(t, errChan) - require.NoError(waitForAtLeastNBlocks(nodeItem, blocksToWaitFor, source)) - } - - // Verify both nodes are synced using the helper - require.NoError(verifyNodesSynced(nodes[0], nodes[1], source)) - - // Cancel all node contexts to signal shutdown and wait - shutdownAndWait(t, cancels, &runningWg, 5*time.Second) -} - // TestTwoChainsInOneNamespace verifies that two chains in the same namespace can coexist without any issues. func TestTwoChainsInOneNamespace(t *testing.T) { cases := []struct { diff --git a/pkg/cmd/run_node_test.go b/pkg/cmd/run_node_test.go index 2980f94741..16430ee450 100644 --- a/pkg/cmd/run_node_test.go +++ b/pkg/cmd/run_node_test.go @@ -69,7 +69,6 @@ func TestParseFlags(t *testing.T) { "--rollkit.node.lazy_block_interval", "2m", "--rollkit.node.light", "--rollkit.node.max_pending_headers_and_data", "100", - "--rollkit.node.trusted_hash", "abcdef1234567890", "--rollkit.da.submit_options", "custom-options", // Instrumentation flags "--rollkit.instrumentation.prometheus", "true", @@ -125,7 +124,6 @@ func TestParseFlags(t *testing.T) { {"LazyBlockTime", nodeConfig.Node.LazyBlockInterval.Duration, 2 * time.Minute}, {"Light", nodeConfig.Node.Light, true}, {"MaxPendingHeadersAndData", nodeConfig.Node.MaxPendingHeadersAndData, uint64(100)}, - {"TrustedHash", nodeConfig.Node.TrustedHash, "abcdef1234567890"}, {"DASubmitOptions", nodeConfig.DA.SubmitOptions, "custom-options"}, {"Prometheus", nodeConfig.Instrumentation.Prometheus, true}, {"PrometheusListenAddr", nodeConfig.Instrumentation.PrometheusListenAddr, ":26665"}, diff --git a/pkg/config/config.go b/pkg/config/config.go index aad4ce6b93..f6b7dff2ed 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -37,8 +37,6 @@ const ( FlagLight = FlagPrefixEvnode + "node.light" // FlagBlockTime is a flag for specifying the block time FlagBlockTime = FlagPrefixEvnode + "node.block_time" - // FlagTrustedHash is a flag for specifying the trusted hash - FlagTrustedHash = FlagPrefixEvnode + "node.trusted_hash" // FlagLazyAggregator is a flag for enabling lazy aggregation mode that only produces blocks when transactions are available FlagLazyAggregator = FlagPrefixEvnode + "node.lazy_mode" // FlagMaxPendingHeadersAndData is a flag to limit and pause block production when too many headers or data are waiting for DA confirmation @@ -192,9 +190,6 @@ type NodeConfig struct { LazyMode bool `mapstructure:"lazy_mode" yaml:"lazy_mode" comment:"Enables lazy aggregation mode, where blocks are only produced when transactions are available or after LazyBlockTime. Optimizes resources by avoiding empty block creation during periods of inactivity."` LazyBlockInterval DurationWrapper `mapstructure:"lazy_block_interval" yaml:"lazy_block_interval" comment:"Maximum interval between blocks in lazy aggregation mode (LazyAggregator). Ensures blocks are produced periodically even without transactions to keep the chain active. Generally larger than BlockTime."` - // Header configuration - TrustedHash string `mapstructure:"trusted_hash" yaml:"trusted_hash" comment:"Initial trusted hash used to bootstrap the header exchange service. Allows nodes to start synchronizing from a specific trusted point in the chain instead of genesis. When provided, the node will fetch the corresponding header/block from peers using this hash and use it as a starting point for synchronization. If not provided, the node will attempt to fetch the genesis block instead."` - // Readiness / health configuration ReadinessWindowSeconds uint64 `mapstructure:"readiness_window_seconds" yaml:"readiness_window_seconds" comment:"Time window in seconds used to calculate ReadinessMaxBlocksBehind based on block time. Default: 15 seconds."` ReadinessMaxBlocksBehind uint64 `mapstructure:"readiness_max_blocks_behind" yaml:"readiness_max_blocks_behind" comment:"How many blocks behind best-known head the node can be and still be considered ready. 0 means must be exactly at head."` @@ -309,7 +304,6 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Bool(FlagAggregator, def.Node.Aggregator, "run node in aggregator mode") cmd.Flags().Bool(FlagLight, def.Node.Light, "run light client") cmd.Flags().Duration(FlagBlockTime, def.Node.BlockTime.Duration, "block time (for aggregator mode)") - cmd.Flags().String(FlagTrustedHash, def.Node.TrustedHash, "initial trusted hash to start the header exchange service") cmd.Flags().Bool(FlagLazyAggregator, def.Node.LazyMode, "produce blocks only when transactions are available or after lazy block time") cmd.Flags().Uint64(FlagMaxPendingHeadersAndData, def.Node.MaxPendingHeadersAndData, "maximum headers or data pending DA confirmation before pausing block production (0 for no limit)") cmd.Flags().Duration(FlagLazyBlockTime, def.Node.LazyBlockInterval.Duration, "maximum interval between blocks in lazy aggregation mode") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 206d6040bd..7834e42aab 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -32,7 +32,6 @@ func TestDefaultConfig(t *testing.T) { assert.Equal(t, uint64(0), def.Node.MaxPendingHeadersAndData) assert.Equal(t, false, def.Node.LazyMode) assert.Equal(t, 60*time.Second, def.Node.LazyBlockInterval.Duration) - assert.Equal(t, "", def.Node.TrustedHash) assert.Equal(t, "file", def.Signer.SignerType) assert.Equal(t, "config", def.Signer.SignerPath) assert.Equal(t, "127.0.0.1:7331", def.RPC.Address) @@ -56,7 +55,6 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagAggregator, DefaultConfig().Node.Aggregator) assertFlagValue(t, flags, FlagLight, DefaultConfig().Node.Light) assertFlagValue(t, flags, FlagBlockTime, DefaultConfig().Node.BlockTime.Duration) - assertFlagValue(t, flags, FlagTrustedHash, DefaultConfig().Node.TrustedHash) assertFlagValue(t, flags, FlagLazyAggregator, DefaultConfig().Node.LazyMode) assertFlagValue(t, flags, FlagMaxPendingHeadersAndData, DefaultConfig().Node.MaxPendingHeadersAndData) assertFlagValue(t, flags, FlagLazyBlockTime, DefaultConfig().Node.LazyBlockInterval.Duration) @@ -101,7 +99,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRPCAddress, DefaultConfig().RPC.Address) // Count the number of flags we're explicitly checking - expectedFlagCount := 38 // Update this number if you add more flag checks above + expectedFlagCount := 37 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 diff --git a/pkg/config/defaults.go b/pkg/config/defaults.go index 3410547531..6a6f813a3c 100644 --- a/pkg/config/defaults.go +++ b/pkg/config/defaults.go @@ -66,7 +66,6 @@ func DefaultConfig() Config { LazyMode: false, LazyBlockInterval: DurationWrapper{60 * time.Second}, Light: false, - TrustedHash: "", ReadinessWindowSeconds: defaultReadinessWindowSeconds, ReadinessMaxBlocksBehind: calculateReadinessMaxBlocksBehind(defaultBlockTime.Duration, defaultReadinessWindowSeconds), }, diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index c72883769b..7160064832 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -2,7 +2,6 @@ package sync import ( "context" - "encoding/hex" "errors" "fmt" "strings" @@ -309,8 +308,9 @@ func (syncService *SyncService[H]) startSubscriber(ctx context.Context) error { } // initFromP2PWithRetry initializes the syncer from P2P with a retry mechanism. -// If trusted hash is available, it fetches the trusted header/block (by hash) from peers. -// Otherwise, it tries to fetch the genesis header/block by height. +// It inspects the local store to determine the first height to request: +// - when the store already contains items, it reuses the latest height as the starting point; +// - otherwise, it falls back to the configured genesis height. func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, peerIDs []peer.ID) error { if len(peerIDs) == 0 { return nil @@ -318,22 +318,23 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee tryInit := func(ctx context.Context) (bool, error) { var ( - trusted H - err error + trusted H + err error + heightToQuery uint64 ) - if syncService.conf.Node.TrustedHash != "" { - trustedHashBytes, err := hex.DecodeString(syncService.conf.Node.TrustedHash) - if err != nil { - return false, fmt.Errorf("failed to parse the trusted hash for initializing the store: %w", err) - } - if trusted, err = syncService.ex.Get(ctx, trustedHashBytes); err != nil { - return false, fmt.Errorf("failed to fetch the trusted header/block for initializing the store: %w", err) - } - } else { - if trusted, err = syncService.ex.GetByHeight(ctx, syncService.genesis.InitialHeight); err != nil { - return false, fmt.Errorf("failed to fetch the genesis: %w", err) - } + head, headErr := syncService.store.Head(ctx) + switch { + case errors.Is(headErr, header.ErrNotFound), errors.Is(headErr, header.ErrEmptyStore): + heightToQuery = syncService.genesis.InitialHeight + case headErr != nil: + return false, fmt.Errorf("failed to inspect local store head: %w", headErr) + default: + heightToQuery = head.Height() + } + + if trusted, err = syncService.ex.GetByHeight(ctx, heightToQuery); err != nil { + return false, fmt.Errorf("failed to fetch height %d from peers: %w", heightToQuery, err) } if _, err := syncService.initStore(ctx, trusted); err != nil { From 453fb82105bcf6e6a06a3502ab55ea1d130c4cc1 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 14 Nov 2025 14:46:15 +0100 Subject: [PATCH 09/23] increase timeout --- block/internal/syncing/da_retriever.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/block/internal/syncing/da_retriever.go b/block/internal/syncing/da_retriever.go index de67e1fd1c..52dba78040 100644 --- a/block/internal/syncing/da_retriever.go +++ b/block/internal/syncing/da_retriever.go @@ -20,7 +20,7 @@ import ( ) // defaultDATimeout is the default timeout for DA retrieval operations -const defaultDATimeout = 10 * time.Second +const defaultDATimeout = 30 * time.Second // DARetriever handles DA retrieval operations for syncing type DARetriever struct { From f26536bb1275624dba96fc3b06d64c02adf8b1df Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 14 Nov 2025 15:40:35 +0100 Subject: [PATCH 10/23] fix timeouts --- block/internal/syncing/da_retriever_test.go | 6 +++--- block/internal/syncing/syncer.go | 8 +++++--- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/block/internal/syncing/da_retriever_test.go b/block/internal/syncing/da_retriever_test.go index c6e8daa78f..c398633d4b 100644 --- a/block/internal/syncing/da_retriever_test.go +++ b/block/internal/syncing/da_retriever_test.go @@ -128,9 +128,9 @@ func TestDARetriever_RetrieveFromDA_Timeout(t *testing.T) { assert.Contains(t, err.Error(), "context deadline exceeded") assert.Len(t, events, 0) - // Verify timeout occurred approximately at expected time (with some tolerance) - assert.Greater(t, duration, 9*time.Second, "should timeout after approximately 10 seconds") - assert.Less(t, duration, 12*time.Second, "should not take much longer than timeout") + // Verify timeout occurred approximately at the helper timeout (with some tolerance) + assert.Greater(t, duration, defaultDATimeout-2*time.Second, "should timeout close to the helper timeout") + assert.Less(t, duration, defaultDATimeout+time.Second, "should not take much longer than timeout") } func TestDARetriever_RetrieveFromDA_TimeoutFast(t *testing.T) { diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index d34dceca51..52f27960e5 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -299,11 +299,13 @@ func (s *Syncer) fetchDAUntilCaughtUp() error { daHeight := s.GetDAHeight() - // Create a new context with a timeout for the DA call - ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second) - defer cancel() + // Create a cancellable context for the DA call that inherits the syncer's lifecycle. + // We intentionally avoid adding another timeout layer on top of the per-request timeout + // inside the DA retriever so large blob batches can finish downloading. + ctx, cancel := context.WithCancel(s.ctx) events, err := s.daRetriever.RetrieveFromDA(ctx, daHeight) + cancel() if err != nil { switch { case errors.Is(err, coreda.ErrBlobNotFound): From d8e9fa156ca2fbe32c370067cb5a36963ffcf4e2 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Fri, 14 Nov 2025 16:20:09 +0100 Subject: [PATCH 11/23] remove extra context --- block/internal/syncing/syncer.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 52f27960e5..d91550cb77 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -299,13 +299,7 @@ func (s *Syncer) fetchDAUntilCaughtUp() error { daHeight := s.GetDAHeight() - // Create a cancellable context for the DA call that inherits the syncer's lifecycle. - // We intentionally avoid adding another timeout layer on top of the per-request timeout - // inside the DA retriever so large blob batches can finish downloading. - ctx, cancel := context.WithCancel(s.ctx) - - events, err := s.daRetriever.RetrieveFromDA(ctx, daHeight) - cancel() + events, err := s.daRetriever.RetrieveFromDA(s.ctx, daHeight) if err != nil { switch { case errors.Is(err, coreda.ErrBlobNotFound): From 5e0396a2e059bd3099f75b09e5184af7c7d0e082 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Sat, 15 Nov 2025 12:51:10 +0100 Subject: [PATCH 12/23] allow execution to be ahead as we will check apphash if anything is truly wrong --- block/internal/common/replay.go | 12 +++++++----- block/internal/common/replay_test.go | 8 +++++--- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/block/internal/common/replay.go b/block/internal/common/replay.go index 4c4a4b26de..9c95bc3f36 100644 --- a/block/internal/common/replay.go +++ b/block/internal/common/replay.go @@ -42,7 +42,7 @@ func NewReplayer( // This is useful for crash recovery scenarios where ev-node is ahead of the execution layer. // // Returns: -// - error if sync fails or if execution layer is ahead of ev-node (unexpected state) +// - error if sync fails func (s *Replayer) SyncToHeight(ctx context.Context, targetHeight uint64) error { // Check if the executor implements HeightProvider execHeightProvider, ok := s.exec.(coreexecutor.HeightProvider) @@ -67,13 +67,15 @@ func (s *Replayer) SyncToHeight(ctx context.Context, targetHeight uint64) error Uint64("exec_layer_height", execHeight). Msg("execution layer height check") - // If execution layer is ahead, this is unexpected, fail hard + // If execution layer is ahead, skip syncing and continue. This can happen if execution + // progressed independently (e.g. after manual intervention). We log it for visibility but + // do not treat it as fatal. if execHeight > targetHeight { - s.logger.Error(). + s.logger.Warn(). Uint64("target_height", targetHeight). Uint64("exec_layer_height", execHeight). - Msg("execution layer is ahead of target height - this should not happen") - return fmt.Errorf("execution layer height (%d) is ahead of target height (%d)", execHeight, targetHeight) + Msg("execution layer is ahead of target height - skipping replay") + return nil } // If execution layer is behind, sync the missing blocks diff --git a/block/internal/common/replay_test.go b/block/internal/common/replay_test.go index cbbfac8ea8..6bc344d945 100644 --- a/block/internal/common/replay_test.go +++ b/block/internal/common/replay_test.go @@ -141,10 +141,12 @@ func TestReplayer_SyncToHeight_ExecutorAhead(t *testing.T) { mockExec.On("GetLatestHeight", mock.Anything).Return(execHeight, nil) - // Execute sync - should fail + // Execute sync - should just log and continue without error err := syncer.SyncToHeight(ctx, targetHeight) - require.Error(t, err) - require.Contains(t, err.Error(), "execution layer height (101) is ahead of target height (100)") + require.NoError(t, err) + + // No replay should be attempted + mockExec.AssertNotCalled(t, "ExecuteTxs", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything) } func TestReplayer_SyncToHeight_NoHeightProvider(t *testing.T) { From a15b177fec5aaadd0e58ef7bc616db304872d31e Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Mon, 17 Nov 2025 11:03:21 +0100 Subject: [PATCH 13/23] implement atomic store initialization in SyncService --- pkg/sync/sync_service.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index 7160064832..801e31b967 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "strings" + "sync/atomic" "time" "github.com/celestiaorg/go-header" @@ -54,6 +55,7 @@ type SyncService[H header.Header[H]] struct { syncer *goheadersync.Syncer[H] syncerStatus *SyncerStatus topicSubscription header.Subscription[H] + storeInitialized atomic.Bool } // DataSyncService is the P2P Sync Service for blocks. @@ -134,9 +136,14 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, return fmt.Errorf("empty header/data cannot write to store or broadcast") } - storeInitialized, err := syncService.initStore(ctx, headerOrData) - if err != nil { - return fmt.Errorf("failed to initialize the store: %w", err) + storeInitialized := false + if !syncService.storeInitialized.Load() { + var err error + storeInitialized, err = syncService.initStore(ctx, headerOrData) + if err != nil { + return fmt.Errorf("failed to initialize the store: %w", err) + } + syncService.storeInitialized.Store(true) } firstStart := false @@ -340,6 +347,7 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee if _, err := syncService.initStore(ctx, trusted); err != nil { return false, fmt.Errorf("failed to initialize the store: %w", err) } + syncService.storeInitialized.Store(true) if err := syncService.startSyncer(ctx); err != nil { return false, err } From 1f840cce0ee454e64b467c78101a676f53a9d05d Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 18 Nov 2025 11:34:38 +0100 Subject: [PATCH 14/23] fix: reduce default DA timeout and clean up unused fields in P2P store snapshots --- block/internal/syncing/da_retriever.go | 2 +- docs/guides/full-node.md | 2 +- pkg/rpc/client/client_test.go | 2 -- pkg/rpc/server/server.go | 2 -- pkg/rpc/server/server_test.go | 2 -- pkg/sync/sync_service.go | 6 ------ proto/evnode/v1/state_rpc.proto | 18 ++++++++---------- types/pb/evnode/v1/state_rpc.pb.go | 24 +++--------------------- 8 files changed, 13 insertions(+), 45 deletions(-) diff --git a/block/internal/syncing/da_retriever.go b/block/internal/syncing/da_retriever.go index 52dba78040..de67e1fd1c 100644 --- a/block/internal/syncing/da_retriever.go +++ b/block/internal/syncing/da_retriever.go @@ -20,7 +20,7 @@ import ( ) // defaultDATimeout is the default timeout for DA retrieval operations -const defaultDATimeout = 30 * time.Second +const defaultDATimeout = 10 * time.Second // DARetriever handles DA retrieval operations for syncing type DARetriever struct { diff --git a/docs/guides/full-node.md b/docs/guides/full-node.md index eb123c227f..0022f65082 100644 --- a/docs/guides/full-node.md +++ b/docs/guides/full-node.md @@ -4,7 +4,7 @@ This guide covers how to set up a full node to run alongside a sequencer node in a Evolve-based blockchain network. A full node maintains a complete copy of the blockchain and helps validate transactions, improving the network's decentralization and security. -> **Note: The guide on how to run an evolve EVM full node can be found [here](./evm/single#setting-up-a-full-node).** +> **Note: The guide on how to run an evolve EVM full node can be found [in the evm section](./evm/single#setting-up-a-full-node).** ## Prerequisites diff --git a/pkg/rpc/client/client_test.go b/pkg/rpc/client/client_test.go index 3429daaf72..4b2b82e1b3 100644 --- a/pkg/rpc/client/client_test.go +++ b/pkg/rpc/client/client_test.go @@ -130,9 +130,7 @@ func TestClientGetP2PStoreInfo(t *testing.T) { require.Len(t, stores, 2) require.Equal(t, "Header Store", stores[0].Label) - require.True(t, stores[0].HeadPresent) require.Equal(t, uint64(10), stores[0].Head.Height) - require.True(t, stores[0].TailPresent) require.Equal(t, uint64(5), stores[0].Tail.Height) require.Equal(t, "Data Store", stores[1].Label) diff --git a/pkg/rpc/server/server.go b/pkg/rpc/server/server.go index e397aa8125..8af957ed9d 100644 --- a/pkg/rpc/server/server.go +++ b/pkg/rpc/server/server.go @@ -236,14 +236,12 @@ func collectP2PStoreSnapshot[H goheader.Header[H]]( } if head, err := store.Head(ctx); err == nil { - snapshot.HeadPresent = true snapshot.Head = toP2PStoreEntry(head) } else if !errors.Is(err, goheader.ErrEmptyStore) && !errors.Is(err, goheader.ErrNotFound) { return nil, fmt.Errorf("failed to read %s head: %w", label, err) } if tail, err := store.Tail(ctx); err == nil { - snapshot.TailPresent = true snapshot.Tail = toP2PStoreEntry(tail) } else if !errors.Is(err, goheader.ErrEmptyStore) && !errors.Is(err, goheader.ErrNotFound) { return nil, fmt.Errorf("failed to read %s tail: %w", label, err) diff --git a/pkg/rpc/server/server_test.go b/pkg/rpc/server/server_test.go index 98eecc006a..2ac3e004e7 100644 --- a/pkg/rpc/server/server_test.go +++ b/pkg/rpc/server/server_test.go @@ -345,8 +345,6 @@ func TestGetP2PStoreInfo(t *testing.T) { require.Equal(t, "Header Store", resp.Msg.Stores[0].Label) require.Equal(t, uint64(12), resp.Msg.Stores[0].Height) - require.True(t, resp.Msg.Stores[0].HeadPresent) - require.True(t, resp.Msg.Stores[0].TailPresent) require.Equal(t, "Data Store", resp.Msg.Stores[1].Label) require.Equal(t, uint64(9), resp.Msg.Stores[1].Height) diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index 801e31b967..f323a12188 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -154,11 +154,6 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, } } - if err := headerOrData.Validate(); err != nil { - syncService.logger.Error().Err(err).Msg("failed to validate header") - panic(err) - } - // Broadcast for subscribers if err := syncService.sub.Broadcast(ctx, headerOrData, opts...); err != nil { // for the first block when starting the app, broadcast error is expected @@ -174,7 +169,6 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, } syncService.logger.Error().Err(err).Msg("failed to broadcast") - panic(err) } return nil diff --git a/proto/evnode/v1/state_rpc.proto b/proto/evnode/v1/state_rpc.proto index e0997b3d9d..3c8e4bb403 100644 --- a/proto/evnode/v1/state_rpc.proto +++ b/proto/evnode/v1/state_rpc.proto @@ -65,24 +65,22 @@ message GetMetadataResponse { // GetGenesisDaHeightResponse defines the DA height at which the first Evolve block was included. message GetGenesisDaHeightResponse { - uint64 height = 3; + uint64 height = 3; } // P2PStoreEntry captures a single head or tail record from a go-header store. message P2PStoreEntry { - uint64 height = 1; - bytes hash = 2; - google.protobuf.Timestamp time = 3; + uint64 height = 1; + bytes hash = 2; + google.protobuf.Timestamp time = 3; } // P2PStoreSnapshot provides head/tail status for a go-header store. message P2PStoreSnapshot { - string label = 1; - uint64 height = 2; - bool head_present = 3; - P2PStoreEntry head = 4; - bool tail_present = 5; - P2PStoreEntry tail = 6; + string label = 1; + uint64 height = 2; + P2PStoreEntry head = 4; + P2PStoreEntry tail = 6; } // GetP2PStoreInfoResponse holds the snapshots for configured go-header stores. diff --git a/types/pb/evnode/v1/state_rpc.pb.go b/types/pb/evnode/v1/state_rpc.pb.go index a36fe4cd5a..6aa4c000c8 100644 --- a/types/pb/evnode/v1/state_rpc.pb.go +++ b/types/pb/evnode/v1/state_rpc.pb.go @@ -468,9 +468,7 @@ type P2PStoreSnapshot struct { state protoimpl.MessageState `protogen:"open.v1"` Label string `protobuf:"bytes,1,opt,name=label,proto3" json:"label,omitempty"` Height uint64 `protobuf:"varint,2,opt,name=height,proto3" json:"height,omitempty"` - HeadPresent bool `protobuf:"varint,3,opt,name=head_present,json=headPresent,proto3" json:"head_present,omitempty"` Head *P2PStoreEntry `protobuf:"bytes,4,opt,name=head,proto3" json:"head,omitempty"` - TailPresent bool `protobuf:"varint,5,opt,name=tail_present,json=tailPresent,proto3" json:"tail_present,omitempty"` Tail *P2PStoreEntry `protobuf:"bytes,6,opt,name=tail,proto3" json:"tail,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -520,13 +518,6 @@ func (x *P2PStoreSnapshot) GetHeight() uint64 { return 0 } -func (x *P2PStoreSnapshot) GetHeadPresent() bool { - if x != nil { - return x.HeadPresent - } - return false -} - func (x *P2PStoreSnapshot) GetHead() *P2PStoreEntry { if x != nil { return x.Head @@ -534,13 +525,6 @@ func (x *P2PStoreSnapshot) GetHead() *P2PStoreEntry { return nil } -func (x *P2PStoreSnapshot) GetTailPresent() bool { - if x != nil { - return x.TailPresent - } - return false -} - func (x *P2PStoreSnapshot) GetTail() *P2PStoreEntry { if x != nil { return x.Tail @@ -621,13 +605,11 @@ const file_evnode_v1_state_rpc_proto_rawDesc = "" + "\rP2PStoreEntry\x12\x16\n" + "\x06height\x18\x01 \x01(\x04R\x06height\x12\x12\n" + "\x04hash\x18\x02 \x01(\fR\x04hash\x12.\n" + - "\x04time\x18\x03 \x01(\v2\x1a.google.protobuf.TimestampR\x04time\"\xe2\x01\n" + + "\x04time\x18\x03 \x01(\v2\x1a.google.protobuf.TimestampR\x04time\"\x9c\x01\n" + "\x10P2PStoreSnapshot\x12\x14\n" + "\x05label\x18\x01 \x01(\tR\x05label\x12\x16\n" + - "\x06height\x18\x02 \x01(\x04R\x06height\x12!\n" + - "\fhead_present\x18\x03 \x01(\bR\vheadPresent\x12,\n" + - "\x04head\x18\x04 \x01(\v2\x18.evnode.v1.P2PStoreEntryR\x04head\x12!\n" + - "\ftail_present\x18\x05 \x01(\bR\vtailPresent\x12,\n" + + "\x06height\x18\x02 \x01(\x04R\x06height\x12,\n" + + "\x04head\x18\x04 \x01(\v2\x18.evnode.v1.P2PStoreEntryR\x04head\x12,\n" + "\x04tail\x18\x06 \x01(\v2\x18.evnode.v1.P2PStoreEntryR\x04tail\"N\n" + "\x17GetP2PStoreInfoResponse\x123\n" + "\x06stores\x18\x01 \x03(\v2\x1b.evnode.v1.P2PStoreSnapshotR\x06stores2\x90\x03\n" + From 1b424f51d1b8d6b8143f1ec049098adf8a4bc6c4 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 18 Nov 2025 11:44:46 +0100 Subject: [PATCH 15/23] chore: update changelog --- CHANGELOG.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 30b04aea76..a434c17b75 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,15 +12,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Enhanced health check system with separate liveness (`/health/live`) and readiness (`/health/ready`) HTTP endpoints. Readiness endpoint includes P2P listening check and aggregator block production rate validation (5x block time threshold). ([#2800](https://github.com/evstack/ev-node/pull/2800)) +- Added `store-info` CLI command to inspect go-header P2P stores and display head/tail information for both header and data stores ([#2835](https://github.com/evstack/ev-node/pull/2835)) +- Added `GetP2PStoreInfo` RPC method to retrieve head/tail metadata for go-header stores used by P2P sync ([#2835](https://github.com/evstack/ev-node/pull/2835)) +- Added protobuf definitions for `P2PStoreEntry` and `P2PStoreSnapshot` messages to support P2P store inspection ### Changed - Remove GasPrice and GasMultiplier from DA interface and configuration to use celestia-node's native fee estimation. ([#2822](https://github.com/evstack/ev-node/pull/2822)) - Use cache instead of in memory store for reaper. Persist cache on reload. Autoclean after 24 hours. ([#2811](https://github.com/evstack/ev-node/pull/2811)) +- Improved P2P sync service store initialization to be atomic and prevent race conditions ([#2838](https://github.com/evstack/ev-node/pull/2838)) +- Enhanced P2P bootstrap behavior to intelligently detect starting height from local store instead of requiring trusted hash +- Relaxed execution layer height validation in block replay to allow execution to be ahead of target height, enabling recovery from manual intervention scenarios ### Removed - **BREAKING:** Removed `evnode.v1.HealthService` gRPC endpoint. Use HTTP endpoints: `GET /health/live` and `GET /health/ready`. ([#2800](https://github.com/evstack/ev-node/pull/2800)) +- **BREAKING:** Removed `TrustedHash` configuration option and `--evnode.node.trusted_hash` flag. Sync service now automatically determines starting height from local store state ([#2838](https://github.com/evstack/ev-node/pull/2838)) + +### Fixed + +- Fixed sync service initialization issue when node is not on genesis but has an empty store ## v1.0.0-beta.9 From 312c563b67296b2e1799c49d89dd5a1311177866 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 18 Nov 2025 12:23:47 +0100 Subject: [PATCH 16/23] fix: prevent race condition during store initialization in WriteToStoreAndBroadcast --- pkg/sync/sync_service.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index f323a12188..a4f7abd016 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -137,13 +137,13 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, } storeInitialized := false - if !syncService.storeInitialized.Load() { + if syncService.storeInitialized.CompareAndSwap(false, true) { var err error storeInitialized, err = syncService.initStore(ctx, headerOrData) if err != nil { + syncService.storeInitialized.Store(false) return fmt.Errorf("failed to initialize the store: %w", err) } - syncService.storeInitialized.Store(true) } firstStart := false @@ -338,10 +338,15 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee return false, fmt.Errorf("failed to fetch height %d from peers: %w", heightToQuery, err) } - if _, err := syncService.initStore(ctx, trusted); err != nil { - return false, fmt.Errorf("failed to initialize the store: %w", err) + // Use CompareAndSwap to atomically check and set initialization flag + // This prevents race condition where concurrent calls could both initialize the store + if syncService.storeInitialized.CompareAndSwap(false, true) { + if _, err := syncService.initStore(ctx, trusted); err != nil { + // Revert the flag on error so initialization can be retried + syncService.storeInitialized.Store(false) + return false, fmt.Errorf("failed to initialize the store: %w", err) + } } - syncService.storeInitialized.Store(true) if err := syncService.startSyncer(ctx); err != nil { return false, err } From 4a66f9ee56468e7128b3c2414d5f54322937e5dd Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Tue, 18 Nov 2025 18:06:57 +0100 Subject: [PATCH 17/23] remove redundant race condition handling in initFromP2PWithRetry --- pkg/sync/sync_service.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index a4f7abd016..6a17a42a85 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -338,11 +338,8 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee return false, fmt.Errorf("failed to fetch height %d from peers: %w", heightToQuery, err) } - // Use CompareAndSwap to atomically check and set initialization flag - // This prevents race condition where concurrent calls could both initialize the store if syncService.storeInitialized.CompareAndSwap(false, true) { if _, err := syncService.initStore(ctx, trusted); err != nil { - // Revert the flag on error so initialization can be retried syncService.storeInitialized.Store(false) return false, fmt.Errorf("failed to initialize the store: %w", err) } From 6c0654e9ed9daa7b8a2fb25c71934f4c2a518e04 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 19 Nov 2025 14:09:06 +0100 Subject: [PATCH 18/23] add read-only mode for key-value store and update store-info command --- pkg/cmd/store.go | 6 ++++-- pkg/store/kv.go | 16 +++++++++++++++- pkg/store/kv_test.go | 36 ++++++++++++++++++++++++++++++++++++ pkg/sync/sync_service.go | 1 + 4 files changed, 56 insertions(+), 3 deletions(-) create mode 100644 pkg/store/kv_test.go diff --git a/pkg/cmd/store.go b/pkg/cmd/store.go index 0f524325f2..f1cff048a4 100644 --- a/pkg/cmd/store.go +++ b/pkg/cmd/store.go @@ -71,7 +71,9 @@ var StoreP2PInspectCmd = &cobra.Command{ Use: "store-info", Short: "Inspect the go-header (P2P) stores and display their tail/head entries", Long: `Opens the datastore used by the node's go-header services and reports -the current height, head, and tail information for both the header and data stores.`, +the current height, head, and tail information for both the header and data stores. +The datastore is opened in read-only mode so that it can run against mounted snapshots +or other read-only environments without requiring write access.`, RunE: func(cmd *cobra.Command, args []string) error { nodeConfig, err := ParseConfig(cmd) if err != nil { @@ -85,7 +87,7 @@ the current height, head, and tail information for both the header and data stor dbName := resolveDBName(cmd) - rawStore, err := store.NewDefaultKVStore(nodeConfig.RootDir, nodeConfig.DBPath, dbName) + rawStore, err := store.NewDefaultReadOnlyKVStore(nodeConfig.RootDir, nodeConfig.DBPath, dbName) if err != nil { return fmt.Errorf("failed to open datastore: %w", err) } diff --git a/pkg/store/kv.go b/pkg/store/kv.go index e270962133..59e35df565 100644 --- a/pkg/store/kv.go +++ b/pkg/store/kv.go @@ -13,8 +13,22 @@ import ( // NewDefaultKVStore creates instance of default key-value store. func NewDefaultKVStore(rootDir, dbPath, dbName string) (ds.Batching, error) { + return newDefaultKVStore(rootDir, dbPath, dbName, nil) +} + +// NewDefaultReadOnlyKVStore creates a key-value store opened in badger's read-only mode. +// +// This is useful for tools that only inspect state (such as store-info) where the +// underlying data directory might be mounted read-only. +func NewDefaultReadOnlyKVStore(rootDir, dbPath, dbName string) (ds.Batching, error) { + opts := badger4.DefaultOptions + opts.Options = opts.Options.WithReadOnly(true) + return newDefaultKVStore(rootDir, dbPath, dbName, &opts) +} + +func newDefaultKVStore(rootDir, dbPath, dbName string, options *badger4.Options) (ds.Batching, error) { path := filepath.Join(rootify(rootDir, dbPath), dbName) - return badger4.NewDatastore(path, nil) + return badger4.NewDatastore(path, options) } // PrefixEntries retrieves all entries in the datastore whose keys have the supplied prefix diff --git a/pkg/store/kv_test.go b/pkg/store/kv_test.go new file mode 100644 index 0000000000..8b8ad1d0b0 --- /dev/null +++ b/pkg/store/kv_test.go @@ -0,0 +1,36 @@ +package store + +import ( + "context" + "testing" + + ds "github.com/ipfs/go-datastore" + "github.com/stretchr/testify/require" +) + +func TestNewDefaultReadOnlyKVStore(t *testing.T) { + t.Parallel() + + ctx := context.Background() + rootDir := t.TempDir() + const dbPath = "db" + const dbName = "test" + + writable, err := NewDefaultKVStore(rootDir, dbPath, dbName) + require.NoError(t, err) + require.NoError(t, writable.Put(ctx, ds.NewKey("/foo"), []byte("bar"))) + require.NoError(t, writable.Close()) + + readOnly, err := NewDefaultReadOnlyKVStore(rootDir, dbPath, dbName) + require.NoError(t, err) + t.Cleanup(func() { + _ = readOnly.Close() + }) + + val, err := readOnly.Get(ctx, ds.NewKey("/foo")) + require.NoError(t, err) + require.Equal(t, []byte("bar"), val) + + err = readOnly.Put(ctx, ds.NewKey("/foo"), []byte("baz")) + require.Error(t, err, "writing to a read-only store should fail") +} diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index 6a17a42a85..6b052845a8 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -169,6 +169,7 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, } syncService.logger.Error().Err(err).Msg("failed to broadcast") + panic(err) } return nil From 6787bedbe3157ace6e704e52fc7bfb1ad30f7263 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 19 Nov 2025 16:01:33 +0100 Subject: [PATCH 19/23] remove extra readonly opener --- pkg/cmd/store.go | 6 ++---- pkg/store/kv.go | 16 +--------------- pkg/store/kv_test.go | 36 ------------------------------------ 3 files changed, 3 insertions(+), 55 deletions(-) delete mode 100644 pkg/store/kv_test.go diff --git a/pkg/cmd/store.go b/pkg/cmd/store.go index f1cff048a4..0f524325f2 100644 --- a/pkg/cmd/store.go +++ b/pkg/cmd/store.go @@ -71,9 +71,7 @@ var StoreP2PInspectCmd = &cobra.Command{ Use: "store-info", Short: "Inspect the go-header (P2P) stores and display their tail/head entries", Long: `Opens the datastore used by the node's go-header services and reports -the current height, head, and tail information for both the header and data stores. -The datastore is opened in read-only mode so that it can run against mounted snapshots -or other read-only environments without requiring write access.`, +the current height, head, and tail information for both the header and data stores.`, RunE: func(cmd *cobra.Command, args []string) error { nodeConfig, err := ParseConfig(cmd) if err != nil { @@ -87,7 +85,7 @@ or other read-only environments without requiring write access.`, dbName := resolveDBName(cmd) - rawStore, err := store.NewDefaultReadOnlyKVStore(nodeConfig.RootDir, nodeConfig.DBPath, dbName) + rawStore, err := store.NewDefaultKVStore(nodeConfig.RootDir, nodeConfig.DBPath, dbName) if err != nil { return fmt.Errorf("failed to open datastore: %w", err) } diff --git a/pkg/store/kv.go b/pkg/store/kv.go index 59e35df565..e270962133 100644 --- a/pkg/store/kv.go +++ b/pkg/store/kv.go @@ -13,22 +13,8 @@ import ( // NewDefaultKVStore creates instance of default key-value store. func NewDefaultKVStore(rootDir, dbPath, dbName string) (ds.Batching, error) { - return newDefaultKVStore(rootDir, dbPath, dbName, nil) -} - -// NewDefaultReadOnlyKVStore creates a key-value store opened in badger's read-only mode. -// -// This is useful for tools that only inspect state (such as store-info) where the -// underlying data directory might be mounted read-only. -func NewDefaultReadOnlyKVStore(rootDir, dbPath, dbName string) (ds.Batching, error) { - opts := badger4.DefaultOptions - opts.Options = opts.Options.WithReadOnly(true) - return newDefaultKVStore(rootDir, dbPath, dbName, &opts) -} - -func newDefaultKVStore(rootDir, dbPath, dbName string, options *badger4.Options) (ds.Batching, error) { path := filepath.Join(rootify(rootDir, dbPath), dbName) - return badger4.NewDatastore(path, options) + return badger4.NewDatastore(path, nil) } // PrefixEntries retrieves all entries in the datastore whose keys have the supplied prefix diff --git a/pkg/store/kv_test.go b/pkg/store/kv_test.go deleted file mode 100644 index 8b8ad1d0b0..0000000000 --- a/pkg/store/kv_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package store - -import ( - "context" - "testing" - - ds "github.com/ipfs/go-datastore" - "github.com/stretchr/testify/require" -) - -func TestNewDefaultReadOnlyKVStore(t *testing.T) { - t.Parallel() - - ctx := context.Background() - rootDir := t.TempDir() - const dbPath = "db" - const dbName = "test" - - writable, err := NewDefaultKVStore(rootDir, dbPath, dbName) - require.NoError(t, err) - require.NoError(t, writable.Put(ctx, ds.NewKey("/foo"), []byte("bar"))) - require.NoError(t, writable.Close()) - - readOnly, err := NewDefaultReadOnlyKVStore(rootDir, dbPath, dbName) - require.NoError(t, err) - t.Cleanup(func() { - _ = readOnly.Close() - }) - - val, err := readOnly.Get(ctx, ds.NewKey("/foo")) - require.NoError(t, err) - require.Equal(t, []byte("bar"), val) - - err = readOnly.Put(ctx, ds.NewKey("/foo"), []byte("baz")) - require.Error(t, err, "writing to a read-only store should fail") -} From b6284591053525d9546d6ce0700e4ed8e9b1564b Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 19 Nov 2025 16:37:34 +0100 Subject: [PATCH 20/23] check the store and other things --- pkg/sync/sync_service.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index 6b052845a8..92216da717 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -154,6 +154,18 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, } } + hh, err := syncService.store.Head(ctx) + if err != nil { + return fmt.Errorf("failed to get head from store: %w", err) + } + syncService.logger.Info().Uint64("height", hh.Height()). + Uint64("header_or_data", headerOrData.Height()). + Msg("writing to store and broadcasting") + + if err := hh.Verify(headerOrData); err != nil { + panic(fmt.Errorf("header verification failed: %w", err)) + } + // Broadcast for subscribers if err := syncService.sub.Broadcast(ctx, headerOrData, opts...); err != nil { // for the first block when starting the app, broadcast error is expected From 483456456dddbd680abfc49c82f16b104c0bbfe1 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Wed, 19 Nov 2025 20:14:18 +0100 Subject: [PATCH 21/23] remove panic and store cmd --- apps/evm/single/main.go | 1 - apps/grpc/single/main.go | 1 - apps/testapp/main.go | 1 - pkg/cmd/store.go | 179 --------------------------------------- pkg/cmd/store_test.go | 96 --------------------- pkg/sync/sync_service.go | 13 --- 6 files changed, 291 deletions(-) diff --git a/apps/evm/single/main.go b/apps/evm/single/main.go index 749cf9f3a0..a562ceff7c 100644 --- a/apps/evm/single/main.go +++ b/apps/evm/single/main.go @@ -31,7 +31,6 @@ func main() { rollcmd.VersionCmd, rollcmd.NetInfoCmd, rollcmd.StoreUnsafeCleanCmd, - rollcmd.StoreP2PInspectCmd, rollcmd.KeysCmd(), ) diff --git a/apps/grpc/single/main.go b/apps/grpc/single/main.go index 08f0145047..a0d7f934d0 100644 --- a/apps/grpc/single/main.go +++ b/apps/grpc/single/main.go @@ -30,7 +30,6 @@ the Evolve execution gRPC interface.`, evcmd.VersionCmd, evcmd.NetInfoCmd, evcmd.StoreUnsafeCleanCmd, - evcmd.StoreP2PInspectCmd, evcmd.KeysCmd(), ) diff --git a/apps/testapp/main.go b/apps/testapp/main.go index 5c711479f7..cd8f019709 100644 --- a/apps/testapp/main.go +++ b/apps/testapp/main.go @@ -19,7 +19,6 @@ func main() { rollcmd.VersionCmd, rollcmd.NetInfoCmd, rollcmd.StoreUnsafeCleanCmd, - rollcmd.StoreP2PInspectCmd, rollcmd.KeysCmd(), cmds.NewRollbackCmd(), initCmd, diff --git a/pkg/cmd/store.go b/pkg/cmd/store.go index 0f524325f2..99ed8579b3 100644 --- a/pkg/cmd/store.go +++ b/pkg/cmd/store.go @@ -1,23 +1,13 @@ package cmd import ( - "context" - "errors" "fmt" "os" "path/filepath" - "time" - goheader "github.com/celestiaorg/go-header" - goheaderstore "github.com/celestiaorg/go-header/store" - ds "github.com/ipfs/go-datastore" - kt "github.com/ipfs/go-datastore/keytransform" "github.com/spf13/cobra" - "github.com/evstack/ev-node/node" "github.com/evstack/ev-node/pkg/config" - "github.com/evstack/ev-node/pkg/store" - "github.com/evstack/ev-node/types" ) // UnsafeCleanDataDir removes all contents of the specified data directory. @@ -66,175 +56,6 @@ This operation is unsafe and cannot be undone. Use with caution!`, }, } -// StoreP2PInspectCmd reports head/tail information for the go-header stores used by P2P sync. -var StoreP2PInspectCmd = &cobra.Command{ - Use: "store-info", - Short: "Inspect the go-header (P2P) stores and display their tail/head entries", - Long: `Opens the datastore used by the node's go-header services and reports -the current height, head, and tail information for both the header and data stores.`, - RunE: func(cmd *cobra.Command, args []string) error { - nodeConfig, err := ParseConfig(cmd) - if err != nil { - return fmt.Errorf("error parsing config: %w", err) - } - - ctx := cmd.Context() - if ctx == nil { - ctx = context.Background() - } - - dbName := resolveDBName(cmd) - - rawStore, err := store.NewDefaultKVStore(nodeConfig.RootDir, nodeConfig.DBPath, dbName) - if err != nil { - return fmt.Errorf("failed to open datastore: %w", err) - } - defer func() { - if closeErr := rawStore.Close(); closeErr != nil { - cmd.PrintErrf("warning: failed to close datastore: %v\n", closeErr) - } - }() - - mainStore := kt.Wrap(rawStore, &kt.PrefixTransform{ - Prefix: ds.NewKey(node.EvPrefix), - }) - - headerSnapshot, err := inspectP2PStore[*types.SignedHeader](ctx, mainStore, headerStorePrefix, "Header Store") - if err != nil { - return fmt.Errorf("failed to inspect header store: %w", err) - } - - dataSnapshot, err := inspectP2PStore[*types.Data](ctx, mainStore, dataStorePrefix, "Data Store") - if err != nil { - return fmt.Errorf("failed to inspect data store: %w", err) - } - - storePath := resolveStorePath(nodeConfig.RootDir, nodeConfig.DBPath, dbName) - - out := cmd.OutOrStdout() - fmt.Fprintf(out, "Inspecting go-header stores at %s\n", storePath) - printP2PStoreSnapshot(cmd, headerSnapshot) - printP2PStoreSnapshot(cmd, dataSnapshot) - - return nil - }, -} - -const ( - headerStorePrefix = "headerSync" - dataStorePrefix = "dataSync" -) - -type p2pStoreSnapshot struct { - Label string - Prefix string - Height uint64 - HeadHeight uint64 - HeadHash string - HeadTime time.Time - TailHeight uint64 - TailHash string - TailTime time.Time - HeadPresent bool - TailPresent bool - Empty bool -} - -func inspectP2PStore[H goheader.Header[H]]( - ctx context.Context, - datastore ds.Batching, - prefix string, - label string, -) (p2pStoreSnapshot, error) { - storeImpl, err := goheaderstore.NewStore[H]( - datastore, - goheaderstore.WithStorePrefix(prefix), - goheaderstore.WithMetrics(), - ) - if err != nil { - return p2pStoreSnapshot{}, fmt.Errorf("failed to open %s: %w", label, err) - } - - if err := storeImpl.Start(ctx); err != nil { - return p2pStoreSnapshot{}, fmt.Errorf("failed to start %s: %w", label, err) - } - defer func() { - _ = storeImpl.Stop(context.Background()) - }() - - snapshot := p2pStoreSnapshot{ - Label: label, - Prefix: prefix, - Height: storeImpl.Height(), - } - - if err := populateSnapshot(ctx, storeImpl, &snapshot); err != nil { - return p2pStoreSnapshot{}, err - } - - return snapshot, nil -} - -func populateSnapshot[H goheader.Header[H]]( - ctx context.Context, - storeImpl *goheaderstore.Store[H], - snapshot *p2pStoreSnapshot, -) error { - head, err := storeImpl.Head(ctx) - switch { - case err == nil: - snapshot.HeadPresent = true - snapshot.HeadHeight = head.Height() - snapshot.HeadHash = head.Hash().String() - snapshot.HeadTime = head.Time() - case errors.Is(err, goheader.ErrEmptyStore), errors.Is(err, goheader.ErrNotFound): - // store not initialized yet - default: - return fmt.Errorf("failed to read %s head: %w", snapshot.Label, err) - } - - tail, err := storeImpl.Tail(ctx) - switch { - case err == nil: - snapshot.TailPresent = true - snapshot.TailHeight = tail.Height() - snapshot.TailHash = tail.Hash().String() - snapshot.TailTime = tail.Time() - case errors.Is(err, goheader.ErrEmptyStore), errors.Is(err, goheader.ErrNotFound): - default: - return fmt.Errorf("failed to read %s tail: %w", snapshot.Label, err) - } - - snapshot.Empty = !snapshot.HeadPresent && !snapshot.TailPresent - - return nil -} - -func printP2PStoreSnapshot(cmd *cobra.Command, snapshot p2pStoreSnapshot) { - out := cmd.OutOrStdout() - fmt.Fprintf(out, "\n[%s]\n", snapshot.Label) - fmt.Fprintf(out, "prefix: %s\n", snapshot.Prefix) - fmt.Fprintf(out, "height: %d\n", snapshot.Height) - if snapshot.Empty { - fmt.Fprintln(out, "status: empty (no entries found)") - return - } - - if snapshot.TailPresent { - fmt.Fprintf(out, "tail: height=%d hash=%s%s\n", snapshot.TailHeight, snapshot.TailHash, formatTime(snapshot.TailTime)) - } - if snapshot.HeadPresent { - fmt.Fprintf(out, "head: height=%d hash=%s%s\n", snapshot.HeadHeight, snapshot.HeadHash, formatTime(snapshot.HeadTime)) - } -} - -func formatTime(t time.Time) string { - if t.IsZero() { - return "" - } - return fmt.Sprintf(" time=%s", t.UTC().Format(time.RFC3339)) -} - func resolveDBName(cmd *cobra.Command) string { if cmd == nil { return config.ConfigFileName diff --git a/pkg/cmd/store_test.go b/pkg/cmd/store_test.go index bf0b2c4bc3..d97b255c6a 100644 --- a/pkg/cmd/store_test.go +++ b/pkg/cmd/store_test.go @@ -2,25 +2,13 @@ package cmd import ( "bytes" - "context" - cryptoRand "crypto/rand" "fmt" "os" "path/filepath" "testing" - goheaderstore "github.com/celestiaorg/go-header/store" - ds "github.com/ipfs/go-datastore" - kt "github.com/ipfs/go-datastore/keytransform" - "github.com/libp2p/go-libp2p/core/crypto" "github.com/spf13/cobra" "github.com/stretchr/testify/require" - - "github.com/evstack/ev-node/node" - "github.com/evstack/ev-node/pkg/config" - "github.com/evstack/ev-node/pkg/signer/noop" - "github.com/evstack/ev-node/pkg/store" - "github.com/evstack/ev-node/types" ) func TestUnsafeCleanDataDir(t *testing.T) { @@ -97,87 +85,3 @@ func TestStoreUnsafeCleanCmd(t *testing.T) { // Check output message (optional) require.Contains(t, buf.String(), fmt.Sprintf("All contents of the data directory at %s have been removed.", dataDir)) } - -func TestStoreP2PInspectCmd(t *testing.T) { - tempDir := t.TempDir() - const appName = "testapp" - - // Seed the header store with a couple of entries. - seedHeaderStore(t, tempDir, appName) - - rootCmd := &cobra.Command{Use: appName} - rootCmd.PersistentFlags().String(config.FlagRootDir, tempDir, "root directory") - rootCmd.AddCommand(StoreP2PInspectCmd) - - buf := new(bytes.Buffer) - rootCmd.SetOut(buf) - rootCmd.SetErr(buf) - rootCmd.SetArgs([]string{"store-info"}) - - err := rootCmd.Execute() - require.NoError(t, err) - - output := buf.String() - require.Contains(t, output, "Inspecting go-header stores") - require.Contains(t, output, "[Header Store]") - require.Contains(t, output, "tail: height=1") - require.Contains(t, output, "head: height=2") - require.Contains(t, output, "[Data Store]") - require.Contains(t, output, "status: empty") -} - -func seedHeaderStore(t *testing.T, rootDir, dbName string) { - t.Helper() - - rawStore, err := store.NewDefaultKVStore(rootDir, "data", dbName) - require.NoError(t, err) - - mainStore := kt.Wrap(rawStore, &kt.PrefixTransform{ - Prefix: ds.NewKey(node.EvPrefix), - }) - - headerStore, err := goheaderstore.NewStore[*types.SignedHeader]( - mainStore, - goheaderstore.WithStorePrefix(headerStorePrefix), - goheaderstore.WithMetrics(), - ) - require.NoError(t, err) - - ctx := context.Background() - require.NoError(t, headerStore.Start(ctx)) - - defer func() { - require.NoError(t, headerStore.Stop(ctx)) - require.NoError(t, rawStore.Close()) - }() - - pk, _, err := crypto.GenerateEd25519Key(cryptoRand.Reader) - require.NoError(t, err) - noopSigner, err := noop.NewNoopSigner(pk) - require.NoError(t, err) - - chainID := "test-chain" - headerCfg := types.HeaderConfig{ - Height: 1, - DataHash: types.GetRandomBytes(32), - AppHash: types.GetRandomBytes(32), - Signer: noopSigner, - } - - first, err := types.GetRandomSignedHeaderCustom(&headerCfg, chainID) - require.NoError(t, err) - require.NoError(t, headerStore.Append(ctx, first)) - - next := &types.SignedHeader{ - Header: types.GetRandomNextHeader(first.Header, chainID), - Signer: first.Signer, - } - payload, err := next.Header.MarshalBinary() - require.NoError(t, err) - signature, err := noopSigner.Sign(payload) - require.NoError(t, err) - next.Signature = signature - - require.NoError(t, headerStore.Append(ctx, next)) - require.NoError(t, headerStore.Sync(ctx)) -} diff --git a/pkg/sync/sync_service.go b/pkg/sync/sync_service.go index 92216da717..6a17a42a85 100644 --- a/pkg/sync/sync_service.go +++ b/pkg/sync/sync_service.go @@ -154,18 +154,6 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, } } - hh, err := syncService.store.Head(ctx) - if err != nil { - return fmt.Errorf("failed to get head from store: %w", err) - } - syncService.logger.Info().Uint64("height", hh.Height()). - Uint64("header_or_data", headerOrData.Height()). - Msg("writing to store and broadcasting") - - if err := hh.Verify(headerOrData); err != nil { - panic(fmt.Errorf("header verification failed: %w", err)) - } - // Broadcast for subscribers if err := syncService.sub.Broadcast(ctx, headerOrData, opts...); err != nil { // for the first block when starting the app, broadcast error is expected @@ -181,7 +169,6 @@ func (syncService *SyncService[H]) WriteToStoreAndBroadcast(ctx context.Context, } syncService.logger.Error().Err(err).Msg("failed to broadcast") - panic(err) } return nil From b36171ed1341cb597b30bc5c776e8639ca134a73 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Thu, 20 Nov 2025 09:21:54 +0100 Subject: [PATCH 22/23] fix lint --- pkg/cmd/store.go | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/pkg/cmd/store.go b/pkg/cmd/store.go index 99ed8579b3..e68a0eaed6 100644 --- a/pkg/cmd/store.go +++ b/pkg/cmd/store.go @@ -6,8 +6,6 @@ import ( "path/filepath" "github.com/spf13/cobra" - - "github.com/evstack/ev-node/pkg/config" ) // UnsafeCleanDataDir removes all contents of the specified data directory. @@ -55,22 +53,3 @@ This operation is unsafe and cannot be undone. Use with caution!`, return nil }, } - -func resolveDBName(cmd *cobra.Command) string { - if cmd == nil { - return config.ConfigFileName - } - root := cmd.Root() - if root == nil || root.Name() == "" { - return config.ConfigFileName - } - return root.Name() -} - -func resolveStorePath(rootDir, dbPath, dbName string) string { - base := dbPath - if !filepath.IsAbs(dbPath) { - base = filepath.Join(rootDir, dbPath) - } - return filepath.Join(base, dbName) -} From 6e6d2e8ca6182855d0442ac736fb73289e42cd25 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Thu, 20 Nov 2025 09:23:15 +0100 Subject: [PATCH 23/23] remove changelog entry --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a434c17b75..3f6ac5f26c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,7 +12,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Enhanced health check system with separate liveness (`/health/live`) and readiness (`/health/ready`) HTTP endpoints. Readiness endpoint includes P2P listening check and aggregator block production rate validation (5x block time threshold). ([#2800](https://github.com/evstack/ev-node/pull/2800)) -- Added `store-info` CLI command to inspect go-header P2P stores and display head/tail information for both header and data stores ([#2835](https://github.com/evstack/ev-node/pull/2835)) - Added `GetP2PStoreInfo` RPC method to retrieve head/tail metadata for go-header stores used by P2P sync ([#2835](https://github.com/evstack/ev-node/pull/2835)) - Added protobuf definitions for `P2PStoreEntry` and `P2PStoreSnapshot` messages to support P2P store inspection