Skip to content

Commit

Permalink
improve reliability and performance of wallet rpc notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
woodser committed Aug 17, 2021
1 parent de7e28d commit 14b7e4b
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 82 deletions.
2 changes: 1 addition & 1 deletion external/monero-cpp
141 changes: 74 additions & 67 deletions src/main/java/monero/wallet/MoneroWalletRpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -926,8 +926,10 @@ public MoneroSubaddress createSubaddress(int accountIdx, String label) {
@Override
public List<MoneroTxWallet> getTxs(MoneroTxQuery query, Collection<String> missingTxHashes) {

// copy query
// copy and normalize query
query = query == null ? new MoneroTxQuery() : query.copy();
if (query.getInputQuery() != null) query.getInputQuery().setTxQuery(query);
if (query.getOutputQuery() != null) query.getOutputQuery().setTxQuery(query);

// temporarily disable transfer and output queries in order to collect all tx information
MoneroTransferQuery transferQuery = query.getTransferQuery();
Expand Down Expand Up @@ -959,7 +961,10 @@ public List<MoneroTxWallet> getTxs(MoneroTxQuery query, Collection<String> missi

// fetch and merge outputs if queried
if (Boolean.TRUE.equals(query.getIncludeOutputs()) || outputQuery != null) {
List<MoneroOutputWallet> outputs = getOutputsAux(new MoneroOutputQuery().setTxQuery(decontextualize(query.copy())));

// fetch outputs
MoneroOutputQuery outputQueryAux = (outputQuery != null ? outputQuery.copy() : new MoneroOutputQuery()).setTxQuery(decontextualize(query.copy()));
List<MoneroOutputWallet> outputs = getOutputsAux(outputQueryAux);

// merge output txs one time while retaining order
Set<MoneroTxWallet> outputTxs = new HashSet<MoneroTxWallet>();
Expand Down Expand Up @@ -1197,7 +1202,7 @@ public List<MoneroTxWallet> createTxs(MoneroTxConfig config) {
}

// notify of changes
poll();
if (Boolean.TRUE.equals(config.getRelay())) poll();

// initialize tx set from rpc response with pre-initialized txs
if (config.getCanSplit()) return convertRpcSentTxsToTxSet(result, txs).getTxs();
Expand Down Expand Up @@ -1233,7 +1238,7 @@ public MoneroTxWallet sweepOutput(MoneroTxConfig config) {
Map<String, Object> result = (Map<String, Object>) resp.get("result");

// notify of changes
poll();
if (Boolean.TRUE.equals(config.getRelay())) poll();

// build and return tx
MoneroTxWallet tx = initSentTxWallet(config, null);
Expand Down Expand Up @@ -1304,7 +1309,7 @@ public List<MoneroTxWallet> sweepUnlocked(MoneroTxConfig config) {
}

// notify of changes
poll();
if (Boolean.TRUE.equals(config.getRelay())) poll();
return txs;
}

Expand All @@ -1314,8 +1319,8 @@ public List<MoneroTxWallet> sweepDust(boolean relay) {
Map<String, Object> params = new HashMap<String, Object>();
params.put("do_not_relay", !relay);
Map<String, Object> resp = rpc.sendJsonRequest("sweep_dust", params);
if (relay) poll();
Map<String, Object> result = (Map<String, Object>) resp.get("result");
poll();
MoneroTxSet txSet = convertRpcSentTxsToTxSet(result, null);
if (txSet.getTxs() == null) return new ArrayList<MoneroTxWallet>();
for (MoneroTxWallet tx : txSet.getTxs()) {
Expand Down Expand Up @@ -1991,7 +1996,7 @@ private List<MoneroTransfer> getTransfersAux(MoneroTransferQuery query) {
// build params for get_transfers rpc call
Map<String, Object> params = new HashMap<String, Object>();
boolean canBeConfirmed = !Boolean.FALSE.equals(txQuery.isConfirmed()) && !Boolean.TRUE.equals(txQuery.inTxPool()) && !Boolean.TRUE.equals(txQuery.isFailed()) && !Boolean.FALSE.equals(txQuery.isRelayed());
boolean canBeInTxPool = isConnected() && !Boolean.TRUE.equals(txQuery.isConfirmed()) && !Boolean.FALSE.equals(txQuery.inTxPool()) && !Boolean.TRUE.equals(txQuery.isFailed()) && !Boolean.FALSE.equals(txQuery.isRelayed()) && txQuery.getHeight() == null && txQuery.getMinHeight() == null && txQuery.getMaxHeight() == null && !Boolean.FALSE.equals(txQuery.isLocked());
boolean canBeInTxPool = isConnected() && !Boolean.TRUE.equals(txQuery.isConfirmed()) && !Boolean.FALSE.equals(txQuery.inTxPool()) && !Boolean.TRUE.equals(txQuery.isFailed()) && !Boolean.FALSE.equals(txQuery.isRelayed()) && txQuery.getHeight() == null && txQuery.getMaxHeight() == null && !Boolean.FALSE.equals(txQuery.isLocked());
boolean canBeIncoming = !Boolean.FALSE.equals(query.isIncoming()) && !Boolean.TRUE.equals(query.isOutgoing()) && !Boolean.TRUE.equals(query.hasDestinations());
boolean canBeOutgoing = !Boolean.FALSE.equals(query.isOutgoing()) && !Boolean.TRUE.equals(query.isIncoming());
params.put("in", canBeIncoming && canBeConfirmed);
Expand Down Expand Up @@ -2258,10 +2263,9 @@ private class WalletRpcPollListener {

private boolean isEnabled;
private boolean isPollLoopRunning; // poll loop runs until disabled
private boolean isPolling;
private int numPolling = 0;
private Long prevHeight;
private BigInteger prevBalance;
private BigInteger prevUnlockedBalance;
private BigInteger[] prevBalances;
private List<MoneroTxWallet> prevLockedTxs = new ArrayList<MoneroTxWallet>();
private Set<String> prevUnconfirmedNotifications = new HashSet<String>(); // tx hashes of previous notifications
private Set<String> prevConfirmedNotifications = new HashSet<String>(); // tx hashes of previously confirmed but not yet unlocked notifications
Expand Down Expand Up @@ -2310,59 +2314,64 @@ public void run() {

public void poll() {

// skip if already polling
if (isPolling) return;
isPolling = true;

// take initial snapshot
if (prevHeight == null) {
prevHeight = getHeight();
prevLockedTxs = getTxs(new MoneroTxQuery().setIsLocked(true));
isPolling = false;
return;
}

// announce height changes
long height = getHeight();
if (prevHeight != height) {
for (long i = prevHeight; i < height; i++) onNewBlock(i);
prevHeight = height;
}

// get locked txs for comparison to previous
List<MoneroTxWallet> lockedTxs = getTxs(new MoneroTxQuery().setIsLocked(true).setIncludeOutputs(true));
// skip if next poll is already queued
if (numPolling > 1) return;

// collect hashes of txs no longer locked
List<String> noLongerLockedHashes = new ArrayList<String>();
for (MoneroTxWallet prevLockedTx : prevLockedTxs) {
if (getTx(lockedTxs, prevLockedTx.getHash()) == null) {
noLongerLockedHashes.add(prevLockedTx.getHash());
// synchronize polls
synchronized(this) {
numPolling++;

// take initial snapshot
if (prevHeight == null) {
prevHeight = getHeight();
prevLockedTxs = getTxs(new MoneroTxQuery().setIsLocked(true));
prevBalances = getBalances(null, null);
numPolling--;
return;
}

// announce height changes
long height = getHeight();
if (prevHeight != height) {
for (long i = prevHeight; i < height; i++) onNewBlock(i);
prevHeight = height;
}

// get locked txs for comparison to previous
long minHeight = height - 70; // only monitor recent txs
List<MoneroTxWallet> lockedTxs = getTxs(new MoneroTxQuery().setIsLocked(true).setMinHeight(minHeight).setIncludeOutputs(true));

// collect hashes of txs no longer locked
List<String> noLongerLockedHashes = new ArrayList<String>();
for (MoneroTxWallet prevLockedTx : prevLockedTxs) {
if (getTx(lockedTxs, prevLockedTx.getHash()) == null) {
noLongerLockedHashes.add(prevLockedTx.getHash());
}
}

// save locked txs for next comparison
prevLockedTxs = lockedTxs;

// fetch txs which are no longer locked
List<MoneroTxWallet> unlockedTxs = noLongerLockedHashes.isEmpty() ? new ArrayList<MoneroTxWallet>() : getTxs(new MoneroTxQuery().setIsLocked(false).setMinHeight(minHeight).setHashes(noLongerLockedHashes).setIncludeOutputs(true), new ArrayList<String>()); // ignore missing tx hashes which could be removed due to re-org

// announce new unconfirmed and confirmed txs
for (MoneroTxWallet lockedTx : lockedTxs) {
boolean unannounced = lockedTx.isConfirmed() ? prevConfirmedNotifications.add(lockedTx.getHash()) : prevUnconfirmedNotifications.add(lockedTx.getHash());
if (unannounced) notifyOutputs(lockedTx);
}

// announce new unlocked outputs
for (MoneroTxWallet unlockedTx : unlockedTxs) {
prevUnconfirmedNotifications.remove(unlockedTx.getHash()); // stop tracking tx notifications
prevConfirmedNotifications.remove(unlockedTx.getHash());
notifyOutputs(unlockedTx);
}

// announce balance changes
checkForChangedBalances();
numPolling--;
}

// fetch txs that are no longer locked
List<MoneroTxWallet> unlockedTxs = getTxs(new MoneroTxQuery().setHashes(noLongerLockedHashes).setIncludeOutputs(true), new ArrayList<String>()); // ignore missing tx hashes which could be removed due to re-org

// save locked txs for next comparison
prevLockedTxs = lockedTxs;

// announce new unconfirmed and confirmed txs
for (MoneroTxWallet lockedTx : lockedTxs) {
boolean unannounced = lockedTx.isConfirmed() ? prevConfirmedNotifications.add(lockedTx.getHash()) : prevUnconfirmedNotifications.add(lockedTx.getHash());
if (unannounced) notifyOutputs(lockedTx);
}

// announce new unlocked outputs
for (MoneroTxWallet unlockedTx : unlockedTxs) {
prevUnconfirmedNotifications.remove(unlockedTx.getHash()); // stop tracking tx notifications
prevConfirmedNotifications.remove(unlockedTx.getHash());
notifyOutputs(unlockedTx);
}

// announce balance changes
checkForChangedBalances();

isPolling = false;
}

private void notifyOutputs(MoneroTxWallet tx) {
Expand Down Expand Up @@ -2413,12 +2422,10 @@ private MoneroTxWallet getTx(List<MoneroTxWallet> txs, String txHash) {

// TODO: factor to common wallet rpc listener
private boolean checkForChangedBalances() {
BigInteger balance = getBalance();
BigInteger unlockedBalance = getUnlockedBalance();
if (!balance.equals(prevBalance) || !unlockedBalance.equals(prevUnlockedBalance)) {
prevBalance = balance;
prevUnlockedBalance = unlockedBalance;
for (MoneroWalletListenerI listener : getListeners()) listener.onBalancesChanged(balance, unlockedBalance);
BigInteger[] balances = getBalances(null, null);
if (!balances[0].equals(prevBalances[0]) || !balances[1].equals(prevBalances[1])) {
prevBalances = balances;
for (MoneroWalletListenerI listener : getListeners()) listener.onBalancesChanged(balances[0], balances[1]);
return true;
}
return false;
Expand Down Expand Up @@ -2982,7 +2989,7 @@ else if (key.equals("spent_key_images")) {
GenUtils.assertTrue(tx.getInputs() == null);
tx.setInputs(new ArrayList<MoneroOutput>());
for (String inputKeyImage : inputKeyImages) {
tx.getInputs().add(new MoneroOutput().setKeyImage(new MoneroKeyImage().setHex(inputKeyImage)).setTx(tx));
tx.getInputs().add(new MoneroOutputWallet().setKeyImage(new MoneroKeyImage().setHex(inputKeyImage)).setTx(tx));
}
}
else LOGGER.warning("ignoring unexpected transaction field with transfer: " + key + ": " + val);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/monero/wallet/model/MoneroTxQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ protected boolean meetsCriteria(MoneroTxWallet tx, boolean queryChildren) {
if (this.getHashes() != null && !this.getHashes().contains(tx.getHash())) return false;
if (this.getPaymentIds() != null && !this.getPaymentIds().contains(tx.getPaymentId())) return false;
if (this.getHeight() != null && !this.getHeight().equals(txHeight)) return false;
if (this.getMinHeight() != null && (txHeight == null || txHeight < this.getMinHeight())) return false;
if (this.getMinHeight() != null && txHeight != null && txHeight < this.getMinHeight()) return false; // do not filter unconfirmed
if (this.getMaxHeight() != null && (txHeight == null || txHeight > this.getMaxHeight())) return false;

// done if not querying transfers or outputs
Expand Down
Loading

0 comments on commit 14b7e4b

Please sign in to comment.