Skip to content

Commit

Permalink
fixes #108; uses altruist system as backup for sync check
Browse files Browse the repository at this point in the history
  • Loading branch information
nymd committed Jul 7, 2021
1 parent 4dd8faa commit 7dcc52f
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 14 deletions.
10 changes: 7 additions & 3 deletions src/services/pocket-relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ export class PocketRelayer {
}

// Send this relay attempt
const relayResponse = await this._sendRelay(data, relayPath, httpMethod, requestID, application, requestTimeOut, blockchain, blockchainEnforceResult, blockchainSyncCheck);
const relayResponse = await this._sendRelay(data, relayPath, httpMethod, requestID, application, requestTimeOut, blockchain, blockchainEnforceResult, blockchainSyncCheck, String(this.altruists[blockchain]));

if (!(relayResponse instanceof Error)) {
// Record success metric
Expand Down Expand Up @@ -303,6 +303,7 @@ export class PocketRelayer {
blockchain: string,
blockchainEnforceResult: string,
blockchainSyncCheck: string,
blockchainSyncBackup: string,
): Promise<RelayResponse | Error> {
logger.log('info', 'RELAYING ' + blockchain + ' req: ' + data, {requestID: requestID, relayType: 'APP', typeID: application.id, serviceNode: ''});

Expand Down Expand Up @@ -366,8 +367,11 @@ export class PocketRelayer {
if (pocketSession instanceof Session) {
let nodes: Node[] = pocketSession.sessionNodes;
if (blockchainSyncCheck) {
nodes = await this.syncChecker.consensusFilter(pocketSession.sessionNodes, requestID, blockchainSyncCheck, 3, blockchain, application.id, application.gatewayAAT.applicationPublicKey, this.pocket, pocketAAT, this.pocketConfiguration);
}
nodes = await this.syncChecker.consensusFilter(pocketSession.sessionNodes, requestID, blockchainSyncCheck, 3, blockchain, blockchainSyncBackup, application.id, application.gatewayAAT.applicationPublicKey, this.pocket, pocketAAT, this.pocketConfiguration);
if (nodes.length === 0) {
return new Error('Sync check failure; using fallbacks');
}
}
node = await this.cherryPicker.cherryPickNode(application, nodes, blockchain, requestID);
}

Expand Down
70 changes: 59 additions & 11 deletions src/services/sync-checker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {Redis} from 'ioredis';
var crypto = require('crypto');

const logger = require('../services/logger');
import axios from 'axios';

export class SyncChecker {
redis: Redis;
Expand All @@ -14,13 +15,13 @@ export class SyncChecker {
this.metricsRecorder = metricsRecorder;
}

async consensusFilter(nodes: Node[], requestID: string, syncCheck: string, syncAllowance: number = 1, blockchain: string, applicationID: string, applicationPublicKey: string, pocket: Pocket, pocketAAT: PocketAAT, pocketConfiguration: Configuration): Promise<Node[]> {
async consensusFilter(nodes: Node[], requestID: string, syncCheck: string, syncAllowance: number = 1, blockchain: string, blockchainSyncBackup: string, applicationID: string, applicationPublicKey: string, pocket: Pocket, pocketAAT: PocketAAT, pocketConfiguration: Configuration): Promise<Node[]> {
let syncedNodes: Node[] = [];
let syncedNodesList: String[] = [];

// Key is "blockchain - a hash of the all the nodes in this session, sorted by public key"
// Value is an array of node public keys that have passed sync checks for this session in the past 5 minutes
const syncedNodesKey = blockchain + '-' + crypto.createHash('sha256').update(JSON.stringify(nodes.sort((a,b) => (a.publicKey > b.publicKey) ? 1 : ((b.publicKey > a.publicKey) ? -1 : 0)), (k, v) => k != 'publicKey' ? v : undefined)).digest('hex');
const syncedNodesKey = blockchain + String(Math.random());'-' + crypto.createHash('sha256').update(JSON.stringify(nodes.sort((a,b) => (a.publicKey > b.publicKey) ? 1 : ((b.publicKey > a.publicKey) ? -1 : 0)), (k, v) => k != 'publicKey' ? v : undefined)).digest('hex');
const syncedNodesCached = await this.redis.get(syncedNodesKey);

if (syncedNodesCached) {
Expand Down Expand Up @@ -48,34 +49,53 @@ export class SyncChecker {

// Fires all 5 sync checks synchronously then assembles the results
const nodeSyncLogs = await this.getNodeSyncLogs(nodes, requestID, syncCheck, blockchain, applicationID, applicationPublicKey, pocket, pocketAAT, pocketConfiguration);

let errorState = false;

// This should never happen
if (nodeSyncLogs.length <= 2) {
logger.log('error', 'SYNC CHECK ERROR: fewer than 3 nodes returned sync', {requestID: requestID, relayType: '', typeID: '', serviceNode: '', error: '', elapsedTime: ''});
return nodes;
errorState = true;
}

let currentBlockHeight = 0;

// Sort NodeSyncLogs by blockHeight
nodeSyncLogs.sort((a, b) => b.blockHeight - a.blockHeight);

// If top node is still 0, or not a number, return all nodes due to check failure
if (
nodeSyncLogs.length === 0 ||
nodeSyncLogs[0].blockHeight === 0 ||
typeof nodeSyncLogs[0].blockHeight !== 'number' ||
(nodeSyncLogs[0].blockHeight %1 ) !== 0
)
{
logger.log('error', 'SYNC CHECK ERROR: top synced node result is invalid ' + nodeSyncLogs[0].blockHeight, {requestID: requestID, relayType: '', typeID: '', serviceNode: '', error: '', elapsedTime: ''});
return nodes;
logger.log('error', 'SYNC CHECK ERROR: top synced node result is invalid ' + JSON.stringify(nodeSyncLogs), {requestID: requestID, relayType: '', typeID: '', serviceNode: '', error: '', elapsedTime: ''});
errorState = true;
} else {
currentBlockHeight = nodeSyncLogs[0].blockHeight;
}

// Make sure at least 2 nodes agree on current highest block to prevent one node from being wildly off
if (nodeSyncLogs[0].blockHeight > (nodeSyncLogs[1].blockHeight + syncAllowance)) {
if (
!errorState &&
nodeSyncLogs[0].blockHeight > (nodeSyncLogs[1].blockHeight + syncAllowance)
) {
logger.log('error', 'SYNC CHECK ERROR: two highest nodes could not agree on sync', {requestID: requestID, relayType: '', typeID: '', serviceNode: '', error: '', elapsedTime: ''});
return nodes;
errorState = true;
}

const currentBlockHeight = nodeSyncLogs[0].blockHeight;
if (errorState) {
// Consult Altruist for sync source of truth
currentBlockHeight = await this.getSyncFromAltruist(syncCheck, blockchainSyncBackup);
if (currentBlockHeight === 0) {
// Failure to find sync from consensus and altruist
logger.log('info', 'SYNC CHECK ALTRUIST FAILURE: ' + currentBlockHeight, {requestID: requestID, relayType: '', typeID: '', serviceNode: 'ALTRUIST', error: '', elapsedTime: ''});
return nodes;
} else {
logger.log('info', 'SYNC CHECK ALTRUIST CHECK: ' + currentBlockHeight, {requestID: requestID, relayType: '', typeID: '', serviceNode: 'ALTRUIST', error: '', elapsedTime: ''});
}
}

// Go through nodes and add all nodes that are current or within 1 block -- this allows for block processing times
for (const nodeSyncLog of nodeSyncLogs) {
Expand Down Expand Up @@ -133,10 +153,38 @@ export class SyncChecker {
);
logger.log('info', 'SYNC CHECK CHALLENGE: ' + JSON.stringify(consensusResponse), {requestID: requestID, relayType: '', typeID: '', serviceNode: '', error: '', elapsedTime: ''});
}

return syncedNodes;
}

async getSyncFromAltruist(syncCheck: string, blockchainSyncBackup: string): Promise<number> {
// Remove user/pass from the altruist URL
const redactedAltruistURL = blockchainSyncBackup.replace(/[\w]*:\/\/[^\/]*@/g, '');

try {
const syncResponse = await axios({
method: 'POST',
url: blockchainSyncBackup,
data: syncCheck,
headers: {'Content-Type': 'application/json'}
});

if (!(syncResponse instanceof Error)) {
// Return decimal version of hex result as blockHeight
return parseInt(syncResponse.data.result, 16);
}
return 0;
}
catch (e) {
logger.log('error', e.message, {
requestID: '',
relayType: 'FALLBACK',
typeID: '',
serviceNode: 'fallback:' + redactedAltruistURL,
});
}
return 0;
}

async getNodeSyncLogs(nodes: Node[], requestID: string, syncCheck: string, blockchain: string, applicationID: string, applicationPublicKey: string, pocket: Pocket, pocketAAT: PocketAAT, pocketConfiguration: Configuration): Promise<NodeSyncLog[]> {
const nodeSyncLogs: NodeSyncLog[] = [];
const promiseStack: Promise<NodeSyncLog>[] = [];
Expand Down

0 comments on commit 7dcc52f

Please sign in to comment.