diff --git a/docker-compose.example.yml b/docker-compose.example.yml index ca6d7ca8559e0..39978b8f2415a 100644 --- a/docker-compose.example.yml +++ b/docker-compose.example.yml @@ -32,7 +32,7 @@ services: - POSTGRES_CONNECTION_POOL_SIZE # The connection pool size for postgres (defaults to 20) - POSTGRES_USE_SSL # Set to anything to indicate that SSL should be used in the connection # L1 Node - - L1_NODE_INFURA_NETWORK=rinkeby # The Infura network for the connection to the L1 node + - L1_NODE_INFURA_NETWORK=goerli # The Infura network for the connection to the L1 node - L1_NODE_INFURA_PROJECT_ID=91981cfffb524ceca0c2e5b18905f9f5 # The Infura project ID for the connection to the L1 node - L1_NODE_WEB3_URL # The URL of the L1 node - FINALITY_DELAY_IN_BLOCKS=1 # The number of block confirmations required to consider a transaction final on L1 diff --git a/packages/core-db/src/app/ethereum/ethereum-block-processor.ts b/packages/core-db/src/app/ethereum/ethereum-block-processor.ts index 2232c98046b6b..86d81774b16af 100644 --- a/packages/core-db/src/app/ethereum/ethereum-block-processor.ts +++ b/packages/core-db/src/app/ethereum/ethereum-block-processor.ts @@ -16,7 +16,7 @@ const blockKey: Buffer = Buffer.from('latestBlock') */ export class EthereumBlockProcessor { private readonly subscriptions: Set> - private currentBlockNumber: number + private currentFinalizedBlockNumber: number private syncInProgress: boolean private syncCompleted: boolean @@ -27,10 +27,13 @@ export class EthereumBlockProcessor { private readonly confirmsUntilFinal: number = 1 ) { this.subscriptions = new Set>() - this.currentBlockNumber = 0 + this.currentFinalizedBlockNumber = 0 this.syncInProgress = false this.syncCompleted = false + if (earliestBlock < 0) { + throw Error('Earliest block must be >= 0') + } } /** @@ -51,25 +54,33 @@ export class EthereumBlockProcessor { provider.on('block', async (blockNumber) => { try { - if (blockNumber < this.earliestBlock) { + const finalizedBlockNumber = this.getBlockFinalizedBy(blockNumber) + + if (finalizedBlockNumber < this.earliestBlock) { log.debug( - `Received block [${blockNumber}] which is before earliest block [${this.earliestBlock}]. Ignoring...` + `Received block [${blockNumber}] which finalizes a block ${finalizedBlockNumber}, before earliest block [${this.earliestBlock}]. Ignoring...` ) return } - log.debug(`Block [${blockNumber}] was mined!`) + log.debug( + `Block [${blockNumber}] was mined! Finalizing block ${finalizedBlockNumber}` + ) - await this.fetchAndDisseminateBlock(provider, blockNumber) - this.currentBlockNumber = blockNumber + await this.fetchAndDisseminateBlock(provider, finalizedBlockNumber) + this.currentFinalizedBlockNumber = finalizedBlockNumber if (!syncPastBlocks || this.syncCompleted) { - await this.storeLastProcessedBlockNumber(this.currentBlockNumber) + await this.storeLastProcessedBlockNumber( + this.currentFinalizedBlockNumber + ) } } catch (e) { logError( log, - `Error thrown processing block ${blockNumber}. Exiting since throwing will not be caught.`, + `Error thrown processing block ${blockNumber}, finalizing block ${this.getBlockFinalizedBy( + blockNumber + )}. Exiting since throwing will not be caught.`, e ) process.exit(1) @@ -100,62 +111,9 @@ export class EthereumBlockProcessor { blockNumber: number ): Promise { log.debug(`Fetching block [${blockNumber}].`) - let block: Block = await provider.getBlock(blockNumber, true) + const block: Block = await provider.getBlock(blockNumber, true) log.debug(`Received block: ${block.number}.`) - if ( - this.confirmsUntilFinal > 1 && - !!block.transactions && - !!block.transactions.length - ) { - log.debug( - `Waiting for ${this.confirmsUntilFinal} confirms before disseminating block ${blockNumber}` - ) - try { - let refetchedHash: string - if (block.transactions.length > 0) { - const receipt: TransactionReceipt = await provider.waitForTransaction( - (block.transactions[0] as any).hash, - this.confirmsUntilFinal - ) - refetchedHash = receipt.blockHash - } else { - while ( - (await provider.getBlockNumber()) < - blockNumber + this.confirmsUntilFinal - ) { - log.debug( - `Waiting for empty block ${blockNumber} to be final. Sleeping...` - ) - await sleep(15_000) - } - const refetched = await provider.getBlock(blockNumber) - refetchedHash = refetched.hash - } - - if (refetchedHash !== block.hash) { - log.info( - `Re-org processing block number ${blockNumber}. Re-fetching block.` - ) - return this.fetchAndDisseminateBlock(provider, blockNumber) - } - } catch (e) { - logError( - log, - `Error waiting for ${this.confirmsUntilFinal} confirms on block ${blockNumber}`, - e - ) - // Cannot silently fail here because syncing will move on as if this block was processed. - throw e - } - - log.debug( - `Received ${this.confirmsUntilFinal} confirms for block ${blockNumber}. Refetching block` - ) - - block = await provider.getBlock(blockNumber, true) - } - this.subscriptions.forEach((h) => { try { // purposefully ignore promise @@ -181,21 +139,22 @@ export class EthereumBlockProcessor { log.debug(`Syncing blocks.`) const lastSynced = await this.getLastSyncedBlockNumber() const syncStart = Math.max(lastSynced + 1, this.earliestBlock) + const mostRecentBlock = await this.getBlockNumber(provider) + const mostRecentFinalBlock = this.getBlockFinalizedBy(mostRecentBlock) log.debug( - `Starting sync with block ${syncStart}. Last synced: ${lastSynced}, earliest block: ${this.earliestBlock}.` + `Starting sync with block ${syncStart}. Last synced: ${lastSynced}, earliest block: ${this.earliestBlock}, most recent un-finalized block: ${mostRecentBlock}, most recent finalized block: ${mostRecentFinalBlock}.` ) - const blockNumber = await this.getBlockNumber(provider) - - if (blockNumber === syncStart) { + if (mostRecentFinalBlock < syncStart) { log.debug(`Up to date, not syncing.`) - this.finishSync(blockNumber, blockNumber) + this.finishSync(mostRecentFinalBlock, mostRecentFinalBlock) return } - for (let i = syncStart; i <= blockNumber; i++) { + for (let i = syncStart; i <= mostRecentFinalBlock; i++) { try { + log.debug(`Syncing past blocks: Fetching and disseminating block ${i}`) await this.fetchAndDisseminateBlock(provider, i) } catch (e) { logError(log, `Error fetching and disseminating block. Retrying...`, e) @@ -205,7 +164,7 @@ export class EthereumBlockProcessor { await this.storeLastProcessedBlockNumber(i) } - this.finishSync(syncStart, blockNumber) + this.finishSync(syncStart, mostRecentFinalBlock) } private finishSync(syncStart: number, currentBlock: number): void { @@ -234,12 +193,25 @@ export class EthereumBlockProcessor { * @returns The current block number */ private async getBlockNumber(provider: Provider): Promise { - if (this.currentBlockNumber === 0) { - this.currentBlockNumber = await provider.getBlockNumber() + if (this.currentFinalizedBlockNumber === 0) { + this.currentFinalizedBlockNumber = await provider.getBlockNumber() } - log.debug(`Current block number: ${this.currentBlockNumber}`) - return this.currentBlockNumber + log.debug(`Current block number: ${this.currentFinalizedBlockNumber}`) + return this.currentFinalizedBlockNumber + } + + /** + * Gets the block number finalized by the block with the provided number. + * + * @param finalizingBlock The block number that finalizes the returned block number + * @returns The block number finalized by the provided block number. + */ + private getBlockFinalizedBy(finalizingBlock: number): number { + if (this.confirmsUntilFinal <= 1) { + return finalizingBlock + } + return finalizingBlock - (this.confirmsUntilFinal - 1) } /** diff --git a/packages/core-db/test/app/ethereum/ethereum-block-processor.spec.ts b/packages/core-db/test/app/ethereum/ethereum-block-processor.spec.ts index 8c624e2f3a274..b413ca58b6d82 100644 --- a/packages/core-db/test/app/ethereum/ethereum-block-processor.spec.ts +++ b/packages/core-db/test/app/ethereum/ethereum-block-processor.spec.ts @@ -1,7 +1,7 @@ import '../../setup' /* External Imports */ -import { getLogger, sleep } from '@eth-optimism/core-utils' +import { getLogger } from '@eth-optimism/core-utils' import { Block } from 'ethers/providers' import { createMockProvider, getWallets } from 'ethereum-waffle' @@ -27,100 +27,280 @@ describe('Block Subscription', () => { let blockListener: TestListener let tokenContract - beforeEach(async () => { - provider = createMockProvider() - wallets = getWallets(provider) - ownerWallet = wallets[0] - recipientWallet = wallets[1] + describe('Instant finalization', () => { + beforeEach(async () => { + provider = createMockProvider() + wallets = getWallets(provider) + ownerWallet = wallets[0] + recipientWallet = wallets[1] - log.debug(`Connection info: ${JSON.stringify(provider.connection)}`) + log.debug(`Connection info: ${JSON.stringify(provider.connection)}`) - tokenContract = await deployTokenContract(ownerWallet, initialSupply) + tokenContract = await deployTokenContract(ownerWallet, initialSupply) - db = newInMemoryDB() - blockProcessor = new EthereumBlockProcessor(db) - blockListener = new TestListener() + db = newInMemoryDB() + blockProcessor = new EthereumBlockProcessor(db) + blockListener = new TestListener() + }) + + it('processes new blocks', async () => { + await blockProcessor.subscribe(provider, blockListener, false) + + await tokenContract.transfer( + ownerWallet.address, + recipientWallet.address, + sendAmount + ) + + const blocks: Block[] = await blockListener.waitForReceive(1) + + blocks.length.should.equal(1) + blocks[0].transactions.length.should.equal(1) + }).timeout(timeout) + + it('processes old blocks', async () => { + await blockProcessor.subscribe(provider, blockListener) + + const blocks: Block[] = await blockListener.waitForSyncToComplete() + + blocks + .map((x) => x.number) + .sort() + .should.deep.equal([0, 1], `Incorrect blocks received!`) + }).timeout(timeout) + + it('honors earliest block', async () => { + blockProcessor = new EthereumBlockProcessor(db, 1) + await blockProcessor.subscribe(provider, blockListener) + + const blocks: Block[] = await blockListener.waitForSyncToComplete() + + blocks.length.should.equal(1, 'There should only be one block synced!') + blocks[0].number.should.equal(1, 'Block 1 should have been finalized!') + blocks[0].transactions.length.should.equal( + 1, + 'There should be 1 transactions in block 1' + ) + const deployToAddressEmpty = !(blocks[0].transactions[0] as any).to + deployToAddressEmpty.should.equal( + true, + 'The "to" address for the deploy tx should be null' + ) + }).timeout(timeout) + + it('processes blocks starting at 1', async () => { + blockProcessor = new EthereumBlockProcessor(db, 1) + await blockProcessor.subscribe(provider, blockListener) + + let blocks: Block[] = await blockListener.waitForSyncToComplete() + blocks.length.should.equal(1, 'Block 1 should have arrived') + + await tokenContract.transfer( + ownerWallet.address, + recipientWallet.address, + sendAmount * 2 + ) + + blocks = await blockListener.waitForReceive(2) + blocks.length.should.equal(2, `Incorrect number of blocks received!`) + + blocks + .map((x) => x.number) + .sort() + .should.deep.equal([1, 2], `Incorrect blocks received!`) + blocks + .filter((x) => x.number === 2)[0] + .transactions.length.should.equal(1, `Tx Length incorrect!`) + }).timeout(timeout) + + it('processes old and new blocks', async () => { + await blockProcessor.subscribe(provider, blockListener) + + await tokenContract.transfer( + ownerWallet.address, + recipientWallet.address, + sendAmount * 2 + ) + + const blocks: Block[] = await blockListener.waitForReceive(3) + + blocks.length.should.equal(3, `Incorrect number of blocks received!`) + + blocks + .map((x) => x.number) + .sort() + .should.deep.equal([0, 1, 2], `Incorrect blocks received!`) + blocks + .filter((x) => x.number === 2)[0] + .transactions.length.should.equal(1, `Tx Length incorrect!`) + }).timeout(timeout) }) - it('processes new blocks', async () => { - await blockProcessor.subscribe(provider, blockListener, false) + describe('Delayed finalization', () => { + const confirmsUntilFinal = 2 + beforeEach(async () => { + provider = createMockProvider() + wallets = getWallets(provider) + ownerWallet = wallets[0] + recipientWallet = wallets[1] + + log.debug(`Connection info: ${JSON.stringify(provider.connection)}`) + + db = newInMemoryDB() + blockProcessor = new EthereumBlockProcessor(db, 0, confirmsUntilFinal) + blockListener = new TestListener() + }) + + it('does not process un-finalized block', async () => { + await blockProcessor.subscribe(provider, blockListener, false) + + const blocks: Block[] = await blockListener.waitForReceive(1, 5_000) + + blocks.length.should.equal(0) + }).timeout(timeout) + + it('finalizes blocks after enough confirms', async () => { + await blockProcessor.subscribe(provider, blockListener, false) + + tokenContract = await deployTokenContract(ownerWallet, initialSupply) - await tokenContract.transfer( - ownerWallet.address, - recipientWallet.address, - sendAmount - ) + const blocks: Block[] = await blockListener.waitForReceive(1, 5_000) - const blocks: Block[] = await blockListener.waitForReceive(1) + blocks.length.should.equal(1, 'Should have received 1 finalized block!') + blocks[0].number.should.equal(0, 'Block 0 should have been finalized!') + blocks[0].transactions.length.should.equal( + 0, + 'There should be 0 transactions in block 0' + ) + }).timeout(timeout) - blocks.length.should.equal(1) - blocks[0].transactions.length.should.equal(1) - }).timeout(timeout) + it('finalizes multiple blocks after enough confirms', async () => { + await blockProcessor.subscribe(provider, blockListener, false) - it('processes old blocks', async () => { - await blockProcessor.subscribe(provider, blockListener) + tokenContract = await deployTokenContract(ownerWallet, initialSupply) + await tokenContract.transfer( + ownerWallet.address, + recipientWallet.address, + sendAmount * 2 + ) - const blocks: Block[] = await blockListener.waitForSyncToComplete() + const blocks: Block[] = await blockListener.waitForReceive(2, 5_000) - blocks - .map((x) => x.number) - .sort() - .should.deep.equal([0, 1], `Incorrect blocks received!`) - }).timeout(timeout) + blocks.length.should.equal(2, 'Should have received 2 finalized block!') + blocks[0].number.should.equal(0, 'Block 0 should have been finalized!') + blocks[0].transactions.length.should.equal( + 0, + 'There should be 0 transactions in block 0' + ) - it('honors earliest block', async () => { - blockProcessor = new EthereumBlockProcessor(db, 1) - await blockProcessor.subscribe(provider, blockListener) + blocks[1].number.should.equal(1, 'Block 1 should have been finalized!') + blocks[1].transactions.length.should.equal( + 1, + 'There should be 1 transactions in block 1' + ) + const deployToAddressEmpty = !(blocks[1].transactions[0] as any).to + deployToAddressEmpty.should.equal( + true, + 'The "to" address for the deploy tx should be null' + ) + }).timeout(timeout) - const blocks: Block[] = await blockListener.waitForSyncToComplete() + describe('Syncing past blocks', () => { + it('does not finalize past blocks if not final', async () => { + await blockProcessor.subscribe(provider, blockListener, true) - blocks.length.should.equal(0) - }).timeout(timeout) + const blocks: Block[] = await blockListener.waitForReceive(1, 5_000) - it('processes blocks starting at 1', async () => { - blockProcessor = new EthereumBlockProcessor(db, 1) - await blockProcessor.subscribe(provider, blockListener) + blocks.length.should.equal(0, 'Should not have finalized a block!') + }).timeout(timeout) - let blocks: Block[] = await blockListener.waitForSyncToComplete() - blocks.length.should.equal(0) + it('finalizes past block', async () => { + tokenContract = await deployTokenContract(ownerWallet, initialSupply) - await tokenContract.transfer( - ownerWallet.address, - recipientWallet.address, - sendAmount * 2 - ) + await blockProcessor.subscribe(provider, blockListener, true) - blocks = await blockListener.waitForReceive(2) - blocks.length.should.equal(2, `Incorrect number of blocks received!`) + const blocks: Block[] = await blockListener.waitForReceive(1, 5_000) - blocks - .map((x) => x.number) - .sort() - .should.deep.equal([1, 2], `Incorrect blocks received!`) - blocks - .filter((x) => x.number === 2)[0] - .transactions.length.should.equal(1, `Tx Length incorrect!`) - }).timeout(timeout) + blocks.length.should.equal(1, 'Should have received 1 finalized block!') + blocks[0].number.should.equal(0, 'Block 0 should have been finalized!') + blocks[0].transactions.length.should.equal( + 0, + 'There should be 0 transactions in block 0' + ) + }).timeout(timeout) - it('processes old and new blocks', async () => { - await blockProcessor.subscribe(provider, blockListener) + it('finalizes past and future blocks', async () => { + tokenContract = await deployTokenContract(ownerWallet, initialSupply) - await tokenContract.transfer( - ownerWallet.address, - recipientWallet.address, - sendAmount * 2 - ) + await blockProcessor.subscribe(provider, blockListener, true) + await tokenContract.transfer( + ownerWallet.address, + recipientWallet.address, + sendAmount * 2 + ) - const blocks: Block[] = await blockListener.waitForReceive(3) + const blocks: Block[] = await blockListener.waitForReceive(2, 5_000) - blocks.length.should.equal(3, `Incorrect number of blocks received!`) + blocks.length.should.equal(2, 'Should have received 2 finalized block!') + blocks[0].number.should.equal(0, 'Block 0 should have been finalized!') + blocks[0].transactions.length.should.equal( + 0, + 'There should be 0 transactions in block 0' + ) - blocks - .map((x) => x.number) - .sort() - .should.deep.equal([0, 1, 2], `Incorrect blocks received!`) - blocks - .filter((x) => x.number === 2)[0] - .transactions.length.should.equal(1, `Tx Length incorrect!`) - }).timeout(timeout) + blocks[1].number.should.equal(1, 'Block 1 should have been finalized!') + blocks[1].transactions.length.should.equal( + 1, + 'There should be 1 transactions in block 1' + ) + const deployToAddressEmpty = !(blocks[1].transactions[0] as any).to + deployToAddressEmpty.should.equal( + true, + 'The "to" address for the deploy tx should be null' + ) + }).timeout(timeout) + + describe('Future earliest block', () => { + it('does not finalize blocks before the earliest block', async () => { + tokenContract = await deployTokenContract(ownerWallet, initialSupply) + + blockProcessor = new EthereumBlockProcessor(db, 1, confirmsUntilFinal) + await blockProcessor.subscribe(provider, blockListener) + + const blocks: Block[] = await blockListener.waitForSyncToComplete() + blocks.length.should.equal(0) + }).timeout(timeout) + + it('finalizes blocks after the earliest block', async () => { + tokenContract = await deployTokenContract(ownerWallet, initialSupply) + + await tokenContract.transfer( + ownerWallet.address, + recipientWallet.address, + sendAmount * 2 + ) + + blockProcessor = new EthereumBlockProcessor(db, 1, confirmsUntilFinal) + await blockProcessor.subscribe(provider, blockListener) + + const blocks: Block[] = await blockListener.waitForSyncToComplete() + blocks.length.should.equal(1, `Incorrect number of blocks received!`) + + blocks[0].number.should.equal( + 1, + 'Block 1 should have been finalized!' + ) + blocks[0].transactions.length.should.equal( + 1, + 'There should be 1 transactions in block 1' + ) + const deployToAddressEmpty = !(blocks[0].transactions[0] as any).to + deployToAddressEmpty.should.equal( + true, + 'The "to" address for the deploy tx should be null' + ) + }) + }) + }) + }) }) diff --git a/packages/core-db/test/app/ethereum/utils.ts b/packages/core-db/test/app/ethereum/utils.ts index f3dfbf2a690b7..aa73b4682a467 100644 --- a/packages/core-db/test/app/ethereum/utils.ts +++ b/packages/core-db/test/app/ethereum/utils.ts @@ -36,12 +36,12 @@ export class TestListener implements EthereumListener { } public async waitForReceive( - num: number = 1, + numberToReceive: number = 1, timeoutMillis: number = -1 ): Promise { const startTime = new Date().getTime() while ( - this.received.length < num && + this.received.length < numberToReceive && (timeoutMillis < 0 || new Date().getTime() - startTime < timeoutMillis) ) { await sleep(this.sleepMillis)