From 78af376618be71fe82fd8adf8c00bcb232490046 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Mon, 5 Jan 2026 11:16:01 -0300 Subject: [PATCH 1/9] feat(event-downloader): added event downloader --- package.json | 3 +- packages/event-downloader/.gitignore | 1 + packages/event-downloader/COE_HEIGHT_ZERO.md | 318 ++++++++++++++++ packages/event-downloader/package.json | 33 ++ packages/event-downloader/src/config.ts | 47 +++ packages/event-downloader/src/db.ts | 222 ++++++++++++ packages/event-downloader/src/event-parser.ts | 38 ++ packages/event-downloader/src/index.ts | 200 ++++++++++ packages/event-downloader/src/orchestrator.ts | 343 ++++++++++++++++++ packages/event-downloader/src/types.ts | 200 ++++++++++ packages/event-downloader/src/worker.ts | 150 ++++++++ packages/event-downloader/tsconfig.json | 17 + .../event-downloader/tsconfig.tsbuildinfo | 1 + yarn.lock | 89 +++++ 14 files changed, 1661 insertions(+), 1 deletion(-) create mode 100644 packages/event-downloader/.gitignore create mode 100644 packages/event-downloader/COE_HEIGHT_ZERO.md create mode 100644 packages/event-downloader/package.json create mode 100644 packages/event-downloader/src/config.ts create mode 100644 packages/event-downloader/src/db.ts create mode 100644 packages/event-downloader/src/event-parser.ts create mode 100644 packages/event-downloader/src/index.ts create mode 100644 packages/event-downloader/src/orchestrator.ts create mode 100644 packages/event-downloader/src/types.ts create mode 100644 packages/event-downloader/src/worker.ts create mode 100644 packages/event-downloader/tsconfig.json create mode 100644 packages/event-downloader/tsconfig.tsbuildinfo diff --git a/package.json b/package.json index 0acc2910..effa09e6 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,8 @@ "workspaces": [ "packages/common", "packages/daemon", - "packages/wallet-service" + "packages/wallet-service", + "packages/event-downloader" ], "engines": { "node": ">=22" diff --git a/packages/event-downloader/.gitignore b/packages/event-downloader/.gitignore new file mode 100644 index 00000000..02a03cf0 --- /dev/null +++ b/packages/event-downloader/.gitignore @@ -0,0 +1 @@ +events.* diff --git a/packages/event-downloader/COE_HEIGHT_ZERO.md b/packages/event-downloader/COE_HEIGHT_ZERO.md new file mode 100644 index 00000000..96c64cba --- /dev/null +++ b/packages/event-downloader/COE_HEIGHT_ZERO.md @@ -0,0 +1,318 @@ +# COE: Transactions Stored with height = 0 in Wallet Service Database + +## Summary + +Over 1.4 million transactions in the wallet-service database have `height = 0` instead of the correct block height or `NULL`. In the wallet-service architecture, the `height` field on a transaction represents the height of the block that confirmed it. Unconfirmed transactions should have `height = NULL`, and confirmed transactions should have the height of their confirming block. + +The sync-daemon is responsible for processing fullnode events via WebSocket and updating the database accordingly. Investigation revealed two bugs: + +1. The fullnode sends `metadata.height = 0` for transactions even when they have a `first_block` (confirming block), and the daemon blindly trusts this value. +2. When a transaction loses its confirmation (`first_block` goes back to `null`), the daemon ignores the event entirely and never resets the height to `NULL`. + +**Discovery**: Manual database audit on 2025-12-31. + +**Root Causes**: +1. The fullnode sends `height: 0` in `VERTEX_METADATA_CHANGED` events even when `first_block` is populated. The daemon's `handleTxFirstBlock` function uses this value directly without validation. +2. The daemon's `metadataDiff` function has no handling for when `first_block` changes from a block hash back to `null`. These events are ignored, leaving the height at `0` instead of resetting it to `NULL`. + +## Impact + +- **1,408,080 transactions** stored with incorrect `height = 0` +- Transaction versions affected: + - Version 1 (regular transactions): 1,159,985 + - Version 2 (token creation): 248,091 + - Version 6 (nano contracts): 3 + - Version 0 (genesis block): 1 (expected) +- **Customer Impact**: APIs and queries that rely on transaction height for confirmation status may return incorrect data +- **Ongoing**: ~50-200 new transactions per day continue to be affected + +## Timeline + +- **Unknown**: Issue began occurring (likely since daemon deployment or fullnode update) +- **2025-12-31 ~15:00 UTC**: Anomaly discovered via database query showing 1.4M transactions with `height = 0` +- **2025-12-31 ~16:00 UTC**: Created `event-downloader` package to download all fullnode events for analysis +- **2025-12-31 ~19:00 UTC**: Downloaded all 7.8M events from fullnode +- **2025-12-31 ~20:00 UTC**: Identified root cause by analyzing events for sample transaction `000000b36b93b2e3088a63108882097c1dfc45e6303ec88ab34d99750029f871` + +## Metrics + +```sql +-- Total affected transactions +SELECT COUNT(*) FROM transaction WHERE height = 0; +-- Result: 1,408,080 + +-- Breakdown by version +SELECT version, COUNT(*) FROM transaction WHERE height = 0 GROUP BY version; +-- 0: 1 (genesis block - expected) +-- 1: 1,159,985 +-- 2: 248,091 +-- 6: 3 + +-- Daily rate of new affected transactions +SELECT DATE(FROM_UNIXTIME(timestamp)) as date, COUNT(*) +FROM transaction WHERE height = 0 +GROUP BY date ORDER BY date DESC LIMIT 10; +``` + +## Investigation Details + +### Background: Daemon Architecture + +The sync-daemon connects to the fullnode via WebSocket and processes events: + +1. `NEW_VERTEX_ACCEPTED`: A new transaction/block was added to the DAG +2. `VERTEX_METADATA_CHANGED`: Transaction metadata changed (e.g., got confirmed by a block) + +When processing `NEW_VERTEX_ACCEPTED` for a transaction (`packages/daemon/src/services/index.ts:246-250`): + +```typescript +let height: number | null = metadata.height; + +if (!isBlock(version) && !metadata.first_block) { + height = null; // Unconfirmed tx gets NULL height +} +``` + +When a `VERTEX_METADATA_CHANGED` event indicates a transaction got its first confirmation, `handleTxFirstBlock` is called (`packages/daemon/src/services/index.ts:689`): + +```typescript +const height: number | null = metadata.height; // Blindly trusts fullnode +await addOrUpdateTx(mysql, hash, height, timestamp, version, weight); +``` + +### Event Analysis Tool + +We created the `event-downloader` package to download all fullnode events and correlate them with transactions: + +- Downloaded 7,835,577 events from mainnet fullnode +- Stored in SQLite with transaction hash indexing +- Enables querying all events that affected a specific transaction + +### Sample Transaction Analysis + +Transaction: `000000b36b93b2e3088a63108882097c1dfc45e6303ec88ab34d99750029f871` + +Events in chronological order: + +#### Event 7762975 - VERTEX_METADATA_CHANGED +```json +{ + "event": { + "id": 7762975, + "timestamp": 1766182695.3963125, + "type": "VERTEX_METADATA_CHANGED", + "data": { + "hash": "000000b36b93b2e3088a63108882097c1dfc45e6303ec88ab34d99750029f871", + "version": 1, + "metadata": { + "voided_by": [], + "first_block": null, + "height": 0 + } + } + } +} +``` + +#### Event 7762979 - NEW_VERTEX_ACCEPTED +```json +{ + "event": { + "id": 7762979, + "timestamp": 1766182695.5350096, + "type": "NEW_VERTEX_ACCEPTED", + "data": { + "hash": "000000b36b93b2e3088a63108882097c1dfc45e6303ec88ab34d99750029f871", + "version": 1, + "metadata": { + "voided_by": [], + "first_block": null, + "height": 0 + } + } + } +} +``` +**Daemon behavior**: Sets `height = NULL` because `first_block` is null (correct). + +#### Event 7762981 - VERTEX_METADATA_CHANGED (TX_FIRST_BLOCK) +```json +{ + "event": { + "id": 7762981, + "timestamp": 1766182710.9933312, + "type": "VERTEX_METADATA_CHANGED", + "data": { + "hash": "000000b36b93b2e3088a63108882097c1dfc45e6303ec88ab34d99750029f871", + "version": 1, + "metadata": { + "voided_by": [], + "first_block": "000000000000000009800ba7eda9ee1734940d45dcfb568ac51d1426a290e761", + "height": 0 + } + } + } +} +``` +**Daemon behavior**: Detects `first_block` is now set, calls `handleTxFirstBlock`, stores `height = 0` (BUG - should be block's height). + +#### Event 7762993 - VERTEX_METADATA_CHANGED +```json +{ + "event": { + "id": 7762993, + "timestamp": 1766184357.769475, + "type": "VERTEX_METADATA_CHANGED", + "data": { + "hash": "000000b36b93b2e3088a63108882097c1dfc45e6303ec88ab34d99750029f871", + "version": 1, + "metadata": { + "voided_by": [], + "first_block": null, + "height": 0 + } + } + } +} +``` +**Note**: `first_block` returned to null because the transaction was sent back to the mempool. + +#### Event 7763176 - VERTEX_METADATA_CHANGED +```json +{ + "event": { + "id": 7763176, + "timestamp": 1766185267.1527846, + "type": "VERTEX_METADATA_CHANGED", + "data": { + "hash": "000000b36b93b2e3088a63108882097c1dfc45e6303ec88ab34d99750029f871", + "version": 1, + "metadata": { + "voided_by": [], + "first_block": "00000000000000000cf169d68d55d2c8f850a602623fa3dd54ff0bd332e9cbc3", + "height": 0 + } + } + } +} +``` +**Note**: Different `first_block`, still `height: 0`. + +#### Event 7763190 - VERTEX_METADATA_CHANGED +```json +{ + "event": { + "id": 7763190, + "timestamp": 1766186968.6764472, + "type": "VERTEX_METADATA_CHANGED", + "data": { + "hash": "000000b36b93b2e3088a63108882097c1dfc45e6303ec88ab34d99750029f871", + "version": 1, + "metadata": { + "voided_by": [], + "first_block": null, + "height": 0 + } + } + } +} +``` +**Daemon behavior**: `first_block` is null, but `metadataDiff` returns `IGNORE` because there's no handling for this case (BUG #2 - should reset height to `NULL`). + +### Key Findings + +1. The fullnode **always** sends `metadata.height = 0` for transactions, regardless of whether `first_block` is set. The `height` field in transaction metadata does not represent the confirming block's height. + +2. The daemon's `metadataDiff` function only handles the case when `first_block` becomes set (`TX_FIRST_BLOCK`), but has no handling for when `first_block` becomes `null` again. This means transactions that lose their confirmation keep `height = 0` instead of being reset to `height = NULL`. + +### Bug #2: Missing Handler for Lost Confirmation + +The `metadataDiff` function at `packages/daemon/src/services/index.ts:150-169`: + +```typescript +if (first_block + && first_block.length + && first_block.length > 0) { + if (!dbTx.height) { + return { + type: METADATA_DIFF_EVENT_TYPES.TX_FIRST_BLOCK, + originalEvent: event, + }; + } + + return { + type: METADATA_DIFF_EVENT_TYPES.IGNORE, + originalEvent: event, + }; +} + +return { + type: METADATA_DIFF_EVENT_TYPES.IGNORE, // <-- BUG: Should handle first_block becoming null + originalEvent: event, +}; +``` + +When `first_block` is `null`: +- The condition `if (first_block && first_block.length > 0)` is **FALSE** +- Falls through to return `IGNORE` +- No `TX_LOST_FIRST_BLOCK` event type exists + +This means even if Bug #1 were fixed and the correct height was stored initially, transactions that get sent back to the mempool would retain their old height instead of being reset to `NULL`. + +## Root Cause Analysis (5 Whys) + +### Bug #1: Wrong height stored on confirmation + +**Problem**: Transactions have `height = 0` instead of the correct block height when confirmed. + +1. **Why do transactions have `height = 0`?** + - The daemon's `handleTxFirstBlock` function stores `metadata.height` directly from the fullnode event. + +2. **Why does `handleTxFirstBlock` store the wrong value?** + - It assumes `metadata.height` contains the height of the confirming block when `first_block` is set. + +3. **Why is that assumption wrong?** + - The fullnode sends `metadata.height = 0` for transactions even when they have a `first_block`. + +4. **Why does the fullnode send `height = 0` for confirmed transactions?** + - The `height` field in transaction metadata represents something different. + +5. **Why wasn't this validated?** + - The daemon blindly trusts the fullnode data without validating that `height > 0` when `first_block` is present. + +### Bug #2: Height not reset when transaction loses confirmation + +**Problem**: Transactions that lose their confirmation (sent back to mempool) retain `height = 0` instead of being reset to `NULL`. + +1. **Why do transactions keep `height = 0` when sent back to mempool?** + - The daemon ignores `VERTEX_METADATA_CHANGED` events where `first_block` becomes `null`. + +2. **Why does the daemon ignore these events?** + - The `metadataDiff` function returns `IGNORE` when `first_block` is null. + +3. **Why does `metadataDiff` ignore null `first_block`?** + - It only handles the case when `first_block` becomes set (`TX_FIRST_BLOCK`), not when it becomes null. + +4. **Why is there no handler for lost confirmations?** + - No `TX_LOST_FIRST_BLOCK` event type was implemented. + +5. **Why wasn't this case considered?** + - Implementation oversight. The case where `first_block` becomes `null` was not handled. + +## Action Items + +| Priority | Action | Owner | Due Date | +|----------|--------|-------|----------| +| P1 | Fix `handleTxFirstBlock` to fetch block height from fullnode API when `first_block` is set | TBD | TBD | +| P1 | Add `TX_LOST_FIRST_BLOCK` handling in `metadataDiff` to reset height to `NULL` when `first_block` becomes null | TBD | TBD | +| P1 | Create migration script to fix existing 1.4M transactions with `height = 0` | TBD | TBD | +| P2 | Add validation: reject/warn when `first_block` is set but `height = 0` | TBD | TBD | +| P2 | Report bug to fullnode team: `metadata.height` should contain block height when `first_block` is set | TBD | TBD | +| P3 | Add monitoring alert for transactions stored with `height = 0` | TBD | TBD | + +## Related Items + +- `packages/daemon/src/services/index.ts` - `handleTxFirstBlock` function (line 674-707) +- `packages/daemon/src/services/index.ts` - `handleVertexAccepted` function (line 192-479) +- `packages/daemon/src/services/index.ts` - `metadataDiff` function (line 98-178) +- `packages/event-downloader/` - Tool created for this investigation diff --git a/packages/event-downloader/package.json b/packages/event-downloader/package.json new file mode 100644 index 00000000..d1025d26 --- /dev/null +++ b/packages/event-downloader/package.json @@ -0,0 +1,33 @@ +{ + "name": "event-downloader", + "license": "MIT", + "main": "dist/index.js", + "typings": "dist/index.d.ts", + "files": [ + "dist", + "src" + ], + "engines": { + "node": ">=18" + }, + "scripts": { + "build": "tsc -b", + "start": "node dist/index.js", + "lint": "eslint ." + }, + "peerDependencies": { + "@hathor/wallet-lib": "2.8.3" + }, + "dependencies": { + "better-sqlite3": "^11.7.0", + "dotenv": "^16.4.5", + "ws": "^8.18.0", + "zod": "^3.23.8" + }, + "devDependencies": { + "@types/better-sqlite3": "^7.6.12", + "@types/node": "^22.10.2", + "@types/ws": "^8.5.13", + "typescript": "^5.7.2" + } +} diff --git a/packages/event-downloader/src/config.ts b/packages/event-downloader/src/config.ts new file mode 100644 index 00000000..400f7086 --- /dev/null +++ b/packages/event-downloader/src/config.ts @@ -0,0 +1,47 @@ +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import 'dotenv/config'; + +const requiredEnvs = [ + 'FULLNODE_HOST', +]; + +export const checkEnvVariables = () => { + const missingEnv = requiredEnvs.filter(envVar => process.env[envVar] === undefined); + + if (missingEnv.length > 0) { + throw new Error(`Missing required environment variables: ${missingEnv.join(', ')}`); + } +}; + +// Fullnode connection +export const FULLNODE_HOST = process.env.FULLNODE_HOST!; +export const USE_SSL = process.env.USE_SSL === 'true'; + +// Download configuration +export const BATCH_SIZE = parseInt(process.env.BATCH_SIZE ?? '5000', 10); +export const PARALLEL_CONNECTIONS = parseInt(process.env.PARALLEL_CONNECTIONS ?? '5', 10); +export const WINDOW_SIZE = parseInt(process.env.WINDOW_SIZE ?? '100', 10); + +// Database configuration +export const DB_PATH = process.env.DB_PATH ?? './events.sqlite'; + +export const getConfig = () => { + checkEnvVariables(); + + return { + FULLNODE_HOST, + USE_SSL, + BATCH_SIZE, + PARALLEL_CONNECTIONS, + WINDOW_SIZE, + DB_PATH, + }; +}; + +export default getConfig; diff --git a/packages/event-downloader/src/db.ts b/packages/event-downloader/src/db.ts new file mode 100644 index 00000000..587c4fc3 --- /dev/null +++ b/packages/event-downloader/src/db.ts @@ -0,0 +1,222 @@ +import Database, { Database as DatabaseType } from 'better-sqlite3'; + +// Types +export interface Event { + id: number; + type: string; + timestamp: number; + data: string; +} + +export interface TxEvent { + tx_hash: string; + event_id: number; + event_type: string; +} + +export interface BatchProgress { + batch_start: number; + batch_end: number; + last_downloaded: number | null; + status: string; + updated_at: string; +} + +// SQL statements +const CREATE_EVENTS_TABLE = ` + CREATE TABLE IF NOT EXISTS events ( + id INTEGER PRIMARY KEY, + type TEXT NOT NULL, + timestamp INTEGER NOT NULL, + data TEXT NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ) +`; + +const CREATE_TX_EVENTS_TABLE = ` + CREATE TABLE IF NOT EXISTS tx_events ( + tx_hash TEXT NOT NULL, + event_id INTEGER NOT NULL, + event_type TEXT NOT NULL, + PRIMARY KEY (tx_hash, event_id) + ) +`; + +const CREATE_DOWNLOAD_PROGRESS_TABLE = ` + CREATE TABLE IF NOT EXISTS download_progress ( + batch_start INTEGER PRIMARY KEY, + batch_end INTEGER NOT NULL, + last_downloaded INTEGER, + status TEXT DEFAULT 'pending', + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP + ) +`; + +const CREATE_TX_EVENTS_HASH_INDEX = ` + CREATE INDEX IF NOT EXISTS idx_tx_events_hash ON tx_events(tx_hash) +`; + +const CREATE_EVENTS_TYPE_INDEX = ` + CREATE INDEX IF NOT EXISTS idx_events_type ON events(type) +`; + +/** + * Initialize the database with all required tables and indexes. + * @param dbPath - Path to the SQLite database file + * @returns The database instance + */ +export function initDatabase(dbPath: string): DatabaseType { + const db = new Database(dbPath); + + // Enable WAL mode for better concurrent read performance + db.pragma('journal_mode = WAL'); + + // Create tables + db.exec(CREATE_EVENTS_TABLE); + db.exec(CREATE_TX_EVENTS_TABLE); + db.exec(CREATE_DOWNLOAD_PROGRESS_TABLE); + + // Create indexes + db.exec(CREATE_TX_EVENTS_HASH_INDEX); + db.exec(CREATE_EVENTS_TYPE_INDEX); + + return db; +} + +/** + * Batch insert events into the events table. + * Uses INSERT OR REPLACE to handle duplicates. + * @param db - Database instance + * @param events - Array of events to insert + */ +export function insertEvents(db: DatabaseType, events: Event[]): void { + if (events.length === 0) { + return; + } + + const insertStmt = db.prepare(` + INSERT OR REPLACE INTO events (id, type, timestamp, data) + VALUES (@id, @type, @timestamp, @data) + `); + + const insertMany = db.transaction((eventsToInsert: Event[]) => { + for (const event of eventsToInsert) { + insertStmt.run(event); + } + }); + + insertMany(events); +} + +/** + * Batch insert transaction event mappings into the tx_events table. + * Uses INSERT OR REPLACE to handle duplicates. + * @param db - Database instance + * @param txEvents - Array of transaction events to insert + */ +export function insertTxEvents(db: DatabaseType, txEvents: TxEvent[]): void { + if (txEvents.length === 0) { + return; + } + + const insertStmt = db.prepare(` + INSERT OR REPLACE INTO tx_events (tx_hash, event_id, event_type) + VALUES (@tx_hash, @event_id, @event_type) + `); + + const insertMany = db.transaction((txEventsToInsert: TxEvent[]) => { + for (const txEvent of txEventsToInsert) { + insertStmt.run(txEvent); + } + }); + + insertMany(txEvents); +} + +/** + * Get the progress for a specific batch. + * @param db - Database instance + * @param batchStart - The starting event ID of the batch + * @returns The batch progress record or undefined if not found + */ +export function getBatchProgress( + db: DatabaseType, + batchStart: number +): BatchProgress | undefined { + const stmt = db.prepare(` + SELECT batch_start, batch_end, last_downloaded, status, updated_at + FROM download_progress + WHERE batch_start = ? + `); + + return stmt.get(batchStart) as BatchProgress | undefined; +} + +/** + * Update or insert batch progress. + * Uses INSERT OR REPLACE to upsert the record. + * @param db - Database instance + * @param batchStart - The starting event ID of the batch + * @param batchEnd - The ending event ID of the batch + * @param lastDownloaded - The last successfully downloaded event ID (null if none) + * @param status - The status of the batch ('pending', 'in_progress', 'completed', etc.) + */ +export function updateBatchProgress( + db: DatabaseType, + batchStart: number, + batchEnd: number, + lastDownloaded: number | null, + status: string +): void { + const stmt = db.prepare(` + INSERT OR REPLACE INTO download_progress (batch_start, batch_end, last_downloaded, status, updated_at) + VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP) + `); + + stmt.run(batchStart, batchEnd, lastDownloaded, status); +} + +/** + * Get all batches that are not marked as 'completed'. + * @param db - Database instance + * @returns Array of pending batch progress records + */ +export function getPendingBatches(db: DatabaseType): BatchProgress[] { + const stmt = db.prepare(` + SELECT batch_start, batch_end, last_downloaded, status, updated_at + FROM download_progress + WHERE status != 'completed' + ORDER BY batch_start ASC + `); + + return stmt.all() as BatchProgress[]; +} + +/** + * Get all batch progress records. + * @param db - Database instance + * @returns Array of all batch progress records + */ +export function getAllBatchProgress(db: DatabaseType): BatchProgress[] { + const stmt = db.prepare(` + SELECT batch_start, batch_end, last_downloaded, status, updated_at + FROM download_progress + ORDER BY batch_start ASC + `); + + return stmt.all() as BatchProgress[]; +} + +/** + * Get the highest event ID currently stored in the database. + * @param db - Database instance + * @returns The highest event ID or null if no events exist + */ +export function getLastEventId(db: DatabaseType): number | null { + const stmt = db.prepare(` + SELECT MAX(id) as last_id FROM events + `); + + const result = stmt.get() as { last_id: number | null }; + return result?.last_id ?? null; +} diff --git a/packages/event-downloader/src/event-parser.ts b/packages/event-downloader/src/event-parser.ts new file mode 100644 index 00000000..48ac1200 --- /dev/null +++ b/packages/event-downloader/src/event-parser.ts @@ -0,0 +1,38 @@ +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import { FullNodeEvent, FullNodeEventTypes } from './types'; + +/** + * Extracts the transaction hash from a fullnode event. + * + * @param event - The fullnode event to extract the hash from + * @returns The transaction hash if the event contains one, null otherwise + */ +export function extractTxHash(event: FullNodeEvent): string | null { + const eventType = event.event.type; + + switch (eventType) { + case FullNodeEventTypes.NEW_VERTEX_ACCEPTED: + case FullNodeEventTypes.VERTEX_METADATA_CHANGED: + case FullNodeEventTypes.VERTEX_REMOVED: + return event.event.data.hash; + + case FullNodeEventTypes.NC_EVENT: + return event.event.data.vertex_id; + + case FullNodeEventTypes.LOAD_STARTED: + case FullNodeEventTypes.LOAD_FINISHED: + case FullNodeEventTypes.REORG_STARTED: + case FullNodeEventTypes.REORG_FINISHED: + case FullNodeEventTypes.FULL_NODE_CRASHED: + return null; + + default: + return null; + } +} diff --git a/packages/event-downloader/src/index.ts b/packages/event-downloader/src/index.ts new file mode 100644 index 00000000..70da84cb --- /dev/null +++ b/packages/event-downloader/src/index.ts @@ -0,0 +1,200 @@ +#!/usr/bin/env node +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import { checkEnvVariables, BATCH_SIZE, PARALLEL_CONNECTIONS, WINDOW_SIZE, DB_PATH, FULLNODE_HOST } from './config'; +import { downloadAllEvents, DownloadStats, WorkerStatus } from './orchestrator'; + +// ANSI escape codes for terminal formatting +const HIDE_CURSOR = '\x1b[?25l'; +const SHOW_CURSOR = '\x1b[?25h'; +const CLEAR_SCREEN = '\x1b[2J'; +const MOVE_HOME = '\x1b[H'; +const BOLD = '\x1b[1m'; +const RESET = '\x1b[0m'; +const GREEN = '\x1b[32m'; +const YELLOW = '\x1b[33m'; +const CYAN = '\x1b[36m'; + +const workerStatuses = new Map(); +let lastRenderTime = 0; +const RENDER_INTERVAL_MS = 500; // Only redraw every 500ms + +// For rate and ETA calculation +let startTime = 0; +let lastEventsDownloaded = 0; +let lastRateCheckTime = 0; +let currentRate = 0; // events per second + +function formatNumber(num: number): string { + return num.toLocaleString(); +} + +function formatDuration(seconds: number): string { + if (seconds < 60) { + return `${Math.round(seconds)}s`; + } + if (seconds < 3600) { + const mins = Math.floor(seconds / 60); + const secs = Math.round(seconds % 60); + return `${mins}m ${secs}s`; + } + const hours = Math.floor(seconds / 3600); + const mins = Math.floor((seconds % 3600) / 60); + return `${hours}h ${mins}m`; +} + +function formatRate(rate: number): string { + if (rate >= 1000) { + return `${(rate / 1000).toFixed(1)}k/s`; + } + return `${Math.round(rate)}/s`; +} + +function progressBar(percent: number, width: number = 40): string { + const filled = Math.round((percent / 100) * width); + const empty = width - filled; + return `[${'█'.repeat(filled)}${'░'.repeat(empty)}]`; +} + +let lastStats: DownloadStats | null = null; + +function render(): void { + if (!lastStats) return; + + const stats = lastStats; + const percent = stats.totalEvents > 0 + ? (stats.eventsDownloaded / stats.totalEvents) * 100 + : 0; + + // Calculate rate + const now = Date.now(); + if (lastRateCheckTime > 0) { + const timeDelta = (now - lastRateCheckTime) / 1000; + if (timeDelta > 0) { + const eventsDelta = stats.eventsDownloaded - lastEventsDownloaded; + // Smooth the rate with exponential moving average + const newRate = eventsDelta / timeDelta; + currentRate = currentRate === 0 ? newRate : currentRate * 0.7 + newRate * 0.3; + } + } + lastRateCheckTime = now; + lastEventsDownloaded = stats.eventsDownloaded; + + // Calculate ETA + const eventsRemaining = stats.totalEvents - stats.eventsDownloaded; + const etaSeconds = currentRate > 0 ? eventsRemaining / currentRate : 0; + + // Calculate elapsed time + const elapsedSeconds = startTime > 0 ? (now - startTime) / 1000 : 0; + + // Move cursor to home and clear screen + process.stdout.write(MOVE_HOME + CLEAR_SCREEN); + + // Header + console.log(`${BOLD}Event Downloader v1.0.0${RESET}`); + console.log('━'.repeat(60)); + console.log(`${CYAN}Fullnode:${RESET} ${FULLNODE_HOST}`); + console.log(`${CYAN}Database:${RESET} ${DB_PATH}`); + console.log(`${CYAN}Batch Size:${RESET} ${formatNumber(BATCH_SIZE)} | ${CYAN}Workers:${RESET} ${PARALLEL_CONNECTIONS} | ${CYAN}Window:${RESET} ${WINDOW_SIZE}`); + console.log('━'.repeat(60)); + console.log(); + + // Stats + console.log(`${CYAN}Latest event ID:${RESET} ${formatNumber(stats.totalEvents)}`); + console.log(`${CYAN}Total batches:${RESET} ${stats.totalBatches}`); + console.log( + `${GREEN}Completed:${RESET} ${stats.completedBatches} | ` + + `${YELLOW}In Progress:${RESET} ${stats.inProgressBatches} | ` + + `Pending: ${stats.pendingBatches}` + ); + console.log(); + console.log(`${progressBar(percent)} ${percent.toFixed(1)}%`); + console.log(`${CYAN}Events downloaded:${RESET} ${formatNumber(stats.eventsDownloaded)}`); + console.log( + `${CYAN}Rate:${RESET} ${formatRate(currentRate)} | ` + + `${CYAN}Elapsed:${RESET} ${formatDuration(elapsedSeconds)} | ` + + `${CYAN}ETA:${RESET} ${etaSeconds > 0 ? formatDuration(etaSeconds) : '--'}` + ); + console.log(); + + // Print worker statuses + const sortedWorkers = Array.from(workerStatuses.entries()).sort((a, b) => a[0] - b[0]); + for (const [workerId, status] of sortedWorkers) { + const workerPercent = ((status.lastEventId - status.batchStart) / (status.batchEnd - status.batchStart) * 100).toFixed(0); + console.log( + `Worker ${workerId + 1}: ${formatNumber(status.batchStart)}-${formatNumber(status.batchEnd)} | ` + + `${formatNumber(status.eventsDownloaded)} events (${workerPercent}%)` + ); + } +} + +function printStats(stats: DownloadStats): void { + lastStats = stats; + + const now = Date.now(); + if (now - lastRenderTime < RENDER_INTERVAL_MS) { + return; // Throttle updates + } + lastRenderTime = now; + + render(); +} + +function handleWorkerUpdate(workerId: number, status: WorkerStatus): void { + workerStatuses.set(workerId, status); +} + +async function main(): Promise { + try { + // Check environment variables + checkEnvVariables(); + + // Hide cursor and clear screen + process.stdout.write(HIDE_CURSOR + CLEAR_SCREEN + MOVE_HOME); + + console.log(`${BOLD}Event Downloader v1.0.0${RESET}`); + console.log('Connecting to fullnode...'); + + // Initialize timing + startTime = Date.now(); + + // Handle graceful shutdown + const cleanup = () => { + process.stdout.write(SHOW_CURSOR); + console.log('\n\nDownload interrupted. Progress has been saved.'); + process.exit(0); + }; + + process.on('SIGINT', cleanup); + process.on('SIGTERM', cleanup); + + // Start download + await downloadAllEvents({ + onStatsUpdate: printStats, + onWorkerUpdate: handleWorkerUpdate, + onComplete: () => { + render(); // Final render + process.stdout.write(SHOW_CURSOR); + console.log(); + console.log('━'.repeat(60)); + console.log(`${GREEN}${BOLD}Download complete!${RESET}`); + console.log(`Database saved to: ${DB_PATH}`); + }, + onError: (error) => { + process.stdout.write(SHOW_CURSOR); + console.error(`\n${BOLD}Error:${RESET}`, error.message); + }, + }); + } catch (error) { + process.stdout.write(SHOW_CURSOR); + console.error('Fatal error:', error); + process.exit(1); + } +} + +main(); diff --git a/packages/event-downloader/src/orchestrator.ts b/packages/event-downloader/src/orchestrator.ts new file mode 100644 index 00000000..4c9c7330 --- /dev/null +++ b/packages/event-downloader/src/orchestrator.ts @@ -0,0 +1,343 @@ +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import { Database as DatabaseType } from 'better-sqlite3'; +import { bigIntUtils } from '@hathor/wallet-lib'; +import WebSocket from 'ws'; +import { + initDatabase, + insertEvents, + insertTxEvents, + updateBatchProgress, + getAllBatchProgress, + BatchProgress, + Event as DbEvent, + TxEvent, +} from './db'; +import { createWorker, BatchConfig } from './worker'; +import { extractTxHash } from './event-parser'; +import { FullNodeEvent } from './types'; +import { FULLNODE_HOST, USE_SSL, BATCH_SIZE, PARALLEL_CONNECTIONS, DB_PATH } from './config'; + +export interface DownloadStats { + totalEvents: number; + totalBatches: number; + completedBatches: number; + inProgressBatches: number; + pendingBatches: number; + eventsDownloaded: number; +} + +export interface WorkerStatus { + batchStart: number; + batchEnd: number; + eventsDownloaded: number; + lastEventId: number; +} + +export interface OrchestratorCallbacks { + onStatsUpdate: (stats: DownloadStats) => void; + onWorkerUpdate: (workerId: number, status: WorkerStatus) => void; + onComplete: () => void; + onError: (error: Error) => void; +} + +interface BatchInfo { + start: number; + end: number; + lastDownloaded?: number; +} + +/** + * Get the latest event ID from the fullnode via WebSocket. + */ +export async function getLatestEventId(): Promise { + return new Promise((resolve, reject) => { + const protocol = USE_SSL ? 'wss://' : 'ws://'; + const url = new URL(`${protocol}${FULLNODE_HOST}`); + url.pathname = '/v1a/event_ws'; + + const socket = new WebSocket(url.toString()); + const timeout = setTimeout(() => { + socket.close(); + reject(new Error('Timeout waiting for initial event')); + }, 30000); + + socket.onopen = () => { + const startMessage = { + type: 'START_STREAM', + window_size: 1, + }; + socket.send(bigIntUtils.JSONBigInt.stringify(startMessage)); + }; + + socket.onmessage = (event) => { + clearTimeout(timeout); + try { + const data = bigIntUtils.JSONBigInt.parse(event.data.toString()); + const latestEventId = data.latest_event_id; + socket.close(); + resolve(latestEventId); + } catch (error) { + socket.close(); + reject(error); + } + }; + + socket.onerror = (error) => { + clearTimeout(timeout); + reject(new Error(`WebSocket error: ${error.message}`)); + }; + }); +} + +/** + * Calculate batches needed to download all events. + */ +function calculateBatches(latestEventId: number, batchSize: number): BatchInfo[] { + const batches: BatchInfo[] = []; + let start = 0; + + while (start < latestEventId) { + const end = Math.min(start + batchSize - 1, latestEventId); + batches.push({ start, end }); + start = end + 1; + } + + return batches; +} + +/** + * Merge calculated batches with existing progress from database. + */ +function mergeBatchesWithProgress( + batches: BatchInfo[], + existingProgress: BatchProgress[] +): BatchInfo[] { + const progressMap = new Map(); + for (const progress of existingProgress) { + progressMap.set(progress.batch_start, progress); + } + + return batches.map((batch) => { + const existing = progressMap.get(batch.start); + if (existing && existing.status === 'completed') { + return { ...batch, lastDownloaded: batch.end }; // Mark as complete + } + if (existing && existing.last_downloaded !== null) { + return { ...batch, lastDownloaded: existing.last_downloaded }; + } + return batch; + }).filter((batch) => { + // Filter out completed batches + return batch.lastDownloaded === undefined || batch.lastDownloaded < batch.end; + }); +} + +/** + * Run a single worker for a batch. + */ +function runWorker( + db: DatabaseType, + batch: BatchInfo, + workerId: number, + callbacks: OrchestratorCallbacks +): Promise { + return new Promise((resolve, reject) => { + const eventBuffer: DbEvent[] = []; + const txEventBuffer: TxEvent[] = []; + let eventsDownloaded = 0; + let lastEventId = batch.lastDownloaded ?? batch.start - 1; + + const flushBuffers = () => { + if (eventBuffer.length > 0) { + insertEvents(db, eventBuffer); + eventBuffer.length = 0; + } + if (txEventBuffer.length > 0) { + insertTxEvents(db, txEventBuffer); + txEventBuffer.length = 0; + } + }; + + const config: BatchConfig = { + batchStart: batch.start, + batchEnd: batch.end, + lastDownloaded: batch.lastDownloaded, + }; + + const worker = createWorker(config, { + onEvent: (event: FullNodeEvent) => { + eventsDownloaded++; + lastEventId = event.event.id; + + // Buffer event for batch insert + eventBuffer.push({ + id: event.event.id, + type: event.event.type, + timestamp: event.event.timestamp, + data: bigIntUtils.JSONBigInt.stringify(event), + }); + + // Extract and buffer tx hash mapping + const txHash = extractTxHash(event); + if (txHash) { + txEventBuffer.push({ + tx_hash: txHash, + event_id: event.event.id, + event_type: event.event.type, + }); + } + + // Flush every 10 events to avoid memory buildup + if (eventBuffer.length >= 10) { + try { + flushBuffers(); + updateBatchProgress(db, batch.start, batch.end, lastEventId, 'in_progress'); + } catch (e) { + // Database might be closed, ignore + } + } + }, + + onProgress: (eventId: number) => { + callbacks.onWorkerUpdate(workerId, { + batchStart: batch.start, + batchEnd: batch.end, + eventsDownloaded, + lastEventId: eventId, + }); + }, + + onComplete: () => { + flushBuffers(); + updateBatchProgress(db, batch.start, batch.end, batch.end, 'completed'); + resolve(); + }, + + onError: (error: Error) => { + // Save progress before failing (only if db is still open) + try { + flushBuffers(); + if (lastEventId >= batch.start) { + updateBatchProgress(db, batch.start, batch.end, lastEventId, 'failed'); + } + } catch (e) { + // Database might be closed already, ignore + } + reject(error); + }, + }); + + // Mark batch as in progress + updateBatchProgress(db, batch.start, batch.end, batch.lastDownloaded ?? null, 'in_progress'); + worker.start(); + }); +} + +/** + * Run workers with concurrency limit. + * Each worker slot picks up the next available batch when done. + */ +async function runWithConcurrency( + items: T[], + concurrency: number, + fn: (item: T, workerSlot: number) => Promise +): Promise { + const results: Promise[] = []; + let currentIndex = 0; + + const runWorkerSlot = async (workerSlot: number): Promise => { + while (currentIndex < items.length) { + const index = currentIndex++; + if (index >= items.length) { + break; + } + + try { + await fn(items[index], workerSlot); + } catch (error) { + // Log but continue with other batches + console.error(`Worker ${workerSlot} failed:`, error); + } + } + }; + + // Start worker slots + const numWorkers = Math.min(concurrency, items.length); + for (let i = 0; i < numWorkers; i++) { + results.push(runWorkerSlot(i)); + } + + await Promise.all(results); +} + +/** + * Main orchestrator function to download all events. + */ +export async function downloadAllEvents(callbacks: OrchestratorCallbacks): Promise { + // Initialize database + const db = initDatabase(DB_PATH); + + try { + // Get latest event ID from fullnode + console.log('Connecting to fullnode to get latest event ID...'); + const latestEventId = await getLatestEventId(); + console.log(`Latest event ID: ${latestEventId.toLocaleString()}`); + + // Calculate all batches + const allBatches = calculateBatches(latestEventId, BATCH_SIZE); + console.log(`Total batches: ${allBatches.length} (${BATCH_SIZE.toLocaleString()} events each)`); + + // Get existing progress and merge + const existingProgress = getAllBatchProgress(db); + const pendingBatches = mergeBatchesWithProgress(allBatches, existingProgress); + + const completedCount = allBatches.length - pendingBatches.length; + console.log(`Completed: ${completedCount} | Pending: ${pendingBatches.length}`); + + if (pendingBatches.length === 0) { + console.log('All batches already completed!'); + callbacks.onComplete(); + return; + } + + // Initialize progress tracking + let totalEventsDownloaded = 0; + const workerStatuses = new Map(); + + const updateStats = () => { + callbacks.onStatsUpdate({ + totalEvents: latestEventId, + totalBatches: allBatches.length, + completedBatches: completedCount + (allBatches.length - pendingBatches.length), + inProgressBatches: workerStatuses.size, + pendingBatches: pendingBatches.length - workerStatuses.size, + eventsDownloaded: totalEventsDownloaded, + }); + }; + + // Run workers with concurrency limit + await runWithConcurrency(pendingBatches, PARALLEL_CONNECTIONS, async (batch, workerSlot) => { + await runWorker(db, batch, workerSlot, { + ...callbacks, + onWorkerUpdate: (workerId, status) => { + workerStatuses.set(workerId, status); + totalEventsDownloaded = Array.from(workerStatuses.values()) + .reduce((sum, s) => sum + s.eventsDownloaded, 0); + updateStats(); + callbacks.onWorkerUpdate(workerId, status); + }, + }); + // Don't delete - worker slot will be reused for next batch + }); + + callbacks.onComplete(); + } finally { + db.close(); + } +} diff --git a/packages/event-downloader/src/types.ts b/packages/event-downloader/src/types.ts new file mode 100644 index 00000000..eda2b821 --- /dev/null +++ b/packages/event-downloader/src/types.ts @@ -0,0 +1,200 @@ +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import z from 'zod'; +import { bigIntUtils } from '@hathor/wallet-lib'; + +// Use type assertion to handle zod version compatibility +const bigIntSchema = bigIntUtils.bigIntCoercibleSchema as unknown as z.ZodType; + +export type WebSocketSendEvent = + | { + type: 'START_STREAM'; + window_size: number; + last_ack_event_id?: number; + } + | { + type: 'ACK'; + window_size: number; + ack_event_id?: number; + }; + +export enum FullNodeEventTypes { + VERTEX_METADATA_CHANGED = 'VERTEX_METADATA_CHANGED', + VERTEX_REMOVED = 'VERTEX_REMOVED', + NEW_VERTEX_ACCEPTED = 'NEW_VERTEX_ACCEPTED', + LOAD_STARTED = 'LOAD_STARTED', + LOAD_FINISHED = 'LOAD_FINISHED', + REORG_STARTED = 'REORG_STARTED', + REORG_FINISHED = 'REORG_FINISHED', + NC_EVENT = 'NC_EVENT', + FULL_NODE_CRASHED = 'FULL_NODE_CRASHED', +} + +/** + * All events with transactions + */ +const StandardFullNodeEvents = z.union([ + z.literal('VERTEX_METADATA_CHANGED'), + z.literal('NEW_VERTEX_ACCEPTED'), +]); + +/** + * Events without data + */ +const EmptyDataFullNodeEvents = z.union([ + z.literal('LOAD_STARTED'), + z.literal('LOAD_FINISHED'), + z.literal('REORG_FINISHED'), + z.literal('FULL_NODE_CRASHED'), +]); + +export const FullNodeEventBaseSchema = z.object({ + stream_id: z.string(), + peer_id: z.string(), + network: z.string(), + type: z.string(), + latest_event_id: z.number(), +}); + +export type FullNodeEventBase = z.infer; + +export const EventTxOutputSchema = z.object({ + value: bigIntSchema, + token_data: z.number(), + script: z.string(), + locked: z.boolean().optional(), + decoded: z.union([ + z.object({ + type: z.string(), + address: z.string(), + timelock: z.number().nullable(), + }).passthrough().nullable(), + z.object({ + token_data: z.number().nullable(), + }), + z.object({}).strict(), + ]), +}); +export type EventTxOutput = z.infer; + +export const EventTxInputSchema = z.object({ + tx_id: z.string(), + index: z.number(), + spent_output: EventTxOutputSchema, +}); +export type EventTxInput = z.infer; + +export const EventTxNanoHeaderSchema = z.object({ + id: z.string(), + nc_seqnum: z.number(), + nc_id: z.string(), + nc_method: z.string(), + nc_address: z.string(), +}); +export type EventTxNanoHeader = z.infer; + +export const TxEventDataWithoutMetaSchema = z.object({ + hash: z.string(), + timestamp: z.number(), + version: z.number(), + weight: z.number(), + nonce: bigIntSchema, + inputs: EventTxInputSchema.array(), + outputs: EventTxOutputSchema.array(), + headers: EventTxNanoHeaderSchema.array().optional(), + parents: z.string().array(), + tokens: z.string().array(), + token_name: z.string().nullable(), + token_symbol: z.string().nullable(), + signal_bits: z.number(), +}); + +export const TxEventDataSchema = TxEventDataWithoutMetaSchema.extend({ + metadata: z.object({ + hash: z.string(), + voided_by: z.string().array(), + first_block: z.string().nullable(), + height: z.number(), + }), +}); + +export const StandardFullNodeEventSchema = FullNodeEventBaseSchema.extend({ + event: z.object({ + id: z.number(), + timestamp: z.number(), + type: StandardFullNodeEvents, + data: TxEventDataSchema, + }), +}); + +export type StandardFullNodeEvent = z.infer; + +export const ReorgFullNodeEventSchema = FullNodeEventBaseSchema.extend({ + event: z.object({ + id: z.number(), + timestamp: z.number(), + type: z.literal('REORG_STARTED'), + data: z.object({ + reorg_size: z.number(), + previous_best_block: z.string(), + new_best_block: z.string(), + common_block: z.string(), + }), + group_id: z.number(), + }), +}); +export type ReorgFullNodeEvent = z.infer; + +export const EmptyDataFullNodeEventSchema = FullNodeEventBaseSchema.extend({ + event: z.object({ + id: z.number(), + timestamp: z.number(), + type: EmptyDataFullNodeEvents, + data: z.object({}).optional(), + }), +}); + +export const TxDataWithoutMetaFullNodeEventSchema = FullNodeEventBaseSchema.extend({ + event: z.object({ + id: z.number(), + timestamp: z.number(), + type: z.literal('VERTEX_REMOVED'), + data: TxEventDataWithoutMetaSchema, + }), +}); + +export const NcEventSchema = FullNodeEventBaseSchema.extend({ + event: z.object({ + id: z.number(), + timestamp: z.number(), + type: z.literal('NC_EVENT'), + data: z.object({ + vertex_id: z.string(), + nc_id: z.string(), + nc_execution: z.union([ + z.literal('pending'), + z.literal('success'), + z.literal('failure'), + z.literal('skipped'), + ]), + first_block: z.string(), + data_hex: z.string(), + }), + group_id: z.number().nullish(), + }), +}); +export type NcEvent = z.infer; + +export const FullNodeEventSchema = z.union([ + TxDataWithoutMetaFullNodeEventSchema, + StandardFullNodeEventSchema, + ReorgFullNodeEventSchema, + EmptyDataFullNodeEventSchema, + NcEventSchema, +]); +export type FullNodeEvent = z.infer; diff --git a/packages/event-downloader/src/worker.ts b/packages/event-downloader/src/worker.ts new file mode 100644 index 00000000..462b86df --- /dev/null +++ b/packages/event-downloader/src/worker.ts @@ -0,0 +1,150 @@ +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import { WebSocket, MessageEvent, ErrorEvent } from 'ws'; +import { bigIntUtils } from '@hathor/wallet-lib'; +import { FullNodeEvent, FullNodeEventSchema, WebSocketSendEvent } from './types'; +import { FULLNODE_HOST, USE_SSL, WINDOW_SIZE } from './config'; + +export interface BatchConfig { + batchStart: number; + batchEnd: number; + lastDownloaded?: number; // Resume from this event if set +} + +export interface WorkerCallbacks { + onEvent: (event: FullNodeEvent) => void; + onProgress: (eventId: number) => void; + onComplete: () => void; + onError: (error: Error) => void; +} + +export interface Worker { + start: () => void; + stop: () => void; +} + +/** + * Creates a WebSocket worker that downloads a batch of events from the fullnode. + * + * @param config - Configuration for the batch to download + * @param callbacks - Callback functions for event handling + * @returns Worker object with start and stop methods + */ +export function createWorker(config: BatchConfig, callbacks: WorkerCallbacks): Worker { + const { batchStart, batchEnd, lastDownloaded } = config; + const { onEvent, onProgress, onComplete, onError } = callbacks; + + let socket: WebSocket | null = null; + let isRunning = false; + let eventsSinceLastAck = 0; + let lastReceivedEventId = 0; + + const getWsUrl = (): string => { + const protocol = USE_SSL ? 'wss://' : 'ws://'; + const fullNodeUrl = new URL(`${protocol}${FULLNODE_HOST}`); + fullNodeUrl.pathname = '/v1a/event_ws'; + return fullNodeUrl.toString(); + }; + + const sendMessage = (message: WebSocketSendEvent): void => { + if (socket && socket.readyState === WebSocket.OPEN) { + const payload = bigIntUtils.JSONBigInt.stringify(message); + socket.send(payload); + } + }; + + const start = (): void => { + if (isRunning) { + return; + } + + isRunning = true; + socket = new WebSocket(getWsUrl()); + + socket.onopen = () => { + const lastAckEventId = lastDownloaded ?? (batchStart > 0 ? batchStart - 1 : undefined); + const startMessage: WebSocketSendEvent = { + type: 'START_STREAM', + window_size: WINDOW_SIZE, + ...(lastAckEventId !== undefined && { last_ack_event_id: lastAckEventId }), + }; + sendMessage(startMessage); + }; + + socket.onmessage = (socketEvent: MessageEvent) => { + try { + const rawData = bigIntUtils.JSONBigInt.parse(socketEvent.data.toString()); + const parseResult = FullNodeEventSchema.safeParse(rawData); + + if (!parseResult.success) { + // Skip messages that don't conform to event schema (e.g., handshake messages) + // These are expected at the start of a connection + console.log(`Skipping non-event message: ${JSON.stringify(rawData).substring(0, 100)}...`); + return; + } + + const event = parseResult.data; + const eventId = event.event.id; + + // Call the event callback + onEvent(event); + + // Track events for batched ACKs + eventsSinceLastAck++; + lastReceivedEventId = eventId; + + // Report progress + onProgress(eventId); + + // Check if we've reached the end of the batch + const isComplete = eventId >= batchEnd; + + // Send ACK after receiving WINDOW_SIZE events, or when batch is complete + if (eventsSinceLastAck >= WINDOW_SIZE || isComplete) { + const ackMessage: WebSocketSendEvent = { + type: 'ACK', + window_size: WINDOW_SIZE, + ack_event_id: lastReceivedEventId, + }; + sendMessage(ackMessage); + eventsSinceLastAck = 0; + } + + if (isComplete) { + onComplete(); + stop(); + } + } catch (error) { + onError(error instanceof Error ? error : new Error(String(error))); + } + }; + + socket.onerror = (error: ErrorEvent) => { + onError(new Error(`WebSocket error: ${error.message}`)); + }; + + socket.onclose = () => { + if (isRunning) { + // Unexpected close - report as error + onError(new Error('WebSocket connection closed unexpectedly')); + } + isRunning = false; + socket = null; + }; + }; + + const stop = (): void => { + isRunning = false; + if (socket) { + socket.close(); + socket = null; + } + }; + + return { start, stop }; +} diff --git a/packages/event-downloader/tsconfig.json b/packages/event-downloader/tsconfig.json new file mode 100644 index 00000000..200c8a6c --- /dev/null +++ b/packages/event-downloader/tsconfig.json @@ -0,0 +1,17 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "commonjs", + "lib": ["ES2020"], + "outDir": "./dist", + "rootDir": "./src", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true, + "declaration": true + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "dist"] +} diff --git a/packages/event-downloader/tsconfig.tsbuildinfo b/packages/event-downloader/tsconfig.tsbuildinfo new file mode 100644 index 00000000..92d3ad63 --- /dev/null +++ b/packages/event-downloader/tsconfig.tsbuildinfo @@ -0,0 +1 @@ +{"root":["./src/config.ts","./src/db.ts","./src/event-parser.ts","./src/index.ts","./src/orchestrator.ts","./src/types.ts","./src/worker.ts"],"version":"5.9.3"} \ No newline at end of file diff --git a/yarn.lock b/yarn.lock index 972274fa..f4e79a7b 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6493,6 +6493,15 @@ __metadata: languageName: node linkType: hard +"@types/better-sqlite3@npm:^7.6.12": + version: 7.6.13 + resolution: "@types/better-sqlite3@npm:7.6.13" + dependencies: + "@types/node": "npm:*" + checksum: 10/c74dafa3c550ac866737870016d7b1a735c7d450c16d40962eeb54510fa150e91752bfdf678f55e91894d8853771b95f909b0062122116cddac4d80491b74411 + languageName: node + linkType: hard + "@types/body-parser@npm:*": version: 1.19.3 resolution: "@types/body-parser@npm:1.19.3" @@ -6804,6 +6813,15 @@ __metadata: languageName: node linkType: hard +"@types/node@npm:^22.10.2": + version: 22.19.3 + resolution: "@types/node@npm:22.19.3" + dependencies: + undici-types: "npm:~6.21.0" + checksum: 10/ffee06ce6d741fde98a40bc65a57394ed2283c521f57f9143d2356513181162bd4108809be6902a861d098b35e35569f61f14c64d3032e48a0289b74f917669a + languageName: node + linkType: hard + "@types/node@npm:^22.8.7": version: 22.9.0 resolution: "@types/node@npm:22.9.0" @@ -6936,6 +6954,15 @@ __metadata: languageName: node linkType: hard +"@types/ws@npm:^8.5.13": + version: 8.18.1 + resolution: "@types/ws@npm:8.18.1" + dependencies: + "@types/node": "npm:*" + checksum: 10/1ce05e3174dcacf28dae0e9b854ef1c9a12da44c7ed73617ab6897c5cbe4fccbb155a20be5508ae9a7dde2f83bd80f5cf3baa386b934fc4b40889ec963e94f3a + languageName: node + linkType: hard + "@types/yargs-parser@npm:*": version: 21.0.1 resolution: "@types/yargs-parser@npm:21.0.1" @@ -8367,6 +8394,17 @@ __metadata: languageName: node linkType: hard +"better-sqlite3@npm:^11.7.0": + version: 11.10.0 + resolution: "better-sqlite3@npm:11.10.0" + dependencies: + bindings: "npm:^1.5.0" + node-gyp: "npm:latest" + prebuild-install: "npm:^7.1.1" + checksum: 10/5e4c7437c4fe6033335a79c82974d7ab29f33c51c36f48b73e87e087d21578468575de1c56a7badd4f76f17255e25abefddaeacf018e5eeb9e0cb8d6e3e4a5e1 + languageName: node + linkType: hard + "bigi@npm:^1.1.0, bigi@npm:^1.4.2": version: 1.4.2 resolution: "bigi@npm:1.4.2" @@ -10966,6 +11004,23 @@ __metadata: languageName: node linkType: hard +"event-downloader@workspace:packages/event-downloader": + version: 0.0.0-use.local + resolution: "event-downloader@workspace:packages/event-downloader" + dependencies: + "@types/better-sqlite3": "npm:^7.6.12" + "@types/node": "npm:^22.10.2" + "@types/ws": "npm:^8.5.13" + better-sqlite3: "npm:^11.7.0" + dotenv: "npm:^16.4.5" + typescript: "npm:^5.7.2" + ws: "npm:^8.18.0" + zod: "npm:^3.23.8" + peerDependencies: + "@hathor/wallet-lib": 2.8.3 + languageName: unknown + linkType: soft + "event-emitter@npm:^0.3.5": version: 0.3.5 resolution: "event-emitter@npm:0.3.5" @@ -18524,6 +18579,16 @@ __metadata: languageName: node linkType: hard +"typescript@npm:^5.7.2": + version: 5.9.3 + resolution: "typescript@npm:5.9.3" + bin: + tsc: bin/tsc + tsserver: bin/tsserver + checksum: 10/c089d9d3da2729fd4ac517f9b0e0485914c4b3c26f80dc0cffcb5de1719a17951e92425d55db59515c1a7ddab65808466debb864d0d56dcf43f27007d0709594 + languageName: node + linkType: hard + "typescript@npm:^5.8.2": version: 5.8.2 resolution: "typescript@npm:5.8.2" @@ -18564,6 +18629,16 @@ __metadata: languageName: node linkType: hard +"typescript@patch:typescript@npm%3A^5.7.2#optional!builtin": + version: 5.9.3 + resolution: "typescript@patch:typescript@npm%3A5.9.3#optional!builtin::version=5.9.3&hash=5786d5" + bin: + tsc: bin/tsc + tsserver: bin/tsserver + checksum: 10/696e1b017bc2635f4e0c94eb4435357701008e2f272f553d06e35b494b8ddc60aa221145e286c28ace0c89ee32827a28c2040e3a69bdc108b1a5dc8fb40b72e3 + languageName: node + linkType: hard + "typescript@patch:typescript@npm%3A^5.8.2#optional!builtin": version: 5.8.2 resolution: "typescript@patch:typescript@npm%3A5.8.2#optional!builtin::version=5.8.2&hash=5786d5" @@ -18626,6 +18701,13 @@ __metadata: languageName: node linkType: hard +"undici-types@npm:~6.21.0": + version: 6.21.0 + resolution: "undici-types@npm:6.21.0" + checksum: 10/ec8f41aa4359d50f9b59fa61fe3efce3477cc681908c8f84354d8567bb3701fafdddf36ef6bff307024d3feb42c837cf6f670314ba37fc8145e219560e473d14 + languageName: node + linkType: hard + "uni-global@npm:^1.0.0": version: 1.0.0 resolution: "uni-global@npm:1.0.0" @@ -19477,3 +19559,10 @@ __metadata: checksum: 10/846fd73e1af0def79c19d510ea9e4a795544a67d5b34b7e1c4d0425bf6bfd1c719446d94cdfa1721c1987d891321d61f779e8236fde517dc0e524aa851a6eff1 languageName: node linkType: hard + +"zod@npm:^3.23.8": + version: 3.25.76 + resolution: "zod@npm:3.25.76" + checksum: 10/f0c963ec40cd96858451d1690404d603d36507c1fc9682f2dae59ab38b578687d542708a7fdbf645f77926f78c9ed558f57c3d3aa226c285f798df0c4da16995 + languageName: node + linkType: hard From 42b39371bfefc23e853c9df8890a54b78a22f1e5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Thu, 8 Jan 2026 13:48:24 -0300 Subject: [PATCH 2/9] fix(events-downloader): only consider a batch as complete if we downloaded all events from it --- packages/event-downloader/COE_HEIGHT_ZERO.md | 318 ------------------ packages/event-downloader/src/orchestrator.ts | 21 +- 2 files changed, 18 insertions(+), 321 deletions(-) delete mode 100644 packages/event-downloader/COE_HEIGHT_ZERO.md diff --git a/packages/event-downloader/COE_HEIGHT_ZERO.md b/packages/event-downloader/COE_HEIGHT_ZERO.md deleted file mode 100644 index 96c64cba..00000000 --- a/packages/event-downloader/COE_HEIGHT_ZERO.md +++ /dev/null @@ -1,318 +0,0 @@ -# COE: Transactions Stored with height = 0 in Wallet Service Database - -## Summary - -Over 1.4 million transactions in the wallet-service database have `height = 0` instead of the correct block height or `NULL`. In the wallet-service architecture, the `height` field on a transaction represents the height of the block that confirmed it. Unconfirmed transactions should have `height = NULL`, and confirmed transactions should have the height of their confirming block. - -The sync-daemon is responsible for processing fullnode events via WebSocket and updating the database accordingly. Investigation revealed two bugs: - -1. The fullnode sends `metadata.height = 0` for transactions even when they have a `first_block` (confirming block), and the daemon blindly trusts this value. -2. When a transaction loses its confirmation (`first_block` goes back to `null`), the daemon ignores the event entirely and never resets the height to `NULL`. - -**Discovery**: Manual database audit on 2025-12-31. - -**Root Causes**: -1. The fullnode sends `height: 0` in `VERTEX_METADATA_CHANGED` events even when `first_block` is populated. The daemon's `handleTxFirstBlock` function uses this value directly without validation. -2. The daemon's `metadataDiff` function has no handling for when `first_block` changes from a block hash back to `null`. These events are ignored, leaving the height at `0` instead of resetting it to `NULL`. - -## Impact - -- **1,408,080 transactions** stored with incorrect `height = 0` -- Transaction versions affected: - - Version 1 (regular transactions): 1,159,985 - - Version 2 (token creation): 248,091 - - Version 6 (nano contracts): 3 - - Version 0 (genesis block): 1 (expected) -- **Customer Impact**: APIs and queries that rely on transaction height for confirmation status may return incorrect data -- **Ongoing**: ~50-200 new transactions per day continue to be affected - -## Timeline - -- **Unknown**: Issue began occurring (likely since daemon deployment or fullnode update) -- **2025-12-31 ~15:00 UTC**: Anomaly discovered via database query showing 1.4M transactions with `height = 0` -- **2025-12-31 ~16:00 UTC**: Created `event-downloader` package to download all fullnode events for analysis -- **2025-12-31 ~19:00 UTC**: Downloaded all 7.8M events from fullnode -- **2025-12-31 ~20:00 UTC**: Identified root cause by analyzing events for sample transaction `000000b36b93b2e3088a63108882097c1dfc45e6303ec88ab34d99750029f871` - -## Metrics - -```sql --- Total affected transactions -SELECT COUNT(*) FROM transaction WHERE height = 0; --- Result: 1,408,080 - --- Breakdown by version -SELECT version, COUNT(*) FROM transaction WHERE height = 0 GROUP BY version; --- 0: 1 (genesis block - expected) --- 1: 1,159,985 --- 2: 248,091 --- 6: 3 - --- Daily rate of new affected transactions -SELECT DATE(FROM_UNIXTIME(timestamp)) as date, COUNT(*) -FROM transaction WHERE height = 0 -GROUP BY date ORDER BY date DESC LIMIT 10; -``` - -## Investigation Details - -### Background: Daemon Architecture - -The sync-daemon connects to the fullnode via WebSocket and processes events: - -1. `NEW_VERTEX_ACCEPTED`: A new transaction/block was added to the DAG -2. `VERTEX_METADATA_CHANGED`: Transaction metadata changed (e.g., got confirmed by a block) - -When processing `NEW_VERTEX_ACCEPTED` for a transaction (`packages/daemon/src/services/index.ts:246-250`): - -```typescript -let height: number | null = metadata.height; - -if (!isBlock(version) && !metadata.first_block) { - height = null; // Unconfirmed tx gets NULL height -} -``` - -When a `VERTEX_METADATA_CHANGED` event indicates a transaction got its first confirmation, `handleTxFirstBlock` is called (`packages/daemon/src/services/index.ts:689`): - -```typescript -const height: number | null = metadata.height; // Blindly trusts fullnode -await addOrUpdateTx(mysql, hash, height, timestamp, version, weight); -``` - -### Event Analysis Tool - -We created the `event-downloader` package to download all fullnode events and correlate them with transactions: - -- Downloaded 7,835,577 events from mainnet fullnode -- Stored in SQLite with transaction hash indexing -- Enables querying all events that affected a specific transaction - -### Sample Transaction Analysis - -Transaction: `000000b36b93b2e3088a63108882097c1dfc45e6303ec88ab34d99750029f871` - -Events in chronological order: - -#### Event 7762975 - VERTEX_METADATA_CHANGED -```json -{ - "event": { - "id": 7762975, - "timestamp": 1766182695.3963125, - "type": "VERTEX_METADATA_CHANGED", - "data": { - "hash": "000000b36b93b2e3088a63108882097c1dfc45e6303ec88ab34d99750029f871", - "version": 1, - "metadata": { - "voided_by": [], - "first_block": null, - "height": 0 - } - } - } -} -``` - -#### Event 7762979 - NEW_VERTEX_ACCEPTED -```json -{ - "event": { - "id": 7762979, - "timestamp": 1766182695.5350096, - "type": "NEW_VERTEX_ACCEPTED", - "data": { - "hash": "000000b36b93b2e3088a63108882097c1dfc45e6303ec88ab34d99750029f871", - "version": 1, - "metadata": { - "voided_by": [], - "first_block": null, - "height": 0 - } - } - } -} -``` -**Daemon behavior**: Sets `height = NULL` because `first_block` is null (correct). - -#### Event 7762981 - VERTEX_METADATA_CHANGED (TX_FIRST_BLOCK) -```json -{ - "event": { - "id": 7762981, - "timestamp": 1766182710.9933312, - "type": "VERTEX_METADATA_CHANGED", - "data": { - "hash": "000000b36b93b2e3088a63108882097c1dfc45e6303ec88ab34d99750029f871", - "version": 1, - "metadata": { - "voided_by": [], - "first_block": "000000000000000009800ba7eda9ee1734940d45dcfb568ac51d1426a290e761", - "height": 0 - } - } - } -} -``` -**Daemon behavior**: Detects `first_block` is now set, calls `handleTxFirstBlock`, stores `height = 0` (BUG - should be block's height). - -#### Event 7762993 - VERTEX_METADATA_CHANGED -```json -{ - "event": { - "id": 7762993, - "timestamp": 1766184357.769475, - "type": "VERTEX_METADATA_CHANGED", - "data": { - "hash": "000000b36b93b2e3088a63108882097c1dfc45e6303ec88ab34d99750029f871", - "version": 1, - "metadata": { - "voided_by": [], - "first_block": null, - "height": 0 - } - } - } -} -``` -**Note**: `first_block` returned to null because the transaction was sent back to the mempool. - -#### Event 7763176 - VERTEX_METADATA_CHANGED -```json -{ - "event": { - "id": 7763176, - "timestamp": 1766185267.1527846, - "type": "VERTEX_METADATA_CHANGED", - "data": { - "hash": "000000b36b93b2e3088a63108882097c1dfc45e6303ec88ab34d99750029f871", - "version": 1, - "metadata": { - "voided_by": [], - "first_block": "00000000000000000cf169d68d55d2c8f850a602623fa3dd54ff0bd332e9cbc3", - "height": 0 - } - } - } -} -``` -**Note**: Different `first_block`, still `height: 0`. - -#### Event 7763190 - VERTEX_METADATA_CHANGED -```json -{ - "event": { - "id": 7763190, - "timestamp": 1766186968.6764472, - "type": "VERTEX_METADATA_CHANGED", - "data": { - "hash": "000000b36b93b2e3088a63108882097c1dfc45e6303ec88ab34d99750029f871", - "version": 1, - "metadata": { - "voided_by": [], - "first_block": null, - "height": 0 - } - } - } -} -``` -**Daemon behavior**: `first_block` is null, but `metadataDiff` returns `IGNORE` because there's no handling for this case (BUG #2 - should reset height to `NULL`). - -### Key Findings - -1. The fullnode **always** sends `metadata.height = 0` for transactions, regardless of whether `first_block` is set. The `height` field in transaction metadata does not represent the confirming block's height. - -2. The daemon's `metadataDiff` function only handles the case when `first_block` becomes set (`TX_FIRST_BLOCK`), but has no handling for when `first_block` becomes `null` again. This means transactions that lose their confirmation keep `height = 0` instead of being reset to `height = NULL`. - -### Bug #2: Missing Handler for Lost Confirmation - -The `metadataDiff` function at `packages/daemon/src/services/index.ts:150-169`: - -```typescript -if (first_block - && first_block.length - && first_block.length > 0) { - if (!dbTx.height) { - return { - type: METADATA_DIFF_EVENT_TYPES.TX_FIRST_BLOCK, - originalEvent: event, - }; - } - - return { - type: METADATA_DIFF_EVENT_TYPES.IGNORE, - originalEvent: event, - }; -} - -return { - type: METADATA_DIFF_EVENT_TYPES.IGNORE, // <-- BUG: Should handle first_block becoming null - originalEvent: event, -}; -``` - -When `first_block` is `null`: -- The condition `if (first_block && first_block.length > 0)` is **FALSE** -- Falls through to return `IGNORE` -- No `TX_LOST_FIRST_BLOCK` event type exists - -This means even if Bug #1 were fixed and the correct height was stored initially, transactions that get sent back to the mempool would retain their old height instead of being reset to `NULL`. - -## Root Cause Analysis (5 Whys) - -### Bug #1: Wrong height stored on confirmation - -**Problem**: Transactions have `height = 0` instead of the correct block height when confirmed. - -1. **Why do transactions have `height = 0`?** - - The daemon's `handleTxFirstBlock` function stores `metadata.height` directly from the fullnode event. - -2. **Why does `handleTxFirstBlock` store the wrong value?** - - It assumes `metadata.height` contains the height of the confirming block when `first_block` is set. - -3. **Why is that assumption wrong?** - - The fullnode sends `metadata.height = 0` for transactions even when they have a `first_block`. - -4. **Why does the fullnode send `height = 0` for confirmed transactions?** - - The `height` field in transaction metadata represents something different. - -5. **Why wasn't this validated?** - - The daemon blindly trusts the fullnode data without validating that `height > 0` when `first_block` is present. - -### Bug #2: Height not reset when transaction loses confirmation - -**Problem**: Transactions that lose their confirmation (sent back to mempool) retain `height = 0` instead of being reset to `NULL`. - -1. **Why do transactions keep `height = 0` when sent back to mempool?** - - The daemon ignores `VERTEX_METADATA_CHANGED` events where `first_block` becomes `null`. - -2. **Why does the daemon ignore these events?** - - The `metadataDiff` function returns `IGNORE` when `first_block` is null. - -3. **Why does `metadataDiff` ignore null `first_block`?** - - It only handles the case when `first_block` becomes set (`TX_FIRST_BLOCK`), not when it becomes null. - -4. **Why is there no handler for lost confirmations?** - - No `TX_LOST_FIRST_BLOCK` event type was implemented. - -5. **Why wasn't this case considered?** - - Implementation oversight. The case where `first_block` becomes `null` was not handled. - -## Action Items - -| Priority | Action | Owner | Due Date | -|----------|--------|-------|----------| -| P1 | Fix `handleTxFirstBlock` to fetch block height from fullnode API when `first_block` is set | TBD | TBD | -| P1 | Add `TX_LOST_FIRST_BLOCK` handling in `metadataDiff` to reset height to `NULL` when `first_block` becomes null | TBD | TBD | -| P1 | Create migration script to fix existing 1.4M transactions with `height = 0` | TBD | TBD | -| P2 | Add validation: reject/warn when `first_block` is set but `height = 0` | TBD | TBD | -| P2 | Report bug to fullnode team: `metadata.height` should contain block height when `first_block` is set | TBD | TBD | -| P3 | Add monitoring alert for transactions stored with `height = 0` | TBD | TBD | - -## Related Items - -- `packages/daemon/src/services/index.ts` - `handleTxFirstBlock` function (line 674-707) -- `packages/daemon/src/services/index.ts` - `handleVertexAccepted` function (line 192-479) -- `packages/daemon/src/services/index.ts` - `metadataDiff` function (line 98-178) -- `packages/event-downloader/` - Tool created for this investigation diff --git a/packages/event-downloader/src/orchestrator.ts b/packages/event-downloader/src/orchestrator.ts index 4c9c7330..0b498785 100644 --- a/packages/event-downloader/src/orchestrator.ts +++ b/packages/event-downloader/src/orchestrator.ts @@ -113,6 +113,8 @@ function calculateBatches(latestEventId: number, batchSize: number): BatchInfo[] /** * Merge calculated batches with existing progress from database. + * Handles the case where a batch was completed with a smaller boundary + * (e.g., latestEventId was lower during a previous run). */ function mergeBatchesWithProgress( batches: BatchInfo[], @@ -125,12 +127,25 @@ function mergeBatchesWithProgress( return batches.map((batch) => { const existing = progressMap.get(batch.start); - if (existing && existing.status === 'completed') { - return { ...batch, lastDownloaded: batch.end }; // Mark as complete + if (!existing) { + return batch; } - if (existing && existing.last_downloaded !== null) { + + // Only consider fully complete if the stored batch_end covers the calculated batch_end + if (existing.status === 'completed' && existing.batch_end >= batch.end) { + return { ...batch, lastDownloaded: batch.end }; + } + + // Batch was "completed" but with a smaller boundary - resume from where it ended + if (existing.status === 'completed') { + return { ...batch, lastDownloaded: existing.batch_end }; + } + + // For in-progress or failed batches, resume from last_downloaded + if (existing.last_downloaded !== null) { return { ...batch, lastDownloaded: existing.last_downloaded }; } + return batch; }).filter((batch) => { // Filter out completed batches From 62cd1941afc01edca2a469cc66309a486ae69fc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Thu, 15 Jan 2026 16:51:30 -0300 Subject: [PATCH 3/9] chore: added timeout to ws connection --- packages/event-downloader/src/config.ts | 2 ++ packages/event-downloader/src/worker.ts | 27 ++++++++++++++++++- .../event-downloader/tsconfig.tsbuildinfo | 2 +- 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/packages/event-downloader/src/config.ts b/packages/event-downloader/src/config.ts index 400f7086..ef75f074 100644 --- a/packages/event-downloader/src/config.ts +++ b/packages/event-downloader/src/config.ts @@ -27,6 +27,7 @@ export const USE_SSL = process.env.USE_SSL === 'true'; export const BATCH_SIZE = parseInt(process.env.BATCH_SIZE ?? '5000', 10); export const PARALLEL_CONNECTIONS = parseInt(process.env.PARALLEL_CONNECTIONS ?? '5', 10); export const WINDOW_SIZE = parseInt(process.env.WINDOW_SIZE ?? '100', 10); +export const CONNECTION_TIMEOUT_MS = parseInt(process.env.CONNECTION_TIMEOUT_MS ?? '60000', 10); // Database configuration export const DB_PATH = process.env.DB_PATH ?? './events.sqlite'; @@ -41,6 +42,7 @@ export const getConfig = () => { PARALLEL_CONNECTIONS, WINDOW_SIZE, DB_PATH, + CONNECTION_TIMEOUT_MS, }; }; diff --git a/packages/event-downloader/src/worker.ts b/packages/event-downloader/src/worker.ts index 462b86df..6c3885b8 100644 --- a/packages/event-downloader/src/worker.ts +++ b/packages/event-downloader/src/worker.ts @@ -8,7 +8,7 @@ import { WebSocket, MessageEvent, ErrorEvent } from 'ws'; import { bigIntUtils } from '@hathor/wallet-lib'; import { FullNodeEvent, FullNodeEventSchema, WebSocketSendEvent } from './types'; -import { FULLNODE_HOST, USE_SSL, WINDOW_SIZE } from './config'; +import { FULLNODE_HOST, USE_SSL, WINDOW_SIZE, CONNECTION_TIMEOUT_MS } from './config'; export interface BatchConfig { batchStart: number; @@ -43,6 +43,28 @@ export function createWorker(config: BatchConfig, callbacks: WorkerCallbacks): W let isRunning = false; let eventsSinceLastAck = 0; let lastReceivedEventId = 0; + let activityTimeout: ReturnType | null = null; + + const resetActivityTimeout = (): void => { + if (activityTimeout) { + clearTimeout(activityTimeout); + } + if (isRunning && CONNECTION_TIMEOUT_MS > 0) { + activityTimeout = setTimeout(() => { + if (isRunning) { + onError(new Error(`Connection timeout: no activity for ${CONNECTION_TIMEOUT_MS}ms`)); + stop(); + } + }, CONNECTION_TIMEOUT_MS); + } + }; + + const clearActivityTimeout = (): void => { + if (activityTimeout) { + clearTimeout(activityTimeout); + activityTimeout = null; + } + }; const getWsUrl = (): string => { const protocol = USE_SSL ? 'wss://' : 'ws://'; @@ -74,9 +96,11 @@ export function createWorker(config: BatchConfig, callbacks: WorkerCallbacks): W ...(lastAckEventId !== undefined && { last_ack_event_id: lastAckEventId }), }; sendMessage(startMessage); + resetActivityTimeout(); }; socket.onmessage = (socketEvent: MessageEvent) => { + resetActivityTimeout(); try { const rawData = bigIntUtils.JSONBigInt.parse(socketEvent.data.toString()); const parseResult = FullNodeEventSchema.safeParse(rawData); @@ -140,6 +164,7 @@ export function createWorker(config: BatchConfig, callbacks: WorkerCallbacks): W const stop = (): void => { isRunning = false; + clearActivityTimeout(); if (socket) { socket.close(); socket = null; diff --git a/packages/event-downloader/tsconfig.tsbuildinfo b/packages/event-downloader/tsconfig.tsbuildinfo index 92d3ad63..53cdbad9 100644 --- a/packages/event-downloader/tsconfig.tsbuildinfo +++ b/packages/event-downloader/tsconfig.tsbuildinfo @@ -1 +1 @@ -{"root":["./src/config.ts","./src/db.ts","./src/event-parser.ts","./src/index.ts","./src/orchestrator.ts","./src/types.ts","./src/worker.ts"],"version":"5.9.3"} \ No newline at end of file +{"root":["./src/config.ts","./src/db.ts","./src/event-parser.ts","./src/index.ts","./src/orchestrator.ts","./src/types.ts","./src/worker.ts"],"version":"5.7.2"} \ No newline at end of file From 5d64c789ccdda0c5762baa8e137d2d7cf6950621 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Fri, 16 Jan 2026 16:19:39 -0300 Subject: [PATCH 4/9] fix: stop skipping events --- packages/event-downloader/src/event-parser.ts | 30 +++++++++++++++++-- packages/event-downloader/src/worker.ts | 14 +++++---- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/packages/event-downloader/src/event-parser.ts b/packages/event-downloader/src/event-parser.ts index 48ac1200..436d3764 100644 --- a/packages/event-downloader/src/event-parser.ts +++ b/packages/event-downloader/src/event-parser.ts @@ -9,21 +9,23 @@ import { FullNodeEvent, FullNodeEventTypes } from './types'; /** * Extracts the transaction hash from a fullnode event. + * Now handles ALL event types, not just predefined ones. * * @param event - The fullnode event to extract the hash from * @returns The transaction hash if the event contains one, null otherwise */ export function extractTxHash(event: FullNodeEvent): string | null { const eventType = event.event.type; + const eventData = event.event.data as any; switch (eventType) { case FullNodeEventTypes.NEW_VERTEX_ACCEPTED: case FullNodeEventTypes.VERTEX_METADATA_CHANGED: case FullNodeEventTypes.VERTEX_REMOVED: - return event.event.data.hash; + return eventData?.hash ?? null; case FullNodeEventTypes.NC_EVENT: - return event.event.data.vertex_id; + return eventData?.vertex_id ?? null; case FullNodeEventTypes.LOAD_STARTED: case FullNodeEventTypes.LOAD_FINISHED: @@ -33,6 +35,30 @@ export function extractTxHash(event: FullNodeEvent): string | null { return null; default: + // Handle unknown event types by trying common patterns + // TOKEN_CREATED and other events might have different structures + + // Try standard hash field + if (eventData?.hash) { + return eventData.hash; + } + + // Try vertex_id (for vertex-related events) + if (eventData?.vertex_id) { + return eventData.vertex_id; + } + + // Try token_uid (for TOKEN_CREATED events) + if (eventData?.token_uid) { + return eventData.token_uid; + } + + // Try nc_exec_info.nc_tx (for nano contract events) + if (eventData?.nc_exec_info?.nc_tx) { + return eventData.nc_exec_info.nc_tx; + } + + // No recognizable hash found return null; } } diff --git a/packages/event-downloader/src/worker.ts b/packages/event-downloader/src/worker.ts index 6c3885b8..2687bd62 100644 --- a/packages/event-downloader/src/worker.ts +++ b/packages/event-downloader/src/worker.ts @@ -7,7 +7,7 @@ import { WebSocket, MessageEvent, ErrorEvent } from 'ws'; import { bigIntUtils } from '@hathor/wallet-lib'; -import { FullNodeEvent, FullNodeEventSchema, WebSocketSendEvent } from './types'; +import { FullNodeEvent, WebSocketSendEvent } from './types'; import { FULLNODE_HOST, USE_SSL, WINDOW_SIZE, CONNECTION_TIMEOUT_MS } from './config'; export interface BatchConfig { @@ -103,16 +103,18 @@ export function createWorker(config: BatchConfig, callbacks: WorkerCallbacks): W resetActivityTimeout(); try { const rawData = bigIntUtils.JSONBigInt.parse(socketEvent.data.toString()); - const parseResult = FullNodeEventSchema.safeParse(rawData); - if (!parseResult.success) { - // Skip messages that don't conform to event schema (e.g., handshake messages) - // These are expected at the start of a connection + // Check if this is an event message (not a handshake or control message) + // Event messages have: event.id, event.type, event.timestamp + if (!rawData.event || typeof rawData.event.id !== 'number' || typeof rawData.event.type !== 'string') { + // Skip non-event messages (handshake, control messages, etc.) console.log(`Skipping non-event message: ${JSON.stringify(rawData).substring(0, 100)}...`); return; } - const event = parseResult.data; + // Accept ALL events regardless of type - store them as-is + // TypeScript requires us to cast, but we're intentionally accepting any event structure + const event = rawData as any; const eventId = event.event.id; // Call the event callback From 19ccc98737143e49d6227ce1da986524ad2e5b51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Wed, 25 Feb 2026 10:36:23 -0300 Subject: [PATCH 5/9] chore: code review fixes for event-downloader and replay-balance script - Fix completedBatches double-counting in orchestrator stats - Fix pendingBatches going negative by tracking activeWorkers separately - Guard onError from firing after onComplete with isCompleted flag - Guard worker onmessage after batch completion with isDone flag - Remove dead exports: getPendingBatches, getLastEventId, getConfig - Add *.tsbuildinfo to .gitignore and untrack build artifact - Relax @hathor/wallet-lib peer dep to >=2.8.3 in event-downloader - Use yarn dlx ts-node for replay-balance script instead of full build - Add better-sqlite3 as dep to daemon for replay-balance script Co-Authored-By: Claude Sonnet 4.6 --- .gitignore | 1 + packages/daemon/package.json | 3 + packages/daemon/src/scripts/replay-balance.ts | 232 ++++++++++++++++++ packages/event-downloader/package.json | 2 +- packages/event-downloader/src/config.ts | 15 -- packages/event-downloader/src/db.ts | 29 --- packages/event-downloader/src/orchestrator.ts | 39 +-- packages/event-downloader/src/worker.ts | 3 + .../event-downloader/tsconfig.tsbuildinfo | 1 - yarn.lock | 2 + 10 files changed, 267 insertions(+), 60 deletions(-) create mode 100644 packages/daemon/src/scripts/replay-balance.ts delete mode 100644 packages/event-downloader/tsconfig.tsbuildinfo diff --git a/.gitignore b/.gitignore index 0b4715e0..ef1f9d32 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ packages/wallet-service/.env* packages/wallet-service/.warmup .yarn/ .env.* +*.tsbuildinfo diff --git a/packages/daemon/package.json b/packages/daemon/package.json index fdfb64a4..719ef7b2 100644 --- a/packages/daemon/package.json +++ b/packages/daemon/package.json @@ -13,6 +13,7 @@ "lint": "eslint .", "build": "tsc -b", "start": "node dist/index.js", + "replay-balance": "yarn dlx ts-node src/scripts/replay-balance.ts", "watch": "tsc -w", "test_images_up": "docker compose -f ./__tests__/integration/scripts/docker-compose.yml up -d", "test_images_down": "docker compose -f ./__tests__/integration/scripts/docker-compose.yml down", @@ -28,6 +29,7 @@ "author": "André Abadesso", "module": "dist/index.js", "devDependencies": { + "@types/better-sqlite3": "^7.6.12", "@types/jest": "29.5.4", "@types/lodash": "4.14.199", "@types/mysql": "2.15.21", @@ -55,6 +57,7 @@ "assert": "2.1.0", "aws-sdk": "2.1454.0", "axios": "1.6.2", + "better-sqlite3": "^11.7.0", "dotenv": "8.2.0", "lodash": "4.17.21", "mysql2": "3.5.2", diff --git a/packages/daemon/src/scripts/replay-balance.ts b/packages/daemon/src/scripts/replay-balance.ts new file mode 100644 index 00000000..094fd104 --- /dev/null +++ b/packages/daemon/src/scripts/replay-balance.ts @@ -0,0 +1,232 @@ +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +/** + * Balance replay script for debugging wallet_balance discrepancies. + * + * Reads events from a SQLite database produced by the event-downloader, + * processes them in order using the daemon's own balance utilities, and + * computes the expected HTR balance for a given set of addresses. + * + * Usage: + * node dist/scripts/replay-balance.js [options] + * + * Options: + * --db Path to events.sqlite (default: ./events.sqlite) + * --addresses Path to addresses CSV (default: ./addresses.csv) + * --expected Expected balance in hatoshis for comparison + * --verbose Show per-transaction breakdown + */ + +import Database from 'better-sqlite3'; +import * as fs from 'fs'; +import { bigIntUtils, constants } from '@hathor/wallet-lib'; +import { prepareOutputs, prepareInputs } from '../utils/wallet'; + +// --------------------------------------------------------------------------- +// CLI argument parsing +// --------------------------------------------------------------------------- + +interface Opts { + db: string; + addresses: string; + expected?: bigint; + verbose: boolean; +} + +function parseArgs(): Opts { + const args = process.argv.slice(2); + const opts: Opts = { + db: './events.sqlite', + addresses: './addresses.csv', + verbose: false, + }; + + for (let i = 0; i < args.length; i++) { + switch (args[i]) { + case '--db': opts.db = args[++i]; break; + case '--addresses': opts.addresses = args[++i]; break; + case '--expected': opts.expected = BigInt(args[++i]); break; + case '--verbose': opts.verbose = true; break; + } + } + return opts; +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function loadAddresses(csvPath: string): Set { + const lines = fs.readFileSync(csvPath, 'utf-8').trim().split('\n'); + // Skip header row + return new Set(lines.slice(1).map(l => l.trim()).filter(Boolean)); +} + +interface TxState { + hash: string; + voided: boolean; + outputs: any[]; + inputs: any[]; + tokens: string[]; + height: number; + timestamp: number; +} + +// --------------------------------------------------------------------------- +// Main +// --------------------------------------------------------------------------- + +function main() { + const opts = parseArgs(); + + const walletAddresses = loadAddresses(opts.addresses); + console.log(`Loaded ${walletAddresses.size} wallet addresses`); + + const sqlite = new Database(opts.db, { readonly: true }); + + // Build the WHERE clause — one LIKE condition per address + const conditions = Array.from(walletAddresses) + .map(addr => `data LIKE '%${addr}%'`) + .join(' OR '); + + const rows = sqlite.prepare(` + SELECT id, type, data + FROM events + WHERE type IN ('NEW_VERTEX_ACCEPTED', 'VERTEX_METADATA_CHANGED') + AND (${conditions}) + ORDER BY id ASC + `).all() as Array<{ id: number; type: string; data: string }>; + + sqlite.close(); + + console.log(`Found ${rows.length} relevant events`); + + // ------------------------------------------------------------------ + // Pass 1: build the final state of each transaction. + // Later events (VERTEX_METADATA_CHANGED) overwrite earlier ones so + // the map always holds the most recent voided_by for each tx. + // ------------------------------------------------------------------ + + const txStates = new Map(); + + for (const row of rows) { + const event = bigIntUtils.JSONBigInt.parse(row.data); + const data = event.event.data; + const hash: string = data.hash; + const voided: boolean = data.metadata.voided_by.length > 0; + + txStates.set(hash, { + hash, + voided, + outputs: data.outputs, + inputs: data.inputs, + tokens: data.tokens ?? [], + height: data.metadata.height, + timestamp: data.timestamp, + }); + } + + console.log(`Unique transactions: ${txStates.size}`); + + // ------------------------------------------------------------------ + // Pass 2: build the spending map. + // For each non-voided transaction, record which tx hash spends each + // UTXO referenced by its inputs. + // ------------------------------------------------------------------ + + const spentBy = new Map(); // "${txId}:${index}" -> spending tx hash + + for (const tx of txStates.values()) { + if (tx.voided) continue; + const inputs = prepareInputs(tx.inputs, tx.tokens); + for (const input of inputs) { + spentBy.set(`${input.tx_id}:${input.index}`, tx.hash); + } + } + + // ------------------------------------------------------------------ + // Pass 3: compute the wallet balance. + // Sum the value of every unspent HTR output in a non-voided tx that + // belongs to one of the wallet addresses. + // ------------------------------------------------------------------ + + let totalBalance = 0n; + + interface Contribution { + hash: string; + outputIndex: number; + address: string; + amount: bigint; + height: number; + timestamp: number; + } + + const unspentUtxos: Contribution[] = []; + + for (const tx of txStates.values()) { + if (tx.voided) continue; + + const outputs = prepareOutputs(tx.outputs, tx.tokens); + + for (const output of outputs) { + const address = output.decoded?.address; + if (!address || !walletAddresses.has(address)) continue; + if (output.token !== constants.NATIVE_TOKEN_UID) continue; + + // Skip authority outputs (mint / melt) + const isAuthority = (output.token_data & 0x80) !== 0; + if (isAuthority) continue; + + const utxoKey = `${tx.hash}:${output.index}`; + if (spentBy.has(utxoKey)) continue; + + totalBalance += BigInt(output.value as unknown as number); + unspentUtxos.push({ + hash: tx.hash, + outputIndex: output.index, + address, + amount: BigInt(output.value as unknown as number), + height: tx.height, + timestamp: tx.timestamp, + }); + } + } + + // ------------------------------------------------------------------ + // Output + // ------------------------------------------------------------------ + + if (opts.verbose) { + console.log('\n--- Unspent HTR UTXOs ---'); + unspentUtxos + .sort((a, b) => a.height - b.height) + .forEach(u => { + console.log( + ` height=${u.height} ${u.hash.substring(0, 16)}... ` + + `output[${u.outputIndex}] ${u.address.substring(0, 8)}... ` + + `+${u.amount} hat`, + ); + }); + } + + console.log('\n=== RESULTS ==='); + console.log(`Computed balance : ${totalBalance} hatoshis`); + console.log(`Unspent UTXOs : ${unspentUtxos.length}`); + + if (opts.expected !== undefined) { + console.log(`Expected balance : ${opts.expected} hatoshis`); + if (totalBalance === opts.expected) { + console.log('✓ MATCH'); + } else { + const diff = totalBalance - opts.expected; + console.log(`✗ MISMATCH — diff: ${diff > 0n ? '+' : ''}${diff} hatoshis`); + } + } +} + +main(); diff --git a/packages/event-downloader/package.json b/packages/event-downloader/package.json index d1025d26..eb09a886 100644 --- a/packages/event-downloader/package.json +++ b/packages/event-downloader/package.json @@ -16,7 +16,7 @@ "lint": "eslint ." }, "peerDependencies": { - "@hathor/wallet-lib": "2.8.3" + "@hathor/wallet-lib": ">=2.8.3" }, "dependencies": { "better-sqlite3": "^11.7.0", diff --git a/packages/event-downloader/src/config.ts b/packages/event-downloader/src/config.ts index ef75f074..ab38a8a7 100644 --- a/packages/event-downloader/src/config.ts +++ b/packages/event-downloader/src/config.ts @@ -32,18 +32,3 @@ export const CONNECTION_TIMEOUT_MS = parseInt(process.env.CONNECTION_TIMEOUT_MS // Database configuration export const DB_PATH = process.env.DB_PATH ?? './events.sqlite'; -export const getConfig = () => { - checkEnvVariables(); - - return { - FULLNODE_HOST, - USE_SSL, - BATCH_SIZE, - PARALLEL_CONNECTIONS, - WINDOW_SIZE, - DB_PATH, - CONNECTION_TIMEOUT_MS, - }; -}; - -export default getConfig; diff --git a/packages/event-downloader/src/db.ts b/packages/event-downloader/src/db.ts index 587c4fc3..61e8667b 100644 --- a/packages/event-downloader/src/db.ts +++ b/packages/event-downloader/src/db.ts @@ -176,22 +176,6 @@ export function updateBatchProgress( stmt.run(batchStart, batchEnd, lastDownloaded, status); } -/** - * Get all batches that are not marked as 'completed'. - * @param db - Database instance - * @returns Array of pending batch progress records - */ -export function getPendingBatches(db: DatabaseType): BatchProgress[] { - const stmt = db.prepare(` - SELECT batch_start, batch_end, last_downloaded, status, updated_at - FROM download_progress - WHERE status != 'completed' - ORDER BY batch_start ASC - `); - - return stmt.all() as BatchProgress[]; -} - /** * Get all batch progress records. * @param db - Database instance @@ -207,16 +191,3 @@ export function getAllBatchProgress(db: DatabaseType): BatchProgress[] { return stmt.all() as BatchProgress[]; } -/** - * Get the highest event ID currently stored in the database. - * @param db - Database instance - * @returns The highest event ID or null if no events exist - */ -export function getLastEventId(db: DatabaseType): number | null { - const stmt = db.prepare(` - SELECT MAX(id) as last_id FROM events - `); - - const result = stmt.get() as { last_id: number | null }; - return result?.last_id ?? null; -} diff --git a/packages/event-downloader/src/orchestrator.ts b/packages/event-downloader/src/orchestrator.ts index 0b498785..fcc1596e 100644 --- a/packages/event-downloader/src/orchestrator.ts +++ b/packages/event-downloader/src/orchestrator.ts @@ -167,6 +167,7 @@ function runWorker( const txEventBuffer: TxEvent[] = []; let eventsDownloaded = 0; let lastEventId = batch.lastDownloaded ?? batch.start - 1; + let isCompleted = false; const flushBuffers = () => { if (eventBuffer.length > 0) { @@ -229,12 +230,14 @@ function runWorker( }, onComplete: () => { + isCompleted = true; flushBuffers(); updateBatchProgress(db, batch.start, batch.end, batch.end, 'completed'); resolve(); }, onError: (error: Error) => { + if (isCompleted) return; // Save progress before failing (only if db is still open) try { flushBuffers(); @@ -323,32 +326,40 @@ export async function downloadAllEvents(callbacks: OrchestratorCallbacks): Promi // Initialize progress tracking let totalEventsDownloaded = 0; + let finishedPendingBatches = 0; + let activeWorkers = 0; const workerStatuses = new Map(); const updateStats = () => { callbacks.onStatsUpdate({ totalEvents: latestEventId, totalBatches: allBatches.length, - completedBatches: completedCount + (allBatches.length - pendingBatches.length), - inProgressBatches: workerStatuses.size, - pendingBatches: pendingBatches.length - workerStatuses.size, + completedBatches: completedCount + finishedPendingBatches, + inProgressBatches: activeWorkers, + pendingBatches: pendingBatches.length - finishedPendingBatches - activeWorkers, eventsDownloaded: totalEventsDownloaded, }); }; // Run workers with concurrency limit await runWithConcurrency(pendingBatches, PARALLEL_CONNECTIONS, async (batch, workerSlot) => { - await runWorker(db, batch, workerSlot, { - ...callbacks, - onWorkerUpdate: (workerId, status) => { - workerStatuses.set(workerId, status); - totalEventsDownloaded = Array.from(workerStatuses.values()) - .reduce((sum, s) => sum + s.eventsDownloaded, 0); - updateStats(); - callbacks.onWorkerUpdate(workerId, status); - }, - }); - // Don't delete - worker slot will be reused for next batch + activeWorkers++; + try { + await runWorker(db, batch, workerSlot, { + ...callbacks, + onWorkerUpdate: (workerId, status) => { + workerStatuses.set(workerId, status); + totalEventsDownloaded = Array.from(workerStatuses.values()) + .reduce((sum, s) => sum + s.eventsDownloaded, 0); + updateStats(); + callbacks.onWorkerUpdate(workerId, status); + }, + }); + finishedPendingBatches++; + } finally { + activeWorkers--; + } + updateStats(); }); callbacks.onComplete(); diff --git a/packages/event-downloader/src/worker.ts b/packages/event-downloader/src/worker.ts index 2687bd62..3fa2ea14 100644 --- a/packages/event-downloader/src/worker.ts +++ b/packages/event-downloader/src/worker.ts @@ -41,6 +41,7 @@ export function createWorker(config: BatchConfig, callbacks: WorkerCallbacks): W let socket: WebSocket | null = null; let isRunning = false; + let isDone = false; let eventsSinceLastAck = 0; let lastReceivedEventId = 0; let activityTimeout: ReturnType | null = null; @@ -100,6 +101,7 @@ export function createWorker(config: BatchConfig, callbacks: WorkerCallbacks): W }; socket.onmessage = (socketEvent: MessageEvent) => { + if (isDone) return; resetActivityTimeout(); try { const rawData = bigIntUtils.JSONBigInt.parse(socketEvent.data.toString()); @@ -142,6 +144,7 @@ export function createWorker(config: BatchConfig, callbacks: WorkerCallbacks): W } if (isComplete) { + isDone = true; onComplete(); stop(); } diff --git a/packages/event-downloader/tsconfig.tsbuildinfo b/packages/event-downloader/tsconfig.tsbuildinfo deleted file mode 100644 index 53cdbad9..00000000 --- a/packages/event-downloader/tsconfig.tsbuildinfo +++ /dev/null @@ -1 +0,0 @@ -{"root":["./src/config.ts","./src/db.ts","./src/event-parser.ts","./src/index.ts","./src/orchestrator.ts","./src/types.ts","./src/worker.ts"],"version":"5.7.2"} \ No newline at end of file diff --git a/yarn.lock b/yarn.lock index f2d57eec..5e876d36 100644 --- a/yarn.lock +++ b/yarn.lock @@ -17843,6 +17843,7 @@ __metadata: dependencies: "@aws-sdk/client-lambda": "npm:3.540.0" "@aws-sdk/client-sqs": "npm:3.540.0" + "@types/better-sqlite3": "npm:^7.6.12" "@types/jest": "npm:29.5.4" "@types/lodash": "npm:4.14.199" "@types/mysql": "npm:2.15.21" @@ -17853,6 +17854,7 @@ __metadata: assert: "npm:2.1.0" aws-sdk: "npm:2.1454.0" axios: "npm:1.6.2" + better-sqlite3: "npm:^11.7.0" dotenv: "npm:8.2.0" eslint: "npm:9.3.0" eslint-config-airbnb-base: "npm:15.0.0" From e061d44389b75899870af7dd663e10d5797f6a86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Fri, 27 Feb 2026 11:10:25 -0300 Subject: [PATCH 6/9] chore: updated yarn --- yarn.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn.lock b/yarn.lock index 5e876d36..17d21a8f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -11017,7 +11017,7 @@ __metadata: ws: "npm:^8.18.0" zod: "npm:^3.23.8" peerDependencies: - "@hathor/wallet-lib": 2.8.3 + "@hathor/wallet-lib": ">=2.8.3" languageName: unknown linkType: soft From 9c3be7cc7123ce5edcb58720d9968b7e9b961456 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Mon, 2 Mar 2026 13:05:57 -0300 Subject: [PATCH 7/9] fix: address CodeRabbit review feedback - replay-balance: validate CLI args, detect CSV headers, parameterize SQL - config: validate numeric env vars, treat empty strings as missing - index: guard division by zero in progress, improve signal handler - orchestrator: validate latest_event_id, fix off-by-one in batches, settle promises exactly once, propagate worker errors, fix cumulative event count - worker: single-shot error handling via fail() helper Co-Authored-By: Claude Opus 4.6 --- packages/daemon/src/scripts/replay-balance.ts | 46 ++++++++++---- packages/event-downloader/src/config.ts | 22 +++++-- packages/event-downloader/src/index.ts | 16 ++++- packages/event-downloader/src/orchestrator.ts | 62 ++++++++++++++----- packages/event-downloader/src/worker.ts | 20 +++--- 5 files changed, 125 insertions(+), 41 deletions(-) diff --git a/packages/daemon/src/scripts/replay-balance.ts b/packages/daemon/src/scripts/replay-balance.ts index 094fd104..13f6accf 100644 --- a/packages/daemon/src/scripts/replay-balance.ts +++ b/packages/daemon/src/scripts/replay-balance.ts @@ -46,12 +46,22 @@ function parseArgs(): Opts { verbose: false, }; + const readNext = (index: number, flag: string): string => { + const value = args[index + 1]; + if (!value || value.startsWith('--')) { + throw new Error(`Missing value for ${flag}`); + } + return value; + }; + for (let i = 0; i < args.length; i++) { switch (args[i]) { - case '--db': opts.db = args[++i]; break; - case '--addresses': opts.addresses = args[++i]; break; - case '--expected': opts.expected = BigInt(args[++i]); break; + case '--db': opts.db = readNext(i, '--db'); i++; break; + case '--addresses': opts.addresses = readNext(i, '--addresses'); i++; break; + case '--expected': opts.expected = BigInt(readNext(i, '--expected')); i++; break; case '--verbose': opts.verbose = true; break; + default: + throw new Error(`Unknown option: ${args[i]}`); } } return opts; @@ -62,9 +72,19 @@ function parseArgs(): Opts { // --------------------------------------------------------------------------- function loadAddresses(csvPath: string): Set { - const lines = fs.readFileSync(csvPath, 'utf-8').trim().split('\n'); - // Skip header row - return new Set(lines.slice(1).map(l => l.trim()).filter(Boolean)); + const lines = fs.readFileSync(csvPath, 'utf-8') + .split(/\r?\n/) + .map(l => l.trim()) + .filter(Boolean); + + const hasHeader = lines[0]?.toLowerCase().startsWith('address'); + const dataLines = hasHeader ? lines.slice(1) : lines; + + return new Set( + dataLines + .map(line => line.split(',')[0]?.trim()) + .filter((addr): addr is string => Boolean(addr)), + ); } interface TxState { @@ -87,12 +107,16 @@ function main() { const walletAddresses = loadAddresses(opts.addresses); console.log(`Loaded ${walletAddresses.size} wallet addresses`); + if (walletAddresses.size === 0) { + throw new Error('No wallet addresses found in CSV'); + } + const sqlite = new Database(opts.db, { readonly: true }); - // Build the WHERE clause — one LIKE condition per address - const conditions = Array.from(walletAddresses) - .map(addr => `data LIKE '%${addr}%'`) - .join(' OR '); + // Build the WHERE clause — one parameterized LIKE condition per address + const addresses = Array.from(walletAddresses); + const conditions = addresses.map(() => 'data LIKE ?').join(' OR '); + const likeParams = addresses.map(addr => `%${addr}%`); const rows = sqlite.prepare(` SELECT id, type, data @@ -100,7 +124,7 @@ function main() { WHERE type IN ('NEW_VERTEX_ACCEPTED', 'VERTEX_METADATA_CHANGED') AND (${conditions}) ORDER BY id ASC - `).all() as Array<{ id: number; type: string; data: string }>; + `).all(...likeParams) as Array<{ id: number; type: string; data: string }>; sqlite.close(); diff --git a/packages/event-downloader/src/config.ts b/packages/event-downloader/src/config.ts index ab38a8a7..d5d921ed 100644 --- a/packages/event-downloader/src/config.ts +++ b/packages/event-downloader/src/config.ts @@ -12,22 +12,34 @@ const requiredEnvs = [ ]; export const checkEnvVariables = () => { - const missingEnv = requiredEnvs.filter(envVar => process.env[envVar] === undefined); + const missingEnv = requiredEnvs.filter((envVar) => { + const value = process.env[envVar]; + return value === undefined || value.trim() === ''; + }); if (missingEnv.length > 0) { throw new Error(`Missing required environment variables: ${missingEnv.join(', ')}`); } }; +const parsePositiveInt = (envName: string, fallback: number): number => { + const raw = process.env[envName]; + const value = Number.parseInt(raw ?? String(fallback), 10); + if (!Number.isInteger(value) || value <= 0) { + throw new Error(`Invalid ${envName}: expected a positive integer, got "${raw}"`); + } + return value; +}; + // Fullnode connection export const FULLNODE_HOST = process.env.FULLNODE_HOST!; export const USE_SSL = process.env.USE_SSL === 'true'; // Download configuration -export const BATCH_SIZE = parseInt(process.env.BATCH_SIZE ?? '5000', 10); -export const PARALLEL_CONNECTIONS = parseInt(process.env.PARALLEL_CONNECTIONS ?? '5', 10); -export const WINDOW_SIZE = parseInt(process.env.WINDOW_SIZE ?? '100', 10); -export const CONNECTION_TIMEOUT_MS = parseInt(process.env.CONNECTION_TIMEOUT_MS ?? '60000', 10); +export const BATCH_SIZE = parsePositiveInt('BATCH_SIZE', 5000); +export const PARALLEL_CONNECTIONS = parsePositiveInt('PARALLEL_CONNECTIONS', 5); +export const WINDOW_SIZE = parsePositiveInt('WINDOW_SIZE', 100); +export const CONNECTION_TIMEOUT_MS = parsePositiveInt('CONNECTION_TIMEOUT_MS', 60000); // Database configuration export const DB_PATH = process.env.DB_PATH ?? './events.sqlite'; diff --git a/packages/event-downloader/src/index.ts b/packages/event-downloader/src/index.ts index 70da84cb..18275786 100644 --- a/packages/event-downloader/src/index.ts +++ b/packages/event-downloader/src/index.ts @@ -125,7 +125,10 @@ function render(): void { // Print worker statuses const sortedWorkers = Array.from(workerStatuses.entries()).sort((a, b) => a[0] - b[0]); for (const [workerId, status] of sortedWorkers) { - const workerPercent = ((status.lastEventId - status.batchStart) / (status.batchEnd - status.batchStart) * 100).toFixed(0); + const batchSize = status.batchEnd - status.batchStart; + const workerPercent = batchSize > 0 + ? ((status.lastEventId - status.batchStart) / batchSize * 100).toFixed(0) + : '0'; console.log( `Worker ${workerId + 1}: ${formatNumber(status.batchStart)}-${formatNumber(status.batchEnd)} | ` + `${formatNumber(status.eventsDownloaded)} events (${workerPercent}%)` @@ -164,10 +167,17 @@ async function main(): Promise { startTime = Date.now(); // Handle graceful shutdown + let isShuttingDown = false; const cleanup = () => { + if (isShuttingDown) return; + isShuttingDown = true; process.stdout.write(SHOW_CURSOR); - console.log('\n\nDownload interrupted. Progress has been saved.'); - process.exit(0); + console.log('\n\nDownload interrupted. Saving progress...'); + // Give a brief window for in-flight DB writes to complete + setTimeout(() => { + console.log('Progress has been saved.'); + process.exit(0); + }, 2000); }; process.on('SIGINT', cleanup); diff --git a/packages/event-downloader/src/orchestrator.ts b/packages/event-downloader/src/orchestrator.ts index fcc1596e..661f3779 100644 --- a/packages/event-downloader/src/orchestrator.ts +++ b/packages/event-downloader/src/orchestrator.ts @@ -79,7 +79,13 @@ export async function getLatestEventId(): Promise { clearTimeout(timeout); try { const data = bigIntUtils.JSONBigInt.parse(event.data.toString()); - const latestEventId = data.latest_event_id; + const rawLatestEventId = data?.latest_event_id; + const latestEventId = typeof rawLatestEventId === 'bigint' + ? Number(rawLatestEventId) + : rawLatestEventId; + if (!Number.isSafeInteger(latestEventId) || latestEventId < 0) { + throw new Error(`Invalid latest_event_id: ${String(rawLatestEventId)}`); + } socket.close(); resolve(latestEventId); } catch (error) { @@ -102,7 +108,7 @@ function calculateBatches(latestEventId: number, batchSize: number): BatchInfo[] const batches: BatchInfo[] = []; let start = 0; - while (start < latestEventId) { + while (start <= latestEventId) { const end = Math.min(start + batchSize - 1, latestEventId); batches.push({ start, end }); start = end + 1; @@ -167,7 +173,20 @@ function runWorker( const txEventBuffer: TxEvent[] = []; let eventsDownloaded = 0; let lastEventId = batch.lastDownloaded ?? batch.start - 1; - let isCompleted = false; + let isSettled = false; + + const settleResolve = () => { + if (!isSettled) { + isSettled = true; + resolve(); + } + }; + const settleReject = (error: Error) => { + if (!isSettled) { + isSettled = true; + reject(error); + } + }; const flushBuffers = () => { if (eventBuffer.length > 0) { @@ -230,14 +249,16 @@ function runWorker( }, onComplete: () => { - isCompleted = true; - flushBuffers(); - updateBatchProgress(db, batch.start, batch.end, batch.end, 'completed'); - resolve(); + try { + flushBuffers(); + updateBatchProgress(db, batch.start, batch.end, batch.end, 'completed'); + settleResolve(); + } catch (error) { + settleReject(error instanceof Error ? error : new Error(String(error))); + } }, onError: (error: Error) => { - if (isCompleted) return; // Save progress before failing (only if db is still open) try { flushBuffers(); @@ -247,7 +268,7 @@ function runWorker( } catch (e) { // Database might be closed already, ignore } - reject(error); + settleReject(error); }, }); @@ -265,8 +286,9 @@ async function runWithConcurrency( items: T[], concurrency: number, fn: (item: T, workerSlot: number) => Promise -): Promise { +): Promise { const results: Promise[] = []; + const errors: Error[] = []; let currentIndex = 0; const runWorkerSlot = async (workerSlot: number): Promise => { @@ -279,8 +301,8 @@ async function runWithConcurrency( try { await fn(items[index], workerSlot); } catch (error) { - // Log but continue with other batches - console.error(`Worker ${workerSlot} failed:`, error); + const normalized = error instanceof Error ? error : new Error(String(error)); + errors.push(normalized); } } }; @@ -292,6 +314,7 @@ async function runWithConcurrency( } await Promise.all(results); + return errors; } /** @@ -332,7 +355,7 @@ export async function downloadAllEvents(callbacks: OrchestratorCallbacks): Promi const updateStats = () => { callbacks.onStatsUpdate({ - totalEvents: latestEventId, + totalEvents: latestEventId + 1, totalBatches: allBatches.length, completedBatches: completedCount + finishedPendingBatches, inProgressBatches: activeWorkers, @@ -342,19 +365,24 @@ export async function downloadAllEvents(callbacks: OrchestratorCallbacks): Promi }; // Run workers with concurrency limit - await runWithConcurrency(pendingBatches, PARALLEL_CONNECTIONS, async (batch, workerSlot) => { + let completedEventsDownloaded = 0; + + const errors = await runWithConcurrency(pendingBatches, PARALLEL_CONNECTIONS, async (batch, workerSlot) => { activeWorkers++; try { await runWorker(db, batch, workerSlot, { ...callbacks, onWorkerUpdate: (workerId, status) => { workerStatuses.set(workerId, status); - totalEventsDownloaded = Array.from(workerStatuses.values()) + const inProgressEvents = Array.from(workerStatuses.values()) .reduce((sum, s) => sum + s.eventsDownloaded, 0); + totalEventsDownloaded = completedEventsDownloaded + inProgressEvents; updateStats(); callbacks.onWorkerUpdate(workerId, status); }, }); + completedEventsDownloaded += workerStatuses.get(workerSlot)?.eventsDownloaded ?? 0; + workerStatuses.delete(workerSlot); finishedPendingBatches++; } finally { activeWorkers--; @@ -362,6 +390,10 @@ export async function downloadAllEvents(callbacks: OrchestratorCallbacks): Promi updateStats(); }); + if (errors.length > 0) { + callbacks.onError(new Error(`${errors.length} batch(es) failed: ${errors.map(e => e.message).join('; ')}`)); + return; + } callbacks.onComplete(); } finally { db.close(); diff --git a/packages/event-downloader/src/worker.ts b/packages/event-downloader/src/worker.ts index 3fa2ea14..19840710 100644 --- a/packages/event-downloader/src/worker.ts +++ b/packages/event-downloader/src/worker.ts @@ -42,10 +42,18 @@ export function createWorker(config: BatchConfig, callbacks: WorkerCallbacks): W let socket: WebSocket | null = null; let isRunning = false; let isDone = false; + let hasFailed = false; let eventsSinceLastAck = 0; let lastReceivedEventId = 0; let activityTimeout: ReturnType | null = null; + const fail = (error: Error): void => { + if (hasFailed || isDone) return; + hasFailed = true; + onError(error); + stop(); + }; + const resetActivityTimeout = (): void => { if (activityTimeout) { clearTimeout(activityTimeout); @@ -53,8 +61,7 @@ export function createWorker(config: BatchConfig, callbacks: WorkerCallbacks): W if (isRunning && CONNECTION_TIMEOUT_MS > 0) { activityTimeout = setTimeout(() => { if (isRunning) { - onError(new Error(`Connection timeout: no activity for ${CONNECTION_TIMEOUT_MS}ms`)); - stop(); + fail(new Error(`Connection timeout: no activity for ${CONNECTION_TIMEOUT_MS}ms`)); } }, CONNECTION_TIMEOUT_MS); } @@ -149,18 +156,17 @@ export function createWorker(config: BatchConfig, callbacks: WorkerCallbacks): W stop(); } } catch (error) { - onError(error instanceof Error ? error : new Error(String(error))); + fail(error instanceof Error ? error : new Error(String(error))); } }; socket.onerror = (error: ErrorEvent) => { - onError(new Error(`WebSocket error: ${error.message}`)); + fail(new Error(`WebSocket error: ${error.message}`)); }; socket.onclose = () => { - if (isRunning) { - // Unexpected close - report as error - onError(new Error('WebSocket connection closed unexpectedly')); + if (isRunning && !isDone) { + fail(new Error('WebSocket connection closed unexpectedly')); } isRunning = false; socket = null; From 568292a52befdedf85edc9bf492c14cbc068b6ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Tue, 3 Mar 2026 09:30:09 -0300 Subject: [PATCH 8/9] fix: address code review feedback - Move better-sqlite3 to devDependency in daemon package - Fix all dependency versions (remove ^ prefixes) - Add --token parameter to replay-balance script --- packages/daemon/package.json | 4 +- packages/daemon/src/scripts/replay-balance.ts | 7 +- packages/event-downloader/package.json | 16 +-- yarn.lock | 122 ++++++++++-------- 4 files changed, 81 insertions(+), 68 deletions(-) diff --git a/packages/daemon/package.json b/packages/daemon/package.json index 719ef7b2..468cafae 100644 --- a/packages/daemon/package.json +++ b/packages/daemon/package.json @@ -29,7 +29,7 @@ "author": "André Abadesso", "module": "dist/index.js", "devDependencies": { - "@types/better-sqlite3": "^7.6.12", + "@types/better-sqlite3": "7.6.12", "@types/jest": "29.5.4", "@types/lodash": "4.14.199", "@types/mysql": "2.15.21", @@ -37,6 +37,7 @@ "@types/ws": "8.5.5", "@typescript-eslint/eslint-plugin": "6.7.3", "@typescript-eslint/parser": "6.7.3", + "better-sqlite3": "11.7.0", "eslint": "9.3.0", "eslint-config-airbnb-base": "15.0.0", "eslint-plugin-import": "2.29.1", @@ -57,7 +58,6 @@ "assert": "2.1.0", "aws-sdk": "2.1454.0", "axios": "1.6.2", - "better-sqlite3": "^11.7.0", "dotenv": "8.2.0", "lodash": "4.17.21", "mysql2": "3.5.2", diff --git a/packages/daemon/src/scripts/replay-balance.ts b/packages/daemon/src/scripts/replay-balance.ts index 13f6accf..fa33b1fd 100644 --- a/packages/daemon/src/scripts/replay-balance.ts +++ b/packages/daemon/src/scripts/replay-balance.ts @@ -18,6 +18,7 @@ * Options: * --db Path to events.sqlite (default: ./events.sqlite) * --addresses Path to addresses CSV (default: ./addresses.csv) + * --token Token UID to compute balance for (default: NATIVE_TOKEN_UID) * --expected Expected balance in hatoshis for comparison * --verbose Show per-transaction breakdown */ @@ -34,6 +35,7 @@ import { prepareOutputs, prepareInputs } from '../utils/wallet'; interface Opts { db: string; addresses: string; + token: string; expected?: bigint; verbose: boolean; } @@ -43,6 +45,7 @@ function parseArgs(): Opts { const opts: Opts = { db: './events.sqlite', addresses: './addresses.csv', + token: constants.NATIVE_TOKEN_UID, verbose: false, }; @@ -58,6 +61,7 @@ function parseArgs(): Opts { switch (args[i]) { case '--db': opts.db = readNext(i, '--db'); i++; break; case '--addresses': opts.addresses = readNext(i, '--addresses'); i++; break; + case '--token': opts.token = readNext(i, '--token'); i++; break; case '--expected': opts.expected = BigInt(readNext(i, '--expected')); i++; break; case '--verbose': opts.verbose = true; break; default: @@ -106,6 +110,7 @@ function main() { const walletAddresses = loadAddresses(opts.addresses); console.log(`Loaded ${walletAddresses.size} wallet addresses`); + console.log(`Token: ${opts.token === constants.NATIVE_TOKEN_UID ? 'HTR (native)' : opts.token}`); if (walletAddresses.size === 0) { throw new Error('No wallet addresses found in CSV'); @@ -200,7 +205,7 @@ function main() { for (const output of outputs) { const address = output.decoded?.address; if (!address || !walletAddresses.has(address)) continue; - if (output.token !== constants.NATIVE_TOKEN_UID) continue; + if (output.token !== opts.token) continue; // Skip authority outputs (mint / melt) const isAuthority = (output.token_data & 0x80) !== 0; diff --git a/packages/event-downloader/package.json b/packages/event-downloader/package.json index eb09a886..1d443349 100644 --- a/packages/event-downloader/package.json +++ b/packages/event-downloader/package.json @@ -19,15 +19,15 @@ "@hathor/wallet-lib": ">=2.8.3" }, "dependencies": { - "better-sqlite3": "^11.7.0", - "dotenv": "^16.4.5", - "ws": "^8.18.0", - "zod": "^3.23.8" + "better-sqlite3": "11.7.0", + "dotenv": "16.4.5", + "ws": "8.18.0", + "zod": "3.23.8" }, "devDependencies": { - "@types/better-sqlite3": "^7.6.12", - "@types/node": "^22.10.2", - "@types/ws": "^8.5.13", - "typescript": "^5.7.2" + "@types/better-sqlite3": "7.6.12", + "@types/node": "22.10.2", + "@types/ws": "8.5.13", + "typescript": "5.7.2" } } diff --git a/yarn.lock b/yarn.lock index 17d21a8f..055c5139 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6493,12 +6493,12 @@ __metadata: languageName: node linkType: hard -"@types/better-sqlite3@npm:^7.6.12": - version: 7.6.13 - resolution: "@types/better-sqlite3@npm:7.6.13" +"@types/better-sqlite3@npm:7.6.12": + version: 7.6.12 + resolution: "@types/better-sqlite3@npm:7.6.12" dependencies: "@types/node": "npm:*" - checksum: 10/c74dafa3c550ac866737870016d7b1a735c7d450c16d40962eeb54510fa150e91752bfdf678f55e91894d8853771b95f909b0062122116cddac4d80491b74411 + checksum: 10/a442231518f1a3e28e0ee6efe2581807e5cfaa88a8af4513da34e6ae303ce82f5666e8a0048a1d2738fa248b1b369ee28e59a75189ae0b43e28e44a243a2f24b languageName: node linkType: hard @@ -6813,12 +6813,12 @@ __metadata: languageName: node linkType: hard -"@types/node@npm:^22.10.2": - version: 22.19.3 - resolution: "@types/node@npm:22.19.3" +"@types/node@npm:22.10.2": + version: 22.10.2 + resolution: "@types/node@npm:22.10.2" dependencies: - undici-types: "npm:~6.21.0" - checksum: 10/ffee06ce6d741fde98a40bc65a57394ed2283c521f57f9143d2356513181162bd4108809be6902a861d098b35e35569f61f14c64d3032e48a0289b74f917669a + undici-types: "npm:~6.20.0" + checksum: 10/451adfefed4add58b069407173e616220fd4aaa3307cdde1bb701aa053b65b54ced8483db2f870dcedec7a58cb3b06101fbc19d85852716672ec1fd3660947fa languageName: node linkType: hard @@ -6945,21 +6945,21 @@ __metadata: languageName: node linkType: hard -"@types/ws@npm:8.5.5": - version: 8.5.5 - resolution: "@types/ws@npm:8.5.5" +"@types/ws@npm:8.5.13": + version: 8.5.13 + resolution: "@types/ws@npm:8.5.13" dependencies: "@types/node": "npm:*" - checksum: 10/b2d7da5bd469c2ff1ddcfba1da33a556dc02c539e727001e7dc7b4182935154143e96a101cc091686acefb4e115c8ee38111c6634934748b8dd2db0c851c50ab + checksum: 10/21369beafa75c91ae3b00d3a2671c7408fceae1d492ca2abd5ac7c8c8bf4596d513c1599ebbddeae82c27c4a2d248976d0d714c4b3d34362b2ae35b964e2e637 languageName: node linkType: hard -"@types/ws@npm:^8.5.13": - version: 8.18.1 - resolution: "@types/ws@npm:8.18.1" +"@types/ws@npm:8.5.5": + version: 8.5.5 + resolution: "@types/ws@npm:8.5.5" dependencies: "@types/node": "npm:*" - checksum: 10/1ce05e3174dcacf28dae0e9b854ef1c9a12da44c7ed73617ab6897c5cbe4fccbb155a20be5508ae9a7dde2f83bd80f5cf3baa386b934fc4b40889ec963e94f3a + checksum: 10/b2d7da5bd469c2ff1ddcfba1da33a556dc02c539e727001e7dc7b4182935154143e96a101cc091686acefb4e115c8ee38111c6634934748b8dd2db0c851c50ab languageName: node linkType: hard @@ -8394,14 +8394,14 @@ __metadata: languageName: node linkType: hard -"better-sqlite3@npm:^11.7.0": - version: 11.10.0 - resolution: "better-sqlite3@npm:11.10.0" +"better-sqlite3@npm:11.7.0": + version: 11.7.0 + resolution: "better-sqlite3@npm:11.7.0" dependencies: bindings: "npm:^1.5.0" node-gyp: "npm:latest" prebuild-install: "npm:^7.1.1" - checksum: 10/5e4c7437c4fe6033335a79c82974d7ab29f33c51c36f48b73e87e087d21578468575de1c56a7badd4f76f17255e25abefddaeacf018e5eeb9e0cb8d6e3e4a5e1 + checksum: 10/a09bb28c0292bb7c037896ee99197815841275bca2d14f63b58994188239f292642c8c7ea3e0206d8ea6c7530d7b03d7343138ebeb9a4cc855c0b3663e00c812 languageName: node linkType: hard @@ -9961,6 +9961,13 @@ __metadata: languageName: node linkType: hard +"dotenv@npm:16.4.5, dotenv@npm:^16.4.5": + version: 16.4.5 + resolution: "dotenv@npm:16.4.5" + checksum: 10/55a3134601115194ae0f924e54473459ed0d9fc340ae610b676e248cca45aa7c680d86365318ea964e6da4e2ea80c4514c1adab5adb43d6867fb57ff068f95c8 + languageName: node + linkType: hard + "dotenv@npm:8.2.0": version: 8.2.0 resolution: "dotenv@npm:8.2.0" @@ -9975,13 +9982,6 @@ __metadata: languageName: node linkType: hard -"dotenv@npm:^16.4.5": - version: 16.4.5 - resolution: "dotenv@npm:16.4.5" - checksum: 10/55a3134601115194ae0f924e54473459ed0d9fc340ae610b676e248cca45aa7c680d86365318ea964e6da4e2ea80c4514c1adab5adb43d6867fb57ff068f95c8 - languageName: node - linkType: hard - "dottie@npm:^2.0.6": version: 2.0.6 resolution: "dottie@npm:2.0.6" @@ -11008,14 +11008,14 @@ __metadata: version: 0.0.0-use.local resolution: "event-downloader@workspace:packages/event-downloader" dependencies: - "@types/better-sqlite3": "npm:^7.6.12" - "@types/node": "npm:^22.10.2" - "@types/ws": "npm:^8.5.13" - better-sqlite3: "npm:^11.7.0" - dotenv: "npm:^16.4.5" - typescript: "npm:^5.7.2" - ws: "npm:^8.18.0" - zod: "npm:^3.23.8" + "@types/better-sqlite3": "npm:7.6.12" + "@types/node": "npm:22.10.2" + "@types/ws": "npm:8.5.13" + better-sqlite3: "npm:11.7.0" + dotenv: "npm:16.4.5" + typescript: "npm:5.7.2" + ws: "npm:8.18.0" + zod: "npm:3.23.8" peerDependencies: "@hathor/wallet-lib": ">=2.8.3" languageName: unknown @@ -17843,7 +17843,7 @@ __metadata: dependencies: "@aws-sdk/client-lambda": "npm:3.540.0" "@aws-sdk/client-sqs": "npm:3.540.0" - "@types/better-sqlite3": "npm:^7.6.12" + "@types/better-sqlite3": "npm:7.6.12" "@types/jest": "npm:29.5.4" "@types/lodash": "npm:4.14.199" "@types/mysql": "npm:2.15.21" @@ -17854,7 +17854,7 @@ __metadata: assert: "npm:2.1.0" aws-sdk: "npm:2.1454.0" axios: "npm:1.6.2" - better-sqlite3: "npm:^11.7.0" + better-sqlite3: "npm:11.7.0" dotenv: "npm:8.2.0" eslint: "npm:9.3.0" eslint-config-airbnb-base: "npm:15.0.0" @@ -18581,13 +18581,13 @@ __metadata: languageName: node linkType: hard -"typescript@npm:^5.7.2": - version: 5.9.3 - resolution: "typescript@npm:5.9.3" +"typescript@npm:5.7.2": + version: 5.7.2 + resolution: "typescript@npm:5.7.2" bin: tsc: bin/tsc tsserver: bin/tsserver - checksum: 10/c089d9d3da2729fd4ac517f9b0e0485914c4b3c26f80dc0cffcb5de1719a17951e92425d55db59515c1a7ddab65808466debb864d0d56dcf43f27007d0709594 + checksum: 10/4caa3904df69db9d4a8bedc31bafc1e19ffb7b24fbde2997a1633ae1398d0de5bdbf8daf602ccf3b23faddf1aeeb9b795223a2ed9c9a4fdcaf07bfde114a401a languageName: node linkType: hard @@ -18631,13 +18631,13 @@ __metadata: languageName: node linkType: hard -"typescript@patch:typescript@npm%3A^5.7.2#optional!builtin": - version: 5.9.3 - resolution: "typescript@patch:typescript@npm%3A5.9.3#optional!builtin::version=5.9.3&hash=5786d5" +"typescript@patch:typescript@npm%3A5.7.2#optional!builtin": + version: 5.7.2 + resolution: "typescript@patch:typescript@npm%3A5.7.2#optional!builtin::version=5.7.2&hash=5786d5" bin: tsc: bin/tsc tsserver: bin/tsserver - checksum: 10/696e1b017bc2635f4e0c94eb4435357701008e2f272f553d06e35b494b8ddc60aa221145e286c28ace0c89ee32827a28c2040e3a69bdc108b1a5dc8fb40b72e3 + checksum: 10/d75ca10141afc64fd3474b41a8b082b640555bed388d237558aed64e5827ddadb48f90932c7f4205883f18f5bcab8b6a739a2cfac95855604b0dfeb34bc2f3eb languageName: node linkType: hard @@ -18703,10 +18703,10 @@ __metadata: languageName: node linkType: hard -"undici-types@npm:~6.21.0": - version: 6.21.0 - resolution: "undici-types@npm:6.21.0" - checksum: 10/ec8f41aa4359d50f9b59fa61fe3efce3477cc681908c8f84354d8567bb3701fafdddf36ef6bff307024d3feb42c837cf6f670314ba37fc8145e219560e473d14 +"undici-types@npm:~6.20.0": + version: 6.20.0 + resolution: "undici-types@npm:6.20.0" + checksum: 10/583ac7bbf4ff69931d3985f4762cde2690bb607844c16a5e2fbb92ed312fe4fa1b365e953032d469fa28ba8b224e88a595f0b10a449332f83fa77c695e567dbe languageName: node linkType: hard @@ -19360,6 +19360,21 @@ __metadata: languageName: node linkType: hard +"ws@npm:8.18.0": + version: 8.18.0 + resolution: "ws@npm:8.18.0" + peerDependencies: + bufferutil: ^4.0.1 + utf-8-validate: ">=5.0.2" + peerDependenciesMeta: + bufferutil: + optional: true + utf-8-validate: + optional: true + checksum: 10/70dfe53f23ff4368d46e4c0b1d4ca734db2c4149c6f68bc62cb16fc21f753c47b35fcc6e582f3bdfba0eaeb1c488cddab3c2255755a5c3eecb251431e42b3ff6 + languageName: node + linkType: hard + "ws@npm:^7.5.3, ws@npm:^7.5.9": version: 7.5.9 resolution: "ws@npm:7.5.9" @@ -19561,10 +19576,3 @@ __metadata: checksum: 10/846fd73e1af0def79c19d510ea9e4a795544a67d5b34b7e1c4d0425bf6bfd1c719446d94cdfa1721c1987d891321d61f779e8236fde517dc0e524aa851a6eff1 languageName: node linkType: hard - -"zod@npm:^3.23.8": - version: 3.25.76 - resolution: "zod@npm:3.25.76" - checksum: 10/f0c963ec40cd96858451d1690404d603d36507c1fc9682f2dae59ab38b578687d542708a7fdbf645f77926f78c9ed558f57c3d3aa226c285f798df0c4da16995 - languageName: node - linkType: hard From 46ffc9e51ae0da9b864c85cf84cbc7e363a32c88 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Abadesso?= Date: Wed, 4 Mar 2026 10:51:58 -0300 Subject: [PATCH 9/9] fix: address PR review feedback - Bump node engine to >=22 to align with @types/node@22.10.2 - Remove stale comment in event-parser.ts --- packages/event-downloader/package.json | 2 +- packages/event-downloader/src/event-parser.ts | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/event-downloader/package.json b/packages/event-downloader/package.json index 1d443349..23ad34e1 100644 --- a/packages/event-downloader/package.json +++ b/packages/event-downloader/package.json @@ -8,7 +8,7 @@ "src" ], "engines": { - "node": ">=18" + "node": ">=22" }, "scripts": { "build": "tsc -b", diff --git a/packages/event-downloader/src/event-parser.ts b/packages/event-downloader/src/event-parser.ts index 436d3764..f2f65d31 100644 --- a/packages/event-downloader/src/event-parser.ts +++ b/packages/event-downloader/src/event-parser.ts @@ -9,7 +9,6 @@ import { FullNodeEvent, FullNodeEventTypes } from './types'; /** * Extracts the transaction hash from a fullnode event. - * Now handles ALL event types, not just predefined ones. * * @param event - The fullnode event to extract the hash from * @returns The transaction hash if the event contains one, null otherwise