diff --git a/app/app.go b/app/app.go index 9e7f8bf490..269208513f 100644 --- a/app/app.go +++ b/app/app.go @@ -292,9 +292,6 @@ type App struct { pendingTxListeners []evmante.PendingTxListener - // RPC address for late initialization (after ABCI handshake) - rpcAddress string - // keys to access the substores keys map[string]*storetypes.KVStoreKey tkeys map[string]*storetypes.TransientStoreKey @@ -812,17 +809,6 @@ func New( logger.Warn("Failed to initialize attestation local finality storage", "error", err) } - // Store CometBFT RPC address for later initialization (after ABCI handshake) - // The BlockDataCollector needs WebSocket access to subscribe to block events - // This comes from the CometBFT RPC server, not the Cosmos SDK API server - rpcAddress := cast.ToString(appOpts.Get("rpc.laddr")) - if rpcAddress == "" { - // Default to localhost if not configured - rpcAddress = "tcp://localhost:26657" - } - app.rpcAddress = rpcAddress - logger.Debug("Configured block collector to use CometBFT RPC", "address", rpcAddress) - /**** Module Options ****/ // NOTE: we may consider parsing `appOpts` inside module constructors. For the moment @@ -1121,12 +1107,6 @@ func New( app.Logger().Error("failed to update blocklist", "error", err) } - // Start block data collector after ABCI handshake completes - // The RPC server will be available shortly after this point - if app.AttestationKeeper.BlockCollector != nil && app.rpcAddress != "" { - // Start in a goroutine with retry logic since RPC server may not be immediately available - go app.startBlockDataCollectorWithRetry() - } } if blockSTMEnabled { @@ -1389,6 +1369,12 @@ func (app *App) RegisterTendermintService(clientCtx client.Context) { app.interfaceRegistry, app.Query, ) + + // Set the local RPC client for attestation keeper (no HTTP overhead) + if clientCtx.Client != nil { + app.AttestationKeeper.SetRPCClient(clientCtx.Client) + app.Logger().Info("Configured attestation local RPC client") + } } func (app *App) RegisterNodeService(clientCtx client.Context, cfg config.Config) { diff --git a/app/attestation.go b/app/attestation.go index 9aa196cfdc..5b39d4e2f4 100644 --- a/app/attestation.go +++ b/app/attestation.go @@ -1,19 +1,14 @@ package app import ( - "context" "fmt" "os" "path/filepath" - "time" "cosmossdk.io/log" - rpchttp "github.com/cometbft/cometbft/rpc/client/http" dbm "github.com/cosmos/cosmos-db" servertypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/spf13/cast" - - attestationcollector "github.com/crypto-org-chain/cronos/x/attestation/collector" ) // setupAttestationFinalityStorage initializes the local finality storage for the attestation module @@ -52,11 +47,6 @@ func setupAttestationFinalityStorage(app *App, homePath string, appOpts serverty "cache_size", cacheSize, ) - // Initialize Block Data Collector (non-fatal if fails) - if err := setupBlockDataCollector(app, homePath, dbBackend, logger); err != nil { - return fmt.Errorf("failed to setup block data collector: %w", err) - } - return nil } @@ -103,118 +93,6 @@ func cleanupMismatchedDB(dbPath string, backend dbm.BackendType, logger log.Logg return os.RemoveAll(filepath.Join(dbPath, "finality.db")) } -// setupBlockDataCollector initializes the block data collector for full block attestation -// This collector runs in the background and subscribes to CometBFT events -func setupBlockDataCollector(app *App, homePath string, dbBackend dbm.BackendType, logger log.Logger) error { - // Create separate database for block data collection - blockDataDBPath := filepath.Join(homePath, "data", "attestation_blocks") - - // Ensure the directory exists - if err := os.MkdirAll(blockDataDBPath, 0755); err != nil { - return fmt.Errorf("failed to create block data database directory: %w", err) - } - - blockDataDB, err := dbm.NewDB("block_data", dbBackend, blockDataDBPath) - if err != nil { - return fmt.Errorf("failed to create block data database: %w", err) - } - - logger.Info("Created attestation block data storage", "path", blockDataDBPath, "backend", dbBackend) - - // Create block data collector without RPC client (set later) - collector := attestationcollector.NewBlockDataCollector( - app.appCodec, - blockDataDB, - nil, // RPC client will be set when starting the collector - logger, // Pass logger for proper logging - ) - - // Set collector in keeper - app.AttestationKeeper.SetBlockCollector(collector) - - logger.Info("Initialized attestation block data collector (will start after RPC server is ready)") - - return nil -} - -// startBlockDataCollectorWithRetry starts the block data collector with retry logic -// This is called in a goroutine after ABCI handshake to wait for RPC server availability -func (app *App) startBlockDataCollectorWithRetry() { - maxRetries := 10 - retryDelay := 500 * time.Millisecond - - app.Logger().Debug("Starting block data collector with retry", - "rpc_address", app.rpcAddress, - "max_retries", maxRetries) - - for attempt := 1; attempt <= maxRetries; attempt++ { - app.Logger().Debug("Attempting to start block data collector", - "attempt", attempt, - "max_retries", maxRetries) - - err := app.startBlockDataCollectorOnce() - if err == nil { - app.Logger().Info("Successfully started attestation block data collector", - "rpc_address", app.rpcAddress, - "attempt", attempt) - return - } - - app.Logger().Debug("Failed to start block data collector, will retry", - "attempt", attempt, - "error", err, - "retry_delay", retryDelay) - - time.Sleep(retryDelay) - retryDelay *= 2 // Exponential backoff - } - - app.Logger().Warn("Failed to start block data collector after all retries", - "rpc_address", app.rpcAddress, - "max_retries", maxRetries) -} - -// startBlockDataCollectorOnce attempts to start the collector once -func (app *App) startBlockDataCollectorOnce() error { - if app.AttestationKeeper.BlockCollector == nil { - return fmt.Errorf("block collector not initialized") - } - - // Type assert to get concrete collector type - collector, ok := app.AttestationKeeper.BlockCollector.(*attestationcollector.BlockDataCollector) - if !ok { - return fmt.Errorf("block collector is not a BlockDataCollector") - } - - // Check if already running - if collector.IsRunning() { - return nil // Already started - } - - // Create CometBFT HTTP client - rpcClient, err := rpchttp.New(app.rpcAddress, "/websocket") - if err != nil { - return fmt.Errorf("failed to create CometBFT RPC client: %w", err) - } - - // Start the RPC client before using it - if err := rpcClient.Start(); err != nil { - return fmt.Errorf("failed to start CometBFT RPC client: %w", err) - } - - // Set the client - collector.SetClient(rpcClient) - - // Start the collector - ctx := context.Background() - if err := collector.Start(ctx); err != nil { - rpcClient.Stop() // Clean up on failure - return fmt.Errorf("failed to start block collector: %w", err) - } - - return nil -} - // checkFinalityDatabaseExists checks if a finality database already exists // and whether it matches the configured backend type // Returns: (dbExists bool, backendMatches bool) diff --git a/x/attestation/collector/collector.go b/x/attestation/collector/collector.go deleted file mode 100644 index 9706c71588..0000000000 --- a/x/attestation/collector/collector.go +++ /dev/null @@ -1,330 +0,0 @@ -package collector - -import ( - "context" - "fmt" - "sync" - - "cosmossdk.io/log" - dbm "github.com/cosmos/cosmos-db" - "github.com/cosmos/cosmos-sdk/codec" - sdk "github.com/cosmos/cosmos-sdk/types" - - rpcclient "github.com/cometbft/cometbft/rpc/client" - ctypes "github.com/cometbft/cometbft/rpc/core/types" - cmttypes "github.com/cometbft/cometbft/types" - - "github.com/crypto-org-chain/cronos/x/attestation/types" -) - -// BlockDataCollector collects full block data from CometBFT for attestation -// It subscribes to new block events and stores complete block data locally -type BlockDataCollector struct { - cdc codec.BinaryCodec - db dbm.DB - client rpcclient.Client - logger log.Logger - - mu sync.RWMutex - running bool - cancel context.CancelFunc -} - -// Ensure BlockDataCollector implements keeper.BlockDataCollector interface -var _ interface { - GetBlockData(height uint64) (*types.BlockAttestationData, error) - GetBlockDataRange(startHeight, endHeight uint64) ([]*types.BlockAttestationData, error) -} = (*BlockDataCollector)(nil) - -// NewBlockDataCollector creates a new block data collector -func NewBlockDataCollector( - cdc codec.BinaryCodec, - db dbm.DB, - client rpcclient.Client, - logger log.Logger, -) *BlockDataCollector { - return &BlockDataCollector{ - cdc: cdc, - db: db, - client: client, - logger: logger, - running: false, - } -} - -// SetClient sets the CometBFT RPC client (useful for late initialization) -func (c *BlockDataCollector) SetClient(client rpcclient.Client) { - c.mu.Lock() - defer c.mu.Unlock() - c.client = client -} - -// Start begins collecting block data from CometBFT -func (c *BlockDataCollector) Start(ctx context.Context) error { - c.mu.Lock() - if c.running { - c.mu.Unlock() - return fmt.Errorf("collector already running") - } - - if c.client == nil { - c.mu.Unlock() - return fmt.Errorf("CometBFT client not set") - } - - // Ensure the RPC client is started - if !c.client.IsRunning() { - if err := c.client.Start(); err != nil { - c.mu.Unlock() - return fmt.Errorf("failed to start CometBFT RPC client: %w", err) - } - } - - // Create cancellable context - collectorCtx, cancel := context.WithCancel(ctx) - c.cancel = cancel - c.running = true - c.mu.Unlock() - - // Subscribe to new block events - const subscriber = "block-data-collector" - const query = "tm.event='NewBlock'" - - eventCh, err := c.client.Subscribe(collectorCtx, subscriber, query, 100) - if err != nil { - c.mu.Lock() - c.running = false - c.cancel = nil - c.mu.Unlock() - return fmt.Errorf("failed to subscribe to blocks: %w", err) - } - - // Start collection goroutine - go c.collectLoop(collectorCtx, eventCh) - - return nil -} - -// Stop stops the block data collector -func (c *BlockDataCollector) Stop() error { - c.mu.Lock() - defer c.mu.Unlock() - - if !c.running { - return fmt.Errorf("collector not running") - } - - if c.cancel != nil { - c.cancel() - } - c.running = false - - return nil -} - -// collectLoop is the main collection loop -func (c *BlockDataCollector) collectLoop(ctx context.Context, eventCh <-chan ctypes.ResultEvent) { - c.logger.Debug("Block data collector loop started") - - for { - select { - case <-ctx.Done(): - c.logger.Debug("Block data collector loop stopped (context cancelled)") - return - - case event := <-eventCh: - // Extract block data from event - eventData, ok := event.Data.(cmttypes.EventDataNewBlock) - if !ok { - c.logger.Debug("Received non-block event, skipping", - "event_type", fmt.Sprintf("%T", event.Data)) - continue - } - - c.logger.Debug("Collecting block data", - "height", eventData.Block.Height, - "hash", fmt.Sprintf("%X", eventData.Block.Hash())) - - if err := c.collectAndStoreBlock(eventData); err != nil { - // Log error but continue collecting - c.logger.Error("failed to collect block", - "height", eventData.Block.Height, - "error", err, - ) - } else { - c.logger.Debug("Successfully collected block data", - "height", eventData.Block.Height) - } - } - } -} - -// collectAndStoreBlock collects full block data and stores it locally -func (c *BlockDataCollector) collectAndStoreBlock(eventData cmttypes.EventDataNewBlock) error { - block := eventData.Block - height := block.Height - - attestationData := &types.BlockAttestationData{ - BlockHeight: uint64(height), - AppHash: block.Header.AppHash, - } - - // Log field lengths for debugging - c.logger.Info("collected block attestation data", "height", height) - - // Store in local database - return c.storeBlockData(uint64(height), attestationData) -} - -// storeBlockData stores block attestation data in the local database -func (c *BlockDataCollector) storeBlockData(height uint64, data *types.BlockAttestationData) error { - key := getBlockDataKey(height) - bz := c.cdc.MustMarshal(data) - - return c.db.Set(key, bz) -} - -// GetBlockData retrieves stored block data by height -func (c *BlockDataCollector) GetBlockData(height uint64) (*types.BlockAttestationData, error) { - key := getBlockDataKey(height) - bz, err := c.db.Get(key) - if err != nil { - return nil, fmt.Errorf("failed to get block data for height %d: %w", height, err) - } - - if bz == nil { - return nil, fmt.Errorf("block data not found for height %d", height) - } - - var data types.BlockAttestationData - if err := c.cdc.Unmarshal(bz, &data); err != nil { - return nil, fmt.Errorf("failed to unmarshal block data: %w", err) - } - - return &data, nil -} - -// GetBlockDataRange retrieves a range of block data -// If the collector is not running, it will try to collect the blocks on-demand -func (c *BlockDataCollector) GetBlockDataRange(startHeight, endHeight uint64) ([]*types.BlockAttestationData, error) { - // Auto-start collector if it has a client but isn't running - c.ensureStarted() - - var result []*types.BlockAttestationData - - for h := startHeight; h <= endHeight; h++ { - data, err := c.GetBlockData(h) - if err != nil { - // If collector is running and data is missing, try to fetch on-demand - if c.client != nil { - // Try to fetch this specific block - if fetchedData, fetchErr := c.fetchBlockOnDemand(h); fetchErr == nil { - result = append(result, fetchedData) - continue - } - } - // Skip missing block silently - it's before collector started or not available - continue - } - result = append(result, data) - } - - if len(result) == 0 { - c.logger.Debug("no block data available in requested range", - "start_height", startHeight, - "end_height", endHeight, - "collector_running", c.IsRunning(), - ) - return nil, fmt.Errorf("no block data found in range %d-%d", startHeight, endHeight) - } - - return result, nil -} - -// ensureStarted auto-starts the collector if it has a client but isn't running -func (c *BlockDataCollector) ensureStarted() { - c.mu.Lock() - defer c.mu.Unlock() - - // If already running or no client, nothing to do - if c.running || c.client == nil { - return - } - - // Ensure the RPC client is started - if !c.client.IsRunning() { - if err := c.client.Start(); err != nil { - c.logger.Error("failed to start RPC client for collector", "error", err) - return - } - } - - // Create cancellable context - collectorCtx, cancel := context.WithCancel(context.Background()) - c.cancel = cancel - c.running = true - - // Subscribe to new block events - const subscriber = "block-data-collector" - const query = "tm.event='NewBlock'" - - eventCh, err := c.client.Subscribe(collectorCtx, subscriber, query, 100) - if err != nil { - c.logger.Error("failed to auto-start collector subscription", "error", err) - c.running = false - c.cancel = nil - return - } - - // Start collection goroutine - go c.collectLoop(collectorCtx, eventCh) - - c.logger.Info("Block data collector auto-started") -} - -// fetchBlockOnDemand fetches a specific block's data on-demand via RPC -func (c *BlockDataCollector) fetchBlockOnDemand(height uint64) (*types.BlockAttestationData, error) { - c.mu.RLock() - client := c.client - c.mu.RUnlock() - - if client == nil { - return nil, fmt.Errorf("RPC client not available") - } - - // Query the block - h := int64(height) - blockRes, err := client.Block(context.Background(), &h) - if err != nil { - return nil, fmt.Errorf("failed to fetch block: %w", err) - } - - // Create attestation data - block := blockRes.Block - attestationData := &types.BlockAttestationData{ - BlockHeight: uint64(height), - AppHash: block.Header.AppHash, - } - - // Store it for future use - if err := c.storeBlockData(height, attestationData); err != nil { - c.logger.Warn("failed to store on-demand block data", "height", height, "error", err) - // Return data even if storage fails - } - - return attestationData, nil -} - -// getBlockDataKey returns the database key for block data at a given height -func getBlockDataKey(height uint64) []byte { - prefix := []byte("block_data:") - heightBytes := sdk.Uint64ToBigEndian(height) - return append(prefix, heightBytes...) -} - -// IsRunning returns whether the collector is currently running -func (c *BlockDataCollector) IsRunning() bool { - c.mu.RLock() - defer c.mu.RUnlock() - return c.running -} diff --git a/x/attestation/keeper/grpc_query.go b/x/attestation/keeper/grpc_query.go index 147423c7f7..f617b40c39 100644 --- a/x/attestation/keeper/grpc_query.go +++ b/x/attestation/keeper/grpc_query.go @@ -9,7 +9,7 @@ import ( var _ types.QueryServer = (*Keeper)(nil) // Params returns the module parameters -func (k Keeper) Params(c context.Context, req *types.QueryParamsRequest) (*types.QueryParamsResponse, error) { +func (k *Keeper) Params(c context.Context, req *types.QueryParamsRequest) (*types.QueryParamsResponse, error) { params, err := k.GetParams(c) if err != nil { return nil, err @@ -19,13 +19,13 @@ func (k Keeper) Params(c context.Context, req *types.QueryParamsRequest) (*types } // GetBlockAttestation returns block attestation data -func (k Keeper) GetBlockAttestation(c context.Context, req *types.QueryGetBlockAttestationRequest) (*types.QueryGetBlockAttestationResponse, error) { +func (k *Keeper) GetBlockAttestation(c context.Context, req *types.QueryGetBlockAttestationRequest) (*types.QueryGetBlockAttestationResponse, error) { // TODO: Implement when attestation storage is added return &types.QueryGetBlockAttestationResponse{}, nil } // GetBlockFinalityStatus returns finality status for a block -func (k Keeper) GetBlockFinalityStatus(c context.Context, req *types.QueryGetBlockFinalityStatusRequest) (*types.QueryGetBlockFinalityStatusResponse, error) { +func (k *Keeper) GetBlockFinalityStatus(c context.Context, req *types.QueryGetBlockFinalityStatusRequest) (*types.QueryGetBlockFinalityStatusResponse, error) { // Query from local storage status, err := k.GetFinalityStatusLocal(c, req.BlockHeight) if err != nil { diff --git a/x/attestation/keeper/ibc_module_v1.go b/x/attestation/keeper/ibc_module_v1.go index c4351d8de3..224f4231bb 100644 --- a/x/attestation/keeper/ibc_module_v1.go +++ b/x/attestation/keeper/ibc_module_v1.go @@ -223,9 +223,19 @@ func (im IBCModuleV1) OnAcknowledgementPacket( } // Process finality feedback for each attested block height + var minHeight, maxHeight uint64 for _, result := range ack.Results { // Store finality in LOCAL database only (no consensus storage) height := result.BlockHeight + + // Track min/max height + if minHeight == 0 || height < minHeight { + minHeight = height + } + if height > maxHeight { + maxHeight = height + } + if err := im.keeper.MarkBlockFinalizedLocal(ctx, height, ack.FinalizedAt); err != nil { im.keeper.Logger(ctx).Error("failed to store finality locally", "height", height, @@ -254,7 +264,11 @@ func (im IBCModuleV1) OnAcknowledgementPacket( } } - im.keeper.Logger(ctx).Info("processed finality feedback", "finalized_count", len(ack.Results)) + im.keeper.Logger(ctx).Info("processed finality feedback", + "finalized_count", len(ack.Results), + "min_height", minHeight, + "max_height", maxHeight, + ) return nil } diff --git a/x/attestation/keeper/keeper.go b/x/attestation/keeper/keeper.go index 5a0a3999ae..c3f4007950 100644 --- a/x/attestation/keeper/keeper.go +++ b/x/attestation/keeper/keeper.go @@ -6,6 +6,7 @@ import ( "cosmossdk.io/core/store" "cosmossdk.io/log" + coretypes "github.com/cometbft/cometbft/rpc/core/types" dbm "github.com/cosmos/cosmos-db" "github.com/cosmos/cosmos-sdk/codec" sdk "github.com/cosmos/cosmos-sdk/types" @@ -14,6 +15,12 @@ import ( "github.com/crypto-org-chain/cronos/x/attestation/types" ) +// BlockchainInfoClient is a minimal interface for fetching block metadata +// Compatible with both rpcclient.Client and client.CometRPC +type BlockchainInfoClient interface { + BlockchainInfo(ctx context.Context, minHeight, maxHeight int64) (*coretypes.ResultBlockchainInfo, error) +} + type Keeper struct { cdc codec.BinaryCodec storeService store.KVStoreService @@ -38,17 +45,8 @@ type Keeper struct { finalityDB dbm.DB // Local database (persistent, no consensus) finalityCache *FinalityCache // Memory cache (fast, no consensus) - // BlockCollector for full block attestation data (exported for module access) - BlockCollector BlockDataCollector -} - -// BlockDataCollector is an interface for collecting full block data -// This interface allows for different implementations (async collector, direct RPC, etc.) -type BlockDataCollector interface { - GetBlockData(height uint64) (*types.BlockAttestationData, error) - GetBlockDataRange(startHeight, endHeight uint64) ([]*types.BlockAttestationData, error) - Start(ctx context.Context) error - Stop() error + // RPC client for fetching block data (in-process local client) + rpcClient BlockchainInfoClient } // NewKeeper creates a new attestation Keeper instance @@ -73,12 +71,12 @@ func NewKeeper( } // GetAuthority returns the authority address for the attestation module -func (k Keeper) GetAuthority() string { +func (k *Keeper) GetAuthority() string { return k.authority } // GetIBCVersion returns the configured IBC version ("v1" or "v2") -func (k Keeper) GetIBCVersion() string { +func (k *Keeper) GetIBCVersion() string { return k.ibcVersion } @@ -102,9 +100,67 @@ func (k *Keeper) InitializeLocalStorage(dbPath string, cacheSize int, backend db return nil } -// SetBlockCollector sets the block data collector for retrieving full block data -func (k *Keeper) SetBlockCollector(collector BlockDataCollector) { - k.BlockCollector = collector +// SetRPCClient sets the CometBFT RPC client for fetching block data +func (k *Keeper) SetRPCClient(client BlockchainInfoClient) { + k.rpcClient = client +} + +// maxBlockchainInfoBlocks is the maximum number of blocks returned by BlockchainInfo RPC +const maxBlockchainInfoBlocks = 20 + +// GetBlockDataRange fetches block attestation data for a range of heights via RPC. +// BlockchainInfo returns at most 20 blocks in descending order, so this function +// handles pagination and returns results in ascending order. +func (k *Keeper) GetBlockDataRange(ctx context.Context, startHeight, endHeight uint64) ([]types.BlockAttestationData, error) { + if k.rpcClient == nil { + return nil, fmt.Errorf("RPC client not configured") + } + + if startHeight > endHeight { + return nil, fmt.Errorf("invalid range: startHeight %d > endHeight %d", startHeight, endHeight) + } + + totalBlocks := endHeight - startHeight + 1 + result := make([]types.BlockAttestationData, 0, totalBlocks) + + // Fetch blocks in chunks of maxBlockchainInfoBlocks, starting from startHeight + for chunkStart := startHeight; chunkStart <= endHeight; chunkStart += maxBlockchainInfoBlocks { + chunkEnd := chunkStart + maxBlockchainInfoBlocks - 1 + if chunkEnd > endHeight { + chunkEnd = endHeight + } + + blockchainInfo, err := k.rpcClient.BlockchainInfo(ctx, int64(chunkStart), int64(chunkEnd)) + if err != nil { + return nil, fmt.Errorf("failed to fetch blockchain info for range %d-%d: %w", chunkStart, chunkEnd, err) + } + + if len(blockchainInfo.BlockMetas) == 0 { + return nil, fmt.Errorf("no block data found in range %d-%d", chunkStart, chunkEnd) + } + + // BlockchainInfo returns blocks in descending order, so we iterate in reverse + // to append in ascending order + for i := len(blockchainInfo.BlockMetas) - 1; i >= 0; i-- { + meta := blockchainInfo.BlockMetas[i] + result = append(result, types.BlockAttestationData{ + BlockHeight: uint64(meta.Header.Height), + AppHash: meta.Header.AppHash, + }) + } + } + + /// make sure the result length is equal to the total blocks + if len(result) != int(totalBlocks) { + return nil, fmt.Errorf("expected %d blocks, got %d", totalBlocks, len(result)) + } + + return result, nil +} + +// HasRPCClient returns true if an RPC client is configured +func (k *Keeper) HasRPCClient() bool { + return k.rpcClient != nil } // SetChannelKeeper sets the IBC v1 channel keeper for sending packets @@ -120,18 +176,18 @@ func (k *Keeper) SetChannelKeeperV2(channelKeeperV2 *channelkeeperv2.Keeper) { } // Logger returns a module-specific logger -func (k Keeper) Logger(ctx context.Context) log.Logger { +func (k *Keeper) Logger(ctx context.Context) log.Logger { sdkCtx := sdk.UnwrapSDKContext(ctx) return sdkCtx.Logger().With("module", "x/"+types.ModuleName) } // ChainID returns the chain ID -func (k Keeper) ChainID() string { +func (k *Keeper) ChainID() string { return k.chainID } // GetParams returns the module parameters -func (k Keeper) GetParams(ctx context.Context) (types.Params, error) { +func (k *Keeper) GetParams(ctx context.Context) (types.Params, error) { store := k.storeService.OpenKVStore(ctx) bz, err := store.Get(types.ParamsKey) if err != nil { @@ -149,7 +205,7 @@ func (k Keeper) GetParams(ctx context.Context) (types.Params, error) { } // SetParams sets the module parameters -func (k Keeper) SetParams(ctx context.Context, params types.Params) error { +func (k *Keeper) SetParams(ctx context.Context, params types.Params) error { if err := params.Validate(); err != nil { return err } @@ -160,7 +216,7 @@ func (k Keeper) SetParams(ctx context.Context, params types.Params) error { } // GetLastSentHeight retrieves the last block height sent for attestation -func (k Keeper) GetLastSentHeight(ctx context.Context) (uint64, error) { +func (k *Keeper) GetLastSentHeight(ctx context.Context) (uint64, error) { store := k.storeService.OpenKVStore(ctx) bz, err := store.Get(types.LastSentHeightKey) if err != nil { @@ -173,14 +229,14 @@ func (k Keeper) GetLastSentHeight(ctx context.Context) (uint64, error) { } // SetLastSentHeight stores the last block height sent for attestation -func (k Keeper) SetLastSentHeight(ctx context.Context, height uint64) error { +func (k *Keeper) SetLastSentHeight(ctx context.Context, height uint64) error { store := k.storeService.OpenKVStore(ctx) return store.Set(types.LastSentHeightKey, types.UintToBytes(height)) } // AddPendingAttestation adds a block attestation to the pending queue (local storage) // Pending attestations are tracked locally by each validator, not in consensus state -func (k Keeper) AddPendingAttestation(ctx context.Context, height uint64, attestation *types.BlockAttestationData) error { +func (k *Keeper) AddPendingAttestation(ctx context.Context, height uint64, attestation *types.BlockAttestationData) error { if k.finalityDB == nil { return fmt.Errorf("local finality database not initialized") } @@ -199,7 +255,7 @@ func (k Keeper) AddPendingAttestation(ctx context.Context, height uint64, attest } // GetPendingAttestation retrieves a pending attestation by height (from local storage) -func (k Keeper) GetPendingAttestation(ctx context.Context, height uint64) (*types.BlockAttestationData, error) { +func (k *Keeper) GetPendingAttestation(ctx context.Context, height uint64) (*types.BlockAttestationData, error) { if k.finalityDB == nil { return nil, fmt.Errorf("local finality database not initialized") } @@ -219,7 +275,7 @@ func (k Keeper) GetPendingAttestation(ctx context.Context, height uint64) (*type } // GetPendingAttestations retrieves all pending attestations in a height range (from local storage) -func (k Keeper) GetPendingAttestations(ctx context.Context, startHeight, endHeight uint64) ([]*types.BlockAttestationData, error) { +func (k *Keeper) GetPendingAttestations(ctx context.Context, startHeight, endHeight uint64) ([]*types.BlockAttestationData, error) { var attestations []*types.BlockAttestationData for height := startHeight; height <= endHeight; height++ { @@ -254,7 +310,7 @@ func (k *Keeper) RemovePendingAttestation(ctx context.Context, height uint64) er } // GetHighestFinalityHeight retrieves the highest finalized block height from consensus state -func (k Keeper) GetHighestFinalityHeight(ctx context.Context) (uint64, error) { +func (k *Keeper) GetHighestFinalityHeight(ctx context.Context) (uint64, error) { store := k.storeService.OpenKVStore(ctx) bz, err := store.Get(types.HighestFinalityHeightKey) if err != nil { @@ -267,7 +323,7 @@ func (k Keeper) GetHighestFinalityHeight(ctx context.Context) (uint64, error) { } // SetHighestFinalityHeight stores the highest finalized block height in consensus state -func (k Keeper) SetHighestFinalityHeight(ctx context.Context, height uint64) error { +func (k *Keeper) SetHighestFinalityHeight(ctx context.Context, height uint64) error { store := k.storeService.OpenKVStore(ctx) return store.Set(types.HighestFinalityHeightKey, types.UintToBytes(height)) } diff --git a/x/attestation/keeper/keeper_local_storage.go b/x/attestation/keeper/keeper_local_storage.go index 60912e704f..fe5c4cdabe 100644 --- a/x/attestation/keeper/keeper_local_storage.go +++ b/x/attestation/keeper/keeper_local_storage.go @@ -6,7 +6,6 @@ import ( "sync" dbm "github.com/cosmos/cosmos-db" - sdk "github.com/cosmos/cosmos-sdk/types" "github.com/crypto-org-chain/cronos/x/attestation/types" ) @@ -75,10 +74,10 @@ func (k *Keeper) SetFinalityCache(cache *FinalityCache) { // This method writes to a local database that does NOT participate in consensus. // Each validator node maintains its own copy. // Also updates the highest finality height in consensus state if this block is higher. -func (k Keeper) MarkBlockFinalizedLocal(ctx context.Context, height uint64, finalizedAt int64) error { +func (k *Keeper) MarkBlockFinalizedLocal(ctx context.Context, height uint64, finalizedAt int64) error { status := &types.FinalityStatus{ - BlockHeight: height, - FinalizedAt: finalizedAt, + BlockHeight: height, + FinalizedAt: finalizedAt, } // 1. Store in memory cache (if available) @@ -107,17 +106,7 @@ func (k Keeper) MarkBlockFinalizedLocal(ctx context.Context, height uint64, fina } } - // 4. Emit event (this IS part of consensus, but minimal overhead) - sdkCtx := sdk.UnwrapSDKContext(ctx) - sdkCtx.EventManager().EmitEvent( - sdk.NewEvent( - "block_finalized_local", - sdk.NewAttribute("block_height", fmt.Sprintf("%d", height)), - sdk.NewAttribute("finalized_at", fmt.Sprintf("%d", finalizedAt)), - ), - ) - - k.Logger(ctx).Debug("Stored finality locally (no consensus)", + k.Logger(ctx).Debug("Stored finality locally", "height", height, "finalized_at", finalizedAt, ) @@ -127,7 +116,7 @@ func (k Keeper) MarkBlockFinalizedLocal(ctx context.Context, height uint64, fina // GetFinalityStatusLocal retrieves finality from LOCAL storage (no consensus) // Uses tiered storage: memory cache -> local DB -> not found -func (k Keeper) GetFinalityStatusLocal(ctx context.Context, height uint64) (*types.FinalityStatus, error) { +func (k *Keeper) GetFinalityStatusLocal(ctx context.Context, height uint64) (*types.FinalityStatus, error) { // 1. Check memory cache first (fastest) if k.finalityCache != nil { if status, ok := k.finalityCache.Get(height); ok { @@ -170,7 +159,7 @@ func (k Keeper) GetFinalityStatusLocal(ctx context.Context, height uint64) (*typ } // ListFinalizedBlocksLocal lists finalized blocks from LOCAL database -func (k Keeper) ListFinalizedBlocksLocal(ctx context.Context, startHeight, endHeight uint64) ([]*types.FinalityStatus, error) { +func (k *Keeper) ListFinalizedBlocksLocal(ctx context.Context, startHeight, endHeight uint64) ([]*types.FinalityStatus, error) { if k.finalityDB == nil { return nil, fmt.Errorf("local finality DB not initialized") } @@ -202,7 +191,7 @@ func (k Keeper) ListFinalizedBlocksLocal(ctx context.Context, startHeight, endHe } // GetLatestFinalizedLocal returns the latest finalized block height from LOCAL storage -func (k Keeper) GetLatestFinalizedLocal(ctx context.Context) (uint64, error) { +func (k *Keeper) GetLatestFinalizedLocal(ctx context.Context) (uint64, error) { if k.finalityDB == nil { return 0, fmt.Errorf("local finality DB not initialized") } @@ -226,7 +215,7 @@ func (k Keeper) GetLatestFinalizedLocal(ctx context.Context) (uint64, error) { // PruneFinalizedBlocksLocal prunes old finalized blocks from LOCAL storage // This is safe because local storage doesn't affect consensus -func (k Keeper) PruneFinalizedBlocksLocal(ctx context.Context, beforeHeight uint64) (int, error) { +func (k *Keeper) PruneFinalizedBlocksLocal(ctx context.Context, beforeHeight uint64) (int, error) { if k.finalityDB == nil { return 0, fmt.Errorf("local finality DB not initialized") } @@ -257,7 +246,7 @@ func (k Keeper) PruneFinalizedBlocksLocal(ctx context.Context, beforeHeight uint } // GetFinalityStatsLocal returns statistics about local finality storage -func (k Keeper) GetFinalityStatsLocal(ctx context.Context) (*types.FinalityStats, error) { +func (k *Keeper) GetFinalityStatsLocal(ctx context.Context) (*types.FinalityStats, error) { if k.finalityDB == nil { return nil, fmt.Errorf("local finality DB not initialized") } @@ -298,7 +287,7 @@ func (k Keeper) GetFinalityStatsLocal(ctx context.Context) (*types.FinalityStats } // CloseFinalityDB closes the local finality database -func (k Keeper) CloseFinalityDB() error { +func (k *Keeper) CloseFinalityDB() error { if k.finalityDB != nil { return k.finalityDB.Close() } diff --git a/x/attestation/keeper/v1_sender.go b/x/attestation/keeper/v1_sender.go index 9bd03bdb17..75b41095e4 100644 --- a/x/attestation/keeper/v1_sender.go +++ b/x/attestation/keeper/v1_sender.go @@ -12,36 +12,22 @@ import ( // SendAttestationPacketV1 sends block attestations via IBC v1 to the attestation chain // This uses the traditional port/channel communication -func (k Keeper) SendAttestationPacketV1( +func (k *Keeper) SendAttestationPacketV1( ctx context.Context, + params types.Params, sourcePort string, sourceChannel string, - attestations []*types.BlockAttestationData, + attestations []types.BlockAttestationData, ) (uint64, error) { - params, err := k.GetParams(ctx) - if err != nil { - return 0, err - } - - if !params.AttestationEnabled { - return 0, types.ErrAttestationDisabled - } - // Check if channel keeper v1 is set if k.channelKeeper == nil { return 0, fmt.Errorf("IBC v1 channel keeper not initialized") } - // Convert pointer slice to value slice for proto - attestationValues := make([]types.BlockAttestationData, len(attestations)) - for i, att := range attestations { - attestationValues[i] = *att - } - // Create packet data packetData := &types.AttestationPacketData{ SourceChainId: k.chainID, - Attestations: attestationValues, + Attestations: attestations, } // Marshal packet data to JSON @@ -117,7 +103,7 @@ func (k Keeper) SendAttestationPacketV1( } // GetV1ChannelID returns the configured IBC v1 channel ID for attestation -func (k Keeper) GetV1ChannelID(ctx context.Context, key string) (string, error) { +func (k *Keeper) GetV1ChannelID(ctx context.Context, key string) (string, error) { store := k.storeService.OpenKVStore(ctx) bz, err := store.Get(append(types.V1ChannelIDPrefix, []byte(key)...)) if err != nil { @@ -130,13 +116,13 @@ func (k Keeper) GetV1ChannelID(ctx context.Context, key string) (string, error) } // SetV1ChannelID stores the IBC v1 channel ID for attestation -func (k Keeper) SetV1ChannelID(ctx context.Context, key string, channelID string) error { +func (k *Keeper) SetV1ChannelID(ctx context.Context, key string, channelID string) error { store := k.storeService.OpenKVStore(ctx) return store.Set(append(types.V1ChannelIDPrefix, []byte(key)...), []byte(channelID)) } // GetV1PortID returns the configured IBC v1 port ID for attestation -func (k Keeper) GetV1PortID(ctx context.Context, key string) (string, error) { +func (k *Keeper) GetV1PortID(ctx context.Context, key string) (string, error) { store := k.storeService.OpenKVStore(ctx) bz, err := store.Get(append(types.V1PortIDPrefix, []byte(key)...)) if err != nil { @@ -150,7 +136,7 @@ func (k Keeper) GetV1PortID(ctx context.Context, key string) (string, error) { } // SetV1PortID stores the IBC v1 port ID for attestation -func (k Keeper) SetV1PortID(ctx context.Context, key string, portID string) error { +func (k *Keeper) SetV1PortID(ctx context.Context, key string, portID string) error { store := k.storeService.OpenKVStore(ctx) return store.Set(append(types.V1PortIDPrefix, []byte(key)...), []byte(portID)) } diff --git a/x/attestation/keeper/v2_sender.go b/x/attestation/keeper/v2_sender.go index c8cd393bf1..4de1e60c33 100644 --- a/x/attestation/keeper/v2_sender.go +++ b/x/attestation/keeper/v2_sender.go @@ -12,11 +12,11 @@ import ( // SendAttestationPacketV2 sends block attestations via IBC v2 to the attestation chain // This uses the simplified client-to-client communication without port/channel -func (k Keeper) SendAttestationPacketV2( +func (k *Keeper) SendAttestationPacketV2( ctx context.Context, sourceClient string, destinationClient string, - attestations []*types.BlockAttestationData, + attestations []types.BlockAttestationData, ) (uint64, error) { params, err := k.GetParams(ctx) if err != nil { @@ -32,17 +32,11 @@ func (k Keeper) SendAttestationPacketV2( return 0, fmt.Errorf("IBC v2 channel keeper not initialized") } - // Convert pointer slice to value slice for proto - attestationValues := make([]types.BlockAttestationData, len(attestations)) - for i, att := range attestations { - attestationValues[i] = *att - } - // Create packet data // Note: IBC v2 handles relayer, signature, and nonce at the transport layer packetData := &types.AttestationPacketData{ SourceChainId: k.chainID, - Attestations: attestationValues, + Attestations: attestations, } // Marshal packet data to JSON for v2 payload @@ -126,7 +120,7 @@ func (k Keeper) SendAttestationPacketV2( } // GetV2ClientID returns the configured IBC v2 client ID for attestation -func (k Keeper) GetV2ClientID(ctx context.Context, key string) (string, error) { +func (k *Keeper) GetV2ClientID(ctx context.Context, key string) (string, error) { store := k.storeService.OpenKVStore(ctx) bz, err := store.Get(append(types.V2ClientIDPrefix, []byte(key)...)) if err != nil { @@ -139,7 +133,7 @@ func (k Keeper) GetV2ClientID(ctx context.Context, key string) (string, error) { } // SetV2ClientID stores the IBC v2 client ID for attestation -func (k Keeper) SetV2ClientID(ctx context.Context, key string, clientID string) error { +func (k *Keeper) SetV2ClientID(ctx context.Context, key string, clientID string) error { store := k.storeService.OpenKVStore(ctx) return store.Set(append(types.V2ClientIDPrefix, []byte(key)...), []byte(clientID)) } diff --git a/x/attestation/module.go b/x/attestation/module.go index 3bd2bb318f..a95026c928 100644 --- a/x/attestation/module.go +++ b/x/attestation/module.go @@ -248,163 +248,154 @@ func (am AppModule) endBlocker(ctx context.Context) error { return err } - // Send attestation if it's time - if currentHeight > lastSentHeight && (currentHeight-lastSentHeight >= params.AttestationInterval) { - am.keeper.Logger(ctx).Info("sending attestation", "current_height", currentHeight, "last_sent_height", lastSentHeight) - // Check if collector is available - if am.keeper.BlockCollector == nil { - am.keeper.Logger(ctx).Error("Block collector not initialized, skipping attestation") - return nil - } - - var v1PortID string - var v1ChannelID string - var v2ClientID string - - if am.keeper.GetIBCVersion() == "v1" { - v1PortID, err = am.keeper.GetV1PortID(ctx, "attestation-layer") - if err != nil { - am.keeper.Logger(ctx).Info("v1 port ID not configured yet, skipping attestation send", - "key", "attestation-layer", - "error", err, - ) - return nil - } - - v1ChannelID, err = am.keeper.GetV1ChannelID(ctx, "attestation-layer") - if err != nil { - am.keeper.Logger(ctx).Info("v1 channel ID not configured yet, skipping attestation send", - "key", "attestation-layer", - "error", err, - ) - return nil - } - - am.keeper.Logger(ctx).Info("Retrieved v1 channel configuration", - "port_id", v1PortID, - "channel_id", v1ChannelID, - ) + // Calculate the block range to attest + // Start from the block after last sent, send up to AttestationInterval blocks + startHeight := lastSentHeight + 1 + endHeight := startHeight + params.AttestationInterval - 1 - } else { - v2ClientID, err = am.keeper.GetV2ClientID(ctx, "attestation-layer") - am.keeper.Logger(ctx).Info("v2 client ID", "v2_client_id", v2ClientID) - if err != nil { - am.keeper.Logger(ctx).Debug("v2 client ID not configured yet, skipping attestation send", - "error", err, - ) - return nil - } - } + // Only send when we have all blocks in the interval finalized (endHeight < currentHeight) + if endHeight >= currentHeight { + return nil + } - // Collect attestations for blocks since last sent - startHeight := lastSentHeight + 1 - endHeight := currentHeight - 1 + am.keeper.Logger(ctx).Info("sending attestation", + "current_height", currentHeight, + "start_height", startHeight, + "end_height", endHeight, + ) - // Limit by interval - if endHeight-startHeight > params.AttestationInterval { - endHeight = startHeight + params.AttestationInterval - 1 - } + // Check if RPC client is available + if !am.keeper.HasRPCClient() { + am.keeper.Logger(ctx).Error("RPC client not initialized, skipping attestation") + return nil + } - am.keeper.Logger(ctx).Info("collecting block attestations", "start_height", startHeight, "end_height", endHeight) + var v1PortID string + var v1ChannelID string + var v2ClientID string - attestations, err := am.collectBlockAttestations(ctx, startHeight, endHeight) - if err != nil || len(attestations) == 0 { - am.keeper.Logger(ctx).Debug("Block data not available yet, skipping attestation", - "start_height", startHeight, - "end_height", endHeight, + if am.keeper.GetIBCVersion() == "v1" { + v1PortID, err = am.keeper.GetV1PortID(ctx, "attestation-layer") + if err != nil { + am.keeper.Logger(ctx).Info("v1 port ID not configured yet, skipping attestation send", + "key", "attestation-layer", "error", err, ) - // Update last sent height to current so we don't keep trying to collect old blocks - if err := am.keeper.SetLastSentHeight(ctx, currentHeight); err != nil { - am.keeper.Logger(ctx).Error("failed to update last sent height", "error", err) - } - return nil // Don't fail the block - collector might still be starting + return nil } - // Dispatch to appropriate IBC version based on keeper configuration - var sendError error - if am.keeper.GetIBCVersion() == "v1" { - am.keeper.Logger(ctx).Info("Sending using v1 channel configuration", - "chain_id", am.keeper.ChainID(), - "port_id", v1PortID, - "channel_id", v1ChannelID, - "start_height", startHeight, - "end_height", endHeight, - "attestations", len(attestations), - ) - - _, sendError = am.keeper.SendAttestationPacketV1( - ctx, - v1PortID, - v1ChannelID, - attestations, - ) - } else { - am.keeper.Logger(ctx).Info("Sending using v2 channel configuration", - "chain_id", am.keeper.ChainID(), - "client_id", v2ClientID, - "start_height", startHeight, - "end_height", endHeight, - "attestations", len(attestations), - ) - _, sendError = am.keeper.SendAttestationPacketV2( - ctx, - v2ClientID, // source client ID - v2ClientID, // destination client ID is the same as source client ID for testing - attestations, + v1ChannelID, err = am.keeper.GetV1ChannelID(ctx, "attestation-layer") + if err != nil { + am.keeper.Logger(ctx).Info("v1 channel ID not configured yet, skipping attestation send", + "key", "attestation-layer", + "error", err, ) + return nil } - if sendError != nil { - am.keeper.Logger(ctx).Error("failed to send attestation packet", - "start_height", startHeight, - "end_height", endHeight, - "ibc_version", am.keeper.GetIBCVersion(), + am.keeper.Logger(ctx).Info("Retrieved v1 channel configuration", + "port_id", v1PortID, + "channel_id", v1ChannelID, + ) + + } else { + v2ClientID, err = am.keeper.GetV2ClientID(ctx, "attestation-layer") + am.keeper.Logger(ctx).Info("v2 client ID", "v2_client_id", v2ClientID) + if err != nil { + am.keeper.Logger(ctx).Debug("v2 client ID not configured yet, skipping attestation send", "error", err, ) - return nil // Don't fail the block + return nil } + } - // Update last sent height - if err := am.keeper.SetLastSentHeight(ctx, endHeight); err != nil { - return err - } + am.keeper.Logger(ctx).Info("collecting block attestations", "start_height", startHeight, "end_height", endHeight) + + attestations, err := am.collectBlockAttestations(ctx, startHeight, endHeight) + if err != nil || len(attestations) == 0 { + am.keeper.Logger(ctx).Warn("Block data not available yet, skipping attestation", + "start_height", startHeight, + "end_height", endHeight, + "error", err, + ) + return nil // Don't fail the block - collector might still be starting + } + + // Dispatch to appropriate IBC version based on keeper configuration + var sendError error + if am.keeper.GetIBCVersion() == "v1" { + am.keeper.Logger(ctx).Info("Sending using v1 channel configuration", + "chain_id", am.keeper.ChainID(), + "port_id", v1PortID, + "channel_id", v1ChannelID, + "start_height", startHeight, + "end_height", endHeight, + "attestations", len(attestations), + ) - am.keeper.Logger(ctx).Info("last sent height updated", "last_sent_height", endHeight) + _, sendError = am.keeper.SendAttestationPacketV1( + ctx, + params, + v1PortID, + v1ChannelID, + attestations, + ) + } else { + am.keeper.Logger(ctx).Info("Sending using v2 channel configuration", + "chain_id", am.keeper.ChainID(), + "client_id", v2ClientID, + "start_height", startHeight, + "end_height", endHeight, + "attestations", len(attestations), + ) + _, sendError = am.keeper.SendAttestationPacketV2( + ctx, + v2ClientID, // source client ID + v2ClientID, // destination client ID is the same as source client ID for testing + attestations, + ) + } - // Emit event - sdkCtx.EventManager().EmitEvent( - sdk.NewEvent( - "attestation_sent", - sdk.NewAttribute("start_height", fmt.Sprintf("%d", startHeight)), - sdk.NewAttribute("end_height", fmt.Sprintf("%d", endHeight)), - sdk.NewAttribute("count", fmt.Sprintf("%d", len(attestations))), - ), + if sendError != nil { + am.keeper.Logger(ctx).Error("failed to send attestation packet", + "start_height", startHeight, + "end_height", endHeight, + "ibc_version", am.keeper.GetIBCVersion(), + "error", sendError, ) + return nil // Don't fail the block } + + // Update last sent height + if err := am.keeper.SetLastSentHeight(ctx, endHeight); err != nil { + return err + } + + am.keeper.Logger(ctx).Info("last sent height updated", "last_sent_height", endHeight) + + // Emit event + sdkCtx.EventManager().EmitEvent( + sdk.NewEvent( + "attestation_sent", + sdk.NewAttribute("start_height", fmt.Sprintf("%d", startHeight)), + sdk.NewAttribute("end_height", fmt.Sprintf("%d", endHeight)), + ), + ) + return nil } // collectBlockAttestations collects block attestation data for the specified height range -// This retrieves pre-collected block data from the BlockDataCollector -func (am AppModule) collectBlockAttestations(ctx context.Context, startHeight, endHeight uint64) ([]*types.BlockAttestationData, error) { - // Use the block collector to get full block data - // The collector subscribes to block events and stores complete block data - // including headers, transactions, results, evidence, etc. - - if am.keeper.BlockCollector == nil { - return nil, fmt.Errorf("block data collector not initialized") - } - - attestations, err := am.keeper.BlockCollector.GetBlockDataRange(startHeight, endHeight) +// This fetches block headers via RPC to get height and apphash +func (am AppModule) collectBlockAttestations(ctx context.Context, startHeight, endHeight uint64) ([]types.BlockAttestationData, error) { + attestations, err := am.keeper.GetBlockDataRange(ctx, startHeight, endHeight) if err != nil { - return nil, fmt.Errorf("failed to collect block data range %d-%d: %w", startHeight, endHeight, err) + return nil, fmt.Errorf("failed to fetch block data range %d-%d: %w", startHeight, endHeight, err) } - am.keeper.Logger(ctx).Debug("collected block attestations from collector", + am.keeper.Logger(ctx).Info("collected block attestations via RPC", "start_height", startHeight, "end_height", endHeight, - "count", len(attestations), ) return attestations, nil @@ -417,5 +408,3 @@ func (am AppModule) EndBlock(ctx context.Context) ([]abci.ValidatorUpdate, error } return []abci.ValidatorUpdate{}, nil } - -// No AutoCLIOptions for now