Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docker-compose.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ services:
- POSTGRES_CONNECTION_POOL_SIZE # The connection pool size for postgres (defaults to 20)
- POSTGRES_USE_SSL # Set to anything to indicate that SSL should be used in the connection
# L1 Node
- L1_NODE_INFURA_NETWORK=rinkeby # The Infura network for the connection to the L1 node
- L1_NODE_INFURA_NETWORK=goerli # The Infura network for the connection to the L1 node
- L1_NODE_INFURA_PROJECT_ID=91981cfffb524ceca0c2e5b18905f9f5 # The Infura project ID for the connection to the L1 node
- L1_NODE_WEB3_URL # The URL of the L1 node
- FINALITY_DELAY_IN_BLOCKS=1 # The number of block confirmations required to consider a transaction final on L1
Expand Down
120 changes: 46 additions & 74 deletions packages/core-db/src/app/ethereum/ethereum-block-processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const blockKey: Buffer = Buffer.from('latestBlock')
*/
export class EthereumBlockProcessor {
private readonly subscriptions: Set<EthereumListener<Block>>
private currentBlockNumber: number
private currentFinalizedBlockNumber: number

private syncInProgress: boolean
private syncCompleted: boolean
Expand All @@ -27,10 +27,13 @@ export class EthereumBlockProcessor {
private readonly confirmsUntilFinal: number = 1
) {
this.subscriptions = new Set<EthereumListener<Block>>()
this.currentBlockNumber = 0
this.currentFinalizedBlockNumber = 0

this.syncInProgress = false
this.syncCompleted = false
if (earliestBlock < 0) {
throw Error('Earliest block must be >= 0')
}
}

/**
Expand All @@ -51,25 +54,33 @@ export class EthereumBlockProcessor {

provider.on('block', async (blockNumber) => {
try {
if (blockNumber < this.earliestBlock) {
const finalizedBlockNumber = this.getBlockFinalizedBy(blockNumber)

if (finalizedBlockNumber < this.earliestBlock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok 👍

log.debug(
`Received block [${blockNumber}] which is before earliest block [${this.earliestBlock}]. Ignoring...`
`Received block [${blockNumber}] which finalizes a block ${finalizedBlockNumber}, before earliest block [${this.earliestBlock}]. Ignoring...`
)
return
}

log.debug(`Block [${blockNumber}] was mined!`)
log.debug(
`Block [${blockNumber}] was mined! Finalizing block ${finalizedBlockNumber}`
)

await this.fetchAndDisseminateBlock(provider, blockNumber)
this.currentBlockNumber = blockNumber
await this.fetchAndDisseminateBlock(provider, finalizedBlockNumber)
this.currentFinalizedBlockNumber = finalizedBlockNumber

if (!syncPastBlocks || this.syncCompleted) {
await this.storeLastProcessedBlockNumber(this.currentBlockNumber)
await this.storeLastProcessedBlockNumber(
this.currentFinalizedBlockNumber
)
}
} catch (e) {
logError(
log,
`Error thrown processing block ${blockNumber}. Exiting since throwing will not be caught.`,
`Error thrown processing block ${blockNumber}, finalizing block ${this.getBlockFinalizedBy(
blockNumber
)}. Exiting since throwing will not be caught.`,
e
)
process.exit(1)
Expand Down Expand Up @@ -100,62 +111,9 @@ export class EthereumBlockProcessor {
blockNumber: number
): Promise<void> {
log.debug(`Fetching block [${blockNumber}].`)
let block: Block = await provider.getBlock(blockNumber, true)
const block: Block = await provider.getBlock(blockNumber, true)
log.debug(`Received block: ${block.number}.`)

if (
this.confirmsUntilFinal > 1 &&
!!block.transactions &&
!!block.transactions.length
) {
log.debug(
`Waiting for ${this.confirmsUntilFinal} confirms before disseminating block ${blockNumber}`
)
try {
let refetchedHash: string
if (block.transactions.length > 0) {
const receipt: TransactionReceipt = await provider.waitForTransaction(
(block.transactions[0] as any).hash,
this.confirmsUntilFinal
)
refetchedHash = receipt.blockHash
} else {
while (
(await provider.getBlockNumber()) <
blockNumber + this.confirmsUntilFinal
) {
log.debug(
`Waiting for empty block ${blockNumber} to be final. Sleeping...`
)
await sleep(15_000)
}
const refetched = await provider.getBlock(blockNumber)
refetchedHash = refetched.hash
}

if (refetchedHash !== block.hash) {
log.info(
`Re-org processing block number ${blockNumber}. Re-fetching block.`
)
return this.fetchAndDisseminateBlock(provider, blockNumber)
}
} catch (e) {
logError(
log,
`Error waiting for ${this.confirmsUntilFinal} confirms on block ${blockNumber}`,
e
)
// Cannot silently fail here because syncing will move on as if this block was processed.
throw e
}

log.debug(
`Received ${this.confirmsUntilFinal} confirms for block ${blockNumber}. Refetching block`
)

block = await provider.getBlock(blockNumber, true)
}

this.subscriptions.forEach((h) => {
try {
// purposefully ignore promise
Expand All @@ -181,21 +139,22 @@ export class EthereumBlockProcessor {
log.debug(`Syncing blocks.`)
const lastSynced = await this.getLastSyncedBlockNumber()
const syncStart = Math.max(lastSynced + 1, this.earliestBlock)
const mostRecentBlock = await this.getBlockNumber(provider)
const mostRecentFinalBlock = this.getBlockFinalizedBy(mostRecentBlock)

log.debug(
`Starting sync with block ${syncStart}. Last synced: ${lastSynced}, earliest block: ${this.earliestBlock}.`
`Starting sync with block ${syncStart}. Last synced: ${lastSynced}, earliest block: ${this.earliestBlock}, most recent un-finalized block: ${mostRecentBlock}, most recent finalized block: ${mostRecentFinalBlock}.`
)

const blockNumber = await this.getBlockNumber(provider)

if (blockNumber === syncStart) {
if (mostRecentFinalBlock < syncStart) {
log.debug(`Up to date, not syncing.`)
this.finishSync(blockNumber, blockNumber)
this.finishSync(mostRecentFinalBlock, mostRecentFinalBlock)
return
}

for (let i = syncStart; i <= blockNumber; i++) {
for (let i = syncStart; i <= mostRecentFinalBlock; i++) {
try {
log.debug(`Syncing past blocks: Fetching and disseminating block ${i}`)
await this.fetchAndDisseminateBlock(provider, i)
} catch (e) {
logError(log, `Error fetching and disseminating block. Retrying...`, e)
Expand All @@ -205,7 +164,7 @@ export class EthereumBlockProcessor {
await this.storeLastProcessedBlockNumber(i)
}

this.finishSync(syncStart, blockNumber)
this.finishSync(syncStart, mostRecentFinalBlock)
}

private finishSync(syncStart: number, currentBlock: number): void {
Expand Down Expand Up @@ -234,12 +193,25 @@ export class EthereumBlockProcessor {
* @returns The current block number
*/
private async getBlockNumber(provider: Provider): Promise<number> {
if (this.currentBlockNumber === 0) {
this.currentBlockNumber = await provider.getBlockNumber()
if (this.currentFinalizedBlockNumber === 0) {
this.currentFinalizedBlockNumber = await provider.getBlockNumber()
}

log.debug(`Current block number: ${this.currentBlockNumber}`)
return this.currentBlockNumber
log.debug(`Current block number: ${this.currentFinalizedBlockNumber}`)
return this.currentFinalizedBlockNumber
}

/**
* Gets the block number finalized by the block with the provided number.
*
* @param finalizingBlock The block number that finalizes the returned block number
* @returns The block number finalized by the provided block number.
*/
private getBlockFinalizedBy(finalizingBlock: number): number {
if (this.confirmsUntilFinal <= 1) {
return finalizingBlock
}
return finalizingBlock - (this.confirmsUntilFinal - 1)
}

/**
Expand Down
Loading