Skip to content

Commit

Permalink
feat: implement speedup v3 events (#341)
Browse files Browse the repository at this point in the history
* feat: implement speedup v3 events

* Fix bugs

* Fix import

* Fix

---------

Co-authored-by: amateima <[email protected]>
  • Loading branch information
amateima and amateima authored Feb 19, 2024
1 parent 61db45a commit cb6c3f5
Show file tree
Hide file tree
Showing 13 changed files with 201 additions and 25 deletions.
2 changes: 1 addition & 1 deletion src/modules/configuration/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ export const configValues = () => ({
[ChainIds.sepolia]: [
{
address: "0x622d59F3dbD28fcFE746E0d2f83ebdC286E89A3c",
startBlockNumber: 14819486,
startBlockNumber: 5146129,
abi: JSON.stringify(SpokePoolV3Abi),
acrossVersion: AcrossContractsVersion.V3,
},
Expand Down
13 changes: 11 additions & 2 deletions src/modules/deposit/model/deposit.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ export type RequestedSpeedUpDepositTx = {
updatedRecipient?: string;
updatedMessage?: string;
};
export type RequestedSpeedUpDepositTxV3 = {
hash: string;
blockNumber: number;
updatedOutputAmount: string;
updatedRecipient: string;
updatedMessage: string;
};
export type FeeBreakdown = {
// lp fee
lpFeeUsd: string;
Expand Down Expand Up @@ -155,7 +162,9 @@ export class Deposit {
outputTokenId?: number;

@ManyToOne(() => Token)
@JoinColumn([{ name: "tokenId", referencedColumnName: "id", foreignKeyConstraintName: "FK_deposit_outputTokenId" }])
@JoinColumn([
{ name: "outputTokenId", referencedColumnName: "id", foreignKeyConstraintName: "FK_deposit_outputTokenId" },
])
outputToken?: Token;

@Column({ nullable: true })
Expand All @@ -178,7 +187,7 @@ export class Deposit {
fillTxs: (DepositFillTx | DepositFillTx2 | DepositFillTxV3)[];

@Column({ type: "jsonb", default: [] })
speedUps: RequestedSpeedUpDepositTx[];
speedUps: RequestedSpeedUpDepositTx[] | RequestedSpeedUpDepositTxV3[];

@Column({ type: "jsonb", default: {} })
feeBreakdown: FeeBreakdown;
Expand Down
75 changes: 61 additions & 14 deletions src/modules/scraper/adapter/messaging/BlocksEventsConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import {
FillEventsQueueMessage,
FillEventsQueueMessage2,
FillEventsV3QueueMessage,
FillEventsV3QueueMessage,
ScraperQueue,
SpeedUpEventsQueueMessage,
SpeedUpEventsV3QueueMessage,
} from ".";
import { Deposit } from "../../../deposit/model/deposit.entity";
import { ScraperQueuesService } from "../../service/ScraperQueuesService";
Expand All @@ -30,6 +30,7 @@ import {
RequestedSpeedUpDepositEvent2_5,
FundsDepositedV3Event,
FilledV3RelayEvent,
RequestedSpeedUpV3DepositEvent,
} from "../../../web3/model";
import { AppConfig } from "../../../configuration/configuration.service";
import { splitBlockRanges } from "../../utils";
Expand Down Expand Up @@ -62,16 +63,15 @@ export class BlocksEventsConsumer {
// Split the block range in case multiple SpokePool contracts need to be queried
const blocksToQuery = splitBlockRanges(ascSpokePoolConfigs, from, to);
// Get the events from the SpokePool contracts
const { depositEvents, depositV3Events, fillEvents, fillV3Events, speedUpEvents } = await this.getEvents(
blocksToQuery,
chainId,
);
const { depositEvents, depositV3Events, fillEvents, fillV3Events, speedUpEvents, speedUpV3Events } =
await this.getEvents(blocksToQuery, chainId);
const eventsCount = {
depositEvents: depositEvents.length,
depositV3Events: depositV3Events.length,
fillEvents: fillEvents.length,
fillV3Events: fillV3Events.length,
speedUpEvents: speedUpEvents.length,
speedUpV3Events: speedUpV3Events.length,
};
this.logger.log(`${from}-${to} - chainId ${chainId} - ${JSON.stringify(eventsCount)}`);

Expand All @@ -80,6 +80,7 @@ export class BlocksEventsConsumer {
await this.processFillEvents(chainId, fillEvents);
await this.processFillEvents(chainId, fillV3Events);
await this.processSpeedUpEvents(chainId, speedUpEvents);
await this.processSpeedUpV3Events(chainId, speedUpV3Events);
}

private async getEvents(
Expand All @@ -92,11 +93,13 @@ export class BlocksEventsConsumer {
const fillEvents: Event[] = [];
const fillV3Events: Event[] = [];
const speedUpEvents: Event[] = [];
const speedUpV3Events: Event[] = [];

for (const blocks of blocksToQuery) {
const spokePoolEventQuerier = this.providers.getSpokePoolEventQuerier(chainId, blocks.address);
let depositEventsPromises = [];
let fillEventsPromises = [];
let speedUpEventsPromises = [];

if (blocks.acrossVersion === AcrossContractsVersion.V2) {
depositEventsPromises = [
Expand All @@ -111,6 +114,12 @@ export class BlocksEventsConsumer {
res([]);
}),
];
speedUpEventsPromises = [
spokePoolEventQuerier.getRequestedSpeedUpDepositEvents(blocks.from, blocks.to),
new Promise((res) => {
res([]);
}),
];
} else if (blocks.acrossVersion === AcrossContractsVersion.V2_5) {
depositEventsPromises = [
spokePoolEventQuerier.getFundsDepositEvents(blocks.from, blocks.to),
Expand All @@ -124,6 +133,12 @@ export class BlocksEventsConsumer {
res([]);
}),
];
speedUpEventsPromises = [
spokePoolEventQuerier.getRequestedSpeedUpDepositEvents(blocks.from, blocks.to),
new Promise((res) => {
res([]);
}),
];
} else if (blocks.acrossVersion === AcrossContractsVersion.V3) {
depositEventsPromises = [
new Promise((res) => {
Expand All @@ -135,20 +150,29 @@ export class BlocksEventsConsumer {
spokePoolEventQuerier.getFilledRelayEvents(blocks.from, blocks.to),
spokePoolEventQuerier.getFilledV3RelayEvents(blocks.from, blocks.to),
];
speedUpEventsPromises = [
new Promise((res) => {
res([]);
}),
spokePoolEventQuerier.getRequestedSpeedUpV3DepositEvents(blocks.from, blocks.to),
];
}
const promises = [
...depositEventsPromises,
...fillEventsPromises,
spokePoolEventQuerier.getRequestedSpeedUpDepositEvents(blocks.from, blocks.to),
];
const [depositEventsChunk, depositV3EventsChunk, fillEventsChunk, fillV3EventsChunk, speedUpEventsChunk] =
await Promise.all(promises);
const promises = [...depositEventsPromises, ...fillEventsPromises, ...speedUpEventsPromises];
const [
depositEventsChunk,
depositV3EventsChunk,
fillEventsChunk,
fillV3EventsChunk,
speedUpEventsChunk,
speedUpV3EventsChunk,
] = await Promise.all(promises);

depositEvents.push(...depositEventsChunk);
depositV3Events.push(...depositV3EventsChunk);
fillEvents.push(...fillEventsChunk);
fillV3Events.push(...fillV3EventsChunk);
speedUpEvents.push(...speedUpEventsChunk);
speedUpV3Events.push(...speedUpV3EventsChunk);
}

return {
Expand All @@ -157,6 +181,7 @@ export class BlocksEventsConsumer {
fillEvents,
fillV3Events,
speedUpEvents,
speedUpV3Events,
};
}

Expand Down Expand Up @@ -296,6 +321,28 @@ export class BlocksEventsConsumer {
}
}

private async processSpeedUpV3Events(chainId: number, events: Event[]) {
for (const event of events) {
const { transactionHash, blockNumber, args } = event as RequestedSpeedUpV3DepositEvent;
const message: SpeedUpEventsV3QueueMessage = {
depositSourceChainId: chainId,
depositId: args.depositId,
transactionHash,
blockNumber,
depositor: args.depositor,
depositorSignature: args.depositorSignature,
updatedOutputAmount: args.updatedOutputAmount.toString(),
updatedRecipient: args.updatedRecipient,
updatedMessage: args.updatedMessage,
};

await this.scraperQueuesService.publishMessage<SpeedUpEventsV3QueueMessage>(
ScraperQueue.SpeedUpEventsV3,
message,
);
}
}

private async fromFundsDepositedEventToDeposit(chainId: number, event: Event) {
const typedEvent = event as FundsDepositedEvent2 | FundsDepositedEvent2_5;
const { transactionHash, blockNumber } = typedEvent;
Expand Down Expand Up @@ -366,8 +413,8 @@ export class BlocksEventsConsumer {
// v3 properties
outputAmount: outputAmount.toString(),
outputTokenAddress: outputToken,
fillDeadline: fillDeadline,
exclusivityDeadline: exclusivityDeadline,
fillDeadline: new Date(fillDeadline * 1000),
exclusivityDeadline: new Date(exclusivityDeadline * 1000),
relayer,
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { ScraperQueuesService } from "../../service/ScraperQueuesService";
import { FeeBreakdownQueueMessage, OpRebateRewardMessage, ScraperQueue } from ".";
import { Deposit, DepositFillTx, DepositFillTx2, DepositFillTxV3 } from "../../../deposit/model/deposit.entity";
import { deriveRelayerFeeComponents, makePctValuesCalculator, toWeiPct } from "../../utils";
import { AcrossContractsVersion } from "src/modules/web3/model/across-version";
import { AcrossContractsVersion } from "../../../web3/model/across-version";

@Processor(ScraperQueue.FeeBreakdown)
export class FeeBreakdownConsumer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Logger } from "@nestjs/common";
import { Job } from "bull";
import { DepositFilledDateQueueMessage, ScraperQueue, SpeedUpEventsQueueMessage } from ".";

Check warning on line 4 in src/modules/scraper/adapter/messaging/SpeedUpEventsConsumer.ts

View workflow job for this annotation

GitHub Actions / lint-build

'DepositFilledDateQueueMessage' is defined but never used
import { InjectRepository } from "@nestjs/typeorm";
import { Deposit } from "../../../deposit/model/deposit.entity";
import { Deposit, RequestedSpeedUpDepositTx } from "../../../deposit/model/deposit.entity";
import { Repository } from "typeorm";
import { ScraperQueuesService } from "../../service/ScraperQueuesService";

Expand Down Expand Up @@ -33,10 +33,6 @@ export class SpeedUpEventsConsumer {
}

await this.processSpeedUpEventQueueMessage(deposit, job.data);

this.scraperQueuesService.publishMessage<DepositFilledDateQueueMessage>(ScraperQueue.DepositFilledDate, {
depositId: deposit.id,
});
}

public async processSpeedUpEventQueueMessage(deposit: Deposit, data: SpeedUpEventsQueueMessage) {
Expand All @@ -46,7 +42,7 @@ export class SpeedUpEventsConsumer {
const sortedSpeedUps = [
...deposit.speedUps,
{ hash: transactionHash, newRelayerFeePct, blockNumber, depositSourceChainId, updatedMessage, updatedRecipient },
].sort((a, b) => b.blockNumber - a.blockNumber);
].sort((a, b) => b.blockNumber - a.blockNumber) as RequestedSpeedUpDepositTx[];

return this.depositRepository.update(
{ id: deposit.id },
Expand Down
76 changes: 76 additions & 0 deletions src/modules/scraper/adapter/messaging/SpeedUpEventsV3Consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { OnQueueFailed, Process, Processor } from "@nestjs/bull";
import { Logger } from "@nestjs/common";
import { Job } from "bull";
import { DepositFilledDateQueueMessage, SpeedUpEventsV3QueueMessage, ScraperQueue } from ".";

Check warning on line 4 in src/modules/scraper/adapter/messaging/SpeedUpEventsV3Consumer.ts

View workflow job for this annotation

GitHub Actions / lint-build

'DepositFilledDateQueueMessage' is defined but never used
import { InjectRepository } from "@nestjs/typeorm";
import { Deposit, RequestedSpeedUpDepositTxV3 } from "../../../deposit/model/deposit.entity";
import { Repository } from "typeorm";
import { ScraperQueuesService } from "../../service/ScraperQueuesService";
import BigNumber from "bignumber.js";

@Processor(ScraperQueue.SpeedUpEventsV3)
export class SpeedUpEventsV3Consumer {
private logger = new Logger(SpeedUpEventsV3Consumer.name);

constructor(
@InjectRepository(Deposit) private depositRepository: Repository<Deposit>,
private scraperQueuesService: ScraperQueuesService,
) {}

@Process()
private async process(job: Job<SpeedUpEventsV3QueueMessage>) {
const { depositId, depositSourceChainId } = job.data;
const deposit = await this.depositRepository.findOne({ where: { sourceChainId: depositSourceChainId, depositId } });

if (!deposit) throw new Error("Deposit not found");
if (this.isSpeedUpAlreadyProcessed(deposit, job.data)) return;

await this.processSpeedUpEventQueueMessage(deposit, job.data);
}

public async processSpeedUpEventQueueMessage(deposit: Deposit, data: SpeedUpEventsV3QueueMessage) {
const {
transactionHash,
blockNumber,
depositSourceChainId,
updatedOutputAmount,
updatedMessage,
updatedRecipient,
} = data;

const sortedSpeedUps = [
...deposit.speedUps,
{
hash: transactionHash,
updatedOutputAmount,
blockNumber,
depositSourceChainId,
updatedMessage,
updatedRecipient,
},
].sort((a, b) => b.blockNumber - a.blockNumber) as RequestedSpeedUpDepositTxV3[];
const wei = new BigNumber(10).pow(18);
const feePct = new BigNumber(updatedOutputAmount).multipliedBy(wei).div(deposit.amount);
return this.depositRepository.update(
{ id: deposit.id },
{
speedUps: sortedSpeedUps,
message: sortedSpeedUps[0].updatedMessage,
recipientAddr: sortedSpeedUps[0].updatedRecipient,
outputAmount: sortedSpeedUps[0].updatedOutputAmount,
depositRelayerFeePct: feePct.toFixed(0),
},
);
}

public isSpeedUpAlreadyProcessed(deposit: Deposit, speedUp: SpeedUpEventsV3QueueMessage) {
const { transactionHash } = speedUp;
const speedUpTxIndex = deposit.speedUps.findIndex((speedUpTx) => speedUpTx.hash === transactionHash);
return speedUpTxIndex !== -1;
}

@OnQueueFailed()
private onQueueFailed(job: Job, error: Error) {
this.logger.error(`${ScraperQueue.SpeedUpEvents} ${JSON.stringify(job.data)} failed: ${error}`);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export class TokenDetailsConsumer {
const inputToken = await this.ethProvidersService.getCachedToken(sourceChainId, tokenAddr);
let outputToken: Token | undefined = undefined;

if (outputTokenAddress === "0x0000000000000000000000000000000000000000") return;
if (outputTokenAddress) {
outputToken = await this.ethProvidersService.getCachedToken(destinationChainId, outputTokenAddress);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export class TrackFillEventConsumer {
throw new Error("Can not track fill event without token, price or deposit date");
}

const fillTx = deposit.fillTxs.find((tx) => tx.hash === fillTxHash);
const fillTx = deposit.fillTxs.find((tx) => tx.hash === fillTxHash) as DepositFillTx | DepositFillTx2;

if (!fillTx) {
throw new Error("Fill tx does not exist on deposit");
Expand All @@ -53,6 +53,8 @@ export class TrackFillEventConsumer {
if (!fillTx.date) {
throw new Error("Fill tx does not have a date");
}
// Temporary disabled for v3 fills
if ((fillTx as any).updatedOutputAmount) return;

const destinationChainInfo = chainIdToInfo[deposit.destinationChainId] || {
name: "unknown",
Expand Down
13 changes: 13 additions & 0 deletions src/modules/scraper/adapter/messaging/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export enum ScraperQueue {
FillEvents2 = "FillEvents2",
FillEventsV3 = "FillEventsV3",
SpeedUpEvents = "SpeedUpEvents",
SpeedUpEventsV3 = "SpeedUpEventsV3",
BlockNumber = "BlockNumber",
TokenDetails = "TokenDetails",
DepositReferral = "DepositReferral",
Expand Down Expand Up @@ -76,6 +77,18 @@ export type SpeedUpEventsQueueMessage = {
updatedMessage?: string;
};

export type SpeedUpEventsV3QueueMessage = {
depositSourceChainId: number;
depositId: number;
transactionHash: string;
blockNumber: number;
depositor: string;
depositorSignature: string;
updatedOutputAmount: string;
updatedRecipient: string;
updatedMessage: string;
};

export type BlockNumberQueueMessage = {
depositId: number;
};
Expand Down
Loading

0 comments on commit cb6c3f5

Please sign in to comment.