diff --git a/.gitignore b/.gitignore index ef1f9d32..2505335f 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,6 @@ packages/wallet-service/.warmup .yarn/ .env.* *.tsbuildinfo + +# Local benchmark output (produced by packages/daemon/src/scripts/bench-void-tx.ts) +bench-results-*.json diff --git a/db/migrations/20251201104138-add-tx-output-utxo-lookup-index.js b/db/migrations/20251201104138-add-tx-output-utxo-lookup-index.js index 6bfb5a3e..f66c503f 100644 --- a/db/migrations/20251201104138-add-tx-output-utxo-lookup-index.js +++ b/db/migrations/20251201104138-add-tx-output-utxo-lookup-index.js @@ -12,7 +12,7 @@ module.exports = { `); // Only create if it doesn't exist - if (indexes[0].count === 0) { + if (Number(indexes[0].count) === 0) { await queryInterface.sequelize.query(` CREATE INDEX idx_tx_output_utxo_lookup ON tx_output (address, token_id, spent_by, voided, locked, authorities); @@ -30,7 +30,7 @@ module.exports = { AND index_name = 'idx_tx_output_utxo_lookup'; `); - if (indexes[0].count > 0) { + if (Number(indexes[0].count) > 0) { await queryInterface.sequelize.query(` DROP INDEX idx_tx_output_utxo_lookup ON tx_output; `); diff --git a/db/migrations/20260108100000-add-tx-output-locked-heightlock-index.js b/db/migrations/20260108100000-add-tx-output-locked-heightlock-index.js index 08ab85a4..29f0b047 100644 --- a/db/migrations/20260108100000-add-tx-output-locked-heightlock-index.js +++ b/db/migrations/20260108100000-add-tx-output-locked-heightlock-index.js @@ -12,7 +12,7 @@ module.exports = { `); // Only create if it doesn't exist - if (indexes[0].count === 0) { + if (Number(indexes[0].count) === 0) { await queryInterface.sequelize.query(` CREATE INDEX idx_tx_output_locked_heightlock ON tx_output (locked, heightlock); @@ -30,7 +30,7 @@ module.exports = { AND index_name = 'idx_tx_output_locked_heightlock'; `); - if (indexes[0].count > 0) { + if (Number(indexes[0].count) > 0) { await queryInterface.sequelize.query(` DROP INDEX idx_tx_output_locked_heightlock ON tx_output; `); diff --git a/db/migrations/20260108100001-add-tx-output-locked-timelock-index.js b/db/migrations/20260108100001-add-tx-output-locked-timelock-index.js index a5477656..8580da80 100644 --- a/db/migrations/20260108100001-add-tx-output-locked-timelock-index.js +++ b/db/migrations/20260108100001-add-tx-output-locked-timelock-index.js @@ -12,7 +12,7 @@ module.exports = { `); // Only create if it doesn't exist - if (indexes[0].count === 0) { + if (Number(indexes[0].count) === 0) { await queryInterface.sequelize.query(` CREATE INDEX idx_tx_output_locked_timelock ON tx_output (locked, timelock); @@ -30,7 +30,7 @@ module.exports = { AND index_name = 'idx_tx_output_locked_timelock'; `); - if (indexes[0].count > 0) { + if (Number(indexes[0].count) > 0) { await queryInterface.sequelize.query(` DROP INDEX idx_tx_output_locked_timelock ON tx_output; `); diff --git a/db/migrations/20260108100002-add-address-tx-history-addr-voided-token-index.js b/db/migrations/20260108100002-add-address-tx-history-addr-voided-token-index.js index 50e99de9..943a0cf9 100644 --- a/db/migrations/20260108100002-add-address-tx-history-addr-voided-token-index.js +++ b/db/migrations/20260108100002-add-address-tx-history-addr-voided-token-index.js @@ -12,7 +12,7 @@ module.exports = { `); // Only create if it doesn't exist - if (indexes[0].count === 0) { + if (Number(indexes[0].count) === 0) { await queryInterface.sequelize.query(` CREATE INDEX idx_address_tx_history_addr_voided_token ON address_tx_history (address, voided, token_id); @@ -30,7 +30,7 @@ module.exports = { AND index_name = 'idx_address_tx_history_addr_voided_token'; `); - if (indexes[0].count > 0) { + if (Number(indexes[0].count) > 0) { await queryInterface.sequelize.query(` DROP INDEX idx_address_tx_history_addr_voided_token ON address_tx_history; `); diff --git a/package.json b/package.json index 6764b5af..7eafa6a3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "hathor-wallet-service", - "version": "1.13.0", + "version": "1.14.0", "workspaces": [ "packages/common", "packages/daemon", @@ -18,17 +18,17 @@ "private": true, "devDependencies": { "@types/jest": "29.5.13", - "@typescript-eslint/eslint-plugin": "^7.4.0", - "@typescript-eslint/parser": "^7.4.0", - "dotenv": "^16.4.5", - "eslint": "^8.57.0", - "eslint-config-airbnb-base": "^15.0.0", - "eslint-plugin-import": "^2.29.1", - "eslint-plugin-jest": "^27.9.0", - "mysql2": "^3.9.3", - "sequelize": "^6.37.2", - "sequelize-cli": "^6.6.2", - "typescript": "^5.8.2" + "@typescript-eslint/eslint-plugin": "8.58.2", + "@typescript-eslint/parser": "8.58.2", + "dotenv": "16.4.5", + "eslint": "9.39.4", + "eslint-config-airbnb-base": "15.0.0", + "eslint-plugin-import": "2.29.1", + "eslint-plugin-jest": "28.14.0", + "mysql2": "3.22.0", + "sequelize": "6.37.2", + "sequelize-cli": "6.6.2", + "typescript": "5.8.2" }, "packageManager": "yarn@4.7.0", "dependencies": { @@ -37,11 +37,11 @@ "@aws-sdk/client-sqs": "3.540.0", "@hathor/wallet-lib": "2.12.0", "@wallet-service/common": "1.5.0", - "bip32": "^4.0.0", - "bitcoinjs-lib": "^6.1.5", - "bitcoinjs-message": "^2.2.0", - "jest": "^29.7.0", - "tiny-secp256k1": "^2.2.3", + "bip32": "4.0.0", + "bitcoinjs-lib": "6.1.5", + "bitcoinjs-message": "2.2.0", + "jest": "29.7.0", + "tiny-secp256k1": "2.2.3", "winston": "3.13.0" } } diff --git a/packages/daemon/__tests__/actors/MonitoringActor.test.ts b/packages/daemon/__tests__/actors/MonitoringActor.test.ts index 07ce898d..71a35a2d 100644 --- a/packages/daemon/__tests__/actors/MonitoringActor.test.ts +++ b/packages/daemon/__tests__/actors/MonitoringActor.test.ts @@ -10,6 +10,7 @@ import logger from '../../src/logger'; import { EventTypes } from '../../src/types/event'; import getConfig from '../../src/config'; import { addAlert, Severity } from '@wallet-service/common'; +import * as db from '../../src/db'; const MONITORING_IDLE_TIMEOUT_EVENT = { type: EventTypes.MONITORING_IDLE_TIMEOUT }; @@ -24,6 +25,10 @@ jest.mock('@wallet-service/common', () => ({ addAlert: jest.fn().mockResolvedValue(undefined), })); +jest.mock('../../src/db', () => ({ + getDbConnection: jest.fn(), +})); + const mockAddAlert = addAlert as jest.Mock; describe('MonitoringActor', () => { @@ -49,6 +54,10 @@ describe('MonitoringActor', () => { config['STUCK_PROCESSING_TIMEOUT_MS'] = 5 * 60 * 1000; // 5 min config['RECONNECTION_STORM_THRESHOLD'] = 3; // low threshold for tests config['RECONNECTION_STORM_WINDOW_MS'] = 5 * 60 * 1000; // 5 min + config['BALANCE_VALIDATION_ENABLED'] = false; + config['BALANCE_VALIDATION_INTERVAL_MS'] = 5000; + config['BALANCE_VALIDATION_WINDOW_MS'] = 900000; + config['BALANCE_VALIDATION_SAMPLE_LIMIT'] = 100; mockCallback = jest.fn(); mockReceive = jest.fn().mockImplementation((cb: any) => { @@ -330,4 +339,190 @@ describe('MonitoringActor', () => { ); expect(setInterval).not.toHaveBeenCalled(); }); + + // ── Balance validation ──────────────────────────────────────────────────── + + const flushPromises = () => new Promise(jest.requireActual('timers').setImmediate); + + describe('balance validation', () => { + let mockMysql: any; + + beforeEach(() => { + mockMysql = { + release: jest.fn(), + query: jest.fn().mockResolvedValue([[], []]), + }; + (db.getDbConnection as jest.Mock).mockResolvedValue(mockMysql); + }); + + it('should not start balance validation when disabled', () => { + config['BALANCE_VALIDATION_ENABLED'] = false; + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + // Only the idle-check interval should fire; no validation interval. + expect(setInterval).toHaveBeenCalledTimes(1); + }); + + it('should start the validation interval on CONNECTED when enabled', () => { + config['BALANCE_VALIDATION_ENABLED'] = true; + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + // Idle check + balance validation = 2 intervals. + expect(setInterval).toHaveBeenCalledTimes(2); + expect(setInterval).toHaveBeenCalledWith( + expect.any(Function), + config['BALANCE_VALIDATION_INTERVAL_MS'], + ); + }); + + it('should clear the validation interval on DISCONNECTED', () => { + config['BALANCE_VALIDATION_ENABLED'] = true; + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + sendEvent('DISCONNECTED'); + + // Idle check + balance validation = 2 cleared intervals. + expect(clearInterval).toHaveBeenCalledTimes(2); + }); + + it('should alert when the validation query returns mismatch rows', async () => { + config['BALANCE_VALIDATION_ENABLED'] = true; + + const mismatchRow = { + address: 'addr1', + tokenId: 'token1', + balanceSum: '100', + historySum: '200', + }; + mockMysql.query.mockResolvedValueOnce([[mismatchRow], []]); + + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + jest.advanceTimersByTime(config['BALANCE_VALIDATION_INTERVAL_MS']); + await flushPromises(); + + expect(mockMysql.query).toHaveBeenCalledWith(expect.stringContaining('LEFT JOIN')); + // Scope-by-updated_at is load-bearing for perf (see follow-up #404); + // pin it so a future refactor doesn't silently drop the filter. + expect(mockMysql.query).toHaveBeenCalledWith(expect.stringContaining('ab.updated_at > NOW() - INTERVAL')); + expect(mockAddAlert).toHaveBeenCalledWith( + 'Balance validation found mismatches', + expect.stringContaining('1 balance mismatch'), + Severity.MAJOR, + expect.objectContaining({ + truncated: false, + samples: [mismatchRow], + }), + expect.anything(), + ); + expect(mockMysql.release).toHaveBeenCalled(); + }); + + it('should log info when no mismatches found', async () => { + config['BALANCE_VALIDATION_ENABLED'] = true; + const mockLoggerInfo = jest.spyOn(logger, 'info'); + + mockMysql.query.mockResolvedValueOnce([[], []]); + + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + jest.advanceTimersByTime(config['BALANCE_VALIDATION_INTERVAL_MS']); + await flushPromises(); + + expect(mockAddAlert).not.toHaveBeenCalled(); + expect(mockLoggerInfo).toHaveBeenCalledWith( + expect.stringContaining('no mismatches found'), + ); + }); + + it('should mark the alert as truncated when the row count hits the LIMIT', async () => { + config['BALANCE_VALIDATION_ENABLED'] = true; + + // The actor's SAMPLE_LIMIT is 100; if exactly that many come back we + // assume more exist and surface "100+" + truncated:true. + const rows = Array.from({ length: 100 }, (_, i) => ({ + address: `addr${i}`, tokenId: 'tok', balanceSum: '1', historySum: '0', + })); + mockMysql.query.mockResolvedValueOnce([rows, []]); + + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + jest.advanceTimersByTime(config['BALANCE_VALIDATION_INTERVAL_MS']); + await flushPromises(); + + expect(mockAddAlert).toHaveBeenCalledWith( + 'Balance validation found mismatches', + expect.stringContaining('100+'), + Severity.MAJOR, + expect.objectContaining({ truncated: true }), + expect.anything(), + ); + }); + + it('should handle DB errors without crashing', async () => { + config['BALANCE_VALIDATION_ENABLED'] = true; + const mockLoggerError = jest.spyOn(logger, 'error'); + + (db.getDbConnection as jest.Mock).mockRejectedValueOnce(new Error('DB connection failed')); + + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + jest.advanceTimersByTime(config['BALANCE_VALIDATION_INTERVAL_MS']); + await flushPromises(); + + expect(mockLoggerError).toHaveBeenCalledWith( + expect.stringContaining('Balance validation error'), + ); + }); + + it('should refuse to schedule validation when interval is NaN', () => { + config['BALANCE_VALIDATION_ENABLED'] = true; + config['BALANCE_VALIDATION_INTERVAL_MS'] = NaN; + const mockLoggerError = jest.spyOn(logger, 'error'); + + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + // Only the idle-check interval should fire; the validation interval + // must NOT be scheduled because the config is invalid. + expect(setInterval).toHaveBeenCalledTimes(1); + expect(mockLoggerError).toHaveBeenCalledWith( + expect.stringContaining('BALANCE_VALIDATION_INTERVAL_MS=NaN is invalid'), + ); + }); + + it('should refuse to schedule validation when interval is below the minimum', () => { + config['BALANCE_VALIDATION_ENABLED'] = true; + config['BALANCE_VALIDATION_INTERVAL_MS'] = 10; // below the 1000ms floor + const mockLoggerError = jest.spyOn(logger, 'error'); + + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + expect(setInterval).toHaveBeenCalledTimes(1); + expect(mockLoggerError).toHaveBeenCalledWith( + expect.stringContaining('is invalid'), + ); + }); + + it('should refuse to schedule validation when sample limit is invalid', () => { + config['BALANCE_VALIDATION_ENABLED'] = true; + config['BALANCE_VALIDATION_SAMPLE_LIMIT'] = 0; // 0 would silently skip every row + const mockLoggerError = jest.spyOn(logger, 'error'); + + MonitoringActor(mockCallback, mockReceive, config); + sendEvent('CONNECTED'); + + expect(setInterval).toHaveBeenCalledTimes(1); + expect(mockLoggerError).toHaveBeenCalledWith( + expect.stringContaining('BALANCE_VALIDATION_SAMPLE_LIMIT=0 is invalid'), + ); + }); + }); }); diff --git a/packages/daemon/__tests__/db/index.test.ts b/packages/daemon/__tests__/db/index.test.ts index 998f5159..1bc57cf2 100644 --- a/packages/daemon/__tests__/db/index.test.ts +++ b/packages/daemon/__tests__/db/index.test.ts @@ -105,7 +105,9 @@ describe('transaction methods', () => { await addOrUpdateTx(mysql, 'txId1', null, 1, 1, 65.4321); const tx = await getTransactionById(mysql, 'txId1'); - expect(tx?.weight).toStrictEqual(65.4321); + // `weight` is stored as FLOAT (32-bit); prepared statements return the + // true binary value rather than the text-protocol's rounded display. + expect(tx?.weight).toBeCloseTo(65.4321, 4); }); test('db which is not on our database should return null', async () => { diff --git a/packages/daemon/__tests__/integration/balances.test.ts b/packages/daemon/__tests__/integration/balances.test.ts index 488ffb8c..1daeb6b5 100644 --- a/packages/daemon/__tests__/integration/balances.test.ts +++ b/packages/daemon/__tests__/integration/balances.test.ts @@ -468,13 +468,14 @@ describe('voided token authority scenario', () => { ), ( 'cafecafe', - 'xpub6F81iNtH5HVknoJ65cK2XAGA5F3okdJK7WHwVAAPZnSir2sfwbhvB9ffNKQ4wLor75QxPe9p12tqt8xUZSG8i8AAPMpkFho7fbWkBJQ5s1x', + -- Distinct xpub from deafbeef so gap discovery doesn't collide on the global address PK. + 'xpub6GwCmKUTKBzEWNM9Zt77NTTsu6DNx6uzQP4TJm7yH5UpaEJ2fKioET7MrXNp584rNDyJWHqeNdEAZU5shWzSDQYs8bNtXAbVw1T1HKj4QjW', 'ready', 20, UNIX_TIMESTAMP(), UNIX_TIMESTAMP(), 0, - 'xpub6F81iNtH5HVknoJ65cK2XAGA5F3okdJK7WHwVAAPZnSir2sfwbhvB9ffNKQ4wLor75QxPe9p12tqt8xUZSG8i8AAPMpkFho7fbWkBJQ5s1x', + 'xpub6GwCmKUTKBzEWNM9Zt77NTTsu6DNx6uzQP4TJm7yH5UpaEJ2fKioET7MrXNp584rNDyJWHqeNdEAZU5shWzSDQYs8bNtXAbVw1T1HKj4QjW', -1 )`; diff --git a/packages/daemon/__tests__/services/services.test.ts b/packages/daemon/__tests__/services/services.test.ts index b7226851..c5f08e91 100644 --- a/packages/daemon/__tests__/services/services.test.ts +++ b/packages/daemon/__tests__/services/services.test.ts @@ -15,6 +15,7 @@ import { getLastSyncedEvent, updateLastSyncedEvent as dbUpdateLastSyncedEvent, getTxOutputsFromTx, + getTxOutputs, voidTransaction, voidAddressTransaction, getTransactionById, @@ -74,6 +75,7 @@ jest.mock('../../src/db', () => ({ updateLastSyncedEvent: jest.fn(), addOrUpdateTx: jest.fn(), getTxOutputsFromTx: jest.fn(), + getTxOutputs: jest.fn().mockResolvedValue([]), getTxOutput: jest.fn(), voidTransaction: jest.fn(), voidAddressTransaction: jest.fn(), @@ -194,7 +196,7 @@ describe('fetchInitialState', () => { it('should return the last event id', async () => { // Mock the return values of the dependencies - const mockDb = { destroy: jest.fn() }; + const mockDb = { release: jest.fn() }; // @ts-ignore getDbConnection.mockReturnValue(mockDb); @@ -211,12 +213,12 @@ describe('fetchInitialState', () => { lastEventId: 123, rewardMinBlocks: expect.any(Number), }); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); }); it('should return the fullnode\'s reward spend min blocks', async () => { // Mock the return values of the dependencies - const mockDb = { destroy: jest.fn() }; + const mockDb = { release: jest.fn() }; // @ts-ignore getDbConnection.mockReturnValue(mockDb); @@ -234,12 +236,12 @@ describe('fetchInitialState', () => { rewardMinBlocks: 300, }); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); }); it('should not fail if reward spend min blocks is 0', async () => { // Mock the return values of the dependencies - const mockDb = { destroy: jest.fn() }; + const mockDb = { release: jest.fn() }; // @ts-ignore axios.get.mockResolvedValue({ @@ -274,11 +276,11 @@ describe('fetchInitialState', () => { rewardMinBlocks: 0, }); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); }); it('should return undefined if no last event is found', async () => { - const mockDb = { destroy: jest.fn() }; + const mockDb = { release: jest.fn() }; // @ts-ignore getDbConnection.mockResolvedValue(mockDb); // @ts-ignore @@ -290,12 +292,12 @@ describe('fetchInitialState', () => { lastEventId: undefined, rewardMinBlocks: expect.any(Number), }); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); }); }); describe('updateLastSyncedEvent', () => { - const mockDb = { destroy: jest.fn() }; + const mockDb = { release: jest.fn() }; beforeEach(() => { jest.clearAllMocks(); @@ -309,7 +311,7 @@ describe('updateLastSyncedEvent', () => { await updateLastSyncedEvent({ event: { event: { id: 101 } } }); expect(dbUpdateLastSyncedEvent).toHaveBeenCalledWith(mockDb, 101); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); expect(logger.error).not.toHaveBeenCalled(); }); @@ -320,7 +322,7 @@ describe('updateLastSyncedEvent', () => { await expect(updateLastSyncedEvent({ event: { event: { id: 100 } } })).rejects.toThrow('Event lower than stored one.'); expect(dbUpdateLastSyncedEvent).not.toHaveBeenCalled(); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); expect(logger.error).toHaveBeenCalledWith('Tried to store an event lower than the one on the database', { lastEventId: 100, lastDbSyncedEvent: JSON.stringify({ last_event_id: 102 }), @@ -333,7 +335,7 @@ describe('handleTxFirstBlock', () => { beginTransaction: jest.fn(), commit: jest.fn(), rollback: jest.fn(), - destroy: jest.fn(), + release: jest.fn(), }; beforeEach(() => { @@ -366,7 +368,7 @@ describe('handleTxFirstBlock', () => { expect(dbUpdateLastSyncedEvent).toHaveBeenCalledWith(mockDb, 'idValue'); expect(logger.debug).toHaveBeenCalledWith('Confirmed tx hashValue in block blockHash123: idValue'); expect(mockDb.commit).toHaveBeenCalled(); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); }); it('should handle tx going back to mempool (first_block is null)', async () => { @@ -395,7 +397,7 @@ describe('handleTxFirstBlock', () => { expect(dbUpdateLastSyncedEvent).toHaveBeenCalledWith(mockDb, 'idValue'); expect(logger.debug).toHaveBeenCalledWith('Tx hashValue back to mempool (first_block=null): idValue'); expect(mockDb.commit).toHaveBeenCalled(); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); }); it('should handle tx going back to mempool (first_block is undefined)', async () => { @@ -424,7 +426,7 @@ describe('handleTxFirstBlock', () => { expect(dbUpdateLastSyncedEvent).toHaveBeenCalledWith(mockDb, 'idValue'); expect(logger.debug).toHaveBeenCalledWith('Tx hashValue back to mempool (first_block=null): idValue'); expect(mockDb.commit).toHaveBeenCalled(); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); }); it('should rollback on error and rethrow', async () => { @@ -451,7 +453,7 @@ describe('handleTxFirstBlock', () => { await expect(handleTxFirstBlock(context as any)).rejects.toThrow('Test error'); expect(logger.error).toHaveBeenCalledWith('E: ', expect.any(Error)); expect(mockDb.rollback).toHaveBeenCalled(); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); }); }); @@ -460,7 +462,7 @@ describe('handleVoidedTx', () => { beginTransaction: jest.fn(), commit: jest.fn(), rollback: jest.fn(), - destroy: jest.fn(), + release: jest.fn(), }; beforeEach(() => { @@ -475,7 +477,7 @@ describe('handleVoidedTx', () => { data: { hash: 'hashValue', outputs: 'outputsValue', - inputs: 'inputsValue', + inputs: [], tokens: 'tokensValue', }, id: 'idValue', @@ -496,7 +498,7 @@ describe('handleVoidedTx', () => { expect(logger.debug).toHaveBeenCalledWith('Voided tx hashValue'); expect(mockDb.beginTransaction).toHaveBeenCalled(); expect(mockDb.commit).toHaveBeenCalled(); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); }); it('should throw an error if transaction output is different from database output', async () => { @@ -506,7 +508,7 @@ describe('handleVoidedTx', () => { data: { hash: 'hashValue', outputs: 'outputsValue', - inputs: 'inputsValue', + inputs: [], tokens: 'tokensValue', }, id: 'idValue', @@ -530,7 +532,7 @@ describe('handleVoidedTx', () => { // Now, when handleVoidedTx is called, it should throw the error because of the mismatch await expect(handleVoidedTx(context as any)).rejects.toThrow('Transaction output different from database output!'); expect(mockDb.rollback).toHaveBeenCalled(); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); }); it('should rollback on error and rethrow', async () => { @@ -542,7 +544,7 @@ describe('handleVoidedTx', () => { data: { hash: 'hashValue', outputs: 'outputsValue', - inputs: 'inputsValue', + inputs: [], tokens: 'tokensValue', }, id: 'idValue', @@ -554,7 +556,7 @@ describe('handleVoidedTx', () => { expect(logger.debug).toHaveBeenCalledWith(expect.any(Error)); expect(mockDb.beginTransaction).toHaveBeenCalled(); expect(mockDb.rollback).toHaveBeenCalled(); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); }); }); @@ -563,7 +565,7 @@ describe('handleVertexAccepted', () => { beginTransaction: jest.fn(), commit: jest.fn(), rollback: jest.fn(), - destroy: jest.fn(), + release: jest.fn(), }; beforeEach(() => { @@ -599,7 +601,7 @@ describe('handleVertexAccepted', () => { version: 'versionValue', weight: 'weightValue', outputs: 'outputsValue', - inputs: 'inputsValue', + inputs: [], tokens: 'tokensValue', token_name: 'tokenName', token_symbol: 'tokenSymbol', @@ -636,7 +638,7 @@ describe('handleVertexAccepted', () => { expect(getTransactionById).toHaveBeenCalledWith(mockDb, 'hashValue'); expect(logger.debug).toHaveBeenCalledWith('Will add the tx with height', 123); expect(mockDb.commit).toHaveBeenCalled(); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); }); it('should handle call the push notification lambda if PUSH_NOTIFICATION_ENABLED is true', async () => { @@ -693,7 +695,7 @@ describe('handleVertexAccepted', () => { expect(invokeOnTxPushNotificationRequestedLambda).toHaveBeenCalled(); expect(mockDb.commit).toHaveBeenCalled(); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); }); it('should handle token creation tx without storing token info (tokens created via TOKEN_CREATED event)', async () => { @@ -748,11 +750,16 @@ describe('handleVertexAccepted', () => { expect(storeTokenInformation).not.toHaveBeenCalled(); expect(mockDb.commit).toHaveBeenCalled(); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); }); it('should rollback on error and rethrow', async () => { - (getTransactionById as jest.Mock).mockRejectedValue(new Error('Test error')); + // getTransactionById runs BEFORE beginTransaction as a read-only duplicate check, + // so to exercise the rollback path we throw from a call that happens inside the + // transaction. getLockedUtxoFromInputs is the first DB call after beginTransaction + // for a non-block tx. + (getTransactionById as jest.Mock).mockResolvedValue(null); + (getLockedUtxoFromInputs as jest.Mock).mockRejectedValue(new Error('Test error')); const context = { rewardMinBlocks: 5, @@ -760,9 +767,18 @@ describe('handleVertexAccepted', () => { event: { data: { hash: 'hashValue', - outputs: 'outputsValue', - inputs: 'inputsValue', - tokens: 'tokensValue', + metadata: { height: 1, first_block: null, voided_by: [] }, + timestamp: 0, + version: 1, + weight: 0, + outputs: [], + inputs: [], + nonce: 0, + tokens: [], + token_name: null, + token_symbol: null, + parents: [], + headers: [], }, id: 'idValue', }, @@ -772,7 +788,7 @@ describe('handleVertexAccepted', () => { await expect(handleVertexAccepted(context as any, {} as any)).rejects.toThrow('Test error'); expect(mockDb.beginTransaction).toHaveBeenCalled(); expect(mockDb.rollback).toHaveBeenCalled(); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); }); it('should handle PoA blocks with empty outputs without crashing', async () => { @@ -837,7 +853,7 @@ describe('handleVertexAccepted', () => { null, // firstBlock ); expect(mockDb.commit).toHaveBeenCalled(); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); }); it('should pass first_block when inserting transaction', async () => { @@ -895,7 +911,7 @@ describe('handleVertexAccepted', () => { describe('metadataDiff', () => { const mockDb = { - destroy: jest.fn(), + release: jest.fn(), }; beforeEach(() => { @@ -1221,7 +1237,7 @@ describe('metadataDiff', () => { expect(result.types).toHaveLength(2); }); - it('should handle errors and destroy the database connection', async () => { + it('should handle errors and release the database connection', async () => { const event = { event: { event: { @@ -1236,7 +1252,7 @@ describe('metadataDiff', () => { (getTransactionById as jest.Mock).mockRejectedValue(new Error('Mock Error')); await expect(metadataDiff({} as any, event as any)).rejects.toThrow('Mock Error'); - expect(mockDb.destroy).toHaveBeenCalled(); + expect(mockDb.release).toHaveBeenCalled(); expect(logger.error).toHaveBeenCalledWith('metadataDiff error', { eventId: 123, error: new Error('Mock Error') }); }); @@ -1766,7 +1782,7 @@ describe('handleNcExecVoided', () => { beginTransaction: jest.fn(), commit: jest.fn(), rollback: jest.fn(), - destroy: jest.fn(), + release: jest.fn(), }; const createContext = (txHash: string, firstBlock: string | null = null) => ({ diff --git a/packages/daemon/package.json b/packages/daemon/package.json index a37989cf..2f2e89cd 100644 --- a/packages/daemon/package.json +++ b/packages/daemon/package.json @@ -68,14 +68,14 @@ "@opentelemetry/sdk-trace-node": "1.30.1", "assert": "2.1.0", "aws-sdk": "2.1454.0", - "axios": "1.6.2", + "axios": "1.13.5", "dotenv": "8.2.0", - "lodash": "4.17.21", - "mysql2": "3.5.2", + "lodash": "4.17.23", + "mysql2": "3.9.8", "sequelize": "6.33.0", "websocket": "1.0.33", "winston": "3.13.0", - "ws": "8.13.0", + "ws": "8.17.1", "xstate": "4.38.2", "zod": "3.23.8" } diff --git a/packages/daemon/src/actors/MonitoringActor.ts b/packages/daemon/src/actors/MonitoringActor.ts index 89e36012..ec1df685 100644 --- a/packages/daemon/src/actors/MonitoringActor.ts +++ b/packages/daemon/src/actors/MonitoringActor.ts @@ -9,6 +9,7 @@ import logger from '../logger'; import getConfig from '../config'; import { addAlert, Severity } from '@wallet-service/common'; import { Event, EventTypes } from '../types'; +import { getDbConnection } from '../db'; /** * MonitoringActor @@ -33,6 +34,12 @@ import { Event, EventTypes } from '../types'; * reconnects more than RECONNECTION_STORM_THRESHOLD times within * RECONNECTION_STORM_WINDOW_MS. Duplicate alerts are suppressed for * STORM_ALERT_COOLDOWN_MS (1 min) to avoid spamming the alerting system. + * + * 4. Scheduled balance validation — when BALANCE_VALIDATION_ENABLED is true, + * periodically runs a single SQL query that joins address_balance against + * SUM(address_tx_history.balance) and reports rows where the two disagree. + * Bounded by LIMIT, so a catastrophic mismatch produces a sample, not a + * flood. Errors never crash the daemon. */ export default (callback: any, receive: any, config = getConfig()) => { logger.info('Starting monitoring actor'); @@ -150,6 +157,178 @@ export default (callback: any, receive: any, config = getConfig()) => { } }; + // ── Scheduled balance validation ────────────────────────────────────────────── + // + // Each tick runs a single SQL query that compares address_balance against the + // sum of non-voided address_tx_history for rows whose `updated_at` falls + // within the configured lookback window. The DB does the pairing (LEFT JOIN), + // the math (native BIGINT, no precision loss), and the consistency snapshot + // (single statement = one read view). + // + // Why `updated_at > NOW() - INTERVAL :window SECOND`: + // A full-table pass was benchmarked on production data (≈1.5M + // address_balance rows, ≈8.3M address_tx_history rows) and took tens of + // seconds per tick. The `updated_at` index scopes the outer set to recently + // changed rows, which is what a scheduled monitor actually needs — drift + // introduced by a bad write will be caught within one tick of the offending + // change. updated_at is `ON UPDATE CURRENT_TIMESTAMP` in the schema, so any + // write to the row bumps it; correctness of the scope is structural. + // + // Trade-off — hot addresses are still expensive: + // Scoping limits WHICH addresses we check per tick; it does NOT limit HOW + // MUCH history we sum per address. address_tx_history has no covering + // index that includes `balance`, so MySQL fetches every non-voided history + // row for each recently-changed address via a PK scan on `address`. For + // whale addresses with hundreds of thousands of history rows this is a + // multi-second per-address cost even when only a handful of addresses + // updated in the window. + // + // Because of this, `BALANCE_VALIDATION_ENABLED` is intended to stay + // `false` in production. The actor + query are here for ad-hoc / on-demand + // runs (local, testnet, or triggered manually), not for a scheduled + // in-production job. See #404 for the covering-index perf improvement that + // makes ad-hoc runs faster; it is not a prerequisite for "enabling" this + // feature, because enabling isn't planned. + // + // Long tail — slow drift on cold rows (balance changed long ago and the + // row never touched since) goes undetected by this validator. A separate + // full-table sweep is the right mechanism for that; out of scope. + // + // The `transactions > 0` filter is intentionally omitted: a row with + // `transactions = 0` AND non-zero balance is itself a bug (the void cleanup + // should have deleted it), and we want the validator to surface that. + // Genuinely-empty rows match `historySum=0` via COALESCE and HAVING drops + // them. + // + // If a run exceeds the interval the in-flight guard skips the next tick + // rather than overlapping. DISCONNECTED clears the timer; an in-flight + // SELECT runs to completion and releases its connection — harmless. + const sampleLimit = config.BALANCE_VALIDATION_SAMPLE_LIMIT; + const windowSeconds = Math.floor(config.BALANCE_VALIDATION_WINDOW_MS / 1000); + // Wrap the SIGNED BIGINT in CAST(... AS CHAR) so values transport to the + // client as strings. mysql2 returns BIGINT as JS Number by default, which + // loses precision above 2^53. HTR max supply is well below that, but we + // log this payload in alerts that humans and tools read — keeping the + // decimal string form avoids any silent rounding if the validator ever + // runs against a non-HTR token. HAVING compares the same CHAR expressions + // via `!=`, which is equivalent to numeric equality for canonical decimal + // strings produced by CAST(SIGNED AS CHAR). + const BALANCE_VALIDATION_SQL = ` + SELECT + ab.address, + ab.token_id AS tokenId, + CAST(CAST(ab.unlocked_balance + ab.locked_balance AS SIGNED) AS CHAR) AS balanceSum, + CAST(CAST(COALESCE(SUM(h.balance), 0) AS SIGNED) AS CHAR) AS historySum + FROM \`address_balance\` ab + LEFT JOIN \`address_tx_history\` h + ON h.address = ab.address + AND h.token_id = ab.token_id + AND h.voided = FALSE + WHERE ab.updated_at > NOW() - INTERVAL ${windowSeconds} SECOND + GROUP BY ab.address, ab.token_id + HAVING balanceSum != historySum + LIMIT ${sampleLimit} + `; + + let balanceValidationTimer: ReturnType | null = null; + let isValidating = false; + + const runBalanceValidation = async () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let mysql: any; + try { + mysql = await getDbConnection(); + const [rows] = await mysql.query(BALANCE_VALIDATION_SQL); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const samples = rows as any[]; + + if (samples.length > 0) { + const truncated = samples.length === sampleLimit; + const countLabel = truncated ? `${sampleLimit}+` : String(samples.length); + logger.error(`[monitoring] Balance validation found ${countLabel} mismatch(es)`, { samples }); + await addAlert( + 'Balance validation found mismatches', + `Found ${countLabel} balance mismatch(es)${truncated ? ' (sample capped)' : ''}`, + Severity.MAJOR, + { samples, truncated }, + logger, + ); + } else { + logger.info('[monitoring] Balance validation complete, no mismatches found'); + } + } catch (err) { + const detail = err instanceof Error ? (err.stack ?? err.message) : String(err); + logger.error(`[monitoring] Balance validation error: ${detail}`); + } finally { + if (mysql) { + try { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (mysql as any).release(); + } catch (releaseErr) { + const detail = releaseErr instanceof Error + ? (releaseErr.stack ?? releaseErr.message) + : String(releaseErr); + logger.warn(`[monitoring] Balance validation: connection release failed: ${detail}`); + } + } + } + }; + + // Minimum tick interval. Below this, we'd hammer the DB faster than a + // validation run can reasonably complete and risk cascading overruns. + const MIN_BALANCE_VALIDATION_INTERVAL_MS = 1000; + + const startBalanceValidation = () => { + if (!config.BALANCE_VALIDATION_ENABLED) return; + + const intervalMs = config.BALANCE_VALIDATION_INTERVAL_MS; + // Guard against misconfig: parseInt('abc') yields NaN, and setInterval(fn, NaN) + // behaves like delay=0 — a tight loop hammering the DB. Fail loud and stay + // disabled rather than silently substitute a default; operators should see + // this and fix the env var. + if (!Number.isFinite(intervalMs) || intervalMs < MIN_BALANCE_VALIDATION_INTERVAL_MS) { + logger.error( + `[monitoring] BALANCE_VALIDATION_INTERVAL_MS=${intervalMs} is invalid ` + + `(must be a finite number >= ${MIN_BALANCE_VALIDATION_INTERVAL_MS}). ` + + 'Scheduled balance validation will NOT run this session.', + ); + return; + } + + // Same guard for SAMPLE_LIMIT: it's interpolated directly into + // `LIMIT ${sampleLimit}`. NaN would produce a SQL syntax error; 0 or + // negative would return no rows and silently claim "no mismatches" + // without actually checking. Fail loud and stay disabled. + if (!Number.isFinite(sampleLimit) || sampleLimit < 1) { + logger.error( + `[monitoring] BALANCE_VALIDATION_SAMPLE_LIMIT=${sampleLimit} is invalid ` + + '(must be a finite number >= 1). Scheduled balance validation will NOT run this session.', + ); + return; + } + + stopBalanceValidation(); + + logger.info('[monitoring] Starting scheduled balance validation'); + balanceValidationTimer = setInterval(async () => { + if (isValidating) return; // prior run still going — skip this tick + isValidating = true; + try { + await runBalanceValidation(); + } finally { + isValidating = false; + } + }, intervalMs); + }; + + const stopBalanceValidation = () => { + if (balanceValidationTimer) { + clearInterval(balanceValidationTimer); + balanceValidationTimer = null; + } + }; + + // ── Event handling ──────────────────────────────────────────────────────────── receive((event: Event) => { if (event.type !== EventTypes.MONITORING_EVENT) { @@ -162,6 +341,7 @@ export default (callback: any, receive: any, config = getConfig()) => { logger.info('[monitoring] WebSocket connected — starting idle-event timer'); isConnected = true; startIdleCheck(); + startBalanceValidation(); break; case 'DISCONNECTED': @@ -169,6 +349,7 @@ export default (callback: any, receive: any, config = getConfig()) => { isConnected = false; stopIdleCheck(); clearStuckTimer(); + stopBalanceValidation(); break; case 'EVENT_RECEIVED': @@ -194,5 +375,6 @@ export default (callback: any, receive: any, config = getConfig()) => { logger.info('Stopping monitoring actor'); stopIdleCheck(); clearStuckTimer(); + stopBalanceValidation(); }; }; diff --git a/packages/daemon/src/config.ts b/packages/daemon/src/config.ts index 8a5d683a..08ecd6e3 100644 --- a/packages/daemon/src/config.ts +++ b/packages/daemon/src/config.ts @@ -100,6 +100,20 @@ export const RECONNECTION_STORM_WINDOW_MS = parseInt(process.env.RECONNECTION_ST // Other export const USE_SSL = process.env.USE_SSL === 'true'; +// Scheduled balance validation configuration +export const BALANCE_VALIDATION_ENABLED = process.env.BALANCE_VALIDATION_ENABLED === 'true'; +export const BALANCE_VALIDATION_INTERVAL_MS = parseInt(process.env.BALANCE_VALIDATION_INTERVAL_MS ?? '600000', 10); // 10 minutes +// Lookback window for recently-changed address_balance rows. Should be >= the +// tick interval so no row slips between ticks if one runs late. Default is +// interval + 50% slack. +export const BALANCE_VALIDATION_WINDOW_MS = parseInt(process.env.BALANCE_VALIDATION_WINDOW_MS ?? '900000', 10); // 15 minutes +// Max mismatch rows surfaced per tick. LIMIT bounds the alert payload size; +// does NOT reduce query execution cost. +export const BALANCE_VALIDATION_SAMPLE_LIMIT = parseInt(process.env.BALANCE_VALIDATION_SAMPLE_LIMIT ?? '100', 10); + +// When false, skips the address balance validation after voiding transactions +export const VALIDATE_ADDRESS_BALANCES = process.env.VALIDATE_ADDRESS_BALANCES !== 'false'; + // Reorg size thresholds for different alert levels export const REORG_SIZE_INFO = parseInt(process.env.REORG_SIZE_INFO ?? '1', 10); export const REORG_SIZE_MINOR = parseInt(process.env.REORG_SIZE_MINOR ?? '3', 10); @@ -141,8 +155,13 @@ export default () => ({ STUCK_PROCESSING_TIMEOUT_MS, RECONNECTION_STORM_THRESHOLD, RECONNECTION_STORM_WINDOW_MS, + BALANCE_VALIDATION_ENABLED, + BALANCE_VALIDATION_INTERVAL_MS, + BALANCE_VALIDATION_WINDOW_MS, + BALANCE_VALIDATION_SAMPLE_LIMIT, REORG_SIZE_INFO, REORG_SIZE_MINOR, REORG_SIZE_MAJOR, REORG_SIZE_CRITICAL, + VALIDATE_ADDRESS_BALANCES, }); diff --git a/packages/daemon/src/db/index.ts b/packages/daemon/src/db/index.ts index 98d72dc2..507b3b83 100644 --- a/packages/daemon/src/db/index.ts +++ b/packages/daemon/src/db/index.ts @@ -4,7 +4,7 @@ * This source code is licensed under the MIT license found in the * LICENSE file in the root directory of this source tree. */ -import mysql, { Connection as MysqlConnection, Pool, ResultSetHeader } from 'mysql2/promise'; +import mysql, { Connection as MysqlConnection, Pool, PoolConnection, ResultSetHeader } from 'mysql2/promise'; import { DbTxOutput, StringMap, @@ -26,6 +26,7 @@ import { TxInput, TokenBalanceMap, TxOutputWithIndex, + Balance, } from '@wallet-service/common'; import { isAuthority, toTokenVersion } from '@wallet-service/common'; import { getWalletBalanceMap } from '../utils/wallet'; @@ -42,6 +43,77 @@ import { import getConfig from '../config'; import { constants } from '@hathor/wallet-lib'; +/** + * Column definition for buildBatchCaseUpdate. + * + * - 'subtract': `column = column - CASE WHEN key1=? AND key2=? THEN ? ELSE 0 END` + * - 'bitor': `column = column | CASE WHEN key1=? AND key2=? THEN ? ELSE 0 END` + * - 'custom': `column = CASE WHEN key1=? AND key2=? THEN ELSE column END` + * (customSql uses `?` placeholders; getValue returns the params array for each pair) + */ +type BatchColumnDef = + | { column: string; op: 'subtract' | 'bitor'; getValue: (item: T) => any } + | { column: string; op: 'custom'; customSql: string; getValue: (item: T) => any[] }; + +/** + * Builds a single batched UPDATE statement using CASE WHEN expressions. + * + * Instead of running one UPDATE per (key1, key2) pair, this builds a single UPDATE + * that handles all pairs at once via CASE WHEN clauses. + * + * @param table - Table name (backtick-escaped) + * @param keyColumns - The two key column names used in WHERE and CASE WHEN + * @param pairs - Array of items to update + * @param getKeys - Extracts the two key values from each item + * @param columns - Column definitions describing how each column should be updated + * @param tableAlias - Optional table alias for the UPDATE statement + * @returns SQL string and params array ready for mysql.query() + */ +function buildBatchCaseUpdate( + table: string, + keyColumns: [string, string], + pairs: T[], + getKeys: (item: T) => [any, any], + columns: BatchColumnDef[], + tableAlias?: string, +): { sql: string; params: any[] } { + const params: any[] = []; + const alias = tableAlias ? ` ${tableAlias}` : ''; + const colPrefix = tableAlias ? `${tableAlias}.` : ''; + const [key1, key2] = keyColumns; + + const setClauses = columns.map((col) => { + let caseExpr = ''; + for (const item of pairs) { + const [k1, k2] = getKeys(item); + if (col.op === 'custom') { + caseExpr += ` WHEN ${colPrefix}${key1} = ? AND ${colPrefix}${key2} = ? THEN ${col.customSql}`; + params.push(k1, k2, ...col.getValue(item)); + } else { + caseExpr += ` WHEN ${colPrefix}${key1} = ? AND ${colPrefix}${key2} = ? THEN ?`; + params.push(k1, k2, col.getValue(item)); + } + } + + if (col.op === 'subtract') { + return `${colPrefix}${col.column} = ${colPrefix}${col.column} - CASE${caseExpr} ELSE 0 END`; + } else if (col.op === 'bitor') { + return `${colPrefix}${col.column} = ${colPrefix}${col.column} | CASE${caseExpr} ELSE 0 END`; + } else { + return `${colPrefix}${col.column} = CASE${caseExpr} ELSE ${colPrefix}${col.column} END`; + } + }); + + const keyPairs = pairs.map(getKeys); + params.push(keyPairs); + + const sql = `UPDATE ${table}${alias} SET + ${setClauses.join(',\n ')} + WHERE (${colPrefix}${key1}, ${colPrefix}${key2}) IN (?)`; + + return { sql, params }; +} + let pool: Pool; /** @@ -49,7 +121,7 @@ let pool: Pool; * * @returns The database connection */ -export const getDbConnection = async (): Promise => { +export const getDbConnection = async (): Promise => { if (!pool) { const { DB_ENDPOINT, @@ -213,7 +285,7 @@ export const getTxOutputsFromTx = async ( mysql: any, txId: string, ): Promise => { - const [results] = await mysql.query( + const [results] = await mysql.execute( `SELECT * FROM \`tx_output\` WHERE \`tx_id\` = ?`, @@ -252,13 +324,15 @@ export const getTxOutputsFromTx = async ( export const getTxOutputs = async ( mysql: any, inputs: { txId: string, index: number }[], + skipVoided: boolean = false, ): Promise => { if (inputs.length <= 0) return []; const txIdIndexPair = inputs.map((utxo) => [utxo.txId, utxo.index]); const [results] = await mysql.query( `SELECT * FROM \`tx_output\` - WHERE (\`tx_id\`, \`index\`) IN (?)`, + WHERE (\`tx_id\`, \`index\`) IN (?) + ${skipVoided ? 'AND `voided` = FALSE' : ''}`, [txIdIndexPair], ); @@ -299,7 +373,7 @@ export const getTxOutput = async ( index: number, skipSpent: boolean, ): Promise => { - const [results] = await mysql.query( + const [results] = await mysql.execute( `SELECT * FROM \`tx_output\` WHERE \`tx_id\` = ? @@ -332,7 +406,7 @@ export const getTxOutputsAtHeight = async ( mysql: MysqlConnection, height: number, ): Promise => { - const [results] = await mysql.query( + const [results] = await mysql.execute( `SELECT * FROM \`tx_output\` WHERE \`tx_id\` IN ( @@ -388,7 +462,7 @@ export const voidAddressTransaction = async ( mysql: any, txId: string, addressBalanceMap: StringMap, - version: number, + _version: number, ): Promise => { const addressEntries = Object.keys(addressBalanceMap).map((address) => [address, 0]); @@ -401,105 +475,104 @@ export const voidAddressTransaction = async ( ); } - // Check if this is a token creation transaction - const isCreateTokenTx = version === constants.CREATE_TOKEN_TX_VERSION; - + // Collect all (address, token) pairs and their balances + const pairs: { address: string; token: string; balance: Balance }[] = []; for (const [address, tokenMap] of Object.entries(addressBalanceMap)) { for (const [token, tokenBalance] of tokenMap.iterator()) { - // Check if address_balance entry exists first - const [existingRows] = await mysql.query( - 'SELECT * FROM address_balance WHERE address = ? AND token_id = ?', - [address, token] - ); + pairs.push({ address, token, balance: tokenBalance }); + } + } - if (existingRows.length > 0) { - // Entry exists, perform UPDATE to subtract values - await mysql.query( - `UPDATE address_balance - SET total_received = total_received - ?, - unlocked_balance = unlocked_balance - ?, - locked_balance = locked_balance - ?, - transactions = transactions - 1, - timelock_expires = CASE - WHEN timelock_expires IS NULL THEN ? - WHEN ? IS NULL THEN timelock_expires - ELSE LEAST(timelock_expires, ?) - END, - unlocked_authorities = (unlocked_authorities | ?), - locked_authorities = locked_authorities | ? - WHERE address = ? AND token_id = ?`, - [ - tokenBalance.totalAmountSent, - tokenBalance.unlockedAmount, - tokenBalance.lockedAmount, - tokenBalance.lockExpires, - tokenBalance.lockExpires, - tokenBalance.lockExpires, - tokenBalance.unlockedAuthorities.toUnsignedInteger(), - tokenBalance.lockedAuthorities.toUnsignedInteger(), - address, - token - ] - ); - } else { - // Entry doesn't exist, this means the balance was never added in the first place - // This shouldn't happen since we receive events in order - console.warn(`warning: Trying to void transaction for address ${address} token ${token} but no balance entry exists`); - } + if (pairs.length === 0) { + await mysql.query( + `DELETE FROM \`address_tx_history\` + WHERE \`tx_id\` = ?`, + [txId], + ); + return; + } - // if we're removing any of the authorities, we need to refresh the authority columns. Unlike the values, - // we cannot only sum/subtract, as authorities are binary: you have it or you don't. We might be spending - // an authority output in this tx without creating a new one, but it doesn't mean this address does not - // have this authority anymore, as it might have other authority outputs - if (!tokenBalance.unlockedAuthorities.hasNegativeValue()) { - await mysql.query( - `UPDATE \`address_balance\` - SET \`unlocked_authorities\` = ( - SELECT BIT_OR(\`authorities\`) - FROM \`tx_output\` - WHERE \`address\` = ? - AND \`token_id\` = ? - AND \`locked\` = FALSE - AND \`spent_by\` IS NULL - AND \`voided\` = FALSE - ) - WHERE \`address\` = ? - AND \`token_id\` = ?`, - [address, token, address, token], - ); - } - // for locked authorities, it doesn't make sense to perform the same operation. The authority needs to be - // unlocked before it can be spent. In case we're just adding new locked authorities, this will be taken - // care by the first sql query. + // Single batched UPDATE for all balance decrements (no-op if no row matches) + type AddrPair = { address: string; token: string; balance: Balance }; + const getAddrKeys = (p: AddrPair): [string, string] => [p.address, p.token]; + + const { sql: updateSql, params: updateParams } = buildBatchCaseUpdate( + '`address_balance`', + ['`address`', '`token_id`'], + pairs, + getAddrKeys, + [ + { column: '`total_received`', op: 'subtract', getValue: (p) => p.balance.totalAmountSent }, + { column: '`unlocked_balance`', op: 'subtract', getValue: (p) => p.balance.unlockedAmount }, + { column: '`locked_balance`', op: 'subtract', getValue: (p) => p.balance.lockedAmount }, + { + column: '`timelock_expires`', op: 'custom', + customSql: 'CASE WHEN `timelock_expires` IS NULL THEN ? WHEN ? IS NULL THEN `timelock_expires` ELSE LEAST(`timelock_expires`, ?) END', + getValue: (p) => [p.balance.lockExpires, p.balance.lockExpires, p.balance.lockExpires], + }, + { column: '`unlocked_authorities`', op: 'bitor', getValue: (p) => p.balance.unlockedAuthorities.toUnsignedInteger() }, + // locked_authorities: OR only, no recalculation needed — locked authorities can't be spent before unlocking + { column: '`locked_authorities`', op: 'bitor', getValue: (p) => p.balance.lockedAuthorities.toUnsignedInteger() }, + // Decrement transactions by 1 for each voided (address, token) pair. + { column: '`transactions`', op: 'subtract', getValue: () => 1 }, + ], + ); - // If the address_balance is now zeroed and the number of transactions - // is also zero, it means that the transaction was removed from address_tx_history - // so we need to remove it from the `address_balance` table. - await mysql.query( - `DELETE FROM address_balance - WHERE address = ? - AND token_id = ? - AND total_received = 0 - AND unlocked_balance = 0 - AND locked_balance = 0 - AND unlocked_authorities = 0 - AND locked_authorities = 0 - AND transactions = 0`, - [address, token] - ); + const [updateResult]: [ResultSetHeader] = await mysql.query(updateSql, updateParams); + + if (updateResult.affectedRows < pairs.length) { + // Each pair in `pairs` corresponds to an (address, token) whose balance + // row should exist — it was created when the tx was originally indexed. + // If we update fewer rows than expected, those rows were already gone: + // - sync inconsistency (the original add was never applied), or + // - the tx was already partially voided (duplicate / out-of-order event). + // Log the count and a truncated sample of the pairs checked so operators + // can diagnose without flooding logs on large txs. + const SAMPLE_LIMIT = 10; + const sample = pairs.slice(0, SAMPLE_LIMIT).map(getAddrKeys); + const truncated = pairs.length > SAMPLE_LIMIT ? ` (+${pairs.length - SAMPLE_LIMIT} more)` : ''; + console.warn( + `[voidAddressTransaction] tx ${txId}: updated ${updateResult.affectedRows}/${pairs.length} address_balance rows. ` + + 'This usually indicates a data inconsistency — the balance row was never added, ' + + 'or the tx was already (partially) voided. ' + + `Pairs checked: ${JSON.stringify(sample)}${truncated}`, + ); + } - if (isCreateTokenTx) { - // The transaction that created the token was voided, so we can remove - // it from the tokens table as well. - await mysql.query( - `DELETE FROM token - WHERE id = ?`, - [token] - ); - } - } + // Recalculate unlocked_authorities from tx_output for pairs that need it + const authorityPairs = pairs.filter(({ balance }) => !balance.unlockedAuthorities.hasNegativeValue()); + if (authorityPairs.length > 0) { + const authPairKeys = authorityPairs.map(getAddrKeys); + await mysql.query( + `UPDATE \`address_balance\` ab + SET ab.\`unlocked_authorities\` = ( + SELECT COALESCE(BIT_OR(txo.\`authorities\`), 0) + FROM \`tx_output\` txo + WHERE txo.\`address\` = ab.\`address\` + AND txo.\`token_id\` = ab.\`token_id\` + AND txo.\`locked\` = FALSE + AND txo.\`spent_by\` IS NULL + AND txo.\`voided\` = FALSE + ) + WHERE (ab.\`address\`, ab.\`token_id\`) IN (?)`, + [authPairKeys], + ); } + // Clean up fully-zeroed address_balance rows + const addressTokenPairs = pairs.map(getAddrKeys); + await mysql.query( + `DELETE FROM \`address_balance\` + WHERE (\`address\`, \`token_id\`) IN (?) + AND \`total_received\` = 0 + AND \`unlocked_balance\` = 0 + AND \`locked_balance\` = 0 + AND \`unlocked_authorities\` = 0 + AND \`locked_authorities\` = 0 + AND \`transactions\` = 0`, + [addressTokenPairs], + ); + await mysql.query( `DELETE FROM \`address_tx_history\` WHERE \`tx_id\` = ?`, @@ -519,7 +592,7 @@ export const voidTransaction = async ( mysql: any, txId: string, ): Promise => { - const [result]: [ResultSetHeader] = await mysql.query( + const [result]: [ResultSetHeader] = await mysql.execute( `UPDATE \`transaction\` SET \`voided\` = TRUE WHERE \`tx_id\` = ?`, @@ -571,73 +644,74 @@ export const voidWalletTransaction = async ( return; } + // Collect all (walletId, token) pairs and their balances + const pairs: { walletId: string; token: string; balance: Balance }[] = []; for (const [walletId, tokenMap] of Object.entries(walletBalanceMap)) { for (const [token, tokenBalance] of tokenMap.iterator()) { - // Update wallet_balance table by reversing the transaction's impact - await mysql.query( - `UPDATE \`wallet_balance\` - SET total_received = total_received - ?, - unlocked_balance = unlocked_balance - ?, - locked_balance = locked_balance - ?, - transactions = transactions - 1, - unlocked_authorities = (unlocked_authorities | ?), - locked_authorities = locked_authorities | ? - WHERE wallet_id = ? AND token_id = ?`, - [ - tokenBalance.totalAmountSent, - tokenBalance.unlockedAmount, - tokenBalance.lockedAmount, - tokenBalance.unlockedAuthorities.toUnsignedInteger(), - tokenBalance.lockedAuthorities.toUnsignedInteger(), - walletId, - token - ], - ); + pairs.push({ walletId, token, balance: tokenBalance }); + } + } - // If we're removing any of the authorities, we need to refresh the - // authority columns because we might have more than one, so we need to - // calculate the complete state from the complete wallet point of view, - // not just from a single transaction balance point of view. + if (pairs.length > 0) { + type WalletPair = { walletId: string; token: string; balance: Balance }; + const getWalletKeys = (p: WalletPair): [string, string] => [p.walletId, p.token]; + + // Single batched UPDATE for all wallet balance decrements + const { sql: updateSql, params: updateParams } = buildBatchCaseUpdate( + '`wallet_balance`', + ['`wallet_id`', '`token_id`'], + pairs, + getWalletKeys, + [ + { column: '`total_received`', op: 'subtract', getValue: (p) => p.balance.totalAmountSent }, + { column: '`unlocked_balance`', op: 'subtract', getValue: (p) => p.balance.unlockedAmount }, + { column: '`locked_balance`', op: 'subtract', getValue: (p) => p.balance.lockedAmount }, + { column: '`unlocked_authorities`', op: 'bitor', getValue: (p) => p.balance.unlockedAuthorities.toUnsignedInteger() }, + // locked_authorities: OR only, no recalculation needed — locked authorities can't be spent before unlocking + { column: '`locked_authorities`', op: 'bitor', getValue: (p) => p.balance.lockedAuthorities.toUnsignedInteger() }, + // Decrement transactions by 1 for each voided (wallet, token) pair. + { column: '`transactions`', op: 'subtract', getValue: () => 1 }, + ], + ); - // NOTE: No need to do the same for locked authorities as they can't be - // spent before being unlocked and we trust the fullnode - if (!tokenBalance.unlockedAuthorities.hasNegativeValue()) { - await mysql.query( - `UPDATE \`wallet_balance\` - SET \`unlocked_authorities\` = ( - SELECT BIT_OR(\`unlocked_authorities\`) - FROM \`address_balance\` - WHERE \`address\` IN ( - SELECT \`address\` - FROM \`address\` - WHERE \`wallet_id\` = ?) - AND \`token_id\` = ?) - WHERE \`wallet_id\` = ? - AND \`token_id\` = ?`, - [walletId, token, walletId, token], - ); - } + await mysql.query(updateSql, updateParams); - // If the number of transactions is zero, it means that this transaction - // was removed from the wallet_tx_history as well, so we must delete the - // row + // Recalculate unlocked_authorities from address_balance for pairs that need it + const authorityPairs = pairs.filter(({ balance }) => !balance.unlockedAuthorities.hasNegativeValue()); + if (authorityPairs.length > 0) { + const authPairKeys = authorityPairs.map(getWalletKeys); await mysql.query( - `DELETE FROM wallet_balance - WHERE wallet_id = ? - AND token_id = ? - AND total_received = 0 - AND unlocked_balance = 0 - AND locked_balance = 0 - AND unlocked_authorities = 0 - AND locked_authorities = 0 - AND transactions = 0`, - [walletId, token] + `UPDATE \`wallet_balance\` wb + SET wb.\`unlocked_authorities\` = ( + SELECT COALESCE(BIT_OR(ab.\`unlocked_authorities\`), 0) + FROM \`address_balance\` ab + WHERE ab.\`address\` IN ( + SELECT a.\`address\` + FROM \`address\` a + WHERE a.\`wallet_id\` = wb.\`wallet_id\`) + AND ab.\`token_id\` = wb.\`token_id\`) + WHERE (wb.\`wallet_id\`, wb.\`token_id\`) IN (?)`, + [authPairKeys], ); } + + // Clean up fully-zeroed wallet_balance rows + const walletTokenPairs = pairs.map(getWalletKeys); + await mysql.query( + `DELETE FROM \`wallet_balance\` + WHERE (\`wallet_id\`, \`token_id\`) IN (?) + AND \`total_received\` = 0 + AND \`unlocked_balance\` = 0 + AND \`locked_balance\` = 0 + AND \`unlocked_authorities\` = 0 + AND \`locked_authorities\` = 0 + AND \`transactions\` = 0`, + [walletTokenPairs], + ); } // Delete wallet transaction history entries for the voided transaction - await mysql.query( + await mysql.execute( `DELETE FROM \`wallet_tx_history\` WHERE \`tx_id\` = ?`, [txId], @@ -727,7 +801,7 @@ export const updateAddressTablesWithTx = async ( // an authority output in this tx without creating a new one, but it doesn't mean this address does not // have this authority anymore, as it might have other authority outputs if (tokenBalance.unlockedAuthorities.hasNegativeValue()) { - await mysql.query( + await mysql.execute( `UPDATE \`address_balance\` SET \`unlocked_authorities\` = ( SELECT BIT_OR(\`authorities\`) @@ -772,7 +846,7 @@ export const getTransactionById = async ( mysql: MysqlConnection, txId: string, ): Promise => { - const [result] = await mysql.query(` + const [result] = await mysql.execute(` SELECT * FROM transaction WHERE tx_id = ? @@ -805,7 +879,7 @@ export const getUtxosLockedAtHeight = async ( ): Promise => { const utxos = []; if (height >= 0) { - const [results] = await mysql.query( + const [results] = await mysql.execute( `SELECT * FROM \`tx_output\` WHERE \`heightlock\` = ? @@ -873,7 +947,7 @@ export const updateAddressLockedBalance = async ( ): Promise => { for (const [address, tokenBalanceMap] of Object.entries(addressBalanceMap)) { for (const [token, tokenBalance] of tokenBalanceMap.iterator()) { - await mysql.query( + await mysql.execute( `UPDATE \`address_balance\` SET \`unlocked_balance\` = \`unlocked_balance\` + ?, \`locked_balance\` = \`locked_balance\` - ?, @@ -890,7 +964,7 @@ export const updateAddressLockedBalance = async ( // if any authority has been unlocked, we have to refresh the locked authorities if (tokenBalance.unlockedAuthorities.toInteger() > 0) { - await mysql.query( + await mysql.execute( `UPDATE \`address_balance\` SET \`locked_authorities\` = ( SELECT BIT_OR(\`authorities\`) @@ -908,7 +982,7 @@ export const updateAddressLockedBalance = async ( // if this is being unlocked due to a timelock, also update the timelock_expires column if (updateTimelocks) { - await mysql.query(` + await mysql.execute(` UPDATE \`address_balance\` SET \`timelock_expires\` = ( SELECT MIN(\`timelock\`) @@ -988,7 +1062,7 @@ export const updateWalletLockedBalance = async ( ): Promise => { for (const [walletId, tokenBalanceMap] of Object.entries(walletBalanceMap)) { for (const [token, tokenBalance] of tokenBalanceMap.iterator()) { - await mysql.query( + await mysql.execute( `UPDATE \`wallet_balance\` SET \`unlocked_balance\` = \`unlocked_balance\` + ?, \`locked_balance\` = \`locked_balance\` - ?, @@ -1001,7 +1075,7 @@ export const updateWalletLockedBalance = async ( // if any authority has been unlocked, we have to refresh the locked authorities if (tokenBalance.unlockedAuthorities.toInteger() > 0) { - await mysql.query( + await mysql.execute( `UPDATE \`wallet_balance\` SET \`locked_authorities\` = ( SELECT BIT_OR(\`locked_authorities\`) @@ -1019,7 +1093,7 @@ export const updateWalletLockedBalance = async ( // if this is being unlocked due to a timelock, also update the timelock_expires column if (updateTimelocks) { - await mysql.query( + await mysql.execute( `UPDATE \`wallet_balance\` SET \`timelock_expires\` = ( SELECT MIN(\`timelock_expires\`) @@ -1048,7 +1122,7 @@ export const addMiner = async ( address: string, txId: string, ): Promise => { - await mysql.query( + await mysql.execute( `INSERT INTO \`miner\` (address, first_block, last_block, count) VALUES (?, ?, ?, 1) ON DUPLICATE KEY UPDATE last_block = ?, count = count + 1`, @@ -1066,7 +1140,7 @@ export const addMiner = async ( export const getMinersList = async ( mysql: MysqlConnection, ): Promise => { - const [results] = await mysql.query(` + const [results] = await mysql.execute(` SELECT address, first_block, last_block, count FROM miner; `); @@ -1097,7 +1171,7 @@ export const getExpiredTimelocksUtxos = async ( mysql: MysqlConnection, now: number, ): Promise => { - const [results] = await mysql.query(` + const [results] = await mysql.execute(` SELECT * FROM tx_output WHERE locked = TRUE @@ -1195,7 +1269,7 @@ export const getTokensCreatedByTx = async ( mysql: MysqlConnection, txId: string, ): Promise => { - const [rows] = await mysql.query( + const [rows] = await mysql.execute( 'SELECT `token_id` FROM `token_creation` WHERE `tx_id` = ?', [txId], ); @@ -1222,7 +1296,7 @@ export const getReexecNanoTokens = async ( txId: string, currentFirstBlock: string | null, ): Promise => { - const [rows] = await mysql.query( + const [rows] = await mysql.execute( 'SELECT `token_id` FROM `token_creation` WHERE `tx_id` = ? AND `token_id` != `tx_id` AND NOT (`first_block` <=> ?)', [txId, currentFirstBlock], ); @@ -1344,7 +1418,7 @@ export const addNewAddresses = async ( ); // Store on the wallet table the highest used index - await mysql.query( + await mysql.execute( `UPDATE \`wallet\` SET \`last_used_address_index\` = ? WHERE \`id\` = ?`, @@ -1415,7 +1489,7 @@ export const updateWalletTablesWithTx = async ( // value. // To do that, we get all unlocked_authorities from all addresses (querying by wallet and token_id) and // bitwise OR them with each other. - await mysql.query( + await mysql.execute( `UPDATE \`wallet_balance\` SET \`unlocked_authorities\` = ( SELECT BIT_OR(\`unlocked_authorities\`) @@ -1523,7 +1597,7 @@ export const getUtxosSpentByTx = async ( mysql: MysqlConnection, spendingTxId: string, ): Promise => { - const [results] = await mysql.query( + const [results] = await mysql.execute( `SELECT * FROM \`tx_output\` WHERE \`spent_by\` = ?`, @@ -1580,7 +1654,7 @@ export const updateLastSyncedEvent = async ( mysql: MysqlConnection, lastEventId: number, ): Promise => { - await mysql.query(` + await mysql.execute(` INSERT INTO \`sync_metadata\` (\`id\`, \`last_event_id\`) VALUES (0, ?) ON DUPLICATE KEY @@ -1591,7 +1665,7 @@ ON DUPLICATE KEY export const getLastSyncedEvent = async ( mysql: MysqlConnection, ): Promise => { - const [results] = await mysql.query( + const [results] = await mysql.execute( `SELECT * FROM \`sync_metadata\` LIMIT 1`, [], ); @@ -1612,7 +1686,7 @@ export const getLastSyncedEvent = async ( export const getBestBlockHeight = async ( mysql: MysqlConnection, ): Promise => { - const [results] = await mysql.query( + const [results] = await mysql.execute( `SELECT MAX(height) AS height FROM \`transaction\` LIMIT 1`, @@ -1697,7 +1771,7 @@ export const getTxOutputsHeightUnlockedAtHeight = async ( mysql: MysqlConnection, height: number, ): Promise => { - const [results] = await mysql.query( + const [results] = await mysql.execute( `SELECT * FROM \`tx_output\` WHERE \`heightlock\` = ? @@ -1738,7 +1812,7 @@ export const getTokenInformation = async ( mysql: MysqlConnection, tokenId: string, ): Promise => { - const [results] = await mysql.query( + const [results] = await mysql.execute( 'SELECT * FROM `token` WHERE `id` = ?', [tokenId], ); @@ -1765,28 +1839,28 @@ export const getTokenInformation = async ( * @param txId - The transaction to clear from database */ export const cleanupVoidedTx = async (mysql: MysqlConnection, txId: string): Promise => { - await mysql.query( + await mysql.execute( `DELETE FROM \`transaction\` WHERE tx_id = ? AND voided = true`, [txId], ); - await mysql.query( + await mysql.execute( `DELETE FROM \`tx_output\` WHERE tx_id = ? AND voided = true`, [txId], ); - await mysql.query( + await mysql.execute( `DELETE FROM \`address_tx_history\` WHERE tx_id = ? AND voided = true`, [txId], ); - await mysql.query( + await mysql.execute( `DELETE FROM \`wallet_tx_history\` WHERE tx_id = ? AND voided = true`, @@ -1812,7 +1886,7 @@ export const clearTxProposalForVoidedTx = async ( const whereClauses = txInputs.map(() => '(tx_id = ? AND `index` = ?)').join(' OR '); const params = txInputs.flatMap(input => [input.tx_id, input.index]); - await mysql.query( + await mysql.execute( `UPDATE \`tx_output\` SET \`tx_proposal\` = NULL, \`tx_proposal_index\` = NULL @@ -1913,7 +1987,7 @@ export const getMaxIndicesForWallets = async ( * @returns Address information if address is known or null */ export async function getAddressInfo(mysql: MysqlConnection, address: string): Promise { - const [results] = await mysql.query( + const [results] = await mysql.execute( 'SELECT * FROM address WHERE address = ?', [address], ); diff --git a/packages/daemon/src/scripts/bench-void-tx.ts b/packages/daemon/src/scripts/bench-void-tx.ts new file mode 100644 index 00000000..9e0ec390 --- /dev/null +++ b/packages/daemon/src/scripts/bench-void-tx.ts @@ -0,0 +1,426 @@ +/** + * Benchmark harness for voidTx performance. + * + * Seeds a synthetic large voidable transaction (N inputs, M address-token pairs), + * calls voidTx K times (re-seeding between runs), and records OTel span durations. + * + * Usage: + * export DB_ENDPOINT=localhost DB_NAME=wallet_service DB_USER=hathor DB_PASS=hathor DB_PORT=3306 + * export OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318 # optional, for Jaeger viz + * npx ts-node src/scripts/bench-void-tx.ts --inputs 200 --pairs 200 --runs 10 --label branch + * + * Writes JSON result to ./bench-results-