Skip to content

Commit

Permalink
Optimize pnl queries (dydxprotocol#1812)
Browse files Browse the repository at this point in the history
  • Loading branch information
dydxwill authored Jun 28, 2024
1 parent 8e59037 commit 3c07400
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ describe('PnlTicks store', () => {
} = await PnlTicksTable.findLatestProcessedBlocktimeAndCount();

expect(maxBlockTime).toEqual(blockTime);
expect(count).toEqual(2);
expect(count).toEqual(1);
});

it('Successfully finds latest block time without any pnl ticks', async () => {
Expand Down Expand Up @@ -283,87 +283,4 @@ describe('PnlTicks store', () => {
expect(mostRecent[defaultSubaccountId].equity).toEqual('1014');
expect(mostRecent[defaultSubaccountId2].equity).toEqual('200');
});

it('createMany PnlTicks, find most recent pnl tick times for each account', async () => {
const now = DateTime.utc();
const nowMinusOneHour = now.minus({ hours: 1 });
const nowMinusThreeHours = now.minus({ hours: 3 });
const nowMinusNineHours = now.minus({ hours: 9 });
const nowMinusElevenHours = now.minus({ hours: 11 });

await Promise.all([
BlockTable.create({
blockHeight: '3',
time: defaultBlock.time,
}),
BlockTable.create({
blockHeight: '5',
time: defaultBlock.time,
}),
]);

await PnlTicksTable.createMany([
{
subaccountId: defaultSubaccountId,
equity: '1092',
createdAt: nowMinusOneHour.toISO(),
totalPnl: '1000',
netTransfers: '50',
blockHeight: defaultBlock.blockHeight,
blockTime: defaultBlock.time,
},
{
subaccountId: defaultSubaccountId,
equity: '1097',
createdAt: nowMinusThreeHours.toISO(),
totalPnl: '1000',
netTransfers: '50',
blockHeight: '3',
blockTime: defaultBlock.time,
},
{
subaccountId: defaultSubaccountId,
equity: '1011',
createdAt: nowMinusElevenHours.toISO(),
totalPnl: '1000',
netTransfers: '50',
blockHeight: '5',
blockTime: defaultBlock.time,
},
{
subaccountId: defaultSubaccountId,
equity: '1014',
createdAt: nowMinusNineHours.toISO(),
totalPnl: '1000',
netTransfers: '50',
blockHeight: '5',
blockTime: defaultBlock.time,
},
{
subaccountId: defaultSubaccountId2,
equity: '100',
createdAt: now.toISO(),
totalPnl: '1000',
netTransfers: '50',
blockHeight: '2',
blockTime: defaultBlock2.time,
},
{
subaccountId: defaultSubaccountId2,
equity: '200',
createdAt: nowMinusNineHours.toISO(),
totalPnl: '1000',
netTransfers: '50',
blockHeight: '5',
blockTime: defaultBlock.time,
},
]);

const mostRecentTimes: {
[accountId: string]: string
} = await PnlTicksTable.findMostRecentPnlTickTimeForEachAccount('3');

expect(mostRecentTimes[defaultSubaccountId]).toEqual(nowMinusThreeHours.toISO());
expect(mostRecentTimes[defaultSubaccountId2]).toEqual(nowMinusNineHours.toISO());
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import * as Knex from 'knex';

export async function up(knex: Knex): Promise<void> {
await knex.raw(`
CREATE INDEX CONCURRENTLY IF NOT EXISTS "pnl_ticks_blocktime_index" ON "pnl_ticks" ("blockTime");
`);
}

export async function down(knex: Knex): Promise<void> {
await knex.raw(`
DROP INDEX CONCURRENTLY IF EXISTS "pnl_ticks_blocktime_index";
`);
}

export const config = {
transaction: false,
};
49 changes: 19 additions & 30 deletions indexer/packages/postgres/src/stores/pnl-ticks-table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,28 @@ export async function findLatestProcessedBlocktimeAndCount(): Promise<{
rows: [{ max: string, count: number }]
} = await knexReadReplica.getConnection().raw(
`
SELECT MAX("blockTime") as max, COUNT(*) as count
FROM "pnl_ticks"
`
,
WITH maxBlockTime AS (
SELECT MAX("blockTime") as "maxBlockTime"
FROM "pnl_ticks"
)
SELECT
maxBlockTime."maxBlockTime" as max,
COUNT(*) as count
FROM
"pnl_ticks",
maxBlockTime
WHERE
"pnl_ticks"."blockTime" = maxBlockTime."maxBlockTime"
GROUP BY 1
`,
) as unknown as { rows: [{ max: string, count: number }] };

const maxBlockTime = result.rows[0]?.max || ZERO_TIME_ISO_8601;
const count = Number(result.rows[0]?.count) || 0;

return {
maxBlockTime: result.rows[0].max || ZERO_TIME_ISO_8601,
count: Number(result.rows[0].count) || 0,
maxBlockTime,
count,
};
}

Expand All @@ -255,27 +268,3 @@ export async function findMostRecentPnlTickForEachAccount(
'subaccountId',
);
}

export async function findMostRecentPnlTickTimeForEachAccount(
createdOnOrAfterHeight: string,
): Promise<{
[subaccountId: string]: string
}> {
verifyAllInjectableVariables([createdOnOrAfterHeight]);

const result: {
rows: { subaccountId: string, createdAt: string }[]
} = await knexReadReplica.getConnection().raw(
`
SELECT DISTINCT ON ("subaccountId") "subaccountId", "createdAt"
FROM "pnl_ticks"
WHERE "blockHeight" >= '${createdOnOrAfterHeight}'
ORDER BY "subaccountId" ASC, "createdAt" DESC;
`,
) as unknown as { rows: { subaccountId: string, createdAt: string }[] };

return result.rows.reduce((acc, row) => {
acc[row.subaccountId] = row.createdAt;
return acc;
}, {} as { [subaccountId: string]: string });
}
9 changes: 9 additions & 0 deletions indexer/services/auxo/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ export async function handler(
// boolean flag used to determine if new kafka topics should be created
await runDbAndKafkaMigration(event.addNewKafkaTopics, lambda);

if (event.onlyRunDbMigrationAndCreateKafkaTopics) {
return {
statusCode: 200,
body: JSON.stringify({
message: 'success',
}),
};
}

// 3. Create new ECS Task Definition for ECS Services with new image
const taskDefinitionArnMap: TaskDefinitionArnMap = await createNewEcsTaskDefinitions(
ecs,
Expand Down
1 change: 1 addition & 0 deletions indexer/services/auxo/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export interface AuxoEventJson {
// In our naming we often times use the appreviated region name
regionAbbrev: string;
addNewKafkaTopics: boolean;
onlyRunDbMigrationAndCreateKafkaTopics: boolean;
}

// EcsServiceName to task definition arn mapping
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import { logger, stats } from '@dydxprotocol-indexer/base';
import {
BlockTable,
PnlTicksTable,
SubaccountTable,
TransferTable,
testMocks,
dbHelpers,
testConstants,
} from '@dydxprotocol-indexer/postgres';
import runTask from '../../src/tasks/pnl-instrumentation';
import { getMostRecentPnlTicksForEachAccount } from '../../src/helpers/pnl-ticks-helper';
import { DateTime } from 'luxon';
import config from '../../src/config';
import { asMock } from '@dydxprotocol-indexer/dev';

jest.mock('../../src/helpers/pnl-ticks-helper');

describe('pnl-instrumentation', () => {
beforeAll(async () => {
Expand Down Expand Up @@ -43,10 +47,18 @@ describe('pnl-instrumentation', () => {
{ id: 'subaccount2' },
] as any);

jest.spyOn(PnlTicksTable, 'findMostRecentPnlTickTimeForEachAccount').mockResolvedValue({
subaccount1: DateTime.utc().minus({ hours: 1 }).toISO(),
subaccount2: DateTime.utc().minus({ hours: 1 }).toISO(),
});
asMock(getMostRecentPnlTicksForEachAccount).mockImplementation(
async () => Promise.resolve({
subaccount1: {
...testConstants.defaultPnlTick,
blockTime: DateTime.utc().minus({ hours: 1 }).toISO(),
},
subaccount2: {
...testConstants.defaultPnlTick,
blockTime: DateTime.utc().minus({ hours: 1 }).toISO(),
},
}),
);

await runTask();

Expand All @@ -66,10 +78,18 @@ describe('pnl-instrumentation', () => {
{ id: 'subaccount2' },
] as any);

jest.spyOn(PnlTicksTable, 'findMostRecentPnlTickTimeForEachAccount').mockResolvedValue({
subaccount1: DateTime.utc().minus({ hours: 3 }).toISO(),
subaccount2: DateTime.utc().minus({ hours: 3 }).toISO(),
});
asMock(getMostRecentPnlTicksForEachAccount).mockImplementation(
async () => Promise.resolve({
subaccount1: {
...testConstants.defaultPnlTick,
blockTime: DateTime.utc().minus({ hours: 3 }).toISO(),
},
subaccount2: {
...testConstants.defaultPnlTick,
blockTime: DateTime.utc().minus({ hours: 3 }).toISO(),
},
}),
);

await runTask();

Expand All @@ -94,9 +114,14 @@ describe('pnl-instrumentation', () => {
{ id: 'subaccount2' },
] as any);

jest.spyOn(PnlTicksTable, 'findMostRecentPnlTickTimeForEachAccount').mockResolvedValue({
subaccount1: DateTime.utc().minus({ hours: 1 }).toISO(),
});
asMock(getMostRecentPnlTicksForEachAccount).mockImplementation(
async () => Promise.resolve({
subaccount1: {
...testConstants.defaultPnlTick,
blockTime: DateTime.utc().minus({ hours: 1 }).toISO(),
},
}),
);

jest.spyOn(TransferTable, 'getLastTransferTimeForSubaccounts').mockResolvedValue({
subaccount2: DateTime.utc().minus({ hours: 3 }).toISO(),
Expand Down
25 changes: 15 additions & 10 deletions indexer/services/roundtable/src/tasks/pnl-instrumentation.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { logger, stats } from '@dydxprotocol-indexer/base';
import {
BlockFromDatabase,
BlockTable,
PnlTicksTable,
BlockTable, PnlTicksCreateObject,
SubaccountFromDatabase,
SubaccountTable,
TransferTable,
} from '@dydxprotocol-indexer/postgres';
import { PnlTickForSubaccounts } from '@dydxprotocol-indexer/redis';
import _ from 'lodash';
import { DateTime } from 'luxon';

import config from '../config';
import { getMostRecentPnlTicksForEachAccount } from '../helpers/pnl-ticks-helper';

/**
* Instrument data on PNL to be used for analytics.
Expand All @@ -34,20 +36,23 @@ export default async function runTask(): Promise<void> {
(subaccount: SubaccountFromDatabase) => subaccount.id,
);

// Get the most recent PNL ticks for each subaccount in DB
const mostRecentPnlTicks: {
// Get the most recent PNL ticks for each subaccount from Redis
const mostRecentPnlTicks: PnlTickForSubaccounts = await getMostRecentPnlTicksForEachAccount();
const mostRecentPnlTickTimes:
{
[subaccountId: string]: string
} = await PnlTicksTable.findMostRecentPnlTickTimeForEachAccount(
'1',
} = _.mapValues(
mostRecentPnlTicks,
(pnlTick: PnlTicksCreateObject) => pnlTick.blockTime,
);

// Check last PNL computation for each subaccount
const stalePnlSubaccounts: string[] = [];
const subaccountsWithPnl: string[] = Object.keys(mostRecentPnlTicks);
const subaccountsWithPnl: string[] = Object.keys(mostRecentPnlTickTimes);
subaccountIds.forEach((id: string) => {
const lastPnlTick: string = mostRecentPnlTicks[id];
if (lastPnlTick) {
const lastPnlTime: DateTime = DateTime.fromISO(lastPnlTick);
const lastPnlTickTime: string = mostRecentPnlTickTimes[id];
if (lastPnlTickTime) {
const lastPnlTime: DateTime = DateTime.fromISO(lastPnlTickTime);
const hoursSinceLastPnl = startTaskTime.diff(lastPnlTime, 'hours').hours;

if (hoursSinceLastPnl >= 2) {
Expand Down

0 comments on commit 3c07400

Please sign in to comment.