Skip to content
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ In this example, we are passing the env variables to the container and running a

## State Machine

The state machine diagram can be visualized at https://xstate.js.org/viz/?gist=7299c0ed0ce189bc121a06dce1e11638
The state machine diagram can be visualized at https://xstate.js.org/viz/?gist=19dd8bc6d62533add23e124ef31adb78

## States:

Expand Down
17 changes: 17 additions & 0 deletions src/api/fullnode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
} from '../types';
import axios from 'axios';
import { globalCache } from '../utils';
import logger from '../logger';

const DEFAULT_SERVER = process.env.DEFAULT_SERVER || 'https://node1.foxtrot.testnet.hathor.network/v1a/';

Expand All @@ -39,6 +40,22 @@ export const downloadTx = async (txId: string, noCache: boolean = false) => {
return response.data;
};

/**
* Returns a list of transactions on the mempool
*
*/
export const downloadMempool = async () => {
const response = await axios.get(`${DEFAULT_SERVER}mempool`);

const data = response.data;

if (!data.success) {
logger.error(data);
throw new Error('Mempool API failure');
Comment thread
r4mmer marked this conversation as resolved.
}
return data;
};

/**
* Returns a `FullBlock` downloaded from the full_node
*
Expand Down
21 changes: 12 additions & 9 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ import { Connection } from '@hathor/wallet-lib';
import logger from './logger';

// @ts-ignore
const machine = interpret(SyncMachine).start();

machine.onTransition(state => {
if (state.changed && state.value) {
logger.debug(`Transitioned to state: ${state.value}`);
const machine = interpret(SyncMachine).onTransition(state => {
if (state.changed) {
logger.debug('Transitioned to state: ', state.value);
}
});

Expand All @@ -31,11 +29,14 @@ const handleMessage = (message: any) => {
* full node's best block height
*/
case 'network:new_tx_accepted':
if (!message.is_block) return;
if (message.is_voided) return;
if (message.type === 'network:new_tx_accepted') {
machine.send({ type: 'NEW_BLOCK' });
if (!message.is_block) {
// identify the tx as a mempool tx
if (message.first_block) return;
machine.send({ type: 'MEMPOOL_UPDATE' });
return;
}
machine.send({ type: 'NEW_BLOCK' });
break;

case 'state_update':
Expand All @@ -46,10 +47,12 @@ const handleMessage = (message: any) => {
if (message.state === Connection.CONNECTED) {
machine.send({ type: 'NEW_BLOCK' });
}
break;
break;
}
};

machine.start();

const DEFAULT_SERVER = process.env.DEFAULT_SERVER;
const conn = new Connection({ network: process.env.NETWORK, servers: [DEFAULT_SERVER] });

Expand Down
106 changes: 100 additions & 6 deletions src/machine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import {
assign,
send,
} from 'xstate';
import { syncToLatestBlock } from './utils';
import { syncToLatestBlock, syncLatestMempool } from './utils';
import {
SyncSchema,
SyncContext,
StatusEvent,
HandlerEvent,
GeneratorYieldResult,
HandlerEvent,
StatusEvent,
MempoolEvent,
SyncContext,
SyncSchema,
} from './types';
import logger from './logger';
import { invokeReorg } from './api/lambda';
Expand Down Expand Up @@ -81,6 +82,53 @@ export const syncHandler = () => (callback, onReceive) => {
};
};

// @ts-ignore
export const mempoolHandler = () => (callback, onReceive) => {
logger.debug('Mempool handler instantiated');
const iterator = syncLatestMempool();
const asyncCall: () => void = async () => {
for (;;) {
const txResult: GeneratorYieldResult<MempoolEvent> = await iterator.next();
const { value, done } = txResult;

if (done) {
// The generator reached its end, we should end this handler
logger.debug('Done.', value)
break;
}

if (value && !value.success) {
logger.error(value.message);
callback('ERROR');
Comment thread
r4mmer marked this conversation as resolved.
return;
}

if (value.type === 'finished') {
logger.info('Sync generator finished.');
callback('DONE');
return;
} else if (value.type === 'tx_success') {
logger.info('Mempool tx synced!');
} else {
logger.warn(`Unhandled type received from sync generator: ${value.type}`);
}
}

return;
};

onReceive((e: HandlerEvent) => {
if (e.type === 'START') {
asyncCall();
}
});

return () => {
logger.debug('Stopping the iterator.');
iterator.return('finished');
};
};

/* See README for an explanation on how the machine works.
* TODO: We need to type the Event
*/
Expand All @@ -89,13 +137,49 @@ export const SyncMachine = Machine<SyncContext, SyncSchema>({
initial: 'idle',
context: {
hasMoreBlocks: false,
hasMempoolUpdate: false,
},
states: {
idle: {
always: [
// Conditions are tested in order, the first valid one is taken, if any are valid
// https://xstate.js.org/docs/guides/guards.html#multiple-guards
{ target: 'syncing', cond: 'hasMoreBlocks' },
{ target: 'mempoolsync', cond: 'hasMempoolUpdate' },
Comment thread
r4mmer marked this conversation as resolved.
],
on: {
NEW_BLOCK: 'syncing',
MEMPOOL_UPDATE: 'mempoolsync',
},
},
mempoolsync: {
invoke: {
id: 'syncLatestMempool',
src: 'mempoolHandler',
},
on: {
MEMPOOL_UPDATE: {
actions: ['setMempoolUpdate'],
},
// Stop mempool sync when a block arrives
// this means that the mempool may not be fully synced when it leaves this state
// giving priority to blocks means the mempool may change between syncs
NEW_BLOCK: {
target: 'syncing',
// When block sync finishes, go back to mempool sync
actions: ['setMempoolUpdate'],
},
STOP: 'idle',
DONE: 'idle',
// Errors on mempool sync are "ignored" since next sync (either block or mempool) should fix it
ERROR: 'idle',
},
entry: [
'resetMempoolUpdate',
send('START', {
to: 'syncLatestMempool',
}),
],
on: { NEW_BLOCK: 'syncing' },
},
syncing: {
invoke: {
Expand Down Expand Up @@ -146,6 +230,7 @@ export const SyncMachine = Machine<SyncContext, SyncSchema>({
}, {
guards: {
hasMoreBlocks: (ctx) => ctx.hasMoreBlocks,
hasMempoolUpdate: (ctx) => ctx.hasMempoolUpdate,
},
actions: {
// @ts-ignore
Expand All @@ -156,8 +241,17 @@ export const SyncMachine = Machine<SyncContext, SyncSchema>({
setMoreBlocks: assign({
hasMoreBlocks: () => true,
}),
// @ts-ignore
resetMempoolUpdate: assign({
hasMempoolUpdate: () => false,
}),
// @ts-ignore
setMempoolUpdate: assign({
hasMempoolUpdate: () => true,
}),
},
services: {
syncHandler,
mempoolHandler,
},
});
22 changes: 22 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export interface DownloadBlockApiResponse extends ApiResponse {
export interface SyncSchema {
states: {
idle: {};
mempoolsync: {};
syncing: {};
failure: {};
reorg: {};
Expand All @@ -95,6 +96,7 @@ export interface SyncSchema {

export interface SyncContext {
hasMoreBlocks: boolean;
hasMempoolUpdate: boolean;
error?: {};
}

Expand Down Expand Up @@ -135,6 +137,26 @@ export type StatusEvent = {
error?: string;
}

export type MempoolEvent = {
type: 'finished';
success: boolean;
message?: string;
} | {
type: 'tx_success';
success: boolean;
txId: string;
message?: string;
} | {
type: 'wait';
success: boolean;
message?: string;
} | {
type: 'error';
success: boolean;
message?: string;
error?: string;
}

/* export interface StatusEvent {
type: string;
success: boolean;
Expand Down
90 changes: 90 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
DecodedScript,
FullTx,
StatusEvent,
MempoolEvent,
PreparedTx,
PreparedInput,
PreparedOutput,
Expand All @@ -25,6 +26,7 @@ import {
} from './types';
import {
downloadTx,
downloadMempool,
getBlockByTxId,
getFullNodeBestBlock,
downloadBlockByHeight,
Expand Down Expand Up @@ -56,6 +58,31 @@ export const IGNORE_TXS: Map<string, string[]> = new Map<string, string[]>([

const TX_CACHE_SIZE: number = parseInt(process.env.TX_CACHE_SIZE as string) || 200;

/**
* Download and parse a tx by it's id
*
* @param txId - the id of the tx to be downloaded
*/
export const downloadTxFromId = async (txId: string): Promise<FullTx | null> => {

const network: string = process.env.NETWORK || 'mainnet';

// Do not download genesis transactions
if (IGNORE_TXS.has(network)) {
const networkTxs: string[] = IGNORE_TXS.get(network) as string[];

if (networkTxs.includes(txId)) {
// Skip
return null;
}
}

const txData: RawTxResponse = await downloadTx(txId);
const { tx, meta } = txData;
return parseTx(tx);

};

/**
* Recursively downloads all transactions that were confirmed by a given block
*
Expand Down Expand Up @@ -251,6 +278,69 @@ export const parseTx = (tx: RawTx): FullTx => {
return parsedTx;
};

/**
* Syncs the latest mempool
*
* @generator
* @yields {MempoolEvent}
*/
export async function* syncLatestMempool(): AsyncGenerator<MempoolEvent> {

logger.info(`Downloading mempool...`);
let mempoolResp;
try {
mempoolResp = await downloadMempool();
} catch (e) {
yield {
success: false,
type: 'error',
message: 'Could not download from mempool api',
};
return;
}

for (let i = 0; i < mempoolResp.transactions.length; i++) {
Comment thread
r4mmer marked this conversation as resolved.
const tx = await downloadTxFromId(mempoolResp.transactions[i]);

if (tx === null) {
yield {
type: 'error',
success: false,
message: `Failure on transaction ${mempoolResp.transactions[i]} in mempool`,
};
return;
}

const preparedTx: PreparedTx = prepareTx(tx);

try {
const sendTxResponse: ApiResponse = await sendTx(preparedTx);
if (!sendTxResponse.success) {
logger.error(sendTxResponse);
throw new Error(sendTxResponse.message);
}
} catch (e) {
yield {
type: 'error',
success: false,
message: `Failure on transaction ${preparedTx.tx_id} in mempool`,
};
return;
}

yield {
type: 'tx_success',
success: true,
txId: preparedTx.tx_id,
};
}

yield {
success: true,
type: 'finished',
};
}

/**
* Syncs to the latest block
*
Comment thread
r4mmer marked this conversation as resolved.
Expand Down
Loading