Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ describe('L2BlockStream', () => {

await blockStream.work();
expect(blockSource.getPublishedBlocks).toHaveBeenCalledTimes(5);
expect(handler.callCount).toEqual(5);
expect(handler.events).toEqual([
{ type: 'blocks-added', blocks: times(10, i => makeBlock(i + 1)) },
{ type: 'blocks-added', blocks: times(10, i => makeBlock(i + 11)) },
Expand All @@ -99,6 +100,19 @@ describe('L2BlockStream', () => {
expect(handler.events).toEqual([{ type: 'blocks-added', blocks: times(10, i => makeBlock(i + 1)) }]);
});

it('halts on handler error and retries', async () => {
setRemoteTips(45);

handler.throwing = true;
await blockStream.work();
expect(handler.callCount).toEqual(1);

handler.throwing = false;
await blockStream.work();
expect(handler.callCount).toEqual(6);
expect(handler.events).toHaveLength(5);
});

it('handles a reorg and requests blocks from new tip', async () => {
setRemoteTips(45);
localData.latest.number = 40;
Expand Down Expand Up @@ -133,8 +147,14 @@ describe('L2BlockStream', () => {

class TestL2BlockStreamEventHandler implements L2BlockStreamEventHandler {
public readonly events: L2BlockStreamEvent[] = [];
public throwing: boolean = false;
public callCount: number = 0;

handleBlockStreamEvent(event: L2BlockStreamEvent): Promise<void> {
this.callCount++;
if (this.throwing) {
throw new Error('Handler error');
}
this.events.push(event);
return Promise.resolve();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ export class L2BlockStream {
}

// Update the proven and finalized tips.
// TODO(palla/reorg): Should we emit this before passing the new blocks? This would allow world-state to skip
// building the data structures for the pending chain if it's unneeded.
if (localTips.proven !== undefined && sourceTips.proven.number !== localTips.proven.number) {
await this.emitEvent({ type: 'chain-proven', blockNumber: sourceTips.proven.number });
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ describe('ServerWorldStateSynchronizer', () => {
it('throws if you try to immediate sync when not running', async () => {
await expect(server.syncImmediate(3)).rejects.toThrow(/is not running/i);
});

it('throws if handling blocks fails', async () => {
void server.start();
merkleTreeDb.handleL2BlockAndMessages.mockRejectedValue(new Error('Test error'));
await expect(pushBlocks(1, 5)).rejects.toThrow(/Test error/i);
});
});

class TestWorldStateSynchronizer extends ServerWorldStateSynchronizer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,23 +207,19 @@ export class ServerWorldStateSynchronizer

/** Handles an event emitted by the block stream. */
public async handleBlockStreamEvent(event: L2BlockStreamEvent): Promise<void> {
try {
switch (event.type) {
case 'blocks-added':
await this.handleL2Blocks(event.blocks.map(b => b.block));
break;
case 'chain-pruned':
await this.handleChainPruned(event.blockNumber);
break;
case 'chain-proven':
await this.handleChainProven(event.blockNumber);
break;
case 'chain-finalized':
await this.handleChainFinalized(event.blockNumber);
break;
}
} catch (err) {
this.log.error('Error processing block stream', err);
switch (event.type) {
case 'blocks-added':
await this.handleL2Blocks(event.blocks.map(b => b.block));
break;
case 'chain-pruned':
await this.handleChainPruned(event.blockNumber);
break;
case 'chain-proven':
await this.handleChainProven(event.blockNumber);
break;
case 'chain-finalized':
await this.handleChainFinalized(event.blockNumber);
break;
}
}

Expand Down