Skip to content

Commit

Permalink
Merge pull request #790 from nervosnetwork/refactor-q
Browse files Browse the repository at this point in the history
refactor: clean and refactor queue
  • Loading branch information
classicalliu authored Jul 31, 2019
2 parents c185835 + 711bef5 commit 8ba07f4
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 88 deletions.
28 changes: 10 additions & 18 deletions packages/neuron-wallet/src/services/sync/block-listener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import Utils from './utils'

export default class BlockListener {
private lockHashes: string[]
private tipBlockNumber: number = -1
private tipBlockNumber: bigint = BigInt(-1)
private queue: Queue | undefined | null = undefined
private rangeForCheck: RangeForCheck
private currentBlockNumber: BlockNumber
private interval: number = 5000
private tipNumberListener: Subscription
private stopped = false

constructor(
lockHashes: string[],
Expand All @@ -24,7 +25,7 @@ export default class BlockListener {

this.tipNumberListener = tipNumberSubject.subscribe(async num => {
if (num) {
this.tipBlockNumber = parseInt(num, 10)
this.tipBlockNumber = BigInt(num)
}
})
}
Expand All @@ -44,46 +45,37 @@ export default class BlockListener {
if (restart) {
await this.currentBlockNumber.updateCurrent(BigInt(0))
}
while (this.queue !== null) {
while (!this.stopped) {
await this.regenerate()
await Utils.sleep(this.interval)
}
}

public stop = async () => {
this.stopped = true
this.tipNumberListener.unsubscribe()
if (!this.queue) {
return
}
await this.queue.kill()
this.queue = null
}

public pause = () => {
if (!this.queue) {
return
}
this.queue.pause()
}

public resume = () => {
if (!this.queue) {
return
public drain = async () => {
if (this.queue) {
return this.queue.drain()
}

this.queue.resume()
return undefined
}

public regenerate = async (): Promise<void> => {
if (this.queue && this.queue.get().length() > 0) {
if (this.queue && this.queue.length() > 0) {
return
}

const current = await this.currentBlockNumber.getCurrent()
const startBlockNumber: string = (current + BigInt(1)).toString()
const endBlockNumber: string = this.tipBlockNumber.toString()

// TODO: check this queue stopped
this.generateQueue(startBlockNumber, endBlockNumber)
}

Expand Down
27 changes: 7 additions & 20 deletions packages/neuron-wallet/src/services/sync/queue-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import async from 'async'
export default class QueueAdapter {
private q: any

constructor(worker: async.AsyncWorker<any, Error>, concurrency?: number | undefined) {
constructor(worker: async.AsyncWorker<any, Error>, concurrency: number) {
this.q = async.queue(worker, concurrency)
}

Expand All @@ -16,33 +16,20 @@ export default class QueueAdapter {
}

public kill = () => {
this.push = () => {}
this.pause()
this.remove(() => true)
this.push = (value: any) => value
this.clear()
this.q.kill()
}

public remove = (callback: any) => {
this.q.remove(callback)
}

public removeAll = () => {
this.remove(() => true)
}

public pause = () => {
this.q.pause()
}

public resume = () => {
this.q.resume()
public clear = () => {
this.q.remove(() => true)
}

public length = (): number => {
return this.q.length()
}

public drain = (callback: any) => {
this.q.drain(callback)
public drain = async () => {
return this.q.drain()
}
}
65 changes: 23 additions & 42 deletions packages/neuron-wallet/src/services/sync/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import QueueAdapter from './queue-adapter'
import { TransactionPersistor } from '../tx'

export default class Queue {
private q: any
private q: QueueAdapter
private concurrent: number = 1
private lockHashes: string[]
private getBlocksService: GetBlocks
Expand All @@ -26,7 +26,7 @@ export default class Queue {
currentBlockNumber: BlockNumber = new BlockNumber(),
rangeForCheck: RangeForCheck = new RangeForCheck()
) {
this.generateQueue()
this.q = new QueueAdapter(this.getWorker(), this.concurrent)
this.lockHashes = lockHashes
this.getBlocksService = new GetBlocks()
this.startBlockNumber = BigInt(startBlockNumber)
Expand All @@ -35,14 +35,6 @@ export default class Queue {
this.currentBlockNumber = currentBlockNumber
}

private generateQueue = () => {
this.q = new QueueAdapter(this.getWorker(), this.concurrent)
}

public cleanQueue = () => {
this.q.removeAll()
}

public setLockHashes = (lockHashes: string[]): void => {
this.lockHashes = lockHashes
}
Expand All @@ -53,30 +45,32 @@ export default class Queue {
await Utils.retry(this.retryTime, 0, async () => {
await this.pipeline(task.blockNumbers)
})
await callback()
} catch {
this.q.kill()
this.q.remove(() => true)
this.clear()
}
await callback()
}
return worker
}

public get = () => {
return this.q
public clear = () => {
this.q.clear()
}

public pause = () => {
this.q.pause()
public get = () => {
return this.q
}

public resume = () => {
this.q.resume()
public length = (): number => {
return this.q.length()
}

public kill = () => {
this.q.kill()
this.q.removeAll()
}

public drain = async () => {
return this.q.drain()
}

public pipeline = async (blockNumbers: string[]) => {
Expand Down Expand Up @@ -105,9 +99,9 @@ export default class Queue {
const range = await this.rangeForCheck.getRange()
const rangeFirstBlockHeader: BlockHeader = range[0]
await this.currentBlockNumber.updateCurrent(BigInt(rangeFirstBlockHeader.number))
await this.rangeForCheck.setRange([])
await this.rangeForCheck.clearRange()
await TransactionPersistor.deleteWhenFork(rangeFirstBlockHeader.number)
await this.cleanQueue()
await this.clear()
this.startBlockNumber = await this.currentBlockNumber.getCurrent()
this.batchPush()
} else if (checkResult.type === 'block-headers-not-match') {
Expand All @@ -117,17 +111,17 @@ export default class Queue {
}
}

public push = async (blockNumbers: string[]): Promise<void> => {
await this.q.push({ blockNumbers })
public push = (blockNumbers: string[]): void => {
this.q.push({ blockNumbers })
}

public batchPush = (): void => {
const rangeArr = this.range(this.startBlockNumber, this.endBlockNumber)
const rangeArr = Utils.rangeForBigInt(this.startBlockNumber, this.endBlockNumber)

const slice = this.eachSlice(rangeArr, this.fetchSize)
const slice = Utils.eachSlice(rangeArr, this.fetchSize)

slice.forEach(async arr => {
await this.push(arr)
slice.forEach(arr => {
this.push(arr)
})
}

Expand All @@ -139,7 +133,7 @@ export default class Queue {
return
}

this.q.removeAll()
this.clear()
this.batchPush()
}

Expand All @@ -149,17 +143,4 @@ export default class Queue {
}
return this.batchPush()
}

private eachSlice = (array: any[], size: number) => {
const arr = []
for (let i = 0, l = array.length; i < l; i += size) {
arr.push(array.slice(i, i + size))
}
return arr
}

private range = (startNumber: bigint, endNumber: bigint): bigint[] => {
const size = +(endNumber - startNumber + BigInt(1)).toString()
return [...Array(size).keys()].map(i => BigInt(i) + startNumber)
}
}
4 changes: 4 additions & 0 deletions packages/neuron-wallet/src/services/sync/range-for-check.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ export default class RangeForCheck {
this.range = range
}

public clearRange = () => {
this.range = []
}

public pushRange = (range: BlockHeader[]) => {
if (range.length <= 0) {
return
Expand Down
11 changes: 7 additions & 4 deletions packages/neuron-wallet/src/services/sync/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ export default class Utils {
}

public static range = (startNumber: string, endNumber: string): string[] => {
const startNumberInt = BigInt(startNumber)
const endNumberInt = BigInt(endNumber)
const size = +(endNumberInt - startNumberInt + BigInt(1)).toString()
return [...Array(size).keys()].map(i => (BigInt(i) + startNumber).toString())
const result = Utils.rangeForBigInt(BigInt(startNumber), BigInt(endNumber))
return result.map(num => num.toString())
}

public static rangeForBigInt = (startNumber: bigint, endNumber: bigint): bigint[] => {
const size = +(endNumber - startNumber + BigInt(1)).toString()
return [...Array(size).keys()].map(i => BigInt(i) + startNumber)
}

public static sleep = (ms: number) => {
Expand Down
7 changes: 3 additions & 4 deletions packages/neuron-wallet/src/startup/sync-block-task/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import BlockListener from '../../services/sync/block-listener'
import { NetworkWithID } from '../../services/networks'
import { initDatabase } from './init-database'
import { register as registerTxStatusListener } from '../../listeners/tx-status'
import Utils from '../../services/sync/utils'

import { register as registerAddressListener } from '../../listeners/address'

Expand Down Expand Up @@ -57,16 +56,16 @@ export const switchNetwork = async () => {
})

const regenerateListener = async () => {
await blockListener.stop()
// wait former queue to be drained
await blockListener.drain()
const hashes: string[] = await loadAddressesAndConvert()
blockListener = new BlockListener(hashes, nodeService.tipNumberSubject)
await blockListener.start(true)
}

walletCreatedSubject.subscribe(async (type: string) => {
if (type === 'import') {
await blockListener.stop()
// wait former queue to be drained
await Utils.sleep(3000)
await regenerateListener()
}
})
Expand Down

0 comments on commit 8ba07f4

Please sign in to comment.