Skip to content

Commit

Permalink
fixes #72; sync check for avax and other nodes that need relay paths
Browse files Browse the repository at this point in the history
  • Loading branch information
nymd committed Jul 7, 2021
1 parent 77abc8e commit 08f5e5f
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 33 deletions.
34 changes: 12 additions & 22 deletions src/services/pocket-relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {BlockchainsRepository} from '../repositories';
import {Applications} from '../models';
import {RelayError} from '../errors/relay-error';
import AatPlans from '../config/aat-plans.json';
import {checkEnforcementJSON} from '../utils';

import { JSONObject } from '@loopback/context';

Expand Down Expand Up @@ -109,7 +110,7 @@ export class PocketRelayer {
if (relayRetries !== undefined && relayRetries >= 0) {
this.relayRetries = relayRetries;
}
const [blockchain, blockchainEnforceResult, blockchainSyncCheck] = await this.loadBlockchain();
const [blockchain, blockchainEnforceResult, blockchainSyncCheck, blockchainSyncCheckPath] = await this.loadBlockchain();
const overallStart = process.hrtime();

// This converts the raw data into formatted JSON then back to a string for relaying.
Expand Down Expand Up @@ -145,7 +146,7 @@ export class PocketRelayer {
}

// Send this relay attempt
const relayResponse = await this._sendRelay(data, relayPath, httpMethod, requestID, application, requestTimeOut, blockchain, blockchainEnforceResult, blockchainSyncCheck, String(this.altruists[blockchain]));
const relayResponse = await this._sendRelay(data, relayPath, httpMethod, requestID, application, requestTimeOut, blockchain, blockchainEnforceResult, blockchainSyncCheck, blockchainSyncCheckPath, String(this.altruists[blockchain]));

if (!(relayResponse instanceof Error)) {
// Record success metric
Expand Down Expand Up @@ -303,6 +304,7 @@ export class PocketRelayer {
blockchain: string,
blockchainEnforceResult: string,
blockchainSyncCheck: string,
blockchainSyncCheckPath: string,
blockchainSyncBackup: string,
): Promise<RelayResponse | Error> {
logger.log('info', 'RELAYING ' + blockchain + ' req: ' + data, {requestID: requestID, relayType: 'APP', typeID: application.id, serviceNode: ''});
Expand Down Expand Up @@ -367,7 +369,7 @@ export class PocketRelayer {
if (pocketSession instanceof Session) {
let nodes: Node[] = pocketSession.sessionNodes;
if (blockchainSyncCheck) {
nodes = await this.syncChecker.consensusFilter(pocketSession.sessionNodes, requestID, blockchainSyncCheck, 3, blockchain, blockchainSyncBackup, application.id, application.gatewayAAT.applicationPublicKey, this.pocket, pocketAAT, this.pocketConfiguration);
nodes = await this.syncChecker.consensusFilter(pocketSession.sessionNodes, requestID, blockchainSyncCheck, blockchainSyncCheckPath, 3, blockchain, blockchainSyncBackup, application.id, application.gatewayAAT.applicationPublicKey, this.pocket, pocketAAT, this.pocketConfiguration);
if (nodes.length === 0) {
return new Error('Sync check failure; using fallbacks');
}
Expand Down Expand Up @@ -430,7 +432,7 @@ export class PocketRelayer {
blockchainEnforceResult && // Is this blockchain marked for result enforcement // and
blockchainEnforceResult.toLowerCase() === 'json' && // the check is for JSON // and
(
!this.checkEnforcementJSON(relayResponse.payload) || // the relay response is not valid JSON // or
!checkEnforcementJSON(relayResponse.payload) || // the relay response is not valid JSON // or
relayResponse.payload.startsWith('{"error"') // the full payload is an error
)
) {
Expand Down Expand Up @@ -525,6 +527,7 @@ export class PocketRelayer {
if (blockchainFilter[0]) {
let blockchainEnforceResult = '';
let blockchainSyncCheck = '';
let blockchainSyncCheckPath = '';
const blockchain = blockchainFilter[0].hash as string;

// Record the necessary format for the result; example: JSON
Expand All @@ -535,29 +538,16 @@ export class PocketRelayer {
if (blockchainFilter[0].syncCheck) {
blockchainSyncCheck = blockchainFilter[0].syncCheck.replace(/\\"/g, '"');
}
return Promise.resolve([blockchain, blockchainEnforceResult, blockchainSyncCheck]);
// Sync Check path necessary for some chains
if (blockchainFilter[0].syncCheckPath) {
blockchainSyncCheckPath = blockchainFilter[0].syncCheckPath;
}
return Promise.resolve([blockchain, blockchainEnforceResult, blockchainSyncCheck, blockchainSyncCheckPath]);
} else {
throw new HttpErrors.BadRequest('Incorrect blockchain: ' + this.host);
}
}

// Check relay result: JSON
checkEnforcementJSON(test: string): boolean {
if (!test || test.length === 0) {
return false;
}
// Code from: https://github.com/prototypejs/prototype/blob/560bb59414fc9343ce85429b91b1e1b82fdc6812/src/prototype/lang/string.js#L699
// Prototype lib
if (/^\s*$/.test(test)) return false;
test = test.replace(/\\(?:["\\\/bfnrt]|u[0-9a-fA-F]{4})/g, '@');
test = test.replace(
/"[^"\\\n\r]*"|true|false|null|-?\d+(?:\.\d*)?(?:[eE][+\-]?\d+)?/g,
']',
);
test = test.replace(/(?:^|:|,)(?:\s*\[)+/g, '');
return /^[\],:{}\s]*$/.test(test);
}

checkSecretKey(application: Applications): boolean {
// Check secretKey; is it required? does it pass? -- temp allowance for unencrypted keys
const decryptor = new Decryptor({key: this.databaseEncryptionKey});
Expand Down
26 changes: 15 additions & 11 deletions src/services/sync-checker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {Configuration, HTTPMethod, Node, Pocket, PocketAAT, RelayResponse} from '@pokt-network/pocket-js';
import {MetricsRecorder} from '../services/metrics-recorder';
import {Redis} from 'ioredis';
import {checkEnforcementJSON} from '../utils';
var crypto = require('crypto');

const logger = require('../services/logger');
Expand All @@ -15,13 +16,13 @@ export class SyncChecker {
this.metricsRecorder = metricsRecorder;
}

async consensusFilter(nodes: Node[], requestID: string, syncCheck: string, syncAllowance: number = 1, blockchain: string, blockchainSyncBackup: string, applicationID: string, applicationPublicKey: string, pocket: Pocket, pocketAAT: PocketAAT, pocketConfiguration: Configuration): Promise<Node[]> {
async consensusFilter(nodes: Node[], requestID: string, syncCheck: string, syncCheckPath: string, syncAllowance: number = 1, blockchain: string, blockchainSyncBackup: string, applicationID: string, applicationPublicKey: string, pocket: Pocket, pocketAAT: PocketAAT, pocketConfiguration: Configuration): Promise<Node[]> {
let syncedNodes: Node[] = [];
let syncedNodesList: String[] = [];

// Key is "blockchain - a hash of the all the nodes in this session, sorted by public key"
// Value is an array of node public keys that have passed sync checks for this session in the past 5 minutes
const syncedNodesKey = blockchain + '-' + crypto.createHash('sha256').update(JSON.stringify(nodes.sort((a,b) => (a.publicKey > b.publicKey) ? 1 : ((b.publicKey > a.publicKey) ? -1 : 0)), (k, v) => k != 'publicKey' ? v : undefined)).digest('hex');
const syncedNodesKey = blockchain + String(Math.round()) + '-' + crypto.createHash('sha256').update(JSON.stringify(nodes.sort((a,b) => (a.publicKey > b.publicKey) ? 1 : ((b.publicKey > a.publicKey) ? -1 : 0)), (k, v) => k != 'publicKey' ? v : undefined)).digest('hex');
const syncedNodesCached = await this.redis.get(syncedNodesKey);

if (syncedNodesCached) {
Expand All @@ -48,7 +49,7 @@ export class SyncChecker {
}

// Fires all 5 sync checks synchronously then assembles the results
const nodeSyncLogs = await this.getNodeSyncLogs(nodes, requestID, syncCheck, blockchain, applicationID, applicationPublicKey, pocket, pocketAAT, pocketConfiguration);
const nodeSyncLogs = await this.getNodeSyncLogs(nodes, requestID, syncCheck, syncCheckPath, blockchain, applicationID, applicationPublicKey, pocket, pocketAAT, pocketConfiguration);
let errorState = false;

// This should never happen
Expand Down Expand Up @@ -87,7 +88,7 @@ export class SyncChecker {

if (errorState) {
// Consult Altruist for sync source of truth
currentBlockHeight = await this.getSyncFromAltruist(syncCheck, blockchainSyncBackup);
currentBlockHeight = await this.getSyncFromAltruist(syncCheck, syncCheckPath, blockchainSyncBackup);
if (currentBlockHeight === 0) {
// Failure to find sync from consensus and altruist
logger.log('info', 'SYNC CHECK ALTRUIST FAILURE: ' + currentBlockHeight, {requestID: requestID, relayType: '', typeID: '', serviceNode: 'ALTRUIST', error: '', elapsedTime: ''});
Expand Down Expand Up @@ -156,14 +157,14 @@ export class SyncChecker {
return syncedNodes;
}

async getSyncFromAltruist(syncCheck: string, blockchainSyncBackup: string): Promise<number> {
async getSyncFromAltruist(syncCheck: string, syncCheckPath: string, blockchainSyncBackup: string): Promise<number> {
// Remove user/pass from the altruist URL
const redactedAltruistURL = blockchainSyncBackup.replace(/[\w]*:\/\/[^\/]*@/g, '');

try {
const syncResponse = await axios({
method: 'POST',
url: blockchainSyncBackup,
url: `${blockchainSyncBackup}${syncCheckPath}`,
data: syncCheck,
headers: {'Content-Type': 'application/json'}
});
Expand All @@ -185,7 +186,7 @@ export class SyncChecker {
return 0;
}

async getNodeSyncLogs(nodes: Node[], requestID: string, syncCheck: string, blockchain: string, applicationID: string, applicationPublicKey: string, pocket: Pocket, pocketAAT: PocketAAT, pocketConfiguration: Configuration): Promise<NodeSyncLog[]> {
async getNodeSyncLogs(nodes: Node[], requestID: string, syncCheck: string, syncCheckPath: string, blockchain: string, applicationID: string, applicationPublicKey: string, pocket: Pocket, pocketAAT: PocketAAT, pocketConfiguration: Configuration): Promise<NodeSyncLog[]> {
const nodeSyncLogs: NodeSyncLog[] = [];
const promiseStack: Promise<NodeSyncLog>[] = [];

Expand All @@ -194,7 +195,7 @@ export class SyncChecker {

for (const node of nodes) {
promiseStack.push(
this.getNodeSyncLog(node, requestID, syncCheck, blockchain, applicationID, applicationPublicKey, pocket, pocketAAT, pocketConfiguration)
this.getNodeSyncLog(node, requestID, syncCheck, syncCheckPath, blockchain, applicationID, applicationPublicKey, pocket, pocketAAT, pocketConfiguration)
);
}

Expand All @@ -211,7 +212,7 @@ export class SyncChecker {
return nodeSyncLogs;
}

async getNodeSyncLog(node: Node, requestID: string, syncCheck: string, blockchain: string, applicationID: string, applicationPublicKey: string, pocket: Pocket, pocketAAT: PocketAAT, pocketConfiguration: Configuration): Promise<NodeSyncLog> {
async getNodeSyncLog(node: Node, requestID: string, syncCheck: string, syncCheckPath: string, blockchain: string, applicationID: string, applicationPublicKey: string, pocket: Pocket, pocketAAT: PocketAAT, pocketConfiguration: Configuration): Promise<NodeSyncLog> {

logger.log('info', 'SYNC CHECK START', {requestID: requestID, relayType: '', typeID: '', serviceNode: node.publicKey, error: '', elapsedTime: ''});

Expand All @@ -225,13 +226,16 @@ export class SyncChecker {
this.updateConfigurationTimeout(pocketConfiguration),
undefined,
'POST' as HTTPMethod,
undefined,
syncCheckPath,
node,
false,
'synccheck'
);

if (relayResponse instanceof RelayResponse) {
if (
relayResponse instanceof RelayResponse &&
checkEnforcementJSON(relayResponse.payload)
) {
const payload = JSON.parse(relayResponse.payload);

// Create a NodeSyncLog for each node with current block
Expand Down
1 change: 1 addition & 0 deletions src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './json';
15 changes: 15 additions & 0 deletions src/utils/json.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export function checkEnforcementJSON(test: string): boolean {
if (!test || test.length === 0) {
return false;
}
// Code from: https://github.com/prototypejs/prototype/blob/560bb59414fc9343ce85429b91b1e1b82fdc6812/src/prototype/lang/string.js#L699
// Prototype lib
if (/^\s*$/.test(test)) return false;
test = test.replace(/\\(?:["\\\/bfnrt]|u[0-9a-fA-F]{4})/g, '@');
test = test.replace(
/"[^"\\\n\r]*"|true|false|null|-?\d+(?:\.\d*)?(?:[eE][+\-]?\d+)?/g,
']',
);
test = test.replace(/(?:^|:|,)(?:\s*\[)+/g, '');
return /^[\],:{}\s]*$/.test(test);
}

0 comments on commit 08f5e5f

Please sign in to comment.