diff --git a/.gitignore b/.gitignore index 0b4715e0..ef1f9d32 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ packages/wallet-service/.env* packages/wallet-service/.warmup .yarn/ .env.* +*.tsbuildinfo diff --git a/package.json b/package.json index 62ca20a1..aa2957ea 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,8 @@ "workspaces": [ "packages/common", "packages/daemon", - "packages/wallet-service" + "packages/wallet-service", + "packages/event-downloader" ], "engines": { "node": ">=22" diff --git a/packages/daemon/package.json b/packages/daemon/package.json index fdfb64a4..468cafae 100644 --- a/packages/daemon/package.json +++ b/packages/daemon/package.json @@ -13,6 +13,7 @@ "lint": "eslint .", "build": "tsc -b", "start": "node dist/index.js", + "replay-balance": "yarn dlx ts-node src/scripts/replay-balance.ts", "watch": "tsc -w", "test_images_up": "docker compose -f ./__tests__/integration/scripts/docker-compose.yml up -d", "test_images_down": "docker compose -f ./__tests__/integration/scripts/docker-compose.yml down", @@ -28,6 +29,7 @@ "author": "André Abadesso", "module": "dist/index.js", "devDependencies": { + "@types/better-sqlite3": "7.6.12", "@types/jest": "29.5.4", "@types/lodash": "4.14.199", "@types/mysql": "2.15.21", @@ -35,6 +37,7 @@ "@types/ws": "8.5.5", "@typescript-eslint/eslint-plugin": "6.7.3", "@typescript-eslint/parser": "6.7.3", + "better-sqlite3": "11.7.0", "eslint": "9.3.0", "eslint-config-airbnb-base": "15.0.0", "eslint-plugin-import": "2.29.1", diff --git a/packages/daemon/src/scripts/replay-balance.ts b/packages/daemon/src/scripts/replay-balance.ts new file mode 100644 index 00000000..fa33b1fd --- /dev/null +++ b/packages/daemon/src/scripts/replay-balance.ts @@ -0,0 +1,261 @@ +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +/** + * Balance replay script for debugging wallet_balance discrepancies. + * + * Reads events from a SQLite database produced by the event-downloader, + * processes them in order using the daemon's own balance utilities, and + * computes the expected HTR balance for a given set of addresses. + * + * Usage: + * node dist/scripts/replay-balance.js [options] + * + * Options: + * --db Path to events.sqlite (default: ./events.sqlite) + * --addresses Path to addresses CSV (default: ./addresses.csv) + * --token Token UID to compute balance for (default: NATIVE_TOKEN_UID) + * --expected Expected balance in hatoshis for comparison + * --verbose Show per-transaction breakdown + */ + +import Database from 'better-sqlite3'; +import * as fs from 'fs'; +import { bigIntUtils, constants } from '@hathor/wallet-lib'; +import { prepareOutputs, prepareInputs } from '../utils/wallet'; + +// --------------------------------------------------------------------------- +// CLI argument parsing +// --------------------------------------------------------------------------- + +interface Opts { + db: string; + addresses: string; + token: string; + expected?: bigint; + verbose: boolean; +} + +function parseArgs(): Opts { + const args = process.argv.slice(2); + const opts: Opts = { + db: './events.sqlite', + addresses: './addresses.csv', + token: constants.NATIVE_TOKEN_UID, + verbose: false, + }; + + const readNext = (index: number, flag: string): string => { + const value = args[index + 1]; + if (!value || value.startsWith('--')) { + throw new Error(`Missing value for ${flag}`); + } + return value; + }; + + for (let i = 0; i < args.length; i++) { + switch (args[i]) { + case '--db': opts.db = readNext(i, '--db'); i++; break; + case '--addresses': opts.addresses = readNext(i, '--addresses'); i++; break; + case '--token': opts.token = readNext(i, '--token'); i++; break; + case '--expected': opts.expected = BigInt(readNext(i, '--expected')); i++; break; + case '--verbose': opts.verbose = true; break; + default: + throw new Error(`Unknown option: ${args[i]}`); + } + } + return opts; +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function loadAddresses(csvPath: string): Set { + const lines = fs.readFileSync(csvPath, 'utf-8') + .split(/\r?\n/) + .map(l => l.trim()) + .filter(Boolean); + + const hasHeader = lines[0]?.toLowerCase().startsWith('address'); + const dataLines = hasHeader ? lines.slice(1) : lines; + + return new Set( + dataLines + .map(line => line.split(',')[0]?.trim()) + .filter((addr): addr is string => Boolean(addr)), + ); +} + +interface TxState { + hash: string; + voided: boolean; + outputs: any[]; + inputs: any[]; + tokens: string[]; + height: number; + timestamp: number; +} + +// --------------------------------------------------------------------------- +// Main +// --------------------------------------------------------------------------- + +function main() { + const opts = parseArgs(); + + const walletAddresses = loadAddresses(opts.addresses); + console.log(`Loaded ${walletAddresses.size} wallet addresses`); + console.log(`Token: ${opts.token === constants.NATIVE_TOKEN_UID ? 'HTR (native)' : opts.token}`); + + if (walletAddresses.size === 0) { + throw new Error('No wallet addresses found in CSV'); + } + + const sqlite = new Database(opts.db, { readonly: true }); + + // Build the WHERE clause — one parameterized LIKE condition per address + const addresses = Array.from(walletAddresses); + const conditions = addresses.map(() => 'data LIKE ?').join(' OR '); + const likeParams = addresses.map(addr => `%${addr}%`); + + const rows = sqlite.prepare(` + SELECT id, type, data + FROM events + WHERE type IN ('NEW_VERTEX_ACCEPTED', 'VERTEX_METADATA_CHANGED') + AND (${conditions}) + ORDER BY id ASC + `).all(...likeParams) as Array<{ id: number; type: string; data: string }>; + + sqlite.close(); + + console.log(`Found ${rows.length} relevant events`); + + // ------------------------------------------------------------------ + // Pass 1: build the final state of each transaction. + // Later events (VERTEX_METADATA_CHANGED) overwrite earlier ones so + // the map always holds the most recent voided_by for each tx. + // ------------------------------------------------------------------ + + const txStates = new Map(); + + for (const row of rows) { + const event = bigIntUtils.JSONBigInt.parse(row.data); + const data = event.event.data; + const hash: string = data.hash; + const voided: boolean = data.metadata.voided_by.length > 0; + + txStates.set(hash, { + hash, + voided, + outputs: data.outputs, + inputs: data.inputs, + tokens: data.tokens ?? [], + height: data.metadata.height, + timestamp: data.timestamp, + }); + } + + console.log(`Unique transactions: ${txStates.size}`); + + // ------------------------------------------------------------------ + // Pass 2: build the spending map. + // For each non-voided transaction, record which tx hash spends each + // UTXO referenced by its inputs. + // ------------------------------------------------------------------ + + const spentBy = new Map(); // "${txId}:${index}" -> spending tx hash + + for (const tx of txStates.values()) { + if (tx.voided) continue; + const inputs = prepareInputs(tx.inputs, tx.tokens); + for (const input of inputs) { + spentBy.set(`${input.tx_id}:${input.index}`, tx.hash); + } + } + + // ------------------------------------------------------------------ + // Pass 3: compute the wallet balance. + // Sum the value of every unspent HTR output in a non-voided tx that + // belongs to one of the wallet addresses. + // ------------------------------------------------------------------ + + let totalBalance = 0n; + + interface Contribution { + hash: string; + outputIndex: number; + address: string; + amount: bigint; + height: number; + timestamp: number; + } + + const unspentUtxos: Contribution[] = []; + + for (const tx of txStates.values()) { + if (tx.voided) continue; + + const outputs = prepareOutputs(tx.outputs, tx.tokens); + + for (const output of outputs) { + const address = output.decoded?.address; + if (!address || !walletAddresses.has(address)) continue; + if (output.token !== opts.token) continue; + + // Skip authority outputs (mint / melt) + const isAuthority = (output.token_data & 0x80) !== 0; + if (isAuthority) continue; + + const utxoKey = `${tx.hash}:${output.index}`; + if (spentBy.has(utxoKey)) continue; + + totalBalance += BigInt(output.value as unknown as number); + unspentUtxos.push({ + hash: tx.hash, + outputIndex: output.index, + address, + amount: BigInt(output.value as unknown as number), + height: tx.height, + timestamp: tx.timestamp, + }); + } + } + + // ------------------------------------------------------------------ + // Output + // ------------------------------------------------------------------ + + if (opts.verbose) { + console.log('\n--- Unspent HTR UTXOs ---'); + unspentUtxos + .sort((a, b) => a.height - b.height) + .forEach(u => { + console.log( + ` height=${u.height} ${u.hash.substring(0, 16)}... ` + + `output[${u.outputIndex}] ${u.address.substring(0, 8)}... ` + + `+${u.amount} hat`, + ); + }); + } + + console.log('\n=== RESULTS ==='); + console.log(`Computed balance : ${totalBalance} hatoshis`); + console.log(`Unspent UTXOs : ${unspentUtxos.length}`); + + if (opts.expected !== undefined) { + console.log(`Expected balance : ${opts.expected} hatoshis`); + if (totalBalance === opts.expected) { + console.log('✓ MATCH'); + } else { + const diff = totalBalance - opts.expected; + console.log(`✗ MISMATCH — diff: ${diff > 0n ? '+' : ''}${diff} hatoshis`); + } + } +} + +main(); diff --git a/packages/event-downloader/.gitignore b/packages/event-downloader/.gitignore new file mode 100644 index 00000000..02a03cf0 --- /dev/null +++ b/packages/event-downloader/.gitignore @@ -0,0 +1 @@ +events.* diff --git a/packages/event-downloader/package.json b/packages/event-downloader/package.json new file mode 100644 index 00000000..23ad34e1 --- /dev/null +++ b/packages/event-downloader/package.json @@ -0,0 +1,33 @@ +{ + "name": "event-downloader", + "license": "MIT", + "main": "dist/index.js", + "typings": "dist/index.d.ts", + "files": [ + "dist", + "src" + ], + "engines": { + "node": ">=22" + }, + "scripts": { + "build": "tsc -b", + "start": "node dist/index.js", + "lint": "eslint ." + }, + "peerDependencies": { + "@hathor/wallet-lib": ">=2.8.3" + }, + "dependencies": { + "better-sqlite3": "11.7.0", + "dotenv": "16.4.5", + "ws": "8.18.0", + "zod": "3.23.8" + }, + "devDependencies": { + "@types/better-sqlite3": "7.6.12", + "@types/node": "22.10.2", + "@types/ws": "8.5.13", + "typescript": "5.7.2" + } +} diff --git a/packages/event-downloader/src/config.ts b/packages/event-downloader/src/config.ts new file mode 100644 index 00000000..d5d921ed --- /dev/null +++ b/packages/event-downloader/src/config.ts @@ -0,0 +1,46 @@ +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import 'dotenv/config'; + +const requiredEnvs = [ + 'FULLNODE_HOST', +]; + +export const checkEnvVariables = () => { + const missingEnv = requiredEnvs.filter((envVar) => { + const value = process.env[envVar]; + return value === undefined || value.trim() === ''; + }); + + if (missingEnv.length > 0) { + throw new Error(`Missing required environment variables: ${missingEnv.join(', ')}`); + } +}; + +const parsePositiveInt = (envName: string, fallback: number): number => { + const raw = process.env[envName]; + const value = Number.parseInt(raw ?? String(fallback), 10); + if (!Number.isInteger(value) || value <= 0) { + throw new Error(`Invalid ${envName}: expected a positive integer, got "${raw}"`); + } + return value; +}; + +// Fullnode connection +export const FULLNODE_HOST = process.env.FULLNODE_HOST!; +export const USE_SSL = process.env.USE_SSL === 'true'; + +// Download configuration +export const BATCH_SIZE = parsePositiveInt('BATCH_SIZE', 5000); +export const PARALLEL_CONNECTIONS = parsePositiveInt('PARALLEL_CONNECTIONS', 5); +export const WINDOW_SIZE = parsePositiveInt('WINDOW_SIZE', 100); +export const CONNECTION_TIMEOUT_MS = parsePositiveInt('CONNECTION_TIMEOUT_MS', 60000); + +// Database configuration +export const DB_PATH = process.env.DB_PATH ?? './events.sqlite'; + diff --git a/packages/event-downloader/src/db.ts b/packages/event-downloader/src/db.ts new file mode 100644 index 00000000..61e8667b --- /dev/null +++ b/packages/event-downloader/src/db.ts @@ -0,0 +1,193 @@ +import Database, { Database as DatabaseType } from 'better-sqlite3'; + +// Types +export interface Event { + id: number; + type: string; + timestamp: number; + data: string; +} + +export interface TxEvent { + tx_hash: string; + event_id: number; + event_type: string; +} + +export interface BatchProgress { + batch_start: number; + batch_end: number; + last_downloaded: number | null; + status: string; + updated_at: string; +} + +// SQL statements +const CREATE_EVENTS_TABLE = ` + CREATE TABLE IF NOT EXISTS events ( + id INTEGER PRIMARY KEY, + type TEXT NOT NULL, + timestamp INTEGER NOT NULL, + data TEXT NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP + ) +`; + +const CREATE_TX_EVENTS_TABLE = ` + CREATE TABLE IF NOT EXISTS tx_events ( + tx_hash TEXT NOT NULL, + event_id INTEGER NOT NULL, + event_type TEXT NOT NULL, + PRIMARY KEY (tx_hash, event_id) + ) +`; + +const CREATE_DOWNLOAD_PROGRESS_TABLE = ` + CREATE TABLE IF NOT EXISTS download_progress ( + batch_start INTEGER PRIMARY KEY, + batch_end INTEGER NOT NULL, + last_downloaded INTEGER, + status TEXT DEFAULT 'pending', + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP + ) +`; + +const CREATE_TX_EVENTS_HASH_INDEX = ` + CREATE INDEX IF NOT EXISTS idx_tx_events_hash ON tx_events(tx_hash) +`; + +const CREATE_EVENTS_TYPE_INDEX = ` + CREATE INDEX IF NOT EXISTS idx_events_type ON events(type) +`; + +/** + * Initialize the database with all required tables and indexes. + * @param dbPath - Path to the SQLite database file + * @returns The database instance + */ +export function initDatabase(dbPath: string): DatabaseType { + const db = new Database(dbPath); + + // Enable WAL mode for better concurrent read performance + db.pragma('journal_mode = WAL'); + + // Create tables + db.exec(CREATE_EVENTS_TABLE); + db.exec(CREATE_TX_EVENTS_TABLE); + db.exec(CREATE_DOWNLOAD_PROGRESS_TABLE); + + // Create indexes + db.exec(CREATE_TX_EVENTS_HASH_INDEX); + db.exec(CREATE_EVENTS_TYPE_INDEX); + + return db; +} + +/** + * Batch insert events into the events table. + * Uses INSERT OR REPLACE to handle duplicates. + * @param db - Database instance + * @param events - Array of events to insert + */ +export function insertEvents(db: DatabaseType, events: Event[]): void { + if (events.length === 0) { + return; + } + + const insertStmt = db.prepare(` + INSERT OR REPLACE INTO events (id, type, timestamp, data) + VALUES (@id, @type, @timestamp, @data) + `); + + const insertMany = db.transaction((eventsToInsert: Event[]) => { + for (const event of eventsToInsert) { + insertStmt.run(event); + } + }); + + insertMany(events); +} + +/** + * Batch insert transaction event mappings into the tx_events table. + * Uses INSERT OR REPLACE to handle duplicates. + * @param db - Database instance + * @param txEvents - Array of transaction events to insert + */ +export function insertTxEvents(db: DatabaseType, txEvents: TxEvent[]): void { + if (txEvents.length === 0) { + return; + } + + const insertStmt = db.prepare(` + INSERT OR REPLACE INTO tx_events (tx_hash, event_id, event_type) + VALUES (@tx_hash, @event_id, @event_type) + `); + + const insertMany = db.transaction((txEventsToInsert: TxEvent[]) => { + for (const txEvent of txEventsToInsert) { + insertStmt.run(txEvent); + } + }); + + insertMany(txEvents); +} + +/** + * Get the progress for a specific batch. + * @param db - Database instance + * @param batchStart - The starting event ID of the batch + * @returns The batch progress record or undefined if not found + */ +export function getBatchProgress( + db: DatabaseType, + batchStart: number +): BatchProgress | undefined { + const stmt = db.prepare(` + SELECT batch_start, batch_end, last_downloaded, status, updated_at + FROM download_progress + WHERE batch_start = ? + `); + + return stmt.get(batchStart) as BatchProgress | undefined; +} + +/** + * Update or insert batch progress. + * Uses INSERT OR REPLACE to upsert the record. + * @param db - Database instance + * @param batchStart - The starting event ID of the batch + * @param batchEnd - The ending event ID of the batch + * @param lastDownloaded - The last successfully downloaded event ID (null if none) + * @param status - The status of the batch ('pending', 'in_progress', 'completed', etc.) + */ +export function updateBatchProgress( + db: DatabaseType, + batchStart: number, + batchEnd: number, + lastDownloaded: number | null, + status: string +): void { + const stmt = db.prepare(` + INSERT OR REPLACE INTO download_progress (batch_start, batch_end, last_downloaded, status, updated_at) + VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP) + `); + + stmt.run(batchStart, batchEnd, lastDownloaded, status); +} + +/** + * Get all batch progress records. + * @param db - Database instance + * @returns Array of all batch progress records + */ +export function getAllBatchProgress(db: DatabaseType): BatchProgress[] { + const stmt = db.prepare(` + SELECT batch_start, batch_end, last_downloaded, status, updated_at + FROM download_progress + ORDER BY batch_start ASC + `); + + return stmt.all() as BatchProgress[]; +} + diff --git a/packages/event-downloader/src/event-parser.ts b/packages/event-downloader/src/event-parser.ts new file mode 100644 index 00000000..f2f65d31 --- /dev/null +++ b/packages/event-downloader/src/event-parser.ts @@ -0,0 +1,63 @@ +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import { FullNodeEvent, FullNodeEventTypes } from './types'; + +/** + * Extracts the transaction hash from a fullnode event. + * + * @param event - The fullnode event to extract the hash from + * @returns The transaction hash if the event contains one, null otherwise + */ +export function extractTxHash(event: FullNodeEvent): string | null { + const eventType = event.event.type; + const eventData = event.event.data as any; + + switch (eventType) { + case FullNodeEventTypes.NEW_VERTEX_ACCEPTED: + case FullNodeEventTypes.VERTEX_METADATA_CHANGED: + case FullNodeEventTypes.VERTEX_REMOVED: + return eventData?.hash ?? null; + + case FullNodeEventTypes.NC_EVENT: + return eventData?.vertex_id ?? null; + + case FullNodeEventTypes.LOAD_STARTED: + case FullNodeEventTypes.LOAD_FINISHED: + case FullNodeEventTypes.REORG_STARTED: + case FullNodeEventTypes.REORG_FINISHED: + case FullNodeEventTypes.FULL_NODE_CRASHED: + return null; + + default: + // Handle unknown event types by trying common patterns + // TOKEN_CREATED and other events might have different structures + + // Try standard hash field + if (eventData?.hash) { + return eventData.hash; + } + + // Try vertex_id (for vertex-related events) + if (eventData?.vertex_id) { + return eventData.vertex_id; + } + + // Try token_uid (for TOKEN_CREATED events) + if (eventData?.token_uid) { + return eventData.token_uid; + } + + // Try nc_exec_info.nc_tx (for nano contract events) + if (eventData?.nc_exec_info?.nc_tx) { + return eventData.nc_exec_info.nc_tx; + } + + // No recognizable hash found + return null; + } +} diff --git a/packages/event-downloader/src/index.ts b/packages/event-downloader/src/index.ts new file mode 100644 index 00000000..18275786 --- /dev/null +++ b/packages/event-downloader/src/index.ts @@ -0,0 +1,210 @@ +#!/usr/bin/env node +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import { checkEnvVariables, BATCH_SIZE, PARALLEL_CONNECTIONS, WINDOW_SIZE, DB_PATH, FULLNODE_HOST } from './config'; +import { downloadAllEvents, DownloadStats, WorkerStatus } from './orchestrator'; + +// ANSI escape codes for terminal formatting +const HIDE_CURSOR = '\x1b[?25l'; +const SHOW_CURSOR = '\x1b[?25h'; +const CLEAR_SCREEN = '\x1b[2J'; +const MOVE_HOME = '\x1b[H'; +const BOLD = '\x1b[1m'; +const RESET = '\x1b[0m'; +const GREEN = '\x1b[32m'; +const YELLOW = '\x1b[33m'; +const CYAN = '\x1b[36m'; + +const workerStatuses = new Map(); +let lastRenderTime = 0; +const RENDER_INTERVAL_MS = 500; // Only redraw every 500ms + +// For rate and ETA calculation +let startTime = 0; +let lastEventsDownloaded = 0; +let lastRateCheckTime = 0; +let currentRate = 0; // events per second + +function formatNumber(num: number): string { + return num.toLocaleString(); +} + +function formatDuration(seconds: number): string { + if (seconds < 60) { + return `${Math.round(seconds)}s`; + } + if (seconds < 3600) { + const mins = Math.floor(seconds / 60); + const secs = Math.round(seconds % 60); + return `${mins}m ${secs}s`; + } + const hours = Math.floor(seconds / 3600); + const mins = Math.floor((seconds % 3600) / 60); + return `${hours}h ${mins}m`; +} + +function formatRate(rate: number): string { + if (rate >= 1000) { + return `${(rate / 1000).toFixed(1)}k/s`; + } + return `${Math.round(rate)}/s`; +} + +function progressBar(percent: number, width: number = 40): string { + const filled = Math.round((percent / 100) * width); + const empty = width - filled; + return `[${'█'.repeat(filled)}${'░'.repeat(empty)}]`; +} + +let lastStats: DownloadStats | null = null; + +function render(): void { + if (!lastStats) return; + + const stats = lastStats; + const percent = stats.totalEvents > 0 + ? (stats.eventsDownloaded / stats.totalEvents) * 100 + : 0; + + // Calculate rate + const now = Date.now(); + if (lastRateCheckTime > 0) { + const timeDelta = (now - lastRateCheckTime) / 1000; + if (timeDelta > 0) { + const eventsDelta = stats.eventsDownloaded - lastEventsDownloaded; + // Smooth the rate with exponential moving average + const newRate = eventsDelta / timeDelta; + currentRate = currentRate === 0 ? newRate : currentRate * 0.7 + newRate * 0.3; + } + } + lastRateCheckTime = now; + lastEventsDownloaded = stats.eventsDownloaded; + + // Calculate ETA + const eventsRemaining = stats.totalEvents - stats.eventsDownloaded; + const etaSeconds = currentRate > 0 ? eventsRemaining / currentRate : 0; + + // Calculate elapsed time + const elapsedSeconds = startTime > 0 ? (now - startTime) / 1000 : 0; + + // Move cursor to home and clear screen + process.stdout.write(MOVE_HOME + CLEAR_SCREEN); + + // Header + console.log(`${BOLD}Event Downloader v1.0.0${RESET}`); + console.log('━'.repeat(60)); + console.log(`${CYAN}Fullnode:${RESET} ${FULLNODE_HOST}`); + console.log(`${CYAN}Database:${RESET} ${DB_PATH}`); + console.log(`${CYAN}Batch Size:${RESET} ${formatNumber(BATCH_SIZE)} | ${CYAN}Workers:${RESET} ${PARALLEL_CONNECTIONS} | ${CYAN}Window:${RESET} ${WINDOW_SIZE}`); + console.log('━'.repeat(60)); + console.log(); + + // Stats + console.log(`${CYAN}Latest event ID:${RESET} ${formatNumber(stats.totalEvents)}`); + console.log(`${CYAN}Total batches:${RESET} ${stats.totalBatches}`); + console.log( + `${GREEN}Completed:${RESET} ${stats.completedBatches} | ` + + `${YELLOW}In Progress:${RESET} ${stats.inProgressBatches} | ` + + `Pending: ${stats.pendingBatches}` + ); + console.log(); + console.log(`${progressBar(percent)} ${percent.toFixed(1)}%`); + console.log(`${CYAN}Events downloaded:${RESET} ${formatNumber(stats.eventsDownloaded)}`); + console.log( + `${CYAN}Rate:${RESET} ${formatRate(currentRate)} | ` + + `${CYAN}Elapsed:${RESET} ${formatDuration(elapsedSeconds)} | ` + + `${CYAN}ETA:${RESET} ${etaSeconds > 0 ? formatDuration(etaSeconds) : '--'}` + ); + console.log(); + + // Print worker statuses + const sortedWorkers = Array.from(workerStatuses.entries()).sort((a, b) => a[0] - b[0]); + for (const [workerId, status] of sortedWorkers) { + const batchSize = status.batchEnd - status.batchStart; + const workerPercent = batchSize > 0 + ? ((status.lastEventId - status.batchStart) / batchSize * 100).toFixed(0) + : '0'; + console.log( + `Worker ${workerId + 1}: ${formatNumber(status.batchStart)}-${formatNumber(status.batchEnd)} | ` + + `${formatNumber(status.eventsDownloaded)} events (${workerPercent}%)` + ); + } +} + +function printStats(stats: DownloadStats): void { + lastStats = stats; + + const now = Date.now(); + if (now - lastRenderTime < RENDER_INTERVAL_MS) { + return; // Throttle updates + } + lastRenderTime = now; + + render(); +} + +function handleWorkerUpdate(workerId: number, status: WorkerStatus): void { + workerStatuses.set(workerId, status); +} + +async function main(): Promise { + try { + // Check environment variables + checkEnvVariables(); + + // Hide cursor and clear screen + process.stdout.write(HIDE_CURSOR + CLEAR_SCREEN + MOVE_HOME); + + console.log(`${BOLD}Event Downloader v1.0.0${RESET}`); + console.log('Connecting to fullnode...'); + + // Initialize timing + startTime = Date.now(); + + // Handle graceful shutdown + let isShuttingDown = false; + const cleanup = () => { + if (isShuttingDown) return; + isShuttingDown = true; + process.stdout.write(SHOW_CURSOR); + console.log('\n\nDownload interrupted. Saving progress...'); + // Give a brief window for in-flight DB writes to complete + setTimeout(() => { + console.log('Progress has been saved.'); + process.exit(0); + }, 2000); + }; + + process.on('SIGINT', cleanup); + process.on('SIGTERM', cleanup); + + // Start download + await downloadAllEvents({ + onStatsUpdate: printStats, + onWorkerUpdate: handleWorkerUpdate, + onComplete: () => { + render(); // Final render + process.stdout.write(SHOW_CURSOR); + console.log(); + console.log('━'.repeat(60)); + console.log(`${GREEN}${BOLD}Download complete!${RESET}`); + console.log(`Database saved to: ${DB_PATH}`); + }, + onError: (error) => { + process.stdout.write(SHOW_CURSOR); + console.error(`\n${BOLD}Error:${RESET}`, error.message); + }, + }); + } catch (error) { + process.stdout.write(SHOW_CURSOR); + console.error('Fatal error:', error); + process.exit(1); + } +} + +main(); diff --git a/packages/event-downloader/src/orchestrator.ts b/packages/event-downloader/src/orchestrator.ts new file mode 100644 index 00000000..661f3779 --- /dev/null +++ b/packages/event-downloader/src/orchestrator.ts @@ -0,0 +1,401 @@ +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import { Database as DatabaseType } from 'better-sqlite3'; +import { bigIntUtils } from '@hathor/wallet-lib'; +import WebSocket from 'ws'; +import { + initDatabase, + insertEvents, + insertTxEvents, + updateBatchProgress, + getAllBatchProgress, + BatchProgress, + Event as DbEvent, + TxEvent, +} from './db'; +import { createWorker, BatchConfig } from './worker'; +import { extractTxHash } from './event-parser'; +import { FullNodeEvent } from './types'; +import { FULLNODE_HOST, USE_SSL, BATCH_SIZE, PARALLEL_CONNECTIONS, DB_PATH } from './config'; + +export interface DownloadStats { + totalEvents: number; + totalBatches: number; + completedBatches: number; + inProgressBatches: number; + pendingBatches: number; + eventsDownloaded: number; +} + +export interface WorkerStatus { + batchStart: number; + batchEnd: number; + eventsDownloaded: number; + lastEventId: number; +} + +export interface OrchestratorCallbacks { + onStatsUpdate: (stats: DownloadStats) => void; + onWorkerUpdate: (workerId: number, status: WorkerStatus) => void; + onComplete: () => void; + onError: (error: Error) => void; +} + +interface BatchInfo { + start: number; + end: number; + lastDownloaded?: number; +} + +/** + * Get the latest event ID from the fullnode via WebSocket. + */ +export async function getLatestEventId(): Promise { + return new Promise((resolve, reject) => { + const protocol = USE_SSL ? 'wss://' : 'ws://'; + const url = new URL(`${protocol}${FULLNODE_HOST}`); + url.pathname = '/v1a/event_ws'; + + const socket = new WebSocket(url.toString()); + const timeout = setTimeout(() => { + socket.close(); + reject(new Error('Timeout waiting for initial event')); + }, 30000); + + socket.onopen = () => { + const startMessage = { + type: 'START_STREAM', + window_size: 1, + }; + socket.send(bigIntUtils.JSONBigInt.stringify(startMessage)); + }; + + socket.onmessage = (event) => { + clearTimeout(timeout); + try { + const data = bigIntUtils.JSONBigInt.parse(event.data.toString()); + const rawLatestEventId = data?.latest_event_id; + const latestEventId = typeof rawLatestEventId === 'bigint' + ? Number(rawLatestEventId) + : rawLatestEventId; + if (!Number.isSafeInteger(latestEventId) || latestEventId < 0) { + throw new Error(`Invalid latest_event_id: ${String(rawLatestEventId)}`); + } + socket.close(); + resolve(latestEventId); + } catch (error) { + socket.close(); + reject(error); + } + }; + + socket.onerror = (error) => { + clearTimeout(timeout); + reject(new Error(`WebSocket error: ${error.message}`)); + }; + }); +} + +/** + * Calculate batches needed to download all events. + */ +function calculateBatches(latestEventId: number, batchSize: number): BatchInfo[] { + const batches: BatchInfo[] = []; + let start = 0; + + while (start <= latestEventId) { + const end = Math.min(start + batchSize - 1, latestEventId); + batches.push({ start, end }); + start = end + 1; + } + + return batches; +} + +/** + * Merge calculated batches with existing progress from database. + * Handles the case where a batch was completed with a smaller boundary + * (e.g., latestEventId was lower during a previous run). + */ +function mergeBatchesWithProgress( + batches: BatchInfo[], + existingProgress: BatchProgress[] +): BatchInfo[] { + const progressMap = new Map(); + for (const progress of existingProgress) { + progressMap.set(progress.batch_start, progress); + } + + return batches.map((batch) => { + const existing = progressMap.get(batch.start); + if (!existing) { + return batch; + } + + // Only consider fully complete if the stored batch_end covers the calculated batch_end + if (existing.status === 'completed' && existing.batch_end >= batch.end) { + return { ...batch, lastDownloaded: batch.end }; + } + + // Batch was "completed" but with a smaller boundary - resume from where it ended + if (existing.status === 'completed') { + return { ...batch, lastDownloaded: existing.batch_end }; + } + + // For in-progress or failed batches, resume from last_downloaded + if (existing.last_downloaded !== null) { + return { ...batch, lastDownloaded: existing.last_downloaded }; + } + + return batch; + }).filter((batch) => { + // Filter out completed batches + return batch.lastDownloaded === undefined || batch.lastDownloaded < batch.end; + }); +} + +/** + * Run a single worker for a batch. + */ +function runWorker( + db: DatabaseType, + batch: BatchInfo, + workerId: number, + callbacks: OrchestratorCallbacks +): Promise { + return new Promise((resolve, reject) => { + const eventBuffer: DbEvent[] = []; + const txEventBuffer: TxEvent[] = []; + let eventsDownloaded = 0; + let lastEventId = batch.lastDownloaded ?? batch.start - 1; + let isSettled = false; + + const settleResolve = () => { + if (!isSettled) { + isSettled = true; + resolve(); + } + }; + const settleReject = (error: Error) => { + if (!isSettled) { + isSettled = true; + reject(error); + } + }; + + const flushBuffers = () => { + if (eventBuffer.length > 0) { + insertEvents(db, eventBuffer); + eventBuffer.length = 0; + } + if (txEventBuffer.length > 0) { + insertTxEvents(db, txEventBuffer); + txEventBuffer.length = 0; + } + }; + + const config: BatchConfig = { + batchStart: batch.start, + batchEnd: batch.end, + lastDownloaded: batch.lastDownloaded, + }; + + const worker = createWorker(config, { + onEvent: (event: FullNodeEvent) => { + eventsDownloaded++; + lastEventId = event.event.id; + + // Buffer event for batch insert + eventBuffer.push({ + id: event.event.id, + type: event.event.type, + timestamp: event.event.timestamp, + data: bigIntUtils.JSONBigInt.stringify(event), + }); + + // Extract and buffer tx hash mapping + const txHash = extractTxHash(event); + if (txHash) { + txEventBuffer.push({ + tx_hash: txHash, + event_id: event.event.id, + event_type: event.event.type, + }); + } + + // Flush every 10 events to avoid memory buildup + if (eventBuffer.length >= 10) { + try { + flushBuffers(); + updateBatchProgress(db, batch.start, batch.end, lastEventId, 'in_progress'); + } catch (e) { + // Database might be closed, ignore + } + } + }, + + onProgress: (eventId: number) => { + callbacks.onWorkerUpdate(workerId, { + batchStart: batch.start, + batchEnd: batch.end, + eventsDownloaded, + lastEventId: eventId, + }); + }, + + onComplete: () => { + try { + flushBuffers(); + updateBatchProgress(db, batch.start, batch.end, batch.end, 'completed'); + settleResolve(); + } catch (error) { + settleReject(error instanceof Error ? error : new Error(String(error))); + } + }, + + onError: (error: Error) => { + // Save progress before failing (only if db is still open) + try { + flushBuffers(); + if (lastEventId >= batch.start) { + updateBatchProgress(db, batch.start, batch.end, lastEventId, 'failed'); + } + } catch (e) { + // Database might be closed already, ignore + } + settleReject(error); + }, + }); + + // Mark batch as in progress + updateBatchProgress(db, batch.start, batch.end, batch.lastDownloaded ?? null, 'in_progress'); + worker.start(); + }); +} + +/** + * Run workers with concurrency limit. + * Each worker slot picks up the next available batch when done. + */ +async function runWithConcurrency( + items: T[], + concurrency: number, + fn: (item: T, workerSlot: number) => Promise +): Promise { + const results: Promise[] = []; + const errors: Error[] = []; + let currentIndex = 0; + + const runWorkerSlot = async (workerSlot: number): Promise => { + while (currentIndex < items.length) { + const index = currentIndex++; + if (index >= items.length) { + break; + } + + try { + await fn(items[index], workerSlot); + } catch (error) { + const normalized = error instanceof Error ? error : new Error(String(error)); + errors.push(normalized); + } + } + }; + + // Start worker slots + const numWorkers = Math.min(concurrency, items.length); + for (let i = 0; i < numWorkers; i++) { + results.push(runWorkerSlot(i)); + } + + await Promise.all(results); + return errors; +} + +/** + * Main orchestrator function to download all events. + */ +export async function downloadAllEvents(callbacks: OrchestratorCallbacks): Promise { + // Initialize database + const db = initDatabase(DB_PATH); + + try { + // Get latest event ID from fullnode + console.log('Connecting to fullnode to get latest event ID...'); + const latestEventId = await getLatestEventId(); + console.log(`Latest event ID: ${latestEventId.toLocaleString()}`); + + // Calculate all batches + const allBatches = calculateBatches(latestEventId, BATCH_SIZE); + console.log(`Total batches: ${allBatches.length} (${BATCH_SIZE.toLocaleString()} events each)`); + + // Get existing progress and merge + const existingProgress = getAllBatchProgress(db); + const pendingBatches = mergeBatchesWithProgress(allBatches, existingProgress); + + const completedCount = allBatches.length - pendingBatches.length; + console.log(`Completed: ${completedCount} | Pending: ${pendingBatches.length}`); + + if (pendingBatches.length === 0) { + console.log('All batches already completed!'); + callbacks.onComplete(); + return; + } + + // Initialize progress tracking + let totalEventsDownloaded = 0; + let finishedPendingBatches = 0; + let activeWorkers = 0; + const workerStatuses = new Map(); + + const updateStats = () => { + callbacks.onStatsUpdate({ + totalEvents: latestEventId + 1, + totalBatches: allBatches.length, + completedBatches: completedCount + finishedPendingBatches, + inProgressBatches: activeWorkers, + pendingBatches: pendingBatches.length - finishedPendingBatches - activeWorkers, + eventsDownloaded: totalEventsDownloaded, + }); + }; + + // Run workers with concurrency limit + let completedEventsDownloaded = 0; + + const errors = await runWithConcurrency(pendingBatches, PARALLEL_CONNECTIONS, async (batch, workerSlot) => { + activeWorkers++; + try { + await runWorker(db, batch, workerSlot, { + ...callbacks, + onWorkerUpdate: (workerId, status) => { + workerStatuses.set(workerId, status); + const inProgressEvents = Array.from(workerStatuses.values()) + .reduce((sum, s) => sum + s.eventsDownloaded, 0); + totalEventsDownloaded = completedEventsDownloaded + inProgressEvents; + updateStats(); + callbacks.onWorkerUpdate(workerId, status); + }, + }); + completedEventsDownloaded += workerStatuses.get(workerSlot)?.eventsDownloaded ?? 0; + workerStatuses.delete(workerSlot); + finishedPendingBatches++; + } finally { + activeWorkers--; + } + updateStats(); + }); + + if (errors.length > 0) { + callbacks.onError(new Error(`${errors.length} batch(es) failed: ${errors.map(e => e.message).join('; ')}`)); + return; + } + callbacks.onComplete(); + } finally { + db.close(); + } +} diff --git a/packages/event-downloader/src/types.ts b/packages/event-downloader/src/types.ts new file mode 100644 index 00000000..eda2b821 --- /dev/null +++ b/packages/event-downloader/src/types.ts @@ -0,0 +1,200 @@ +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import z from 'zod'; +import { bigIntUtils } from '@hathor/wallet-lib'; + +// Use type assertion to handle zod version compatibility +const bigIntSchema = bigIntUtils.bigIntCoercibleSchema as unknown as z.ZodType; + +export type WebSocketSendEvent = + | { + type: 'START_STREAM'; + window_size: number; + last_ack_event_id?: number; + } + | { + type: 'ACK'; + window_size: number; + ack_event_id?: number; + }; + +export enum FullNodeEventTypes { + VERTEX_METADATA_CHANGED = 'VERTEX_METADATA_CHANGED', + VERTEX_REMOVED = 'VERTEX_REMOVED', + NEW_VERTEX_ACCEPTED = 'NEW_VERTEX_ACCEPTED', + LOAD_STARTED = 'LOAD_STARTED', + LOAD_FINISHED = 'LOAD_FINISHED', + REORG_STARTED = 'REORG_STARTED', + REORG_FINISHED = 'REORG_FINISHED', + NC_EVENT = 'NC_EVENT', + FULL_NODE_CRASHED = 'FULL_NODE_CRASHED', +} + +/** + * All events with transactions + */ +const StandardFullNodeEvents = z.union([ + z.literal('VERTEX_METADATA_CHANGED'), + z.literal('NEW_VERTEX_ACCEPTED'), +]); + +/** + * Events without data + */ +const EmptyDataFullNodeEvents = z.union([ + z.literal('LOAD_STARTED'), + z.literal('LOAD_FINISHED'), + z.literal('REORG_FINISHED'), + z.literal('FULL_NODE_CRASHED'), +]); + +export const FullNodeEventBaseSchema = z.object({ + stream_id: z.string(), + peer_id: z.string(), + network: z.string(), + type: z.string(), + latest_event_id: z.number(), +}); + +export type FullNodeEventBase = z.infer; + +export const EventTxOutputSchema = z.object({ + value: bigIntSchema, + token_data: z.number(), + script: z.string(), + locked: z.boolean().optional(), + decoded: z.union([ + z.object({ + type: z.string(), + address: z.string(), + timelock: z.number().nullable(), + }).passthrough().nullable(), + z.object({ + token_data: z.number().nullable(), + }), + z.object({}).strict(), + ]), +}); +export type EventTxOutput = z.infer; + +export const EventTxInputSchema = z.object({ + tx_id: z.string(), + index: z.number(), + spent_output: EventTxOutputSchema, +}); +export type EventTxInput = z.infer; + +export const EventTxNanoHeaderSchema = z.object({ + id: z.string(), + nc_seqnum: z.number(), + nc_id: z.string(), + nc_method: z.string(), + nc_address: z.string(), +}); +export type EventTxNanoHeader = z.infer; + +export const TxEventDataWithoutMetaSchema = z.object({ + hash: z.string(), + timestamp: z.number(), + version: z.number(), + weight: z.number(), + nonce: bigIntSchema, + inputs: EventTxInputSchema.array(), + outputs: EventTxOutputSchema.array(), + headers: EventTxNanoHeaderSchema.array().optional(), + parents: z.string().array(), + tokens: z.string().array(), + token_name: z.string().nullable(), + token_symbol: z.string().nullable(), + signal_bits: z.number(), +}); + +export const TxEventDataSchema = TxEventDataWithoutMetaSchema.extend({ + metadata: z.object({ + hash: z.string(), + voided_by: z.string().array(), + first_block: z.string().nullable(), + height: z.number(), + }), +}); + +export const StandardFullNodeEventSchema = FullNodeEventBaseSchema.extend({ + event: z.object({ + id: z.number(), + timestamp: z.number(), + type: StandardFullNodeEvents, + data: TxEventDataSchema, + }), +}); + +export type StandardFullNodeEvent = z.infer; + +export const ReorgFullNodeEventSchema = FullNodeEventBaseSchema.extend({ + event: z.object({ + id: z.number(), + timestamp: z.number(), + type: z.literal('REORG_STARTED'), + data: z.object({ + reorg_size: z.number(), + previous_best_block: z.string(), + new_best_block: z.string(), + common_block: z.string(), + }), + group_id: z.number(), + }), +}); +export type ReorgFullNodeEvent = z.infer; + +export const EmptyDataFullNodeEventSchema = FullNodeEventBaseSchema.extend({ + event: z.object({ + id: z.number(), + timestamp: z.number(), + type: EmptyDataFullNodeEvents, + data: z.object({}).optional(), + }), +}); + +export const TxDataWithoutMetaFullNodeEventSchema = FullNodeEventBaseSchema.extend({ + event: z.object({ + id: z.number(), + timestamp: z.number(), + type: z.literal('VERTEX_REMOVED'), + data: TxEventDataWithoutMetaSchema, + }), +}); + +export const NcEventSchema = FullNodeEventBaseSchema.extend({ + event: z.object({ + id: z.number(), + timestamp: z.number(), + type: z.literal('NC_EVENT'), + data: z.object({ + vertex_id: z.string(), + nc_id: z.string(), + nc_execution: z.union([ + z.literal('pending'), + z.literal('success'), + z.literal('failure'), + z.literal('skipped'), + ]), + first_block: z.string(), + data_hex: z.string(), + }), + group_id: z.number().nullish(), + }), +}); +export type NcEvent = z.infer; + +export const FullNodeEventSchema = z.union([ + TxDataWithoutMetaFullNodeEventSchema, + StandardFullNodeEventSchema, + ReorgFullNodeEventSchema, + EmptyDataFullNodeEventSchema, + NcEventSchema, +]); +export type FullNodeEvent = z.infer; diff --git a/packages/event-downloader/src/worker.ts b/packages/event-downloader/src/worker.ts new file mode 100644 index 00000000..19840710 --- /dev/null +++ b/packages/event-downloader/src/worker.ts @@ -0,0 +1,186 @@ +/** + * Copyright (c) Hathor Labs and its affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +import { WebSocket, MessageEvent, ErrorEvent } from 'ws'; +import { bigIntUtils } from '@hathor/wallet-lib'; +import { FullNodeEvent, WebSocketSendEvent } from './types'; +import { FULLNODE_HOST, USE_SSL, WINDOW_SIZE, CONNECTION_TIMEOUT_MS } from './config'; + +export interface BatchConfig { + batchStart: number; + batchEnd: number; + lastDownloaded?: number; // Resume from this event if set +} + +export interface WorkerCallbacks { + onEvent: (event: FullNodeEvent) => void; + onProgress: (eventId: number) => void; + onComplete: () => void; + onError: (error: Error) => void; +} + +export interface Worker { + start: () => void; + stop: () => void; +} + +/** + * Creates a WebSocket worker that downloads a batch of events from the fullnode. + * + * @param config - Configuration for the batch to download + * @param callbacks - Callback functions for event handling + * @returns Worker object with start and stop methods + */ +export function createWorker(config: BatchConfig, callbacks: WorkerCallbacks): Worker { + const { batchStart, batchEnd, lastDownloaded } = config; + const { onEvent, onProgress, onComplete, onError } = callbacks; + + let socket: WebSocket | null = null; + let isRunning = false; + let isDone = false; + let hasFailed = false; + let eventsSinceLastAck = 0; + let lastReceivedEventId = 0; + let activityTimeout: ReturnType | null = null; + + const fail = (error: Error): void => { + if (hasFailed || isDone) return; + hasFailed = true; + onError(error); + stop(); + }; + + const resetActivityTimeout = (): void => { + if (activityTimeout) { + clearTimeout(activityTimeout); + } + if (isRunning && CONNECTION_TIMEOUT_MS > 0) { + activityTimeout = setTimeout(() => { + if (isRunning) { + fail(new Error(`Connection timeout: no activity for ${CONNECTION_TIMEOUT_MS}ms`)); + } + }, CONNECTION_TIMEOUT_MS); + } + }; + + const clearActivityTimeout = (): void => { + if (activityTimeout) { + clearTimeout(activityTimeout); + activityTimeout = null; + } + }; + + const getWsUrl = (): string => { + const protocol = USE_SSL ? 'wss://' : 'ws://'; + const fullNodeUrl = new URL(`${protocol}${FULLNODE_HOST}`); + fullNodeUrl.pathname = '/v1a/event_ws'; + return fullNodeUrl.toString(); + }; + + const sendMessage = (message: WebSocketSendEvent): void => { + if (socket && socket.readyState === WebSocket.OPEN) { + const payload = bigIntUtils.JSONBigInt.stringify(message); + socket.send(payload); + } + }; + + const start = (): void => { + if (isRunning) { + return; + } + + isRunning = true; + socket = new WebSocket(getWsUrl()); + + socket.onopen = () => { + const lastAckEventId = lastDownloaded ?? (batchStart > 0 ? batchStart - 1 : undefined); + const startMessage: WebSocketSendEvent = { + type: 'START_STREAM', + window_size: WINDOW_SIZE, + ...(lastAckEventId !== undefined && { last_ack_event_id: lastAckEventId }), + }; + sendMessage(startMessage); + resetActivityTimeout(); + }; + + socket.onmessage = (socketEvent: MessageEvent) => { + if (isDone) return; + resetActivityTimeout(); + try { + const rawData = bigIntUtils.JSONBigInt.parse(socketEvent.data.toString()); + + // Check if this is an event message (not a handshake or control message) + // Event messages have: event.id, event.type, event.timestamp + if (!rawData.event || typeof rawData.event.id !== 'number' || typeof rawData.event.type !== 'string') { + // Skip non-event messages (handshake, control messages, etc.) + console.log(`Skipping non-event message: ${JSON.stringify(rawData).substring(0, 100)}...`); + return; + } + + // Accept ALL events regardless of type - store them as-is + // TypeScript requires us to cast, but we're intentionally accepting any event structure + const event = rawData as any; + const eventId = event.event.id; + + // Call the event callback + onEvent(event); + + // Track events for batched ACKs + eventsSinceLastAck++; + lastReceivedEventId = eventId; + + // Report progress + onProgress(eventId); + + // Check if we've reached the end of the batch + const isComplete = eventId >= batchEnd; + + // Send ACK after receiving WINDOW_SIZE events, or when batch is complete + if (eventsSinceLastAck >= WINDOW_SIZE || isComplete) { + const ackMessage: WebSocketSendEvent = { + type: 'ACK', + window_size: WINDOW_SIZE, + ack_event_id: lastReceivedEventId, + }; + sendMessage(ackMessage); + eventsSinceLastAck = 0; + } + + if (isComplete) { + isDone = true; + onComplete(); + stop(); + } + } catch (error) { + fail(error instanceof Error ? error : new Error(String(error))); + } + }; + + socket.onerror = (error: ErrorEvent) => { + fail(new Error(`WebSocket error: ${error.message}`)); + }; + + socket.onclose = () => { + if (isRunning && !isDone) { + fail(new Error('WebSocket connection closed unexpectedly')); + } + isRunning = false; + socket = null; + }; + }; + + const stop = (): void => { + isRunning = false; + clearActivityTimeout(); + if (socket) { + socket.close(); + socket = null; + } + }; + + return { start, stop }; +} diff --git a/packages/event-downloader/tsconfig.json b/packages/event-downloader/tsconfig.json new file mode 100644 index 00000000..200c8a6c --- /dev/null +++ b/packages/event-downloader/tsconfig.json @@ -0,0 +1,17 @@ +{ + "compilerOptions": { + "target": "ES2020", + "module": "commonjs", + "lib": ["ES2020"], + "outDir": "./dist", + "rootDir": "./src", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true, + "declaration": true + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "dist"] +} diff --git a/yarn.lock b/yarn.lock index 472fcd71..055c5139 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6493,6 +6493,15 @@ __metadata: languageName: node linkType: hard +"@types/better-sqlite3@npm:7.6.12": + version: 7.6.12 + resolution: "@types/better-sqlite3@npm:7.6.12" + dependencies: + "@types/node": "npm:*" + checksum: 10/a442231518f1a3e28e0ee6efe2581807e5cfaa88a8af4513da34e6ae303ce82f5666e8a0048a1d2738fa248b1b369ee28e59a75189ae0b43e28e44a243a2f24b + languageName: node + linkType: hard + "@types/body-parser@npm:*": version: 1.19.3 resolution: "@types/body-parser@npm:1.19.3" @@ -6804,6 +6813,15 @@ __metadata: languageName: node linkType: hard +"@types/node@npm:22.10.2": + version: 22.10.2 + resolution: "@types/node@npm:22.10.2" + dependencies: + undici-types: "npm:~6.20.0" + checksum: 10/451adfefed4add58b069407173e616220fd4aaa3307cdde1bb701aa053b65b54ced8483db2f870dcedec7a58cb3b06101fbc19d85852716672ec1fd3660947fa + languageName: node + linkType: hard + "@types/node@npm:^22.8.7": version: 22.9.0 resolution: "@types/node@npm:22.9.0" @@ -6927,6 +6945,15 @@ __metadata: languageName: node linkType: hard +"@types/ws@npm:8.5.13": + version: 8.5.13 + resolution: "@types/ws@npm:8.5.13" + dependencies: + "@types/node": "npm:*" + checksum: 10/21369beafa75c91ae3b00d3a2671c7408fceae1d492ca2abd5ac7c8c8bf4596d513c1599ebbddeae82c27c4a2d248976d0d714c4b3d34362b2ae35b964e2e637 + languageName: node + linkType: hard + "@types/ws@npm:8.5.5": version: 8.5.5 resolution: "@types/ws@npm:8.5.5" @@ -8367,6 +8394,17 @@ __metadata: languageName: node linkType: hard +"better-sqlite3@npm:11.7.0": + version: 11.7.0 + resolution: "better-sqlite3@npm:11.7.0" + dependencies: + bindings: "npm:^1.5.0" + node-gyp: "npm:latest" + prebuild-install: "npm:^7.1.1" + checksum: 10/a09bb28c0292bb7c037896ee99197815841275bca2d14f63b58994188239f292642c8c7ea3e0206d8ea6c7530d7b03d7343138ebeb9a4cc855c0b3663e00c812 + languageName: node + linkType: hard + "bigi@npm:^1.1.0, bigi@npm:^1.4.2": version: 1.4.2 resolution: "bigi@npm:1.4.2" @@ -9923,6 +9961,13 @@ __metadata: languageName: node linkType: hard +"dotenv@npm:16.4.5, dotenv@npm:^16.4.5": + version: 16.4.5 + resolution: "dotenv@npm:16.4.5" + checksum: 10/55a3134601115194ae0f924e54473459ed0d9fc340ae610b676e248cca45aa7c680d86365318ea964e6da4e2ea80c4514c1adab5adb43d6867fb57ff068f95c8 + languageName: node + linkType: hard + "dotenv@npm:8.2.0": version: 8.2.0 resolution: "dotenv@npm:8.2.0" @@ -9937,13 +9982,6 @@ __metadata: languageName: node linkType: hard -"dotenv@npm:^16.4.5": - version: 16.4.5 - resolution: "dotenv@npm:16.4.5" - checksum: 10/55a3134601115194ae0f924e54473459ed0d9fc340ae610b676e248cca45aa7c680d86365318ea964e6da4e2ea80c4514c1adab5adb43d6867fb57ff068f95c8 - languageName: node - linkType: hard - "dottie@npm:^2.0.6": version: 2.0.6 resolution: "dottie@npm:2.0.6" @@ -10966,6 +11004,23 @@ __metadata: languageName: node linkType: hard +"event-downloader@workspace:packages/event-downloader": + version: 0.0.0-use.local + resolution: "event-downloader@workspace:packages/event-downloader" + dependencies: + "@types/better-sqlite3": "npm:7.6.12" + "@types/node": "npm:22.10.2" + "@types/ws": "npm:8.5.13" + better-sqlite3: "npm:11.7.0" + dotenv: "npm:16.4.5" + typescript: "npm:5.7.2" + ws: "npm:8.18.0" + zod: "npm:3.23.8" + peerDependencies: + "@hathor/wallet-lib": ">=2.8.3" + languageName: unknown + linkType: soft + "event-emitter@npm:^0.3.5": version: 0.3.5 resolution: "event-emitter@npm:0.3.5" @@ -17788,6 +17843,7 @@ __metadata: dependencies: "@aws-sdk/client-lambda": "npm:3.540.0" "@aws-sdk/client-sqs": "npm:3.540.0" + "@types/better-sqlite3": "npm:7.6.12" "@types/jest": "npm:29.5.4" "@types/lodash": "npm:4.14.199" "@types/mysql": "npm:2.15.21" @@ -17798,6 +17854,7 @@ __metadata: assert: "npm:2.1.0" aws-sdk: "npm:2.1454.0" axios: "npm:1.6.2" + better-sqlite3: "npm:11.7.0" dotenv: "npm:8.2.0" eslint: "npm:9.3.0" eslint-config-airbnb-base: "npm:15.0.0" @@ -18524,6 +18581,16 @@ __metadata: languageName: node linkType: hard +"typescript@npm:5.7.2": + version: 5.7.2 + resolution: "typescript@npm:5.7.2" + bin: + tsc: bin/tsc + tsserver: bin/tsserver + checksum: 10/4caa3904df69db9d4a8bedc31bafc1e19ffb7b24fbde2997a1633ae1398d0de5bdbf8daf602ccf3b23faddf1aeeb9b795223a2ed9c9a4fdcaf07bfde114a401a + languageName: node + linkType: hard + "typescript@npm:^5.8.2": version: 5.8.2 resolution: "typescript@npm:5.8.2" @@ -18564,6 +18631,16 @@ __metadata: languageName: node linkType: hard +"typescript@patch:typescript@npm%3A5.7.2#optional!builtin": + version: 5.7.2 + resolution: "typescript@patch:typescript@npm%3A5.7.2#optional!builtin::version=5.7.2&hash=5786d5" + bin: + tsc: bin/tsc + tsserver: bin/tsserver + checksum: 10/d75ca10141afc64fd3474b41a8b082b640555bed388d237558aed64e5827ddadb48f90932c7f4205883f18f5bcab8b6a739a2cfac95855604b0dfeb34bc2f3eb + languageName: node + linkType: hard + "typescript@patch:typescript@npm%3A^5.8.2#optional!builtin": version: 5.8.2 resolution: "typescript@patch:typescript@npm%3A5.8.2#optional!builtin::version=5.8.2&hash=5786d5" @@ -18626,6 +18703,13 @@ __metadata: languageName: node linkType: hard +"undici-types@npm:~6.20.0": + version: 6.20.0 + resolution: "undici-types@npm:6.20.0" + checksum: 10/583ac7bbf4ff69931d3985f4762cde2690bb607844c16a5e2fbb92ed312fe4fa1b365e953032d469fa28ba8b224e88a595f0b10a449332f83fa77c695e567dbe + languageName: node + linkType: hard + "uni-global@npm:^1.0.0": version: 1.0.0 resolution: "uni-global@npm:1.0.0" @@ -19276,6 +19360,21 @@ __metadata: languageName: node linkType: hard +"ws@npm:8.18.0": + version: 8.18.0 + resolution: "ws@npm:8.18.0" + peerDependencies: + bufferutil: ^4.0.1 + utf-8-validate: ">=5.0.2" + peerDependenciesMeta: + bufferutil: + optional: true + utf-8-validate: + optional: true + checksum: 10/70dfe53f23ff4368d46e4c0b1d4ca734db2c4149c6f68bc62cb16fc21f753c47b35fcc6e582f3bdfba0eaeb1c488cddab3c2255755a5c3eecb251431e42b3ff6 + languageName: node + linkType: hard + "ws@npm:^7.5.3, ws@npm:^7.5.9": version: 7.5.9 resolution: "ws@npm:7.5.9"