Skip to content

Commit

Permalink
feat: refresh dynamic metadata tokens periodically (#64)
Browse files Browse the repository at this point in the history
* feat: first draft

* test: dynamic token refresh

* fix: test merge

* feat: specify from and to block heights for import

* feat: refresh dynamic tokens with ttl

* chore: remove last refresh column
  • Loading branch information
rafaelcr committed Jan 20, 2023
1 parent 9dba66c commit e1c0882
Show file tree
Hide file tree
Showing 11 changed files with 1,561 additions and 945 deletions.
3 changes: 3 additions & 0 deletions migrations/1670265062169_tokens.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,7 @@ export function up(pgm: MigrationBuilder): void {
'UNIQUE(smart_contract_id, token_number)'
);
pgm.createIndex('tokens', ['smart_contract_id']);
pgm.createIndex('tokens', 'COALESCE(updated_at, created_at)', {
where: "update_mode = 'dynamic'",
});
}
27 changes: 27 additions & 0 deletions migrations/1671125881755_chain-tip.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/* eslint-disable @typescript-eslint/naming-convention */
import { MigrationBuilder, ColumnDefinitions } from 'node-pg-migrate';

export const shorthands: ColumnDefinitions | undefined = undefined;

export function up(pgm: MigrationBuilder): void {
pgm.createTable('chain_tip', {
id: {
type: 'bool',
primaryKey: true,
default: true,
},
block_height: {
type: 'int',
notNull: true,
default: 0,
},
});
// Ensure only a single row can exist
pgm.addConstraint('chain_tip', 'chain_tip_one_row', 'CHECK(id)');
// Create the single row
pgm.sql('INSERT INTO chain_tip VALUES(DEFAULT)');
}

export function down(pgm: MigrationBuilder): void {
pgm.dropTable('chain_tip');
}
11 changes: 10 additions & 1 deletion src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ interface Env {
* before declaring the failure as a non-retryable error.
*/
METADATA_MAX_IMMEDIATE_URI_RETRIES: number;
/** Timeout period for a token metadata URL fetch (miliseconds) */
/** Timeout period for a token metadata URL fetch (milliseconds) */
METADATA_FETCH_TIMEOUT_MS: number;
/**
* The maximum number of bytes of metadata to fetch. If the fetch encounters more bytes than this
Expand All @@ -57,6 +57,11 @@ interface Env {
* Example: ./config/image-cache.js
*/
METADATA_IMAGE_CACHE_PROCESSOR: string;
/**
* How often will token metadata that is marked `dynamic` will be refreshed (seconds). See SIP-019
* for more information. Defaults to 86400 seconds (24 hours).
*/
METADATA_DYNAMIC_TOKEN_REFRESH_INTERVAL: number;

/** Whether or not the `JobQueue` will continue to try retryable failed jobs indefinitely. */
JOB_QUEUE_STRICT_MODE: boolean;
Expand Down Expand Up @@ -207,6 +212,10 @@ export function getEnvVars(): Env {
METADATA_IMAGE_CACHE_PROCESSOR: {
type: 'string',
},
METADATA_DYNAMIC_TOKEN_REFRESH_INTERVAL: {
type: 'number',
default: 86_400, // 24 hours
},
JOB_QUEUE_CONCURRENCY_LIMIT: {
type: 'number',
default: 5,
Expand Down
14 changes: 10 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@ async function initApp() {
const db = await PgStore.connect({ skipMigrations: false });
const apiDb = await PgBlockchainApiStore.connect();

if (process.env['NODE_ENV'] === 'production') {
if (process.env.NODE_ENV === 'production') {
new TokenProcessorMetrics({ db });
}

const contractImporter = new BlockchainImporter({ db, apiDb });
const lastObservedBlockHeight = (await db.getChainTipBlockHeight()) ?? 1;
const contractImporter = new BlockchainImporter({
db,
apiDb,
// Start importing from the last block height seen by this service.
startingBlockHeight: lastObservedBlockHeight,
});
registerShutdownConfig({
name: 'Contract Importer',
forceKillable: false,
Expand Down Expand Up @@ -71,10 +77,10 @@ async function initApp() {
},
});

// Start services in order.
// Start services.
await contractMonitor.start();
await contractImporter.import();
jobQueue.start();
await contractMonitor.start();
await apiServer.listen({ host: ENV.API_HOST, port: ENV.API_PORT });
}

Expand Down
33 changes: 31 additions & 2 deletions src/pg/blockchain-api/pg-blockchain-api-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ export interface BlockchainDbContractLog {
value: string;
}

export interface BlockchainDbBlock {
block_height: number;
block_hash: string;
index_block_hash: string;
}

/**
* Connects and queries the Stacks Blockchain API postgres DB.
*/
Expand All @@ -39,7 +45,8 @@ export class PgBlockchainApiStore extends BasePgStore {
}

getSmartContractsCursor(args: {
afterBlockHeight: number;
fromBlockHeight: number;
toBlockHeight: number;
}): AsyncIterable<BlockchainDbSmartContract[]> {
return this.sql<BlockchainDbSmartContract[]>`
SELECT * FROM (
Expand All @@ -48,7 +55,8 @@ export class PgBlockchainApiStore extends BasePgStore {
WHERE
canonical = TRUE
AND microblock_canonical = TRUE
AND block_height >= ${args.afterBlockHeight}
AND block_height >= ${args.fromBlockHeight}
AND block_height <= ${args.toBlockHeight}
AND abi <> '"null"'
ORDER BY contract_id, block_height DESC, microblock_sequence DESC
) AS contract_list
Expand Down Expand Up @@ -90,6 +98,27 @@ export class PgBlockchainApiStore extends BasePgStore {
}
}

async getBlock(args: { blockHash: string }): Promise<BlockchainDbBlock | undefined> {
const result = await this.sql<BlockchainDbBlock[]>`
SELECT block_height, block_hash, index_block_hash
FROM blocks
WHERE canonical = TRUE AND block_hash = ${args.blockHash}
LIMIT 1
`;
if (result.count) {
return result[0];
}
}

async getCurrentBlockHeight(): Promise<number | undefined> {
const result = await this.sql<{ block_height: number }[]>`
SELECT block_height FROM chain_tip LIMIT 1
`;
if (result.count) {
return result[0].block_height;
}
}

getSmartContractLogsByContractCursor(args: {
contractId: string;
}): AsyncIterable<BlockchainDbContractLog[]> {
Expand Down
42 changes: 27 additions & 15 deletions src/pg/pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,21 +84,6 @@ export class PgStore extends BasePgStore {
return result[0];
}

/**
* Retrieves the latest block height of imported contracts. Useful for when we want to only import
* remaining contracts from the Stacks chain.
* @returns Max block height
*/
async getSmartContractsMaxBlockHeight(): Promise<number | undefined> {
const result = await this.sql<{ max: number }[]>`
SELECT MAX(block_height) FROM smart_contracts;
`;
if (result.count === 0) {
return undefined;
}
return result[0].max;
}

async updateSmartContractTokenCount(args: { id: number; count: bigint }): Promise<void> {
await this.sql`
UPDATE smart_contracts SET token_count = ${args.count.toString()} WHERE id = ${args.id}
Expand Down Expand Up @@ -323,6 +308,33 @@ export class PgStore extends BasePgStore {
});
}

async updateChainTipBlockHeight(args: { blockHeight: number }): Promise<void> {
await this.sql`UPDATE chain_tip SET block_height = ${args.blockHeight}`;
}

async getChainTipBlockHeight(): Promise<number> {
const result = await this.sql<{ block_height: number }[]>`SELECT block_height FROM chain_tip`;
return result[0].block_height;
}

async enqueueDynamicTokensDueForRefresh(): Promise<void> {
const interval = ENV.METADATA_DYNAMIC_TOKEN_REFRESH_INTERVAL.toString();
await this.sql`
UPDATE jobs
SET status = 'pending', updated_at = NOW()
WHERE status IN ('done', 'failed') AND token_id = (
SELECT id FROM tokens
WHERE update_mode = 'dynamic'
AND CASE
WHEN ttl IS NOT NULL THEN
COALESCE(updated_at, created_at) < (NOW() - INTERVAL '1 seconds' * ttl)
ELSE
COALESCE(updated_at, created_at) < (NOW() - INTERVAL '${this.sql(interval)} seconds')
END
)
`;
}

/**
* Returns a token ETag based on its last updated date.
* @param contractPrincipal - smart contract principal
Expand Down
50 changes: 25 additions & 25 deletions src/token-processor/blockchain-api/blockchain-importer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ export class SmartContractImportInterruptedError extends Error {
export class BlockchainImporter {
private readonly db: PgStore;
private readonly apiDb: PgBlockchainApiStore;
private readonly startingBlockHeight: number;
private importInterruptWaiter: Waiter<void>;
private importInterrupted = false;
private importFinished = false;

constructor(args: { db: PgStore; apiDb: PgBlockchainApiStore }) {
constructor(args: { db: PgStore; apiDb: PgBlockchainApiStore; startingBlockHeight: number }) {
this.db = args.db;
this.apiDb = args.apiDb;
this.startingBlockHeight = args.startingBlockHeight;
this.importInterruptWaiter = waiter();
}

Expand All @@ -46,8 +48,9 @@ export class BlockchainImporter {
async import() {
while (!this.importFinished) {
try {
const afterBlockHeight = (await this.db.getSmartContractsMaxBlockHeight()) ?? 1;
await this.importSmartContracts(afterBlockHeight);
const currentBlockHeight = (await this.apiDb.getCurrentBlockHeight()) ?? 1;
await this.importSmartContracts(this.startingBlockHeight, currentBlockHeight);
// TODO: Import SIP-019 notifications.
this.importFinished = true;
} catch (error) {
if (isPgConnectionError(error)) {
Expand All @@ -70,40 +73,37 @@ export class BlockchainImporter {
* Scans the `smart_contracts` table in the Stacks Blockchain API postgres DB for every smart
* contract that exists in the blockchain. It then takes all of them which declare tokens and
* enqueues them for processing.
* @param afterBlockHeight - Minimum block height
* @param fromBlockHeight - Minimum block height
* @param toBlockHeight - Maximum block height
*/
private async importSmartContracts(afterBlockHeight: number) {
private async importSmartContracts(fromBlockHeight: number, toBlockHeight: number) {
logger.info(
`BlockchainImporter smart contract import starting at block height ${afterBlockHeight}`
`BlockchainImporter smart contract import at block heights ${fromBlockHeight} to ${toBlockHeight}`
);
const cursor = this.apiDb.getSmartContractsCursor({ afterBlockHeight });
const cursor = this.apiDb.getSmartContractsCursor({ fromBlockHeight, toBlockHeight });
for await (const rows of cursor) {
for (const row of rows) {
if (this.importInterrupted) {
// We've received a SIGINT, so stop the import and throw an error so we don't proceed with
// booting the rest of the service.
throw new SmartContractImportInterruptedError();
}
await this.doImportSmartContract(row);
const sip = getSmartContractSip(row.abi as ClarityAbi);
if (!sip) {
continue; // Not a token contract.
}
await this.db.insertAndEnqueueSmartContract({
values: {
principal: row.contract_id,
sip: sip,
abi: JSON.stringify(row.abi),
tx_id: row.tx_id,
block_height: row.block_height,
},
});
logger.info(`BlockchainImporter detected token contract (${sip}): ${row.contract_id}`);
}
}
logger.info(`BlockchainImporter smart contract import finished`);
}

protected async doImportSmartContract(contract: BlockchainDbSmartContract): Promise<void> {
const sip = getSmartContractSip(contract.abi as ClarityAbi);
if (!sip) {
return; // Not a token contract.
}
await this.db.insertAndEnqueueSmartContract({
values: {
principal: contract.contract_id,
sip: sip,
abi: JSON.stringify(contract.abi),
tx_id: contract.tx_id,
block_height: contract.block_height,
},
});
logger.info(`BlockchainImporter detected token contract (${sip}): ${contract.contract_id}`);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,18 @@ const PgSmartContractLogPayload = Type.Object({ txId: Type.String(), eventIndex:
const PgSmartContractPayloadLogCType = TypeCompiler.Compile(PgSmartContractLogPayload);
type PgSmartContractPayloadLogType = Static<typeof PgSmartContractLogPayload>;

const PgBlockPayload = Type.Object({ blockHash: Type.String() });
const PgBlockPayloadCType = TypeCompiler.Compile(PgBlockPayload);
type PgBlockPayloadType = Static<typeof PgBlockPayload>;

/**
* Listens for postgres notifications emitted from the API database when new contracts are deployed
* or contract logs are registered. It will analyze each of them to determine if:
* Listens for postgres notifications emitted from the API database when new contracts are deployed,
* contract logs are registered, or new blocks are produced. It will analyze each of them to
* determine if:
* - A new token contract needs indexing
* - A SIP-019 notifications calls for a token metadata refresh
* - A SIP-013 mint event declared a new SFT that needs metadata processing
* - `dynamic` token metadata needs to be refreshed.
*/
export class BlockchainSmartContractMonitor {
private readonly db: PgStore;
Expand Down Expand Up @@ -84,6 +90,15 @@ export class BlockchainSmartContractMonitor {
}
}
break;
case 'blockUpdate':
if (PgBlockPayloadCType.Check(messageJson.payload)) {
try {
await this.handleBlock(messageJson.payload);
} catch (error) {
logger.error(`BlockchainSmartContractMonitor error handling block`, error);
}
}
break;
default:
break;
}
Expand Down Expand Up @@ -145,4 +160,15 @@ export class BlockchainSmartContractMonitor {
}
}
}

private async handleBlock(payload: PgBlockPayloadType) {
const block = await this.apiDb.getBlock({ blockHash: payload.blockHash });
if (!block) {
return;
}
// Keep latest observed block height so we can know the last synchronization point for this
// service.
await this.db.updateChainTipBlockHeight({ blockHeight: block.block_height });
await this.db.enqueueDynamicTokensDueForRefresh();
}
}
Loading

0 comments on commit e1c0882

Please sign in to comment.