Skip to content

Commit

Permalink
fix: reuse q in Queue
Browse files Browse the repository at this point in the history
  • Loading branch information
classicalliu committed Jul 29, 2019
1 parent 881b845 commit 87bbf73
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 30 deletions.
41 changes: 18 additions & 23 deletions packages/neuron-wallet/src/services/sync/block-listener.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BehaviorSubject } from 'rxjs'
import { BehaviorSubject, Subscription } from 'rxjs'
import NodeService from '../node'
import Queue from './queue'
import RangeForCheck from './range-for-check'
Expand All @@ -12,6 +12,7 @@ export default class BlockListener {
private rangeForCheck: RangeForCheck
private currentBlockNumber: BlockNumber
private interval: number = 5000
private tipNumberListener: Subscription

constructor(
lockHashes: string[],
Expand All @@ -21,7 +22,7 @@ export default class BlockListener {
this.currentBlockNumber = new BlockNumber()
this.rangeForCheck = new RangeForCheck()

tipNumberSubject.subscribe(async num => {
this.tipNumberListener = tipNumberSubject.subscribe(async num => {
if (num) {
this.tipBlockNumber = parseInt(num, 10)
}
Expand Down Expand Up @@ -50,11 +51,11 @@ export default class BlockListener {
}

public stop = async () => {
this.tipNumberListener.unsubscribe()
if (!this.queue) {
return
}
await this.queue.kill()
await this.queue.get().remove(() => true)
this.queue = null
}

Expand All @@ -74,10 +75,7 @@ export default class BlockListener {
}

public regenerate = async (): Promise<void> => {
if (this.queue) {
if (this.queue.get().length() <= 0) {
this.queue = undefined
}
if (this.queue && this.queue.get().length() > 0) {
return
}

Expand All @@ -86,29 +84,26 @@ export default class BlockListener {
const endBlockNumber: string = this.tipBlockNumber.toString()

// TODO: check this queue stopped
const queue: Queue | undefined = this.generateQueue(startBlockNumber, endBlockNumber)

if (queue) {
queue.process()
}
this.generateQueue(startBlockNumber, endBlockNumber)
}

public generateQueue = (startBlockNumber: string, endBlockNumber: string): Queue | undefined => {
if (BigInt(startBlockNumber) > BigInt(endBlockNumber)) {
return undefined
}

this.queue = new Queue(
this.lockHashes,
startBlockNumber,
endBlockNumber,
this.currentBlockNumber,
this.rangeForCheck
)

this.queue.get().drain(() => {
this.queue = undefined
})
if (!this.queue) {
this.queue = new Queue(
this.lockHashes,
startBlockNumber,
endBlockNumber,
this.currentBlockNumber,
this.rangeForCheck
)
this.queue.process()
} else {
this.queue.reset(startBlockNumber, endBlockNumber)
}

return this.queue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ export default class QueueAdapter {
this.q.remove(callback)
}

public removeAll = () => {
this.remove(() => true)
}

public pause = () => {
this.q.pause()
}
Expand Down
24 changes: 17 additions & 7 deletions packages/neuron-wallet/src/services/sync/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import RangeForCheck from './range-for-check'
import BlockNumber from './block-number'
import Utils from './utils'
import TransactionsService from '../transactions'
import QueueAdapter from '../queue-adapter'
import QueueAdapter from './queue-adapter'

export default class Queue {
private q: any
Expand Down Expand Up @@ -39,10 +39,8 @@ export default class Queue {
this.q = new QueueAdapter(this.getWorker(), this.concurrent)
}

private regenerateQueue = async () => {
this.kill()
await Utils.sleep(3000)
this.generateQueue()
public cleanQueue = () => {
this.q.removeAll()
}

public setLockHashes = (lockHashes: string[]): void => {
Expand Down Expand Up @@ -78,7 +76,7 @@ export default class Queue {

public kill = () => {
this.q.kill()
this.q.remove(() => true)
this.q.removeAll()
}

public pipeline = async (blockNumbers: string[]) => {
Expand Down Expand Up @@ -109,7 +107,7 @@ export default class Queue {
await this.currentBlockNumber.updateCurrent(BigInt(rangeFirstBlockHeader.number))
await this.rangeForCheck.setRange([])
await TransactionsService.deleteWhenFork(rangeFirstBlockHeader.number)
await this.regenerateQueue()
await this.cleanQueue()
this.startBlockNumber = await this.currentBlockNumber.getCurrent()
this.batchPush()
} else if (checkResult.type === 'block-headers-not-match') {
Expand All @@ -133,6 +131,18 @@ export default class Queue {
})
}

public reset = (startBlockNumber: string, endBlockNumber: string) => {
this.startBlockNumber = BigInt(startBlockNumber)
this.endBlockNumber = BigInt(endBlockNumber)

if (this.startBlockNumber > this.endBlockNumber) {
return
}

this.q.removeAll()
this.batchPush()
}

public process = () => {
if (this.startBlockNumber > this.endBlockNumber) {
return undefined
Expand Down

0 comments on commit 87bbf73

Please sign in to comment.