diff --git a/.vscode/launch.json b/.vscode/launch.json index 0415fe4..cee1dbe 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -28,6 +28,8 @@ "PGPORT": "5432", "PGUSER": "postgres", "PGPASSWORD": "postgres", + "STACKS_NODE_RPC_HOST": "127.0.0.1", + "STACKS_NODE_RPC_PORT": "20443", }, "killBehavior": "polite", "preLaunchTask": "npm: testenv:run", diff --git a/migrations/1729684505757_chain_tip_pox_info.ts b/migrations/1729684505757_chain_tip_pox_info.ts new file mode 100644 index 0000000..414c4d5 --- /dev/null +++ b/migrations/1729684505757_chain_tip_pox_info.ts @@ -0,0 +1,17 @@ +/* eslint-disable @typescript-eslint/naming-convention */ +import { MigrationBuilder, ColumnDefinitions } from 'node-pg-migrate'; + +export const shorthands: ColumnDefinitions | undefined = undefined; + +export function up(pgm: MigrationBuilder): void { + pgm.addColumns('chain_tip', { + first_burnchain_block_height: { + type: 'integer', + default: null, + }, + reward_cycle_length: { + type: 'integer', + default: null, + }, + }); +} diff --git a/src/env.ts b/src/env.ts index 82f6501..c448659 100644 --- a/src/env.ts +++ b/src/env.ts @@ -30,6 +30,9 @@ const schema = Type.Object({ /** Port in which to serve the profiler */ PROFILER_PORT: Type.Number({ default: 9119 }), + STACKS_NODE_RPC_HOST: Type.String(), + STACKS_NODE_RPC_PORT: Type.Number({ minimum: 0, maximum: 65535 }), + /** Hostname of the chainhook node we'll use to register predicates */ CHAINHOOK_NODE_RPC_HOST: Type.String({ default: '127.0.0.1' }), /** Control port of the chainhook node */ diff --git a/src/helpers.ts b/src/helpers.ts index 7a18818..46116d0 100644 --- a/src/helpers.ts +++ b/src/helpers.ts @@ -20,3 +20,16 @@ export function unixTimeSecondsToISO(timestampSeconds: number): string { export function normalizeHexString(hexString: string): string { return hexString.startsWith('0x') ? hexString : '0x' + hexString; } + +export function sleep(ms: number, signal?: AbortSignal): Promise { + return new Promise((resolve, reject) => { + if (signal?.aborted) { + return reject(signal.reason); + } + const timeout = setTimeout(() => resolve(), ms); + signal?.addEventListener('abort', () => { + clearTimeout(timeout); + reject(signal.reason); + }); + }); +} diff --git a/src/index.ts b/src/index.ts index 1f2e70e..385c305 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,6 +4,7 @@ import { ENV } from './env'; import { isProdEnv } from './helpers'; import { buildProfilerServer, logger, registerShutdownConfig } from '@hirosystems/api-toolkit'; import { closeChainhookServer, startChainhookServer } from './chainhook/server'; +import { startPoxInfoUpdater } from './stacks-core-rpc/pox-info-updater'; /** * Initializes background services. Only for `default` and `writeonly` run modes. @@ -12,6 +13,15 @@ import { closeChainhookServer, startChainhookServer } from './chainhook/server'; async function initBackgroundServices(db: PgStore) { logger.info('Initializing background services...'); + const poxInfoUpdater = startPoxInfoUpdater({ db }); + registerShutdownConfig({ + name: 'stacks-core RPC PoX fetcher', + forceKillable: false, + handler: () => { + poxInfoUpdater.close(); + }, + }); + const server = await startChainhookServer({ db }); registerShutdownConfig({ name: 'Chainhook Server', diff --git a/src/pg/pg-store.ts b/src/pg/pg-store.ts index 799865b..25c5355 100644 --- a/src/pg/pg-store.ts +++ b/src/pg/pg-store.ts @@ -46,6 +46,34 @@ export class PgStore extends BasePgStore { return result[0].block_height; } + async getPoxInfo() { + const result = await this.sql< + { first_burnchain_block_height: number | null; reward_cycle_length: number | null }[] + >` + SELECT first_burnchain_block_height, reward_cycle_length FROM chain_tip + `; + return result[0]; + } + + async updatePoxInfo(poxInfo: { + first_burnchain_block_height: number; + reward_cycle_length: number; + }): Promise<{ rowUpdated: boolean }> { + // Update the first_burnchain_block_height and reward_cycle_length columns in the chain_tip table only if + // they differ from the existing values. Return true if the row was updated, false otherwise. + // Should only update the row if the values are null (i.e. the first time the values are set). + const updateResult = await this.sql` + UPDATE chain_tip + SET + first_burnchain_block_height = ${poxInfo.first_burnchain_block_height}, + reward_cycle_length = ${poxInfo.reward_cycle_length} + WHERE + first_burnchain_block_height IS DISTINCT FROM ${poxInfo.first_burnchain_block_height} + OR reward_cycle_length IS DISTINCT FROM ${poxInfo.reward_cycle_length} + `; + return { rowUpdated: updateResult.count > 0 }; + } + async getRecentBlocks(limit: number, offset: number) { // The `blocks` table (and its associated block_signer_signatures table) is the source of truth that is // never missing blocks and does not contain duplicate rows per block. diff --git a/src/stacks-core-rpc/pox-info-updater.ts b/src/stacks-core-rpc/pox-info-updater.ts new file mode 100644 index 0000000..0945430 --- /dev/null +++ b/src/stacks-core-rpc/pox-info-updater.ts @@ -0,0 +1,79 @@ +import { logger } from '@hirosystems/api-toolkit'; +import { PgStore } from '../pg/pg-store'; +import { sleep } from '../helpers'; +import { ENV } from '../env'; + +// How long to wait between PoX rpc fetches when the database already has PoX info +const POX_INFO_UPDATE_INTERVAL_MS = 30000; + +// How long to wait between retries when fetching PoX info fails and the database is missing PoX info +const POX_INFO_UPDATE_CRITICAL_RETRY_INTERVAL_MS = 3000; + +export function startPoxInfoUpdater(args: { db: PgStore }) { + const abortController = new AbortController(); + void runPoxInfoBackgroundJob(args.db, abortController.signal); + return { + close: () => abortController.abort(), + }; +} + +async function runPoxInfoBackgroundJob(db: PgStore, abortSignal: AbortSignal) { + let isDbMissingPoxInfo: boolean | null = null; + while (!abortSignal.aborted) { + try { + // Check if isDbMissingPoxInfo is null, which means we haven't checked the database yet + if (isDbMissingPoxInfo === null) { + const dbPoxInfo = await db.getPoxInfo(); + isDbMissingPoxInfo = dbPoxInfo.reward_cycle_length === null; + } + + if (isDbMissingPoxInfo) { + logger.info( + `Database is missing PoX info, fetching from stacks-core RPC ${getStacksNodeUrl()}` + ); + } + const rpcPoxInfo = await fetchRpcPoxInfo(abortSignal); + if (isDbMissingPoxInfo) { + logger.info( + `Fetched PoX info from stacks-core RPC: first_burnchain_block_height=${rpcPoxInfo.first_burnchain_block_height}, reward_cycle_length=${rpcPoxInfo.reward_cycle_length}, storing in database` + ); + } + await db.updatePoxInfo(rpcPoxInfo); + isDbMissingPoxInfo = false; + await sleep(POX_INFO_UPDATE_INTERVAL_MS, abortSignal); + } catch (error) { + if (abortSignal.aborted) { + return; + } + if (isDbMissingPoxInfo) { + logger.error( + error, + `Failed to fetch PoX info from stacks-core RPC, retrying in ${POX_INFO_UPDATE_CRITICAL_RETRY_INTERVAL_MS}ms ...` + ); + await sleep(POX_INFO_UPDATE_CRITICAL_RETRY_INTERVAL_MS, abortSignal); + } else { + logger.warn( + error, + `Failed to update PoX info (database already has PoX info, this is not critical)` + ); + await sleep(POX_INFO_UPDATE_INTERVAL_MS, abortSignal); + } + } + } +} + +interface PoxInfo { + first_burnchain_block_height: number; + reward_cycle_length: number; +} + +function getStacksNodeUrl(): string { + return `http://${ENV.STACKS_NODE_RPC_HOST}:${ENV.STACKS_NODE_RPC_PORT}`; +} + +async function fetchRpcPoxInfo(abortSignal: AbortSignal) { + const url = `${getStacksNodeUrl()}/v2/pox`; + const res = await fetch(url, { signal: abortSignal }); + const json = await res.json(); + return json as PoxInfo; +}