diff --git a/packages/daemon/__tests__/actors/MonitoringActor.test.ts b/packages/daemon/__tests__/actors/MonitoringActor.test.ts index 07ce898d..71a35a2d 100644 --- a/packages/daemon/__tests__/actors/MonitoringActor.test.ts +++ b/packages/daemon/__tests__/actors/MonitoringActor.test.ts @@ -10,6 +10,7 @@ import logger from '../../src/logger'; import { EventTypes } from '../../src/types/event'; import getConfig from '../../src/config'; import { addAlert, Severity } from '@wallet-service/common'; +import * as db from '../../src/db'; const MONITORING_IDLE_TIMEOUT_EVENT = { type: EventTypes.MONITORING_IDLE_TIMEOUT }; @@ -24,6 +25,10 @@ jest.mock('@wallet-service/common', () => ({ addAlert: jest.fn().mockResolvedValue(undefined), })); +jest.mock('../../src/db', () => ({ + getDbConnection: jest.fn(), +})); + const mockAddAlert = addAlert as jest.Mock; describe('MonitoringActor', () => { @@ -49,6 +54,10 @@ describe('MonitoringActor', () => { config['STUCK_PROCESSING_TIMEOUT_MS'] = 5 * 60 * 1000; // 5 min config['RECONNECTION_STORM_THRESHOLD'] = 3; // low threshold for tests config['RECONNECTION_STORM_WINDOW_MS'] = 5 * 60 * 1000; // 5 min + config['BALANCE_VALIDATION_ENABLED'] = false; + config['BALANCE_VALIDATION_INTERVAL_MS'] = 5000; + config['BALANCE_VALIDATION_WINDOW_MS'] = 900000; + config['BALANCE_VALIDATION_SAMPLE_LIMIT'] = 100; mockCallback = jest.fn(); mockReceive = jest.fn().mockImplementation((cb: any) => { @@ -330,4 +339,190 @@ describe('MonitoringActor', () => { ); expect(setInterval).not.toHaveBeenCalled(); }); + + // ── Balance validation ──────────────────────────────────────────────────── + + const flushPromises = () => new Promise(jest.requireActual('timers').setImmediate); + + describe('balance validation', () => { + let mockMysql: any; + + beforeEach(() => { + mockMysql = { + release: jest.fn(), + query: jest.fn().mockResolvedValue([[], []]), + }; + (db.getDbConnection as jest.Mock).mockResolvedValue(mockMysql); + }); + + it('should not start balance validation when disabled', () => { + config['BALANCE_VALIDATION_ENABLED'] = false; + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + // Only the idle-check interval should fire; no validation interval. + expect(setInterval).toHaveBeenCalledTimes(1); + }); + + it('should start the validation interval on CONNECTED when enabled', () => { + config['BALANCE_VALIDATION_ENABLED'] = true; + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + // Idle check + balance validation = 2 intervals. + expect(setInterval).toHaveBeenCalledTimes(2); + expect(setInterval).toHaveBeenCalledWith( + expect.any(Function), + config['BALANCE_VALIDATION_INTERVAL_MS'], + ); + }); + + it('should clear the validation interval on DISCONNECTED', () => { + config['BALANCE_VALIDATION_ENABLED'] = true; + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + sendEvent('DISCONNECTED'); + + // Idle check + balance validation = 2 cleared intervals. + expect(clearInterval).toHaveBeenCalledTimes(2); + }); + + it('should alert when the validation query returns mismatch rows', async () => { + config['BALANCE_VALIDATION_ENABLED'] = true; + + const mismatchRow = { + address: 'addr1', + tokenId: 'token1', + balanceSum: '100', + historySum: '200', + }; + mockMysql.query.mockResolvedValueOnce([[mismatchRow], []]); + + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + jest.advanceTimersByTime(config['BALANCE_VALIDATION_INTERVAL_MS']); + await flushPromises(); + + expect(mockMysql.query).toHaveBeenCalledWith(expect.stringContaining('LEFT JOIN')); + // Scope-by-updated_at is load-bearing for perf (see follow-up #404); + // pin it so a future refactor doesn't silently drop the filter. + expect(mockMysql.query).toHaveBeenCalledWith(expect.stringContaining('ab.updated_at > NOW() - INTERVAL')); + expect(mockAddAlert).toHaveBeenCalledWith( + 'Balance validation found mismatches', + expect.stringContaining('1 balance mismatch'), + Severity.MAJOR, + expect.objectContaining({ + truncated: false, + samples: [mismatchRow], + }), + expect.anything(), + ); + expect(mockMysql.release).toHaveBeenCalled(); + }); + + it('should log info when no mismatches found', async () => { + config['BALANCE_VALIDATION_ENABLED'] = true; + const mockLoggerInfo = jest.spyOn(logger, 'info'); + + mockMysql.query.mockResolvedValueOnce([[], []]); + + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + jest.advanceTimersByTime(config['BALANCE_VALIDATION_INTERVAL_MS']); + await flushPromises(); + + expect(mockAddAlert).not.toHaveBeenCalled(); + expect(mockLoggerInfo).toHaveBeenCalledWith( + expect.stringContaining('no mismatches found'), + ); + }); + + it('should mark the alert as truncated when the row count hits the LIMIT', async () => { + config['BALANCE_VALIDATION_ENABLED'] = true; + + // The actor's SAMPLE_LIMIT is 100; if exactly that many come back we + // assume more exist and surface "100+" + truncated:true. + const rows = Array.from({ length: 100 }, (_, i) => ({ + address: `addr${i}`, tokenId: 'tok', balanceSum: '1', historySum: '0', + })); + mockMysql.query.mockResolvedValueOnce([rows, []]); + + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + jest.advanceTimersByTime(config['BALANCE_VALIDATION_INTERVAL_MS']); + await flushPromises(); + + expect(mockAddAlert).toHaveBeenCalledWith( + 'Balance validation found mismatches', + expect.stringContaining('100+'), + Severity.MAJOR, + expect.objectContaining({ truncated: true }), + expect.anything(), + ); + }); + + it('should handle DB errors without crashing', async () => { + config['BALANCE_VALIDATION_ENABLED'] = true; + const mockLoggerError = jest.spyOn(logger, 'error'); + + (db.getDbConnection as jest.Mock).mockRejectedValueOnce(new Error('DB connection failed')); + + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + jest.advanceTimersByTime(config['BALANCE_VALIDATION_INTERVAL_MS']); + await flushPromises(); + + expect(mockLoggerError).toHaveBeenCalledWith( + expect.stringContaining('Balance validation error'), + ); + }); + + it('should refuse to schedule validation when interval is NaN', () => { + config['BALANCE_VALIDATION_ENABLED'] = true; + config['BALANCE_VALIDATION_INTERVAL_MS'] = NaN; + const mockLoggerError = jest.spyOn(logger, 'error'); + + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + // Only the idle-check interval should fire; the validation interval + // must NOT be scheduled because the config is invalid. + expect(setInterval).toHaveBeenCalledTimes(1); + expect(mockLoggerError).toHaveBeenCalledWith( + expect.stringContaining('BALANCE_VALIDATION_INTERVAL_MS=NaN is invalid'), + ); + }); + + it('should refuse to schedule validation when interval is below the minimum', () => { + config['BALANCE_VALIDATION_ENABLED'] = true; + config['BALANCE_VALIDATION_INTERVAL_MS'] = 10; // below the 1000ms floor + const mockLoggerError = jest.spyOn(logger, 'error'); + + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + expect(setInterval).toHaveBeenCalledTimes(1); + expect(mockLoggerError).toHaveBeenCalledWith( + expect.stringContaining('is invalid'), + ); + }); + + it('should refuse to schedule validation when sample limit is invalid', () => { + config['BALANCE_VALIDATION_ENABLED'] = true; + config['BALANCE_VALIDATION_SAMPLE_LIMIT'] = 0; // 0 would silently skip every row + const mockLoggerError = jest.spyOn(logger, 'error'); + + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + expect(setInterval).toHaveBeenCalledTimes(1); + expect(mockLoggerError).toHaveBeenCalledWith( + expect.stringContaining('BALANCE_VALIDATION_SAMPLE_LIMIT=0 is invalid'), + ); + }); + }); }); diff --git a/packages/daemon/src/actors/MonitoringActor.ts b/packages/daemon/src/actors/MonitoringActor.ts index 89e36012..ec1df685 100644 --- a/packages/daemon/src/actors/MonitoringActor.ts +++ b/packages/daemon/src/actors/MonitoringActor.ts @@ -9,6 +9,7 @@ import logger from '../logger'; import getConfig from '../config'; import { addAlert, Severity } from '@wallet-service/common'; import { Event, EventTypes } from '../types'; +import { getDbConnection } from '../db'; /** * MonitoringActor @@ -33,6 +34,12 @@ import { Event, EventTypes } from '../types'; * reconnects more than RECONNECTION_STORM_THRESHOLD times within * RECONNECTION_STORM_WINDOW_MS. Duplicate alerts are suppressed for * STORM_ALERT_COOLDOWN_MS (1 min) to avoid spamming the alerting system. + * + * 4. Scheduled balance validation — when BALANCE_VALIDATION_ENABLED is true, + * periodically runs a single SQL query that joins address_balance against + * SUM(address_tx_history.balance) and reports rows where the two disagree. + * Bounded by LIMIT, so a catastrophic mismatch produces a sample, not a + * flood. Errors never crash the daemon. */ export default (callback: any, receive: any, config = getConfig()) => { logger.info('Starting monitoring actor'); @@ -150,6 +157,178 @@ export default (callback: any, receive: any, config = getConfig()) => { } }; + // ── Scheduled balance validation ────────────────────────────────────────────── + // + // Each tick runs a single SQL query that compares address_balance against the + // sum of non-voided address_tx_history for rows whose `updated_at` falls + // within the configured lookback window. The DB does the pairing (LEFT JOIN), + // the math (native BIGINT, no precision loss), and the consistency snapshot + // (single statement = one read view). + // + // Why `updated_at > NOW() - INTERVAL :window SECOND`: + // A full-table pass was benchmarked on production data (≈1.5M + // address_balance rows, ≈8.3M address_tx_history rows) and took tens of + // seconds per tick. The `updated_at` index scopes the outer set to recently + // changed rows, which is what a scheduled monitor actually needs — drift + // introduced by a bad write will be caught within one tick of the offending + // change. updated_at is `ON UPDATE CURRENT_TIMESTAMP` in the schema, so any + // write to the row bumps it; correctness of the scope is structural. + // + // Trade-off — hot addresses are still expensive: + // Scoping limits WHICH addresses we check per tick; it does NOT limit HOW + // MUCH history we sum per address. address_tx_history has no covering + // index that includes `balance`, so MySQL fetches every non-voided history + // row for each recently-changed address via a PK scan on `address`. For + // whale addresses with hundreds of thousands of history rows this is a + // multi-second per-address cost even when only a handful of addresses + // updated in the window. + // + // Because of this, `BALANCE_VALIDATION_ENABLED` is intended to stay + // `false` in production. The actor + query are here for ad-hoc / on-demand + // runs (local, testnet, or triggered manually), not for a scheduled + // in-production job. See #404 for the covering-index perf improvement that + // makes ad-hoc runs faster; it is not a prerequisite for "enabling" this + // feature, because enabling isn't planned. + // + // Long tail — slow drift on cold rows (balance changed long ago and the + // row never touched since) goes undetected by this validator. A separate + // full-table sweep is the right mechanism for that; out of scope. + // + // The `transactions > 0` filter is intentionally omitted: a row with + // `transactions = 0` AND non-zero balance is itself a bug (the void cleanup + // should have deleted it), and we want the validator to surface that. + // Genuinely-empty rows match `historySum=0` via COALESCE and HAVING drops + // them. + // + // If a run exceeds the interval the in-flight guard skips the next tick + // rather than overlapping. DISCONNECTED clears the timer; an in-flight + // SELECT runs to completion and releases its connection — harmless. + const sampleLimit = config.BALANCE_VALIDATION_SAMPLE_LIMIT; + const windowSeconds = Math.floor(config.BALANCE_VALIDATION_WINDOW_MS / 1000); + // Wrap the SIGNED BIGINT in CAST(... AS CHAR) so values transport to the + // client as strings. mysql2 returns BIGINT as JS Number by default, which + // loses precision above 2^53. HTR max supply is well below that, but we + // log this payload in alerts that humans and tools read — keeping the + // decimal string form avoids any silent rounding if the validator ever + // runs against a non-HTR token. HAVING compares the same CHAR expressions + // via `!=`, which is equivalent to numeric equality for canonical decimal + // strings produced by CAST(SIGNED AS CHAR). + const BALANCE_VALIDATION_SQL = ` + SELECT + ab.address, + ab.token_id AS tokenId, + CAST(CAST(ab.unlocked_balance + ab.locked_balance AS SIGNED) AS CHAR) AS balanceSum, + CAST(CAST(COALESCE(SUM(h.balance), 0) AS SIGNED) AS CHAR) AS historySum + FROM \`address_balance\` ab + LEFT JOIN \`address_tx_history\` h + ON h.address = ab.address + AND h.token_id = ab.token_id + AND h.voided = FALSE + WHERE ab.updated_at > NOW() - INTERVAL ${windowSeconds} SECOND + GROUP BY ab.address, ab.token_id + HAVING balanceSum != historySum + LIMIT ${sampleLimit} + `; + + let balanceValidationTimer: ReturnType | null = null; + let isValidating = false; + + const runBalanceValidation = async () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let mysql: any; + try { + mysql = await getDbConnection(); + const [rows] = await mysql.query(BALANCE_VALIDATION_SQL); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const samples = rows as any[]; + + if (samples.length > 0) { + const truncated = samples.length === sampleLimit; + const countLabel = truncated ? `${sampleLimit}+` : String(samples.length); + logger.error(`[monitoring] Balance validation found ${countLabel} mismatch(es)`, { samples }); + await addAlert( + 'Balance validation found mismatches', + `Found ${countLabel} balance mismatch(es)${truncated ? ' (sample capped)' : ''}`, + Severity.MAJOR, + { samples, truncated }, + logger, + ); + } else { + logger.info('[monitoring] Balance validation complete, no mismatches found'); + } + } catch (err) { + const detail = err instanceof Error ? (err.stack ?? err.message) : String(err); + logger.error(`[monitoring] Balance validation error: ${detail}`); + } finally { + if (mysql) { + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (mysql as any).release(); + } catch (releaseErr) { + const detail = releaseErr instanceof Error + ? (releaseErr.stack ?? releaseErr.message) + : String(releaseErr); + logger.warn(`[monitoring] Balance validation: connection release failed: ${detail}`); + } + } + } + }; + + // Minimum tick interval. Below this, we'd hammer the DB faster than a + // validation run can reasonably complete and risk cascading overruns. + const MIN_BALANCE_VALIDATION_INTERVAL_MS = 1000; + + const startBalanceValidation = () => { + if (!config.BALANCE_VALIDATION_ENABLED) return; + + const intervalMs = config.BALANCE_VALIDATION_INTERVAL_MS; + // Guard against misconfig: parseInt('abc') yields NaN, and setInterval(fn, NaN) + // behaves like delay=0 — a tight loop hammering the DB. Fail loud and stay + // disabled rather than silently substitute a default; operators should see + // this and fix the env var. + if (!Number.isFinite(intervalMs) || intervalMs < MIN_BALANCE_VALIDATION_INTERVAL_MS) { + logger.error( + `[monitoring] BALANCE_VALIDATION_INTERVAL_MS=${intervalMs} is invalid ` + + `(must be a finite number >= ${MIN_BALANCE_VALIDATION_INTERVAL_MS}). ` + + 'Scheduled balance validation will NOT run this session.', + ); + return; + } + + // Same guard for SAMPLE_LIMIT: it's interpolated directly into + // `LIMIT ${sampleLimit}`. NaN would produce a SQL syntax error; 0 or + // negative would return no rows and silently claim "no mismatches" + // without actually checking. Fail loud and stay disabled. + if (!Number.isFinite(sampleLimit) || sampleLimit < 1) { + logger.error( + `[monitoring] BALANCE_VALIDATION_SAMPLE_LIMIT=${sampleLimit} is invalid ` + + '(must be a finite number >= 1). Scheduled balance validation will NOT run this session.', + ); + return; + } + + stopBalanceValidation(); + + logger.info('[monitoring] Starting scheduled balance validation'); + balanceValidationTimer = setInterval(async () => { + if (isValidating) return; // prior run still going — skip this tick + isValidating = true; + try { + await runBalanceValidation(); + } finally { + isValidating = false; + } + }, intervalMs); + }; + + const stopBalanceValidation = () => { + if (balanceValidationTimer) { + clearInterval(balanceValidationTimer); + balanceValidationTimer = null; + } + }; + + // ── Event handling ──────────────────────────────────────────────────────────── receive((event: Event) => { if (event.type !== EventTypes.MONITORING_EVENT) { @@ -162,6 +341,7 @@ export default (callback: any, receive: any, config = getConfig()) => { logger.info('[monitoring] WebSocket connected — starting idle-event timer'); isConnected = true; startIdleCheck(); + startBalanceValidation(); break; case 'DISCONNECTED': @@ -169,6 +349,7 @@ export default (callback: any, receive: any, config = getConfig()) => { isConnected = false; stopIdleCheck(); clearStuckTimer(); + stopBalanceValidation(); break; case 'EVENT_RECEIVED': @@ -194,5 +375,6 @@ export default (callback: any, receive: any, config = getConfig()) => { logger.info('Stopping monitoring actor'); stopIdleCheck(); clearStuckTimer(); + stopBalanceValidation(); }; }; diff --git a/packages/daemon/src/config.ts b/packages/daemon/src/config.ts index 7321a1c1..08ecd6e3 100644 --- a/packages/daemon/src/config.ts +++ b/packages/daemon/src/config.ts @@ -100,6 +100,17 @@ export const RECONNECTION_STORM_WINDOW_MS = parseInt(process.env.RECONNECTION_ST // Other export const USE_SSL = process.env.USE_SSL === 'true'; +// Scheduled balance validation configuration +export const BALANCE_VALIDATION_ENABLED = process.env.BALANCE_VALIDATION_ENABLED === 'true'; +export const BALANCE_VALIDATION_INTERVAL_MS = parseInt(process.env.BALANCE_VALIDATION_INTERVAL_MS ?? '600000', 10); // 10 minutes +// Lookback window for recently-changed address_balance rows. Should be >= the +// tick interval so no row slips between ticks if one runs late. Default is +// interval + 50% slack. +export const BALANCE_VALIDATION_WINDOW_MS = parseInt(process.env.BALANCE_VALIDATION_WINDOW_MS ?? '900000', 10); // 15 minutes +// Max mismatch rows surfaced per tick. LIMIT bounds the alert payload size; +// does NOT reduce query execution cost. +export const BALANCE_VALIDATION_SAMPLE_LIMIT = parseInt(process.env.BALANCE_VALIDATION_SAMPLE_LIMIT ?? '100', 10); + // When false, skips the address balance validation after voiding transactions export const VALIDATE_ADDRESS_BALANCES = process.env.VALIDATE_ADDRESS_BALANCES !== 'false'; @@ -144,6 +155,10 @@ export default () => ({ STUCK_PROCESSING_TIMEOUT_MS, RECONNECTION_STORM_THRESHOLD, RECONNECTION_STORM_WINDOW_MS, + BALANCE_VALIDATION_ENABLED, + BALANCE_VALIDATION_INTERVAL_MS, + BALANCE_VALIDATION_WINDOW_MS, + BALANCE_VALIDATION_SAMPLE_LIMIT, REORG_SIZE_INFO, REORG_SIZE_MINOR, REORG_SIZE_MAJOR,