Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 6 additions & 20 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,6 @@ type App struct {

pendingTxListeners []evmante.PendingTxListener

// RPC address for late initialization (after ABCI handshake)
rpcAddress string

// keys to access the substores
keys map[string]*storetypes.KVStoreKey
tkeys map[string]*storetypes.TransientStoreKey
Expand Down Expand Up @@ -812,17 +809,6 @@ func New(
logger.Warn("Failed to initialize attestation local finality storage", "error", err)
}

// Store CometBFT RPC address for later initialization (after ABCI handshake)
// The BlockDataCollector needs WebSocket access to subscribe to block events
// This comes from the CometBFT RPC server, not the Cosmos SDK API server
rpcAddress := cast.ToString(appOpts.Get("rpc.laddr"))
if rpcAddress == "" {
// Default to localhost if not configured
rpcAddress = "tcp://localhost:26657"
}
app.rpcAddress = rpcAddress
logger.Debug("Configured block collector to use CometBFT RPC", "address", rpcAddress)

/**** Module Options ****/

// NOTE: we may consider parsing `appOpts` inside module constructors. For the moment
Expand Down Expand Up @@ -1121,12 +1107,6 @@ func New(
app.Logger().Error("failed to update blocklist", "error", err)
}

// Start block data collector after ABCI handshake completes
// The RPC server will be available shortly after this point
if app.AttestationKeeper.BlockCollector != nil && app.rpcAddress != "" {
// Start in a goroutine with retry logic since RPC server may not be immediately available
go app.startBlockDataCollectorWithRetry()
}
}

if blockSTMEnabled {
Expand Down Expand Up @@ -1389,6 +1369,12 @@ func (app *App) RegisterTendermintService(clientCtx client.Context) {
app.interfaceRegistry,
app.Query,
)

// Set the local RPC client for attestation keeper (no HTTP overhead)
if clientCtx.Client != nil {
app.AttestationKeeper.SetRPCClient(clientCtx.Client)
app.Logger().Info("Configured attestation local RPC client")
}
}

func (app *App) RegisterNodeService(clientCtx client.Context, cfg config.Config) {
Expand Down
122 changes: 0 additions & 122 deletions app/attestation.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
package app

import (
"context"
"fmt"
"os"
"path/filepath"
"time"

"cosmossdk.io/log"
rpchttp "github.com/cometbft/cometbft/rpc/client/http"
dbm "github.com/cosmos/cosmos-db"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/spf13/cast"

attestationcollector "github.com/crypto-org-chain/cronos/x/attestation/collector"
)

// setupAttestationFinalityStorage initializes the local finality storage for the attestation module
Expand Down Expand Up @@ -52,11 +47,6 @@ func setupAttestationFinalityStorage(app *App, homePath string, appOpts serverty
"cache_size", cacheSize,
)

// Initialize Block Data Collector (non-fatal if fails)
if err := setupBlockDataCollector(app, homePath, dbBackend, logger); err != nil {
return fmt.Errorf("failed to setup block data collector: %w", err)
}

return nil
}

Expand Down Expand Up @@ -103,118 +93,6 @@ func cleanupMismatchedDB(dbPath string, backend dbm.BackendType, logger log.Logg
return os.RemoveAll(filepath.Join(dbPath, "finality.db"))
}

// setupBlockDataCollector initializes the block data collector for full block attestation
// This collector runs in the background and subscribes to CometBFT events
func setupBlockDataCollector(app *App, homePath string, dbBackend dbm.BackendType, logger log.Logger) error {
// Create separate database for block data collection
blockDataDBPath := filepath.Join(homePath, "data", "attestation_blocks")

// Ensure the directory exists
if err := os.MkdirAll(blockDataDBPath, 0755); err != nil {
return fmt.Errorf("failed to create block data database directory: %w", err)
}

blockDataDB, err := dbm.NewDB("block_data", dbBackend, blockDataDBPath)
if err != nil {
return fmt.Errorf("failed to create block data database: %w", err)
}

logger.Info("Created attestation block data storage", "path", blockDataDBPath, "backend", dbBackend)

// Create block data collector without RPC client (set later)
collector := attestationcollector.NewBlockDataCollector(
app.appCodec,
blockDataDB,
nil, // RPC client will be set when starting the collector
logger, // Pass logger for proper logging
)

// Set collector in keeper
app.AttestationKeeper.SetBlockCollector(collector)

logger.Info("Initialized attestation block data collector (will start after RPC server is ready)")

return nil
}

// startBlockDataCollectorWithRetry starts the block data collector with retry logic
// This is called in a goroutine after ABCI handshake to wait for RPC server availability
func (app *App) startBlockDataCollectorWithRetry() {
maxRetries := 10
retryDelay := 500 * time.Millisecond

app.Logger().Debug("Starting block data collector with retry",
"rpc_address", app.rpcAddress,
"max_retries", maxRetries)

for attempt := 1; attempt <= maxRetries; attempt++ {
app.Logger().Debug("Attempting to start block data collector",
"attempt", attempt,
"max_retries", maxRetries)

err := app.startBlockDataCollectorOnce()
if err == nil {
app.Logger().Info("Successfully started attestation block data collector",
"rpc_address", app.rpcAddress,
"attempt", attempt)
return
}

app.Logger().Debug("Failed to start block data collector, will retry",
"attempt", attempt,
"error", err,
"retry_delay", retryDelay)

time.Sleep(retryDelay)
retryDelay *= 2 // Exponential backoff
}

app.Logger().Warn("Failed to start block data collector after all retries",
"rpc_address", app.rpcAddress,
"max_retries", maxRetries)
}

// startBlockDataCollectorOnce attempts to start the collector once
func (app *App) startBlockDataCollectorOnce() error {
if app.AttestationKeeper.BlockCollector == nil {
return fmt.Errorf("block collector not initialized")
}

// Type assert to get concrete collector type
collector, ok := app.AttestationKeeper.BlockCollector.(*attestationcollector.BlockDataCollector)
if !ok {
return fmt.Errorf("block collector is not a BlockDataCollector")
}

// Check if already running
if collector.IsRunning() {
return nil // Already started
}

// Create CometBFT HTTP client
rpcClient, err := rpchttp.New(app.rpcAddress, "/websocket")
if err != nil {
return fmt.Errorf("failed to create CometBFT RPC client: %w", err)
}

// Start the RPC client before using it
if err := rpcClient.Start(); err != nil {
return fmt.Errorf("failed to start CometBFT RPC client: %w", err)
}

// Set the client
collector.SetClient(rpcClient)

// Start the collector
ctx := context.Background()
if err := collector.Start(ctx); err != nil {
rpcClient.Stop() // Clean up on failure
return fmt.Errorf("failed to start block collector: %w", err)
}

return nil
}

// checkFinalityDatabaseExists checks if a finality database already exists
// and whether it matches the configured backend type
// Returns: (dbExists bool, backendMatches bool)
Expand Down
Loading
Loading