diff --git a/README.md b/README.md index 685c376f..9d0e3084 100644 --- a/README.md +++ b/README.md @@ -73,7 +73,7 @@ In this example, we are passing the env variables to the container and running a ## State Machine -The state machine diagram can be visualized at https://xstate.js.org/viz/?gist=7299c0ed0ce189bc121a06dce1e11638 +The state machine diagram can be visualized at https://xstate.js.org/viz/?gist=19dd8bc6d62533add23e124ef31adb78 ## States: diff --git a/src/api/fullnode.ts b/src/api/fullnode.ts index 5a5906db..0dbd2172 100644 --- a/src/api/fullnode.ts +++ b/src/api/fullnode.ts @@ -16,6 +16,7 @@ import { } from '../types'; import axios from 'axios'; import { globalCache } from '../utils'; +import logger from '../logger'; const DEFAULT_SERVER = process.env.DEFAULT_SERVER || 'https://node1.foxtrot.testnet.hathor.network/v1a/'; @@ -39,6 +40,22 @@ export const downloadTx = async (txId: string, noCache: boolean = false) => { return response.data; }; +/** + * Returns a list of transactions on the mempool + * + */ +export const downloadMempool = async () => { + const response = await axios.get(`${DEFAULT_SERVER}mempool`); + + const data = response.data; + + if (!data.success) { + logger.error(data); + throw new Error('Mempool API failure'); + } + return data; +}; + /** * Returns a `FullBlock` downloaded from the full_node * diff --git a/src/index.ts b/src/index.ts index abe158e5..b6655680 100644 --- a/src/index.ts +++ b/src/index.ts @@ -13,11 +13,9 @@ import { Connection } from '@hathor/wallet-lib'; import logger from './logger'; // @ts-ignore -const machine = interpret(SyncMachine).start(); - -machine.onTransition(state => { - if (state.changed && state.value) { - logger.debug(`Transitioned to state: ${state.value}`); +const machine = interpret(SyncMachine).onTransition(state => { + if (state.changed) { + logger.debug('Transitioned to state: ', state.value); } }); @@ -31,11 +29,14 @@ const handleMessage = (message: any) => { * full node's best block height */ case 'network:new_tx_accepted': - if (!message.is_block) return; if (message.is_voided) return; - if (message.type === 'network:new_tx_accepted') { - machine.send({ type: 'NEW_BLOCK' }); + if (!message.is_block) { + // identify the tx as a mempool tx + if (message.first_block) return; + machine.send({ type: 'MEMPOOL_UPDATE' }); + return; } + machine.send({ type: 'NEW_BLOCK' }); break; case 'state_update': @@ -46,10 +47,12 @@ const handleMessage = (message: any) => { if (message.state === Connection.CONNECTED) { machine.send({ type: 'NEW_BLOCK' }); } - break; + break; } }; +machine.start(); + const DEFAULT_SERVER = process.env.DEFAULT_SERVER; const conn = new Connection({ network: process.env.NETWORK, servers: [DEFAULT_SERVER] }); diff --git a/src/machine.ts b/src/machine.ts index db194c21..40db28fb 100644 --- a/src/machine.ts +++ b/src/machine.ts @@ -10,13 +10,14 @@ import { assign, send, } from 'xstate'; -import { syncToLatestBlock } from './utils'; +import { syncToLatestBlock, syncLatestMempool } from './utils'; import { - SyncSchema, - SyncContext, - StatusEvent, - HandlerEvent, GeneratorYieldResult, + HandlerEvent, + StatusEvent, + MempoolEvent, + SyncContext, + SyncSchema, } from './types'; import logger from './logger'; import { invokeReorg } from './api/lambda'; @@ -81,6 +82,53 @@ export const syncHandler = () => (callback, onReceive) => { }; }; +// @ts-ignore +export const mempoolHandler = () => (callback, onReceive) => { + logger.debug('Mempool handler instantiated'); + const iterator = syncLatestMempool(); + const asyncCall: () => void = async () => { + for (;;) { + const txResult: GeneratorYieldResult = await iterator.next(); + const { value, done } = txResult; + + if (done) { + // The generator reached its end, we should end this handler + logger.debug('Done.', value) + break; + } + + if (value && !value.success) { + logger.error(value.message); + callback('ERROR'); + return; + } + + if (value.type === 'finished') { + logger.info('Sync generator finished.'); + callback('DONE'); + return; + } else if (value.type === 'tx_success') { + logger.info('Mempool tx synced!'); + } else { + logger.warn(`Unhandled type received from sync generator: ${value.type}`); + } + } + + return; + }; + + onReceive((e: HandlerEvent) => { + if (e.type === 'START') { + asyncCall(); + } + }); + + return () => { + logger.debug('Stopping the iterator.'); + iterator.return('finished'); + }; +}; + /* See README for an explanation on how the machine works. * TODO: We need to type the Event */ @@ -89,13 +137,49 @@ export const SyncMachine = Machine({ initial: 'idle', context: { hasMoreBlocks: false, + hasMempoolUpdate: false, }, states: { idle: { always: [ + // Conditions are tested in order, the first valid one is taken, if any are valid + // https://xstate.js.org/docs/guides/guards.html#multiple-guards { target: 'syncing', cond: 'hasMoreBlocks' }, + { target: 'mempoolsync', cond: 'hasMempoolUpdate' }, + ], + on: { + NEW_BLOCK: 'syncing', + MEMPOOL_UPDATE: 'mempoolsync', + }, + }, + mempoolsync: { + invoke: { + id: 'syncLatestMempool', + src: 'mempoolHandler', + }, + on: { + MEMPOOL_UPDATE: { + actions: ['setMempoolUpdate'], + }, + // Stop mempool sync when a block arrives + // this means that the mempool may not be fully synced when it leaves this state + // giving priority to blocks means the mempool may change between syncs + NEW_BLOCK: { + target: 'syncing', + // When block sync finishes, go back to mempool sync + actions: ['setMempoolUpdate'], + }, + STOP: 'idle', + DONE: 'idle', + // Errors on mempool sync are "ignored" since next sync (either block or mempool) should fix it + ERROR: 'idle', + }, + entry: [ + 'resetMempoolUpdate', + send('START', { + to: 'syncLatestMempool', + }), ], - on: { NEW_BLOCK: 'syncing' }, }, syncing: { invoke: { @@ -146,6 +230,7 @@ export const SyncMachine = Machine({ }, { guards: { hasMoreBlocks: (ctx) => ctx.hasMoreBlocks, + hasMempoolUpdate: (ctx) => ctx.hasMempoolUpdate, }, actions: { // @ts-ignore @@ -156,8 +241,17 @@ export const SyncMachine = Machine({ setMoreBlocks: assign({ hasMoreBlocks: () => true, }), + // @ts-ignore + resetMempoolUpdate: assign({ + hasMempoolUpdate: () => false, + }), + // @ts-ignore + setMempoolUpdate: assign({ + hasMempoolUpdate: () => true, + }), }, services: { syncHandler, + mempoolHandler, }, }); diff --git a/src/types.ts b/src/types.ts index ff503ad9..5266b07f 100644 --- a/src/types.ts +++ b/src/types.ts @@ -87,6 +87,7 @@ export interface DownloadBlockApiResponse extends ApiResponse { export interface SyncSchema { states: { idle: {}; + mempoolsync: {}; syncing: {}; failure: {}; reorg: {}; @@ -95,6 +96,7 @@ export interface SyncSchema { export interface SyncContext { hasMoreBlocks: boolean; + hasMempoolUpdate: boolean; error?: {}; } @@ -135,6 +137,26 @@ export type StatusEvent = { error?: string; } +export type MempoolEvent = { + type: 'finished'; + success: boolean; + message?: string; +} | { + type: 'tx_success'; + success: boolean; + txId: string; + message?: string; +} | { + type: 'wait'; + success: boolean; + message?: string; +} | { + type: 'error'; + success: boolean; + message?: string; + error?: string; +} + /* export interface StatusEvent { type: string; success: boolean; diff --git a/src/utils.ts b/src/utils.ts index 73c73125..97d07ae4 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -14,6 +14,7 @@ import { DecodedScript, FullTx, StatusEvent, + MempoolEvent, PreparedTx, PreparedInput, PreparedOutput, @@ -25,6 +26,7 @@ import { } from './types'; import { downloadTx, + downloadMempool, getBlockByTxId, getFullNodeBestBlock, downloadBlockByHeight, @@ -56,6 +58,31 @@ export const IGNORE_TXS: Map = new Map([ const TX_CACHE_SIZE: number = parseInt(process.env.TX_CACHE_SIZE as string) || 200; +/** + * Download and parse a tx by it's id + * + * @param txId - the id of the tx to be downloaded + */ +export const downloadTxFromId = async (txId: string): Promise => { + + const network: string = process.env.NETWORK || 'mainnet'; + + // Do not download genesis transactions + if (IGNORE_TXS.has(network)) { + const networkTxs: string[] = IGNORE_TXS.get(network) as string[]; + + if (networkTxs.includes(txId)) { + // Skip + return null; + } + } + + const txData: RawTxResponse = await downloadTx(txId); + const { tx, meta } = txData; + return parseTx(tx); + +}; + /** * Recursively downloads all transactions that were confirmed by a given block * @@ -251,6 +278,69 @@ export const parseTx = (tx: RawTx): FullTx => { return parsedTx; }; +/** + * Syncs the latest mempool + * + * @generator + * @yields {MempoolEvent} + */ +export async function* syncLatestMempool(): AsyncGenerator { + + logger.info(`Downloading mempool...`); + let mempoolResp; + try { + mempoolResp = await downloadMempool(); + } catch (e) { + yield { + success: false, + type: 'error', + message: 'Could not download from mempool api', + }; + return; + } + + for (let i = 0; i < mempoolResp.transactions.length; i++) { + const tx = await downloadTxFromId(mempoolResp.transactions[i]); + + if (tx === null) { + yield { + type: 'error', + success: false, + message: `Failure on transaction ${mempoolResp.transactions[i]} in mempool`, + }; + return; + } + + const preparedTx: PreparedTx = prepareTx(tx); + + try { + const sendTxResponse: ApiResponse = await sendTx(preparedTx); + if (!sendTxResponse.success) { + logger.error(sendTxResponse); + throw new Error(sendTxResponse.message); + } + } catch (e) { + yield { + type: 'error', + success: false, + message: `Failure on transaction ${preparedTx.tx_id} in mempool`, + }; + return; + } + + yield { + type: 'tx_success', + success: true, + txId: preparedTx.tx_id, + }; + } + + yield { + success: true, + type: 'finished', + }; +} + /** * Syncs to the latest block * diff --git a/test/machine.test.ts b/test/machine.test.ts index 5b06aa6b..9a770e15 100644 --- a/test/machine.test.ts +++ b/test/machine.test.ts @@ -242,3 +242,103 @@ test('The SyncMachine should transition to \'reorg\' state when a reorg is detec expect(syncMachine.state.value).toStrictEqual('reorg'); }, 500); + +test('Mempool: transition to \'idle\' on ERROR event', async () => { + const mockCleanupFunction = jest.fn(); + + const TestSyncMachine = SyncMachine.withConfig({ + services: { + syncHandler: () => () => { + return mockCleanupFunction; + }, + } + }); + + // @ts-ignore + const syncMachine = interpret(TestSyncMachine).start(); + + syncMachine.send({ type: 'MEMPOOL_UPDATE' }); + + expect(syncMachine.state.value).toStrictEqual('mempoolsync'); + + syncMachine.send({ type: 'ERROR' }); + + expect(syncMachine.state.value).toStrictEqual('idle'); +}, 500); + +test('Mempool: transition to \'idle\' on DONE event', async () => { + const mockCleanupFunction = jest.fn(); + + const TestSyncMachine = SyncMachine.withConfig({ + services: { + syncHandler: () => () => { + return mockCleanupFunction; + }, + } + }); + + // @ts-ignore + const syncMachine = interpret(TestSyncMachine).start(); + + syncMachine.send({ type: 'MEMPOOL_UPDATE' }); + + expect(syncMachine.state.value).toStrictEqual('mempoolsync'); + + syncMachine.send({ type: 'DONE' }); + + expect(syncMachine.state.value).toStrictEqual('idle'); +}, 500); + +test('Mempool: transition to \'idle\' on STOP event', async () => { + const mockCleanupFunction = jest.fn(); + + const TestSyncMachine = SyncMachine.withConfig({ + services: { + syncHandler: () => () => { + return mockCleanupFunction; + }, + } + }); + + // @ts-ignore + const syncMachine = interpret(TestSyncMachine).start(); + + syncMachine.send({ type: 'MEMPOOL_UPDATE' }); + + expect(syncMachine.state.value).toStrictEqual('mempoolsync'); + + syncMachine.send({ type: 'STOP' }); + + expect(syncMachine.state.value).toStrictEqual('idle'); +}, 500); + +test('Mempool: transition to \'syncing\' on NEW_BLOCK event and back to \'mempoolsync\' when DONE with block sync', async () => { + const mockCleanupFunction = jest.fn(); + + const TestSyncMachine = SyncMachine.withConfig({ + services: { + syncHandler: () => () => { + return mockCleanupFunction; + }, + } + }); + + // @ts-ignore + const syncMachine = interpret(TestSyncMachine).start(); + + syncMachine.send({ type: 'MEMPOOL_UPDATE' }); + + expect(syncMachine.state.value).toStrictEqual('mempoolsync'); + + syncMachine.send({ type: 'NEW_BLOCK' }); + + expect(syncMachine.state.value).toStrictEqual('syncing'); + + expect(syncMachine.state.context.hasMempoolUpdate).toStrictEqual(true); + + syncMachine.send({ type: 'DONE' }); + + // Will go to idle and straight back to mempoolsync, setting hasMempoolUpdate to false + expect(syncMachine.state.context.hasMempoolUpdate).toStrictEqual(false); + expect(syncMachine.state.value).toStrictEqual('mempoolsync'); +}, 500); \ No newline at end of file