Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement speedup v3 events #341

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/modules/configuration/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ export const configValues = () => ({
[ChainIds.sepolia]: [
{
address: "0x622d59F3dbD28fcFE746E0d2f83ebdC286E89A3c",
startBlockNumber: 14819486,
startBlockNumber: 5146129,
abi: JSON.stringify(SpokePoolV3Abi),
acrossVersion: AcrossContractsVersion.V3,
},
Expand Down
13 changes: 11 additions & 2 deletions src/modules/deposit/model/deposit.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ export type RequestedSpeedUpDepositTx = {
updatedRecipient?: string;
updatedMessage?: string;
};
export type RequestedSpeedUpDepositTxV3 = {
hash: string;
blockNumber: number;
updatedOutputAmount: string;
updatedRecipient: string;
updatedMessage: string;
};
export type FeeBreakdown = {
// lp fee
lpFeeUsd: string;
Expand Down Expand Up @@ -155,7 +162,9 @@ export class Deposit {
outputTokenId?: number;

@ManyToOne(() => Token)
@JoinColumn([{ name: "tokenId", referencedColumnName: "id", foreignKeyConstraintName: "FK_deposit_outputTokenId" }])
@JoinColumn([
{ name: "outputTokenId", referencedColumnName: "id", foreignKeyConstraintName: "FK_deposit_outputTokenId" },
])
outputToken?: Token;

@Column({ nullable: true })
Expand All @@ -178,7 +187,7 @@ export class Deposit {
fillTxs: (DepositFillTx | DepositFillTx2 | DepositFillTxV3)[];

@Column({ type: "jsonb", default: [] })
speedUps: RequestedSpeedUpDepositTx[];
speedUps: RequestedSpeedUpDepositTx[] | RequestedSpeedUpDepositTxV3[];

@Column({ type: "jsonb", default: {} })
feeBreakdown: FeeBreakdown;
Expand Down
75 changes: 61 additions & 14 deletions src/modules/scraper/adapter/messaging/BlocksEventsConsumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
FillEventsQueueMessage,
FillEventsQueueMessage2,
FillEventsV3QueueMessage,
FillEventsV3QueueMessage,
ScraperQueue,
SpeedUpEventsQueueMessage,
SpeedUpEventsV3QueueMessage,
} from ".";
import { Deposit } from "../../../deposit/model/deposit.entity";
import { ScraperQueuesService } from "../../service/ScraperQueuesService";
Expand All @@ -30,6 +30,7 @@
RequestedSpeedUpDepositEvent2_5,
FundsDepositedV3Event,
FilledV3RelayEvent,
RequestedSpeedUpV3DepositEvent,
} from "../../../web3/model";
import { AppConfig } from "../../../configuration/configuration.service";
import { splitBlockRanges } from "../../utils";
Expand Down Expand Up @@ -62,16 +63,15 @@
// Split the block range in case multiple SpokePool contracts need to be queried
const blocksToQuery = splitBlockRanges(ascSpokePoolConfigs, from, to);
// Get the events from the SpokePool contracts
const { depositEvents, depositV3Events, fillEvents, fillV3Events, speedUpEvents } = await this.getEvents(
blocksToQuery,
chainId,
);
const { depositEvents, depositV3Events, fillEvents, fillV3Events, speedUpEvents, speedUpV3Events } =
await this.getEvents(blocksToQuery, chainId);
const eventsCount = {
depositEvents: depositEvents.length,
depositV3Events: depositV3Events.length,
fillEvents: fillEvents.length,
fillV3Events: fillV3Events.length,
speedUpEvents: speedUpEvents.length,
speedUpV3Events: speedUpV3Events.length,
};
this.logger.log(`${from}-${to} - chainId ${chainId} - ${JSON.stringify(eventsCount)}`);

Expand All @@ -80,6 +80,7 @@
await this.processFillEvents(chainId, fillEvents);
await this.processFillEvents(chainId, fillV3Events);
await this.processSpeedUpEvents(chainId, speedUpEvents);
await this.processSpeedUpV3Events(chainId, speedUpV3Events);
}

private async getEvents(
Expand All @@ -92,11 +93,13 @@
const fillEvents: Event[] = [];
const fillV3Events: Event[] = [];
const speedUpEvents: Event[] = [];
const speedUpV3Events: Event[] = [];

for (const blocks of blocksToQuery) {
const spokePoolEventQuerier = this.providers.getSpokePoolEventQuerier(chainId, blocks.address);
let depositEventsPromises = [];
let fillEventsPromises = [];
let speedUpEventsPromises = [];

if (blocks.acrossVersion === AcrossContractsVersion.V2) {
depositEventsPromises = [
Expand All @@ -111,6 +114,12 @@
res([]);
}),
];
speedUpEventsPromises = [
spokePoolEventQuerier.getRequestedSpeedUpDepositEvents(blocks.from, blocks.to),
new Promise((res) => {
res([]);
}),
];
} else if (blocks.acrossVersion === AcrossContractsVersion.V2_5) {
depositEventsPromises = [
spokePoolEventQuerier.getFundsDepositEvents(blocks.from, blocks.to),
Expand All @@ -124,6 +133,12 @@
res([]);
}),
];
speedUpEventsPromises = [
spokePoolEventQuerier.getRequestedSpeedUpDepositEvents(blocks.from, blocks.to),
new Promise((res) => {
res([]);
}),
];
} else if (blocks.acrossVersion === AcrossContractsVersion.V3) {
depositEventsPromises = [
new Promise((res) => {
Expand All @@ -135,20 +150,29 @@
spokePoolEventQuerier.getFilledRelayEvents(blocks.from, blocks.to),
spokePoolEventQuerier.getFilledV3RelayEvents(blocks.from, blocks.to),
];
speedUpEventsPromises = [
new Promise((res) => {
res([]);
}),
spokePoolEventQuerier.getRequestedSpeedUpV3DepositEvents(blocks.from, blocks.to),
];
}
const promises = [
...depositEventsPromises,
...fillEventsPromises,
spokePoolEventQuerier.getRequestedSpeedUpDepositEvents(blocks.from, blocks.to),
];
const [depositEventsChunk, depositV3EventsChunk, fillEventsChunk, fillV3EventsChunk, speedUpEventsChunk] =
await Promise.all(promises);
const promises = [...depositEventsPromises, ...fillEventsPromises, ...speedUpEventsPromises];
const [
depositEventsChunk,
depositV3EventsChunk,
fillEventsChunk,
fillV3EventsChunk,
speedUpEventsChunk,
speedUpV3EventsChunk,
] = await Promise.all(promises);

depositEvents.push(...depositEventsChunk);
depositV3Events.push(...depositV3EventsChunk);
fillEvents.push(...fillEventsChunk);
fillV3Events.push(...fillV3EventsChunk);
speedUpEvents.push(...speedUpEventsChunk);
speedUpV3Events.push(...speedUpV3EventsChunk);
}

return {
Expand All @@ -157,6 +181,7 @@
fillEvents,
fillV3Events,
speedUpEvents,
speedUpV3Events,
};
}

Expand Down Expand Up @@ -205,7 +230,7 @@

for (const event of events) {
const { address } = event;
const { acrossVersion } = this.appConfig.values.web3.spokePoolContracts[chainId].filter(

Check warning on line 233 in src/modules/scraper/adapter/messaging/BlocksEventsConsumer.ts

View workflow job for this annotation

GitHub Actions / lint-build

'acrossVersion' is assigned a value but never used
(contract) => contract.address === address,
)[0];

Expand Down Expand Up @@ -296,6 +321,28 @@
}
}

private async processSpeedUpV3Events(chainId: number, events: Event[]) {
for (const event of events) {
const { transactionHash, blockNumber, args } = event as RequestedSpeedUpV3DepositEvent;
const message: SpeedUpEventsV3QueueMessage = {
depositSourceChainId: chainId,
depositId: args.depositId,
transactionHash,
blockNumber,
depositor: args.depositor,
depositorSignature: args.depositorSignature,
updatedOutputAmount: args.updatedOutputAmount.toString(),
updatedRecipient: args.updatedRecipient,
updatedMessage: args.updatedMessage,
};

await this.scraperQueuesService.publishMessage<SpeedUpEventsV3QueueMessage>(
ScraperQueue.SpeedUpEventsV3,
message,
);
}
}

private async fromFundsDepositedEventToDeposit(chainId: number, event: Event) {
const typedEvent = event as FundsDepositedEvent2 | FundsDepositedEvent2_5;
const { transactionHash, blockNumber } = typedEvent;
Expand Down Expand Up @@ -366,8 +413,8 @@
// v3 properties
outputAmount: outputAmount.toString(),
outputTokenAddress: outputToken,
fillDeadline: fillDeadline,
exclusivityDeadline: exclusivityDeadline,
fillDeadline: new Date(fillDeadline * 1000),
exclusivityDeadline: new Date(exclusivityDeadline * 1000),
relayer,
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { ScraperQueuesService } from "../../service/ScraperQueuesService";
import { FeeBreakdownQueueMessage, OpRebateRewardMessage, ScraperQueue } from ".";
import { Deposit, DepositFillTx, DepositFillTx2, DepositFillTxV3 } from "../../../deposit/model/deposit.entity";
import { deriveRelayerFeeComponents, makePctValuesCalculator, toWeiPct } from "../../utils";
import { AcrossContractsVersion } from "src/modules/web3/model/across-version";
import { AcrossContractsVersion } from "../../../web3/model/across-version";

@Processor(ScraperQueue.FeeBreakdown)
export class FeeBreakdownConsumer {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { OnQueueFailed, Process, Processor } from "@nestjs/bull";
import { Logger } from "@nestjs/common";
import { Job } from "bull";
import { DepositFilledDateQueueMessage, ScraperQueue, SpeedUpEventsQueueMessage } from ".";

Check warning on line 4 in src/modules/scraper/adapter/messaging/SpeedUpEventsConsumer.ts

View workflow job for this annotation

GitHub Actions / lint-build

'DepositFilledDateQueueMessage' is defined but never used
import { InjectRepository } from "@nestjs/typeorm";
import { Deposit } from "../../../deposit/model/deposit.entity";
import { Deposit, RequestedSpeedUpDepositTx } from "../../../deposit/model/deposit.entity";
import { Repository } from "typeorm";
import { ScraperQueuesService } from "../../service/ScraperQueuesService";

Expand Down Expand Up @@ -33,10 +33,6 @@
}

await this.processSpeedUpEventQueueMessage(deposit, job.data);

this.scraperQueuesService.publishMessage<DepositFilledDateQueueMessage>(ScraperQueue.DepositFilledDate, {
depositId: deposit.id,
});
}

public async processSpeedUpEventQueueMessage(deposit: Deposit, data: SpeedUpEventsQueueMessage) {
Expand All @@ -46,7 +42,7 @@
const sortedSpeedUps = [
...deposit.speedUps,
{ hash: transactionHash, newRelayerFeePct, blockNumber, depositSourceChainId, updatedMessage, updatedRecipient },
].sort((a, b) => b.blockNumber - a.blockNumber);
].sort((a, b) => b.blockNumber - a.blockNumber) as RequestedSpeedUpDepositTx[];

return this.depositRepository.update(
{ id: deposit.id },
Expand Down
76 changes: 76 additions & 0 deletions src/modules/scraper/adapter/messaging/SpeedUpEventsV3Consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { OnQueueFailed, Process, Processor } from "@nestjs/bull";
import { Logger } from "@nestjs/common";
import { Job } from "bull";
import { DepositFilledDateQueueMessage, SpeedUpEventsV3QueueMessage, ScraperQueue } from ".";

Check warning on line 4 in src/modules/scraper/adapter/messaging/SpeedUpEventsV3Consumer.ts

View workflow job for this annotation

GitHub Actions / lint-build

'DepositFilledDateQueueMessage' is defined but never used
import { InjectRepository } from "@nestjs/typeorm";
import { Deposit, RequestedSpeedUpDepositTxV3 } from "../../../deposit/model/deposit.entity";
import { Repository } from "typeorm";
import { ScraperQueuesService } from "../../service/ScraperQueuesService";
import BigNumber from "bignumber.js";

@Processor(ScraperQueue.SpeedUpEventsV3)
export class SpeedUpEventsV3Consumer {
private logger = new Logger(SpeedUpEventsV3Consumer.name);

constructor(
@InjectRepository(Deposit) private depositRepository: Repository<Deposit>,
private scraperQueuesService: ScraperQueuesService,
) {}

@Process()
private async process(job: Job<SpeedUpEventsV3QueueMessage>) {
const { depositId, depositSourceChainId } = job.data;
const deposit = await this.depositRepository.findOne({ where: { sourceChainId: depositSourceChainId, depositId } });

if (!deposit) throw new Error("Deposit not found");
if (this.isSpeedUpAlreadyProcessed(deposit, job.data)) return;

await this.processSpeedUpEventQueueMessage(deposit, job.data);
}

public async processSpeedUpEventQueueMessage(deposit: Deposit, data: SpeedUpEventsV3QueueMessage) {
const {
transactionHash,
blockNumber,
depositSourceChainId,
updatedOutputAmount,
updatedMessage,
updatedRecipient,
} = data;

const sortedSpeedUps = [
...deposit.speedUps,
{
hash: transactionHash,
updatedOutputAmount,
blockNumber,
depositSourceChainId,
updatedMessage,
updatedRecipient,
},
].sort((a, b) => b.blockNumber - a.blockNumber) as RequestedSpeedUpDepositTxV3[];
const wei = new BigNumber(10).pow(18);
const feePct = new BigNumber(updatedOutputAmount).multipliedBy(wei).div(deposit.amount);
return this.depositRepository.update(
{ id: deposit.id },
{
speedUps: sortedSpeedUps,
message: sortedSpeedUps[0].updatedMessage,
recipientAddr: sortedSpeedUps[0].updatedRecipient,
outputAmount: sortedSpeedUps[0].updatedOutputAmount,
depositRelayerFeePct: feePct.toFixed(0),
},
);
}

public isSpeedUpAlreadyProcessed(deposit: Deposit, speedUp: SpeedUpEventsV3QueueMessage) {
const { transactionHash } = speedUp;
const speedUpTxIndex = deposit.speedUps.findIndex((speedUpTx) => speedUpTx.hash === transactionHash);
return speedUpTxIndex !== -1;
}

@OnQueueFailed()
private onQueueFailed(job: Job, error: Error) {
this.logger.error(`${ScraperQueue.SpeedUpEvents} ${JSON.stringify(job.data)} failed: ${error}`);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export class TokenDetailsConsumer {
const inputToken = await this.ethProvidersService.getCachedToken(sourceChainId, tokenAddr);
let outputToken: Token | undefined = undefined;

if (outputTokenAddress === "0x0000000000000000000000000000000000000000") return;
if (outputTokenAddress) {
outputToken = await this.ethProvidersService.getCachedToken(destinationChainId, outputTokenAddress);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export class TrackFillEventConsumer {
throw new Error("Can not track fill event without token, price or deposit date");
}

const fillTx = deposit.fillTxs.find((tx) => tx.hash === fillTxHash);
const fillTx = deposit.fillTxs.find((tx) => tx.hash === fillTxHash) as DepositFillTx | DepositFillTx2;

if (!fillTx) {
throw new Error("Fill tx does not exist on deposit");
Expand All @@ -53,6 +53,8 @@ export class TrackFillEventConsumer {
if (!fillTx.date) {
throw new Error("Fill tx does not have a date");
}
// Temporary disabled for v3 fills
if ((fillTx as any).updatedOutputAmount) return;

const destinationChainInfo = chainIdToInfo[deposit.destinationChainId] || {
name: "unknown",
Expand Down
13 changes: 13 additions & 0 deletions src/modules/scraper/adapter/messaging/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export enum ScraperQueue {
FillEvents2 = "FillEvents2",
FillEventsV3 = "FillEventsV3",
SpeedUpEvents = "SpeedUpEvents",
SpeedUpEventsV3 = "SpeedUpEventsV3",
BlockNumber = "BlockNumber",
TokenDetails = "TokenDetails",
DepositReferral = "DepositReferral",
Expand Down Expand Up @@ -76,6 +77,18 @@ export type SpeedUpEventsQueueMessage = {
updatedMessage?: string;
};

export type SpeedUpEventsV3QueueMessage = {
depositSourceChainId: number;
depositId: number;
transactionHash: string;
blockNumber: number;
depositor: string;
depositorSignature: string;
updatedOutputAmount: string;
updatedRecipient: string;
updatedMessage: string;
};

export type BlockNumberQueueMessage = {
depositId: number;
};
Expand Down
Loading
Loading