Skip to content

Commit

Permalink
Develop to staging (#117)
Browse files Browse the repository at this point in the history
* updating loopback 4

* pinning mongo version

* removing comment from .env line which was breaking the import

* removing comment that is breaking env example

* making scripts use diff connection strings per instance

* simplifying data import

* making gateway wait for other services

* fixes #83; lb cannot be found due to blockchain subdomain hack

* New altruist implementation (#100)

* new altruist implementation

* adding check for blank relaypath, not just undefined

* Updating example to match realities

* pushing feature branch to staging

* fixing bad subdomain parsing

* removing subdomain hack, localhost can work with subdomains

* db test for staging

* db test for staging

* db test for staging

* db test for staging

* fixing logging of debug and wrapping axios in try catch

* testing on usw2

* fixing memory reservation for fargate

* removing excess logging, vestigal interfaces

* restoring CORS headers

* restoring CORS headers

* restoring CORS headers

* restoring CORS headers

* restoring CORS headers

* restoring push on master to production files

* updating requires to imports, resolving PR comments

* reverting logger import change

* pushing to usw2

* pointing workflows back to correct branches

* Update task for e2

* feat: add local setup of relations in timescaledb

* Adding back missing allowed headers (#112)

* new altruist implementation

* adding check for blank relaypath, not just undefined

* Updating example to match realities

* pushing feature branch to staging

* fixing bad subdomain parsing

* removing subdomain hack, localhost can work with subdomains

* db test for staging

* db test for staging

* db test for staging

* db test for staging

* fixing logging of debug and wrapping axios in try catch

* testing on usw2

* fixing memory reservation for fargate

* removing excess logging, vestigal interfaces

* restoring CORS headers

* restoring CORS headers

* restoring CORS headers

* restoring CORS headers

* restoring CORS headers

* restoring push on master to production files

* updating requires to imports, resolving PR comments

* reverting logger import change

* pushing to usw2

* pointing workflows back to correct branches

* adding back origin,ua headers

* fixes #108; uses altruist system as backup for sync check

* removing leftover cache breaker and changing it to 30 second redis cache for failure sessions

Co-authored-by: Roniel Valdez <[email protected]>
  • Loading branch information
nymd and rem1niscence authored Jul 7, 2021
1 parent 1867e1d commit 7428555
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 16 deletions.
2 changes: 1 addition & 1 deletion pocket-gateway/ecs-task-us-east-2.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"environment": [],
"command": [],
"linuxParameters": null,
"cpu": 2048,
"cpu": 4096,
"resourceRequirements": null,
"ulimits": [
{
Expand Down
10 changes: 7 additions & 3 deletions src/services/pocket-relayer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ export class PocketRelayer {
}

// Send this relay attempt
const relayResponse = await this._sendRelay(data, relayPath, httpMethod, requestID, application, requestTimeOut, blockchain, blockchainEnforceResult, blockchainSyncCheck);
const relayResponse = await this._sendRelay(data, relayPath, httpMethod, requestID, application, requestTimeOut, blockchain, blockchainEnforceResult, blockchainSyncCheck, String(this.altruists[blockchain]));

if (!(relayResponse instanceof Error)) {
// Record success metric
Expand Down Expand Up @@ -303,6 +303,7 @@ export class PocketRelayer {
blockchain: string,
blockchainEnforceResult: string,
blockchainSyncCheck: string,
blockchainSyncBackup: string,
): Promise<RelayResponse | Error> {
logger.log('info', 'RELAYING ' + blockchain + ' req: ' + data, {requestID: requestID, relayType: 'APP', typeID: application.id, serviceNode: ''});

Expand Down Expand Up @@ -366,8 +367,11 @@ export class PocketRelayer {
if (pocketSession instanceof Session) {
let nodes: Node[] = pocketSession.sessionNodes;
if (blockchainSyncCheck) {
nodes = await this.syncChecker.consensusFilter(pocketSession.sessionNodes, requestID, blockchainSyncCheck, 3, blockchain, application.id, application.gatewayAAT.applicationPublicKey, this.pocket, pocketAAT, this.pocketConfiguration);
}
nodes = await this.syncChecker.consensusFilter(pocketSession.sessionNodes, requestID, blockchainSyncCheck, 3, blockchain, blockchainSyncBackup, application.id, application.gatewayAAT.applicationPublicKey, this.pocket, pocketAAT, this.pocketConfiguration);
if (nodes.length === 0) {
return new Error('Sync check failure; using fallbacks');
}
}
node = await this.cherryPicker.cherryPickNode(application, nodes, blockchain, requestID);
}

Expand Down
70 changes: 59 additions & 11 deletions src/services/sync-checker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {Redis} from 'ioredis';
var crypto = require('crypto');

const logger = require('../services/logger');
import axios from 'axios';

export class SyncChecker {
redis: Redis;
Expand All @@ -14,7 +15,7 @@ export class SyncChecker {
this.metricsRecorder = metricsRecorder;
}

async consensusFilter(nodes: Node[], requestID: string, syncCheck: string, syncAllowance: number = 1, blockchain: string, applicationID: string, applicationPublicKey: string, pocket: Pocket, pocketAAT: PocketAAT, pocketConfiguration: Configuration): Promise<Node[]> {
async consensusFilter(nodes: Node[], requestID: string, syncCheck: string, syncAllowance: number = 1, blockchain: string, blockchainSyncBackup: string, applicationID: string, applicationPublicKey: string, pocket: Pocket, pocketAAT: PocketAAT, pocketConfiguration: Configuration): Promise<Node[]> {
let syncedNodes: Node[] = [];
let syncedNodesList: String[] = [];

Expand Down Expand Up @@ -48,34 +49,53 @@ export class SyncChecker {

// Fires all 5 sync checks synchronously then assembles the results
const nodeSyncLogs = await this.getNodeSyncLogs(nodes, requestID, syncCheck, blockchain, applicationID, applicationPublicKey, pocket, pocketAAT, pocketConfiguration);

let errorState = false;

// This should never happen
if (nodeSyncLogs.length <= 2) {
logger.log('error', 'SYNC CHECK ERROR: fewer than 3 nodes returned sync', {requestID: requestID, relayType: '', typeID: '', serviceNode: '', error: '', elapsedTime: ''});
return nodes;
errorState = true;
}

let currentBlockHeight = 0;

// Sort NodeSyncLogs by blockHeight
nodeSyncLogs.sort((a, b) => b.blockHeight - a.blockHeight);

// If top node is still 0, or not a number, return all nodes due to check failure
if (
nodeSyncLogs.length === 0 ||
nodeSyncLogs[0].blockHeight === 0 ||
typeof nodeSyncLogs[0].blockHeight !== 'number' ||
(nodeSyncLogs[0].blockHeight %1 ) !== 0
)
{
logger.log('error', 'SYNC CHECK ERROR: top synced node result is invalid ' + nodeSyncLogs[0].blockHeight, {requestID: requestID, relayType: '', typeID: '', serviceNode: '', error: '', elapsedTime: ''});
return nodes;
logger.log('error', 'SYNC CHECK ERROR: top synced node result is invalid ' + JSON.stringify(nodeSyncLogs), {requestID: requestID, relayType: '', typeID: '', serviceNode: '', error: '', elapsedTime: ''});
errorState = true;
} else {
currentBlockHeight = nodeSyncLogs[0].blockHeight;
}

// Make sure at least 2 nodes agree on current highest block to prevent one node from being wildly off
if (nodeSyncLogs[0].blockHeight > (nodeSyncLogs[1].blockHeight + syncAllowance)) {
if (
!errorState &&
nodeSyncLogs[0].blockHeight > (nodeSyncLogs[1].blockHeight + syncAllowance)
) {
logger.log('error', 'SYNC CHECK ERROR: two highest nodes could not agree on sync', {requestID: requestID, relayType: '', typeID: '', serviceNode: '', error: '', elapsedTime: ''});
return nodes;
errorState = true;
}

const currentBlockHeight = nodeSyncLogs[0].blockHeight;
if (errorState) {
// Consult Altruist for sync source of truth
currentBlockHeight = await this.getSyncFromAltruist(syncCheck, blockchainSyncBackup);
if (currentBlockHeight === 0) {
// Failure to find sync from consensus and altruist
logger.log('info', 'SYNC CHECK ALTRUIST FAILURE: ' + currentBlockHeight, {requestID: requestID, relayType: '', typeID: '', serviceNode: 'ALTRUIST', error: '', elapsedTime: ''});
return nodes;
} else {
logger.log('info', 'SYNC CHECK ALTRUIST CHECK: ' + currentBlockHeight, {requestID: requestID, relayType: '', typeID: '', serviceNode: 'ALTRUIST', error: '', elapsedTime: ''});
}
}

// Go through nodes and add all nodes that are current or within 1 block -- this allows for block processing times
for (const nodeSyncLog of nodeSyncLogs) {
Expand Down Expand Up @@ -113,7 +133,7 @@ export class SyncChecker {
syncedNodesKey,
JSON.stringify(syncedNodesList),
'EX',
300,
(syncedNodes.length > 0) ? 300 : 30, // will retry sync check every 30 seconds if no nodes are in sync
);

// If one or more nodes of this session are not in sync, fire a consensus relay with the same check.
Expand All @@ -133,10 +153,38 @@ export class SyncChecker {
);
logger.log('info', 'SYNC CHECK CHALLENGE: ' + JSON.stringify(consensusResponse), {requestID: requestID, relayType: '', typeID: '', serviceNode: '', error: '', elapsedTime: ''});
}

return syncedNodes;
}

async getSyncFromAltruist(syncCheck: string, blockchainSyncBackup: string): Promise<number> {
// Remove user/pass from the altruist URL
const redactedAltruistURL = blockchainSyncBackup.replace(/[\w]*:\/\/[^\/]*@/g, '');

try {
const syncResponse = await axios({
method: 'POST',
url: blockchainSyncBackup,
data: syncCheck,
headers: {'Content-Type': 'application/json'}
});

if (!(syncResponse instanceof Error)) {
// Return decimal version of hex result as blockHeight
return parseInt(syncResponse.data.result, 16);
}
return 0;
}
catch (e) {
logger.log('error', e.message, {
requestID: '',
relayType: 'FALLBACK',
typeID: '',
serviceNode: 'fallback:' + redactedAltruistURL,
});
}
return 0;
}

async getNodeSyncLogs(nodes: Node[], requestID: string, syncCheck: string, blockchain: string, applicationID: string, applicationPublicKey: string, pocket: Pocket, pocketAAT: PocketAAT, pocketConfiguration: Configuration): Promise<NodeSyncLog[]> {
const nodeSyncLogs: NodeSyncLog[] = [];
const promiseStack: Promise<NodeSyncLog>[] = [];
Expand Down
19 changes: 19 additions & 0 deletions stacks/local.init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- Local setup of timescaledb relations

CREATE TABLE relay (
timestamp TIMESTAMPTZ NOT NULL,
app_pub_key TEXT NOT NULL,
blockchain TEXT NOT NULL,
service_node TEXT,
elapsed_time DOUBLE PRECISION NOT NULL,
result NUMERIC,
bytes NUMERIC NOT NULL,
method TEXT
);

CREATE INDEX relay_app_pub_key_method_timestamp_idx ON relay(app_pub_key, method, timestamp);
CREATE INDEX relay_app_pub_key_result_timestamp_idx ON relay(app_pub_key, result, timestamp);
CREATE INDEX relay_app_pub_key_timestamp_idx ON relay(app_pub_key, timestamp DESC);
CREATE INDEX relay_service_node_timestamp_idx ON relay(service_node, timestamp DESC);
CREATE INDEX relay_timestamp_app_pub_key_idx ON relay(timestamp DESC, app_pub_key);
CREATE INDEX relay_timestamp_idx ON relay(timestamp DESC);
4 changes: 3 additions & 1 deletion stacks/local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ services:
- 5432:5432
networks:
- pocket

volumes:
- ./local.init.sql:/docker-entrypoint-initdb.d/init.sql

networks:
pocket:
driver: bridge

0 comments on commit 7428555

Please sign in to comment.