Skip to content

Commit

Permalink
Merge pull request #68 from defi-wonderland/develop
Browse files Browse the repository at this point in the history
chore: v1.1.0
  • Loading branch information
0xGorilla authored Mar 28, 2023
2 parents 09f2ba8 + 7fcb464 commit 1eca068
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 58 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@keep3r-network/keeper-scripting-utils",
"version": "1.0.0",
"version": "1.1.0",
"description": "A library containing helper functions that facilitate scripting for keepers of the Keep3r Network",
"keywords": [
"ethereum",
Expand Down
99 changes: 42 additions & 57 deletions src/subscriptions/blocks.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { UnsubscribeFunction } from '../types/Blocks';
import { Block } from '@ethersproject/abstract-provider';
import chalk from 'chalk';
import { providers } from 'ethers';
import { fromEvent, mergeMap, Observable, Subject, Subscription } from 'rxjs';

type CallbackFunction = (block: Block) => Promise<void>;

Expand All @@ -11,15 +9,6 @@ type CallbackFunction = (block: Block) => Promise<void>;
*
*/
export class BlockListener {
// Amount of live subscriptions to block$ observable.
private count = 0;

// Observable in charge of emitting and providing new Blocks.
private block$ = new Subject<Block>();

// Array of generated internal subscriptions. Used mainly to be able to unsubscribe from them when needed.
private getBlockSubscription: Subscription | undefined;

/**
* @param provider - JsonRpc provider that has the methods needed to fetch and listen for new blocks.
*/
Expand All @@ -36,57 +25,53 @@ export class BlockListener {
* - One that hooks to the 'block' event of the provider that returns just the number of the new block, and then use
* provider.getBlock(blockNumber) method to fetch all the data of that block and push it to block$ observable.
*
* @returns A callback that will be called in every new block
* @param cb Callback function which will receive each new block
* @param intervalDelay Get next block after X seconds of sleep. This number must be bigger than the time between 2 blocks.
* @param blockDelay After getting a new block, wait X seconds before calling the callback function.
* This is useful for descentralised node infrastructures which may need some time to sync, as Ankr.
*/
stream(cb: CallbackFunction): UnsubscribeFunction {
// initialize block subscription if necessary, and increase subscribers count
if (this.count++ === 0) {
this.initBlockSubscription();
}

// log subscribers count
this.logSubscribersCount();

// create a new block subscription that will call the callback for every new block
const observable = this.block$.subscribe((block) => cb(block));

// return an unsubscription function
return () => {
observable.unsubscribe();

this.count--;
this.logSubscribersCount();

// uninitialize state if there are no current subscribers
if (this.count === 0) {
console.info(chalk.redBright('\n------ STOP BLOCK LISTENING -----'));

this.getBlockSubscription?.unsubscribe();
this.provider.removeAllListeners('block');
}
stream(cb: CallbackFunction, intervalDelay = 0, blockDelay = 0): void {
const start = async () => {
// save latest block number, in order to avoid old block dumps
let latestBlockNumber = await this.provider.getBlockNumber();

console.info(chalk.redBright(`\nWaiting for next block, latest block: ${latestBlockNumber}`));

// listen for next block
this.provider.on('block', async (blockNumber) => {
// avoid having old dump of blocks
if (blockNumber <= latestBlockNumber) return;
latestBlockNumber = blockNumber;

if (intervalDelay > 0) {
// stop listening to new blocks
this.stop();
}

// delay the block arrival a bit, for ankr to have time to sync
setTimeout(async () => {
// double check that the block to process is actually the latest
if (blockNumber < latestBlockNumber) return;

console.info(`${chalk.bgGray('block arrived:', blockNumber)}\n`);
// get block data
const block = await this.provider.getBlock(blockNumber);
// call the given callback with the block data
await cb(block);
}, blockDelay);
});
};
}

private initBlockSubscription(): void {
// push latest block to the subject asap
this.provider.getBlock('latest').then((block) => {
console.info(`${chalk.bgGray('\nblock arrived:', block.number)}\n`);
this.block$.next(block);
});

// listen to new blocks from the provider
console.info(chalk.redBright('\n------ START BLOCK LISTENING -----'));
const onBlockNumber$ = fromEvent(this.provider, 'block') as Observable<number>;
const onBlock$ = onBlockNumber$.pipe(mergeMap((blockNumber) => this.provider.getBlock(blockNumber)));
// get next block immediately
start();

// push them to the subject as they arrive
this.getBlockSubscription = onBlock$.subscribe((block) => {
console.info(`${chalk.bgGray('\nblock arrived:', block.number)}\n`);
this.block$.next(block);
});
if (intervalDelay > 0) {
// get next block every {intervalDelay} of sleep
setInterval(start, intervalDelay);
}
}

private logSubscribersCount(): void {
console.debug('\nOpen BlockListener subscriptions count:', chalk.redBright(this.count));
stop(): void {
this.provider.removeAllListeners('block');
}
}

0 comments on commit 1eca068

Please sign in to comment.