Skip to content

Commit

Permalink
Merge the develop branch to the master branch, preparation to v3.7.0
Browse files Browse the repository at this point in the history
This merge contains the following set of changes:
  * [Oracle, Improvement] Periodic check for RPC sync state (#656)
  • Loading branch information
akolotov authored May 25, 2022
2 parents bcf1614 + 297cb67 commit ff9f3fb
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 59 deletions.
2 changes: 2 additions & 0 deletions CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ ORACLE_JSONRPC_ERROR_CODES | Override default JSON rpc error codes that can trig
ORACLE_HOME_EVENTS_REPROCESSING | If set to `true`, home events happened in the past will be refetched and processed once again, to ensure that nothing was missed on the first pass. | `bool`
ORACLE_HOME_EVENTS_REPROCESSING_BATCH_SIZE | Batch size for one `eth_getLogs` request when reprocessing old logs in the home chain. Defaults to `1000` | `integer`
ORACLE_HOME_EVENTS_REPROCESSING_BLOCK_DELAY | Block confirmations number, after which old logs are being reprocessed in the home chain. Defaults to `500` | `integer`
ORACLE_HOME_RPC_SYNC_STATE_CHECK_INTERVAL | Interval for checking JSON RPC sync state, by requesting the latest block number. Oracle will switch to the fallback JSON RPC in case sync process is stuck | `integer`
ORACLE_FOREIGN_EVENTS_REPROCESSING | If set to `true`, foreign events happened in the past will be refetched and processed once again, to ensure that nothing was missed on the first pass. | `bool`
ORACLE_FOREIGN_EVENTS_REPROCESSING_BATCH_SIZE | Batch size for one `eth_getLogs` request when reprocessing old logs in the foreign chain. Defaults to `500` | `integer`
ORACLE_FOREIGN_EVENTS_REPROCESSING_BLOCK_DELAY | Block confirmations number, after which old logs are being reprocessed in the foreign chain. Defaults to `250` | `integer`
ORACLE_FOREIGN_RPC_SYNC_STATE_CHECK_INTERVAL | Interval for checking JSON RPC sync state, by requesting the latest block number. Oracle will switch to the fallback JSON RPC in case sync process is stuck | `integer`


## Monitor configuration
Expand Down
21 changes: 19 additions & 2 deletions oracle/config/base.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@ const {
HOME_AMB_ABI,
FOREIGN_AMB_ABI
} = require('../../commons')
const { web3Home, web3Foreign } = require('../src/services/web3')
const {
web3Home,
web3Foreign,
web3HomeRedundant,
web3HomeFallback,
web3ForeignRedundant,
web3ForeignFallback,
web3ForeignArchive
} = require('../src/services/web3')
const { add0xPrefix, privateKeyToAddress } = require('../src/utils/utils')
const { EXIT_CODES } = require('../src/utils/constants')

Expand All @@ -27,9 +35,11 @@ const {
ORACLE_HOME_EVENTS_REPROCESSING,
ORACLE_HOME_EVENTS_REPROCESSING_BATCH_SIZE,
ORACLE_HOME_EVENTS_REPROCESSING_BLOCK_DELAY,
ORACLE_HOME_RPC_SYNC_STATE_CHECK_INTERVAL,
ORACLE_FOREIGN_EVENTS_REPROCESSING,
ORACLE_FOREIGN_EVENTS_REPROCESSING_BATCH_SIZE,
ORACLE_FOREIGN_EVENTS_REPROCESSING_BLOCK_DELAY
ORACLE_FOREIGN_EVENTS_REPROCESSING_BLOCK_DELAY,
ORACLE_FOREIGN_RPC_SYNC_STATE_CHECK_INTERVAL
} = process.env

let homeAbi
Expand Down Expand Up @@ -63,9 +73,12 @@ const homeConfig = {
bridgeAddress: COMMON_HOME_BRIDGE_ADDRESS,
bridgeABI: homeAbi,
pollingInterval: parseInt(ORACLE_HOME_RPC_POLLING_INTERVAL, 10),
syncCheckInterval: parseInt(ORACLE_HOME_RPC_SYNC_STATE_CHECK_INTERVAL, 10) || 60000,
startBlock: parseInt(ORACLE_HOME_START_BLOCK, 10) || 0,
blockPollingLimit: parseInt(ORACLE_HOME_RPC_BLOCK_POLLING_LIMIT, 10),
web3: web3Home,
web3Redundant: web3HomeRedundant,
web3Fallback: web3HomeFallback,
bridgeContract: homeContract,
eventContract: homeContract,
reprocessingOptions: {
Expand All @@ -81,9 +94,13 @@ const foreignConfig = {
bridgeAddress: COMMON_FOREIGN_BRIDGE_ADDRESS,
bridgeABI: foreignAbi,
pollingInterval: parseInt(ORACLE_FOREIGN_RPC_POLLING_INTERVAL, 10),
syncCheckInterval: parseInt(ORACLE_FOREIGN_RPC_SYNC_STATE_CHECK_INTERVAL, 10) || 60000,
startBlock: parseInt(ORACLE_FOREIGN_START_BLOCK, 10) || 0,
blockPollingLimit: parseInt(ORACLE_FOREIGN_RPC_BLOCK_POLLING_LIMIT, 10),
web3: web3Foreign,
web3Redundant: web3ForeignRedundant,
web3Fallback: web3ForeignFallback,
web3Archive: web3ForeignArchive || web3Foreign,
bridgeContract: foreignContract,
eventContract: foreignContract,
reprocessingOptions: {
Expand Down
5 changes: 1 addition & 4 deletions oracle/config/foreign-sender.config.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
const baseConfig = require('./base.config')

const { DEFAULT_TRANSACTION_RESEND_INTERVAL } = require('../src/utils/constants')
const { web3Foreign, web3ForeignRedundant, web3ForeignFallback } = require('../src/services/web3')

const { ORACLE_FOREIGN_TX_RESEND_INTERVAL } = process.env

module.exports = {
...baseConfig,
main: baseConfig.foreign,
queue: 'foreign-prioritized',
id: 'foreign',
name: 'sender-foreign',
web3: web3Foreign,
web3Redundant: web3ForeignRedundant,
web3Fallback: web3ForeignFallback,
resendInterval: parseInt(ORACLE_FOREIGN_TX_RESEND_INTERVAL, 10) || DEFAULT_TRANSACTION_RESEND_INTERVAL
}
5 changes: 1 addition & 4 deletions oracle/config/home-sender.config.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
const baseConfig = require('./base.config')

const { DEFAULT_TRANSACTION_RESEND_INTERVAL } = require('../src/utils/constants')
const { web3Home, web3HomeRedundant, web3HomeFallback } = require('../src/services/web3')

const { ORACLE_HOME_TX_RESEND_INTERVAL } = process.env

module.exports = {
...baseConfig,
main: baseConfig.home,
queue: 'home-prioritized',
id: 'home',
name: 'sender-home',
web3: web3Home,
web3Redundant: web3HomeRedundant,
web3Fallback: web3HomeFallback,
resendInterval: parseInt(ORACLE_HOME_TX_RESEND_INTERVAL, 10) || DEFAULT_TRANSACTION_RESEND_INTERVAL
}
2 changes: 0 additions & 2 deletions oracle/config/information-request-watcher.config.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
const baseConfig = require('./base.config')
const { web3ForeignArchive } = require('../src/services/web3')

const id = `${baseConfig.id}-information-request`

module.exports = {
...baseConfig,
web3ForeignArchive: web3ForeignArchive || baseConfig.foreign.web3,
main: baseConfig.home,
event: 'UserRequestForInformation',
sender: 'home',
Expand Down
14 changes: 9 additions & 5 deletions oracle/src/events/processAMBInformationRequests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ Object.keys(asyncCalls).forEach(method => {
})

function processInformationRequestsBuilder(config) {
const { home, foreign, web3ForeignArchive } = config
const { home, foreign } = config

let validatorContract = null
let blockFinder = null

foreign.web3Archive.currentProvider.startSyncStateChecker(foreign.syncCheckInterval)

return async function processInformationRequests(informationRequests) {
const txToSend = []

Expand All @@ -49,13 +51,15 @@ function processInformationRequestsBuilder(config) {

if (blockFinder === null) {
rootLogger.debug('Initializing block finder')
blockFinder = await makeBlockFinder('foreign', foreign.web3)
blockFinder = await makeBlockFinder('foreign', foreign.web3Archive)
}

// latest foreign block is requested from an archive RPC, to ensure that it is synced with the network
// block confirmations can be requested from the regular JSON RPC
const foreignBlockNumber =
(await getBlockNumber(foreign.web3)) - (await getRequiredBlockConfirmations(foreign.bridgeContract))
(await getBlockNumber(foreign.web3Archive)) - (await getRequiredBlockConfirmations(foreign.bridgeContract))
const homeBlock = await getBlock(home.web3, informationRequests[0].blockNumber)
const lastForeignBlock = await getBlock(foreign.web3, foreignBlockNumber)
const lastForeignBlock = await getBlock(foreign.web3Archive, foreignBlockNumber)

if (homeBlock.timestamp > lastForeignBlock.timestamp) {
rootLogger.debug(
Expand Down Expand Up @@ -85,7 +89,7 @@ function processInformationRequestsBuilder(config) {
logger.info({ requestSelector, method: asyncCallMethod, data }, 'Processing async request')

const call = asyncCalls[asyncCallMethod]
let [status, result] = await call(web3ForeignArchive, data, foreignClosestBlock).catch(e => {
let [status, result] = await call(foreign.web3Archive, data, foreignClosestBlock).catch(e => {
if (e instanceof HttpListProviderError) {
throw e
}
Expand Down
5 changes: 3 additions & 2 deletions oracle/src/sender.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ if (process.argv.length < 3) {

const config = require(path.join('../config/', process.argv[2]))

const { web3, web3Fallback } = config
const web3Redundant = ORACLE_TX_REDUNDANCY === 'true' ? config.web3Redundant : web3
const { web3, web3Fallback, syncCheckInterval } = config.main
const web3Redundant = ORACLE_TX_REDUNDANCY === 'true' ? config.main.web3Redundant : web3

const nonceKey = `${config.id}:nonce`
let chainId = 0
Expand All @@ -43,6 +43,7 @@ async function initialize() {
const checkHttps = checkHTTPS(process.env.ORACLE_ALLOW_HTTP_FOR_RPC, logger)

web3.currentProvider.urls.forEach(checkHttps(config.id))
web3.currentProvider.startSyncStateChecker(syncCheckInterval)

GasPrice.start(config.id, web3)

Expand Down
50 changes: 41 additions & 9 deletions oracle/src/services/HttpListProvider.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const fetch = require('node-fetch')
const promiseRetry = require('promise-retry')
const { utils } = require('web3')
const { FALLBACK_RPC_URL_SWITCH_TIMEOUT } = require('../utils/constants')

const { onInjected } = require('./injectedLogger')
Expand Down Expand Up @@ -39,19 +40,54 @@ function HttpListProvider(urls, options = {}) {
this.options = { ...defaultOptions, ...options }
this.currentIndex = 0
this.lastTimeUsedPrimary = 0
this.latestBlock = 0
this.syncStateCheckerIntervalId = 0

onInjected(logger => {
this.logger = logger.child({ module: `HttpListProvider:${this.options.name}` })
})
}

HttpListProvider.prototype.switchToFallbackRPC = function() {
if (this.urls.length < 2) {
return
HttpListProvider.prototype.startSyncStateChecker = function(syncCheckInterval) {
if (this.urls.length > 1 && syncCheckInterval > 0 && this.syncStateCheckerIntervalId === 0) {
this.syncStateCheckerIntervalId = setInterval(this.checkLatestBlock.bind(this), syncCheckInterval)
}
}

HttpListProvider.prototype.checkLatestBlock = function() {
const payload = { jsonrpc: '2.0', id: 1, method: 'eth_blockNumber', params: [] }
this.send(payload, (error, result) => {
if (error) {
this.logger.warn({ oldBlock: this.latestBlock }, 'Failed to request latest block from all RPC urls')
} else if (result.error) {
this.logger.warn(
{ oldBlock: this.latestBlock, error: result.error.message },
'Failed to make eth_blockNumber request due to unknown error, switching to fallback RPC'
)
this.switchToFallbackRPC()
} else {
const blockNumber = utils.hexToNumber(result.result)
if (blockNumber > this.latestBlock) {
this.logger.debug({ oldBlock: this.latestBlock, newBlock: blockNumber }, 'Updating latest block number')
this.latestBlock = blockNumber
} else {
this.logger.warn(
{ oldBlock: this.latestBlock, newBlock: blockNumber },
'Latest block on the node was not updated since last request, switching to fallback RPC'
)
this.switchToFallbackRPC()
}
}
})
}

HttpListProvider.prototype.switchToFallbackRPC = function(index) {
const prevIndex = this.currentIndex
const newIndex = (prevIndex + 1) % this.urls.length
const newIndex = index || (prevIndex + 1) % this.urls.length
if (this.urls.length < 2 || prevIndex === newIndex) {
return
}

this.logger.info(
{ index: newIndex, oldURL: this.urls[prevIndex], newURL: this.urls[newIndex] },
'Switching to fallback JSON-RPC URL'
Expand Down Expand Up @@ -80,11 +116,7 @@ HttpListProvider.prototype.send = async function send(payload, callback) {

// if some of URLs failed to respond, current URL index is updated to the first URL that responded
if (currentIndex !== index) {
this.logger.info(
{ index, oldURL: this.urls[currentIndex], newURL: this.urls[index] },
'Switching to fallback JSON-RPC URL'
)
this.currentIndex = index
this.switchToFallbackRPC(index)
}
callback(null, result)
} catch (e) {
Expand Down
1 change: 0 additions & 1 deletion oracle/src/utils/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ module.exports = {
MIN_GAS_PRICE_BUMP_FACTOR: 0.1,
DEFAULT_TRANSACTION_RESEND_INTERVAL: 20 * 60 * 1000,
FALLBACK_RPC_URL_SWITCH_TIMEOUT: 60 * 60 * 1000,
BLOCK_NUMBER_PROGRESS_ITERATIONS_LIMIT: 10,
SENDER_QUEUE_MAX_PRIORITY: 10,
SENDER_QUEUE_SEND_PRIORITY: 5,
SENDER_QUEUE_CHECK_STATUS_PRIORITY: 1,
Expand Down
34 changes: 4 additions & 30 deletions oracle/src/watcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@ const logger = require('./services/logger')
const { getShutdownFlag } = require('./services/shutdownState')
const { getBlockNumber, getRequiredBlockConfirmations, getEvents } = require('./tx/web3')
const { checkHTTPS, watchdog } = require('./utils/utils')
const {
EXIT_CODES,
BLOCK_NUMBER_PROGRESS_ITERATIONS_LIMIT,
MAX_HISTORY_BLOCK_TO_REPROCESS
} = require('./utils/constants')
const { EXIT_CODES, MAX_HISTORY_BLOCK_TO_REPROCESS } = require('./utils/constants')

if (process.argv.length < 3) {
logger.error('Please check the number of arguments, config file was not provided')
Expand Down Expand Up @@ -38,21 +34,21 @@ const {
pollingInterval,
chain,
reprocessingOptions,
blockPollingLimit
blockPollingLimit,
syncCheckInterval
} = config.main
const lastBlockRedisKey = `${config.id}:lastProcessedBlock`
const lastReprocessedBlockRedisKey = `${config.id}:lastReprocessedBlock`
const seenEventsRedisKey = `${config.id}:seenEvents`
let lastProcessedBlock = Math.max(startBlock - 1, 0)
let lastReprocessedBlock
let lastSeenBlockNumber = 0
let sameBlockNumberCounter = 0

async function initialize() {
try {
const checkHttps = checkHTTPS(process.env.ORACLE_ALLOW_HTTP_FOR_RPC, logger)

web3.currentProvider.urls.forEach(checkHttps(chain))
web3.currentProvider.startSyncStateChecker(syncCheckInterval)

await getLastProcessedBlock()
await getLastReprocessedBlock()
Expand Down Expand Up @@ -225,28 +221,6 @@ async function getLastBlockToProcess(web3, bridgeContract) {
getBlockNumber(web3),
getRequiredBlockConfirmations(bridgeContract)
])

if (lastBlockNumber < lastSeenBlockNumber) {
sameBlockNumberCounter = 0
logger.warn({ lastBlockNumber, lastSeenBlockNumber }, 'Received block number less than already seen block')
web3.currentProvider.switchToFallbackRPC()
} else if (lastBlockNumber === lastSeenBlockNumber) {
sameBlockNumberCounter++
if (sameBlockNumberCounter > 1) {
logger.info({ lastBlockNumber, sameBlockNumberCounter }, 'Received the same block number more than twice')
if (sameBlockNumberCounter >= BLOCK_NUMBER_PROGRESS_ITERATIONS_LIMIT) {
sameBlockNumberCounter = 0
logger.warn(
{ lastBlockNumber, n: BLOCK_NUMBER_PROGRESS_ITERATIONS_LIMIT },
'Received the same block number for too many times. Probably node is not synced anymore'
)
web3.currentProvider.switchToFallbackRPC()
}
}
} else {
sameBlockNumberCounter = 0
lastSeenBlockNumber = lastBlockNumber
}
return lastBlockNumber - requiredBlockConfirmations
}

Expand Down

0 comments on commit ff9f3fb

Please sign in to comment.