Skip to content

Commit

Permalink
Merge branch 'main' into tra417
Browse files Browse the repository at this point in the history
  • Loading branch information
shrenujb committed Jun 26, 2024
2 parents bbd93c9 + 75829b3 commit 020b8b6
Show file tree
Hide file tree
Showing 91 changed files with 3,242 additions and 2,188 deletions.
1 change: 1 addition & 0 deletions indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,5 @@ Other example subscription events:
{ "type": "subscribe", "channel": "v4_markets" }
{ "type": "subscribe", "channel": "v4_orderbook", "id": "BTC-USD" }
{ "type": "subscribe", "channel": "v4_subaccounts", "id": "address/0" }
{ "type": "subscribe", "channel": "v4_block_height" }
```
3 changes: 2 additions & 1 deletion indexer/docker-compose-local-deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ services:
to-websockets-subaccounts:1:1,\
to-websockets-trades:1:1,\
to-websockets-markets:1:1,\
to-websockets-candles:1:1"
to-websockets-candles:1:1,\
to-websockets-block-height:1:1"
KAFKA_LISTENERS: INTERNAL://:9092,EXTERNAL_SAME_HOST://:29092
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_SAME_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_SAME_HOST:PLAINTEXT
Expand Down
5 changes: 3 additions & 2 deletions indexer/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ services:
environment:
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS:
KAFKA_CREATE_TOPICS:
"to-ender:1:1,\
to-vulcan:1:1,\
to-websockets-orderbooks:1:1,\
to-websockets-subaccounts:1:1,\
to-websockets-trades:1:1,\
to-websockets-markets:1:1,\
to-websockets-candles:1:1"
to-websockets-candles:1:1,\
to-websockets-block-height:1:1"
postgres-test:
build:
context: .
Expand Down
1 change: 1 addition & 0 deletions indexer/packages/kafka/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export const SUBACCOUNTS_WEBSOCKET_MESSAGE_VERSION: string = '3.0.0';
export const TRADES_WEBSOCKET_MESSAGE_VERSION: string = '2.1.0';
export const MARKETS_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0';
export const CANDLES_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0';
export const BLOCK_HEIGHT_WEBSOCKET_MESSAGE_VERSION: string = '1.0.0';
2 changes: 2 additions & 0 deletions indexer/packages/kafka/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export enum WebsocketTopics {
TO_WEBSOCKETS_TRADES = 'to-websockets-trades',
TO_WEBSOCKETS_MARKETS = 'to-websockets-markets',
TO_WEBSOCKETS_CANDLES = 'to-websockets-candles',
TO_WEBSOCKETS_BLOCK_HEIGHT = 'to-websockets-block-height',
}

export enum KafkaTopics {
Expand All @@ -14,4 +15,5 @@ export enum KafkaTopics {
TO_WEBSOCKETS_TRADES = 'to-websockets-trades',
TO_WEBSOCKETS_MARKETS = 'to-websockets-markets',
TO_WEBSOCKETS_CANDLES = 'to-websockets-candles',
TO_WEBSOCKETS_BLOCK_HEIGHT = 'to-websockets-block-height',
}
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,30 @@ export interface CandleMessageSDKType {

version: string;
}
/** Message to be sent through the 'to-websockets-block-height` kafka topic. */

export interface BlockHeightMessage {
/** Block height where the contents occur. */
blockHeight: string;
/** ISO formatted time of the block height. */

time: string;
/** Version of the websocket message. */

version: string;
}
/** Message to be sent through the 'to-websockets-block-height` kafka topic. */

export interface BlockHeightMessageSDKType {
/** Block height where the contents occur. */
block_height: string;
/** ISO formatted time of the block height. */

time: string;
/** Version of the websocket message. */

version: string;
}

function createBaseOrderbookMessage(): OrderbookMessage {
return {
Expand Down Expand Up @@ -629,4 +653,69 @@ export const CandleMessage = {
return message;
}

};

function createBaseBlockHeightMessage(): BlockHeightMessage {
return {
blockHeight: "",
time: "",
version: ""
};
}

export const BlockHeightMessage = {
encode(message: BlockHeightMessage, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.blockHeight !== "") {
writer.uint32(10).string(message.blockHeight);
}

if (message.time !== "") {
writer.uint32(18).string(message.time);
}

if (message.version !== "") {
writer.uint32(26).string(message.version);
}

return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): BlockHeightMessage {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseBlockHeightMessage();

while (reader.pos < end) {
const tag = reader.uint32();

switch (tag >>> 3) {
case 1:
message.blockHeight = reader.string();
break;

case 2:
message.time = reader.string();
break;

case 3:
message.version = reader.string();
break;

default:
reader.skipType(tag & 7);
break;
}
}

return message;
},

fromPartial(object: DeepPartial<BlockHeightMessage>): BlockHeightMessage {
const message = createBaseBlockHeightMessage();
message.blockHeight = object.blockHeight ?? "";
message.time = object.time ?? "";
message.version = object.version ?? "";
return message;
}

};
109 changes: 108 additions & 1 deletion indexer/packages/v4-protos/src/codegen/dydxprotocol/vault/genesis.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,55 @@
import { Params, ParamsSDKType } from "./params";
import { VaultId, VaultIdSDKType, NumShares, NumSharesSDKType } from "./vault";
import { OwnerShare, OwnerShareSDKType } from "./query";
import * as _m0 from "protobufjs/minimal";
import { DeepPartial } from "../../helpers";
/** GenesisState defines `x/vault`'s genesis state. */

export interface GenesisState {
/** The parameters of the module. */
params?: Params;
/** The vaults. */

vaults: Vault[];
}
/** GenesisState defines `x/vault`'s genesis state. */

export interface GenesisStateSDKType {
/** The parameters of the module. */
params?: ParamsSDKType;
/** The vaults. */

vaults: VaultSDKType[];
}
/** Vault defines the total shares and owner shares of a vault. */

export interface Vault {
/** The ID of the vault. */
vaultId?: VaultId;
/** The total number of shares in the vault. */

totalShares?: NumShares;
/** The shares of each owner in the vault. */

ownerShares: OwnerShare[];
}
/** Vault defines the total shares and owner shares of a vault. */

export interface VaultSDKType {
/** The ID of the vault. */
vault_id?: VaultIdSDKType;
/** The total number of shares in the vault. */

total_shares?: NumSharesSDKType;
/** The shares of each owner in the vault. */

owner_shares: OwnerShareSDKType[];
}

function createBaseGenesisState(): GenesisState {
return {
params: undefined
params: undefined,
vaults: []
};
}

Expand All @@ -26,6 +59,10 @@ export const GenesisState = {
Params.encode(message.params, writer.uint32(10).fork()).ldelim();
}

for (const v of message.vaults) {
Vault.encode(v!, writer.uint32(18).fork()).ldelim();
}

return writer;
},

Expand All @@ -42,6 +79,10 @@ export const GenesisState = {
message.params = Params.decode(reader, reader.uint32());
break;

case 2:
message.vaults.push(Vault.decode(reader, reader.uint32()));
break;

default:
reader.skipType(tag & 7);
break;
Expand All @@ -54,6 +95,72 @@ export const GenesisState = {
fromPartial(object: DeepPartial<GenesisState>): GenesisState {
const message = createBaseGenesisState();
message.params = object.params !== undefined && object.params !== null ? Params.fromPartial(object.params) : undefined;
message.vaults = object.vaults?.map(e => Vault.fromPartial(e)) || [];
return message;
}

};

function createBaseVault(): Vault {
return {
vaultId: undefined,
totalShares: undefined,
ownerShares: []
};
}

export const Vault = {
encode(message: Vault, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
if (message.vaultId !== undefined) {
VaultId.encode(message.vaultId, writer.uint32(10).fork()).ldelim();
}

if (message.totalShares !== undefined) {
NumShares.encode(message.totalShares, writer.uint32(18).fork()).ldelim();
}

for (const v of message.ownerShares) {
OwnerShare.encode(v!, writer.uint32(26).fork()).ldelim();
}

return writer;
},

decode(input: _m0.Reader | Uint8Array, length?: number): Vault {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseVault();

while (reader.pos < end) {
const tag = reader.uint32();

switch (tag >>> 3) {
case 1:
message.vaultId = VaultId.decode(reader, reader.uint32());
break;

case 2:
message.totalShares = NumShares.decode(reader, reader.uint32());
break;

case 3:
message.ownerShares.push(OwnerShare.decode(reader, reader.uint32()));
break;

default:
reader.skipType(tag & 7);
break;
}
}

return message;
},

fromPartial(object: DeepPartial<Vault>): Vault {
const message = createBaseVault();
message.vaultId = object.vaultId !== undefined && object.vaultId !== null ? VaultId.fromPartial(object.vaultId) : undefined;
message.totalShares = object.totalShares !== undefined && object.totalShares !== null ? NumShares.fromPartial(object.totalShares) : undefined;
message.ownerShares = object.ownerShares?.map(e => OwnerShare.fromPartial(e)) || [];
return message;
}

Expand Down
7 changes: 7 additions & 0 deletions indexer/services/auxo/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ export const BAZOOKA_DB_MIGRATION_PAYLOAD: Uint8Array = new TextEncoder().encode
}),
);

export const BAZOOKA_DB_MIGRATION_AND_CREATE_KAFKA_PAYLOAD: Uint8Array = new TextEncoder().encode(
JSON.stringify({
migrate: true,
create_kafka_topics: true,
}),
);

export const ECS_SERVICE_NAMES: EcsServiceNames[] = [
EcsServiceNames.COMLINK,
EcsServiceNames.ENDER,
Expand Down
16 changes: 11 additions & 5 deletions indexer/services/auxo/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import _ from 'lodash';

import config from './config';
import {
BAZOOKA_DB_MIGRATION_AND_CREATE_KAFKA_PAYLOAD,
BAZOOKA_DB_MIGRATION_PAYLOAD,
BAZOOKA_LAMBDA_FUNCTION_NAME,
ECS_SERVICE_NAMES,
Expand All @@ -40,7 +41,7 @@ import { AuxoEventJson, EcsServiceNames, TaskDefinitionArnMap } from './types';
/**
* Upgrades all services and run migrations
* 1. Upgrade Bazooka
* 2. Run db migration in Bazooka
* 2. Run db migration in Bazooka, and update kafka topics
* 3. Create new ECS Task Definition for ECS Services with new image
* 4. Upgrade all ECS Services (comlink, ender, roundtable, socks, vulcan)
*/
Expand All @@ -66,8 +67,9 @@ export async function handler(
// 1. Upgrade Bazooka
await upgradeBazooka(lambda, ecr, event);

// 2. Run db migration in Bazooka
await runDbMigration(lambda);
// 2. Run db migration in Bazooka,
// boolean flag used to determine if new kafka topics should be created
await runDbAndKafkaMigration(event.addNewKafkaTopics, lambda);

// 3. Create new ECS Task Definition for ECS Services with new image
const taskDefinitionArnMap: TaskDefinitionArnMap = await createNewEcsTaskDefinitions(
Expand Down Expand Up @@ -192,16 +194,20 @@ async function getImageDetail(

}

async function runDbMigration(
async function runDbAndKafkaMigration(
createNewKafkaTopics: boolean,
lambda: ECRClient,
): Promise<void> {
logger.info({
at: 'index#runDbMigration',
message: 'Running db migration',
});
const payload = createNewKafkaTopics
? BAZOOKA_DB_MIGRATION_AND_CREATE_KAFKA_PAYLOAD
: BAZOOKA_DB_MIGRATION_PAYLOAD;
const response: InvokeCommandOutput = await lambda.send(new InvokeCommand({
FunctionName: BAZOOKA_LAMBDA_FUNCTION_NAME,
Payload: BAZOOKA_DB_MIGRATION_PAYLOAD,
Payload: payload,
// RequestResponse means that the lambda is synchronously invoked
InvocationType: 'RequestResponse',
}));
Expand Down
1 change: 1 addition & 0 deletions indexer/services/auxo/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export interface AuxoEventJson {
region: string;
// In our naming we often times use the appreviated region name
regionAbbrev: string;
addNewKafkaTopics: boolean;
}

// EcsServiceName to task definition arn mapping
Expand Down
Loading

0 comments on commit 020b8b6

Please sign in to comment.