Skip to content

Commit

Permalink
feat: impl sync by indexer RPCs
Browse files Browse the repository at this point in the history
  • Loading branch information
classicalliu committed Aug 12, 2019
1 parent 71a4d61 commit 16fa2a4
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 1 deletion.
45 changes: 45 additions & 0 deletions packages/neuron-wallet/src/services/indexer/indexer-rpc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import Core from '@nervosnetwork/ckb-sdk-core'

import { NetworkWithID } from 'services/networks'
import { networkSwitchSubject } from 'services/sync/renderer-params'

let core: Core
networkSwitchSubject.subscribe((network: NetworkWithID | undefined) => {
if (network) {
core = new Core(network.remote)
}
})

export default class IndexerRPC {
public deindexLockHash = async (lockHash: string) => {
return core.rpc.deindexLockHash(lockHash)
}

public indexLockHash = async (lockHash: string, indexFrom = '0') => {
return core.rpc.indexLockHash(lockHash, indexFrom)
}

public getTransactionByLockHash = async (
lockHash: string,
page: string,
per: string,
reverseOrder: boolean = false
) => {
const result = await core.rpc.getTransactionsByLockHash(lockHash, page, per, reverseOrder)
return result
}

public getLockHashIndexStates = async () => {
return core.rpc.getLockHashIndexStates()
}

public getLiveCellsByLockHash = async (
lockHash: string,
page: string,
per: string,
reverseOrder: boolean = false
) => {
const result = await core.rpc.getLiveCellsByLockHash(lockHash, page, per, reverseOrder)
return result
}
}
143 changes: 143 additions & 0 deletions packages/neuron-wallet/src/services/indexer/queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import Utils from 'services/sync/utils'
import logger from 'utils/logger'
import GetBlocks from 'services/sync/get-blocks'
import { Transaction } from 'types/cell-types'
import TypeConvert from 'types/type-convert'
import BlockNumber from 'services/sync/block-number'

import IndexerRPC from './indexer-rpc'
import TransactionPersistor from '../tx/transaction-persistor'

export default class Queue {
private lockHashes: string[]
private indexerRPC: IndexerRPC
private getBlocksService: GetBlocks
private per = 50
private interval = 5000
private blockNumberService: BlockNumber

private stopped = false
private indexed = false

private inProcess = false

constructor(lockHashes: string[]) {
this.lockHashes = lockHashes
this.indexerRPC = new IndexerRPC()
this.getBlocksService = new GetBlocks()
this.blockNumberService = new BlockNumber()
}

public setLockHashes = (lockHashes: string[]): void => {
this.lockHashes = lockHashes
this.indexed = false
}

/* eslint no-await-in-loop: "off" */
/* eslint no-restricted-syntax: "off" */
public start = async () => {
while (!this.stopped) {
try {
this.inProcess = true
const { lockHashes } = this
if (!this.indexed) {
await this.indexLockHashes(lockHashes)
this.indexed = true
}
const currentBlockNumber = await this.getCurrentBlockNumber(lockHashes)
for (const lockHash of lockHashes) {
await this.pipeline(lockHash, 'createdBy')
}
for (const lockHash of lockHashes) {
await this.pipeline(lockHash, 'consumedBy')
}
if (currentBlockNumber) {
await this.blockNumberService.updateCurrent(currentBlockNumber)
}
await this.yield(this.interval)
} catch (err) {
logger.error('sync indexer error:', err)
} finally {
await this.yield()
this.inProcess = false
}
}
}

public getCurrentBlockNumber = async (lockHashes: string[]) => {
// get lock hash indexer status
const lockHashIndexStates = await this.indexerRPC.getLockHashIndexStates()
const blockNumbers = lockHashIndexStates
.filter(state => lockHashes.includes(state.lockHash))
.map(state => state.blockNumber)
const uniqueBlockNumbers = [...new Set(blockNumbers)]
const blockNumbersBigInt = uniqueBlockNumbers.map(num => BigInt(num))
const minBlockNumber = blockNumbersBigInt.sort()[0]
return minBlockNumber
}

public indexLockHashes = async (lockHashes: string[]) => {
await Utils.mapSeries(lockHashes, async (lockHash: string) => {
await this.indexerRPC.indexLockHash(lockHash)
})
}

// type: 'createdBy' | 'consumedBy'
public pipeline = async (lockHash: string, type: string) => {
let page = 0
let stopped = false
while (!stopped) {
const txs = await this.indexerRPC.getTransactionByLockHash(lockHash, page.toString(), this.per.toString())
if (txs.length < this.per) {
stopped = true
}
for (const tx of txs) {
let txPoint: CKBComponents.TransactionPoint | null = null
if (type === 'createdBy') {
txPoint = tx.createdBy
} else if (type === 'consumedBy') {
txPoint = tx.consumedBy
}
if (txPoint) {
const transactionWithStatus = await this.getBlocksService.getTransaction(txPoint.txHash)
const ckbTransaction: CKBComponents.Transaction = transactionWithStatus.transaction
const transaction: Transaction = TypeConvert.toTransaction(ckbTransaction)
// tx timestamp / blockNumber / blockHash
const { blockHash } = transactionWithStatus.txStatus
if (blockHash) {
const blockHeader = await this.getBlocksService.getHeader(blockHash)
transaction.blockHash = blockHash
transaction.blockNumber = blockHeader.number
transaction.timestamp = blockHeader.timestamp
}
await TransactionPersistor.saveFetchTx(transaction)
}
}
page += 1
}
}

public stop = () => {
this.stopped = true
}

public waitForDrained = async (timeout: number = 5000) => {
const startAt: number = +new Date()
while (this.inProcess) {
const now: number = +new Date()
if (now - startAt > timeout) {
return
}
await this.yield(50)
}
}

public stopAndWait = async () => {
this.stop()
await this.waitForDrained()
}

private yield = async (millisecond: number = 1) => {
await Utils.sleep(millisecond)
}
}
12 changes: 11 additions & 1 deletion packages/neuron-wallet/src/services/sync/get-blocks.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Core from '@nervosnetwork/ckb-sdk-core'

import { Block } from 'types/cell-types'
import { Block, BlockHeader } from 'types/cell-types'
import TypeConvert from 'types/type-convert'
import { NetworkWithID } from 'services/networks'
import CheckAndSave from './check-and-save'
Expand Down Expand Up @@ -55,6 +55,16 @@ export default class GetBlocks {
return block
}

public getTransaction = async (hash: string): Promise<CKBComponents.TransactionWithStatus> => {
const tx = await core.rpc.getTransaction(hash)
return tx
}

public getHeader = async (hash: string): Promise<BlockHeader> => {
const result = await core.rpc.getHeader(hash)
return TypeConvert.toBlockHeader(result)
}

public static getBlockByNumber = async (num: string): Promise<Block> => {
const block = await core.rpc.getBlockByNumber(num)
return TypeConvert.toBlock(block)
Expand Down

0 comments on commit 16fa2a4

Please sign in to comment.