Skip to content

Commit

Permalink
feat: auto switch indexer or common sync
Browse files Browse the repository at this point in the history
  • Loading branch information
classicalliu committed Aug 12, 2019
1 parent 16fa2a4 commit ece382f
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 61 deletions.
44 changes: 44 additions & 0 deletions packages/neuron-wallet/src/startup/sync-block-task/indexer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { remote } from 'electron'
import AddressService from 'services/addresses'
import LockUtils from 'models/lock-utils'
import IndexerQueue from 'services/indexer/queue'

import { initDatabase } from './init-database'

const { addressDbChangedSubject } = remote.require('./startup/sync-block-task/params')

// maybe should call this every time when new address generated
// load all addresses and convert to lockHashes
export const loadAddressesAndConvert = async (): Promise<string[]> => {
const addresses: string[] = (await AddressService.allAddresses()).map(addr => addr.address)
const lockHashes: string[] = await LockUtils.addressesToAllLockHashes(addresses)
return lockHashes
}

// call this after network switched
let indexerQueue: IndexerQueue | undefined
export const switchNetwork = async () => {
// stop all blocks service
if (indexerQueue) {
await indexerQueue.stopAndWait()
}

// disconnect old connection and connect to new database
await initDatabase()
// load lockHashes
const lockHashes: string[] = await loadAddressesAndConvert()
// start sync blocks service
indexerQueue = new IndexerQueue(lockHashes)

addressDbChangedSubject.subscribe(async (event: string) => {
// ignore update and remove
if (event === 'AfterInsert') {
const hashes: string[] = await loadAddressesAndConvert()
if (indexerQueue) {
indexerQueue.setLockHashes(hashes)
}
}
})

indexerQueue.start()
}
65 changes: 65 additions & 0 deletions packages/neuron-wallet/src/startup/sync-block-task/sync.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { remote } from 'electron'
import AddressService from 'services/addresses'
import LockUtils from 'models/lock-utils'
import BlockListener from 'services/sync/block-listener'

import { initDatabase } from './init-database'

const { nodeService, addressDbChangedSubject, walletCreatedSubject } = remote.require(
'./startup/sync-block-task/params'
)

// pass to task a main process subject
// AddressesUsedSubject.setSubject(addressesUsedSubject)

// maybe should call this every time when new address generated
// load all addresses and convert to lockHashes
export const loadAddressesAndConvert = async (): Promise<string[]> => {
const addresses: string[] = (await AddressService.allAddresses()).map(addr => addr.address)
const lockHashes: string[] = await LockUtils.addressesToAllLockHashes(addresses)
return lockHashes
}

// call this after network switched
let blockListener: BlockListener | undefined
export const switchNetwork = async () => {
// stop all blocks service
if (blockListener) {
await blockListener.stopAndWait()
}

// disconnect old connection and connect to new database
await initDatabase()
// load lockHashes
const lockHashes: string[] = await loadAddressesAndConvert()
// start sync blocks service
blockListener = new BlockListener(lockHashes, nodeService.tipNumberSubject)

addressDbChangedSubject.subscribe(async (event: string) => {
// ignore update and remove
if (event === 'AfterInsert') {
const hashes: string[] = await loadAddressesAndConvert()
if (blockListener) {
blockListener.setLockHashes(hashes)
}
}
})

const regenerateListener = async () => {
if (blockListener) {
await blockListener.stopAndWait()
}
// wait former queue to be drained
const hashes: string[] = await loadAddressesAndConvert()
blockListener = new BlockListener(hashes, nodeService.tipNumberSubject)
await blockListener.start(true)
}

walletCreatedSubject.subscribe(async (type: string) => {
if (type === 'import') {
await regenerateListener()
}
})

blockListener.start()
}
81 changes: 20 additions & 61 deletions packages/neuron-wallet/src/startup/sync-block-task/task.ts
Original file line number Diff line number Diff line change
@@ -1,86 +1,45 @@
import { remote } from 'electron'
import { initConnection as initAddressConnection } from 'database/address/ormconfig'
import AddressService from 'services/addresses'
import LockUtils from 'models/lock-utils'
import AddressesUsedSubject from 'models/subjects/addresses-used-subject'
import BlockListener from 'services/sync/block-listener'
import { NetworkWithID } from 'services/networks'
import { register as registerTxStatusListener } from 'listeners/tx-status'
import { register as registerAddressListener } from 'listeners/address'
import IndexerRPC from 'services/indexer/indexer-rpc'
import Utils from 'services/sync/utils'

import { initDatabase } from './init-database'
import { switchNetwork as syncSwitchNetwork } from './sync'
import { switchNetwork as indexerSwitchNetwork } from './indexer'

// register to listen address updates
registerAddressListener()

const {
nodeService,
addressDbChangedSubject,
addressesUsedSubject,
databaseInitSubject,
walletCreatedSubject,
} = remote.require('./startup/sync-block-task/params')
const { addressesUsedSubject, databaseInitSubject } = remote.require('./startup/sync-block-task/params')

// pass to task a main process subject
AddressesUsedSubject.setSubject(addressesUsedSubject)

// maybe should call this every time when new address generated
// load all addresses and convert to lockHashes
export const loadAddressesAndConvert = async (): Promise<string[]> => {
const addresses: string[] = (await AddressService.allAddresses()).map(addr => addr.address)
const lockHashes: string[] = await LockUtils.addressesToAllLockHashes(addresses)
return lockHashes
}

// call this after network switched
let blockListener: BlockListener | undefined
export const switchNetwork = async () => {
// stop all blocks service
if (blockListener) {
await blockListener.stopAndWait()
export const testIndexer = async (): Promise<boolean> => {
const indexerRPC = new IndexerRPC()
try {
await Utils.retry(3, 100, () => {
return indexerRPC.getLockHashIndexStates()
})
return true
} catch {
return false
}

// disconnect old connection and connect to new database
await initDatabase()
// load lockHashes
const lockHashes: string[] = await loadAddressesAndConvert()
// start sync blocks service
blockListener = new BlockListener(lockHashes, nodeService.tipNumberSubject)

addressDbChangedSubject.subscribe(async (event: string) => {
// ignore update and remove
if (event === 'AfterInsert') {
const hashes: string[] = await loadAddressesAndConvert()
if (blockListener) {
blockListener.setLockHashes(hashes)
}
}
})

const regenerateListener = async () => {
if (blockListener) {
await blockListener.stopAndWait()
}
// wait former queue to be drained
const hashes: string[] = await loadAddressesAndConvert()
blockListener = new BlockListener(hashes, nodeService.tipNumberSubject)
await blockListener.start(true)
}

walletCreatedSubject.subscribe(async (type: string) => {
if (type === 'import') {
await regenerateListener()
}
})

blockListener.start()
}

export const run = async () => {
await initAddressConnection()
databaseInitSubject.subscribe(async (network: NetworkWithID | undefined) => {
if (network) {
await switchNetwork()
const indexerEnable = await testIndexer()
if (indexerEnable) {
await indexerSwitchNetwork()
} else {
await syncSwitchNetwork()
}
}
})
registerTxStatusListener()
Expand Down

0 comments on commit ece382f

Please sign in to comment.