From f259cc1ab6ed4e59a35b153a4225bc953050bf5b Mon Sep 17 00:00:00 2001 From: Mark Travis Date: Mon, 11 Sep 2023 15:48:32 -0700 Subject: [PATCH] Several changes to improve Consensus stability: (#4505) * Verify accepted ledger becomes validated, and retry with a new consensus transaction set if not. * Always store proposals. * Track proposals by ledger sequence. This helps slow peers catch up with the rest of the network. * Acquire transaction sets for proposals with future ledger sequences. This also helps slow peers catch up. * Optimize timer delay for establish phase to wait based on how long validators have been sending proposals. This also helps slow peers to catch up. * Fix impasse achieving close time consensus. * Don't wait between open and establish phases. --- Builds/levelization/results/ordering.txt | 3 + src/ripple/app/consensus/RCLConsensus.cpp | 118 +++-- src/ripple/app/consensus/RCLConsensus.h | 178 ++++++- src/ripple/app/consensus/RCLCxPeerPos.h | 3 +- src/ripple/app/ledger/LedgerMaster.h | 27 +- src/ripple/app/ledger/impl/LedgerMaster.cpp | 2 + src/ripple/app/misc/NetworkOPs.cpp | 17 +- src/ripple/app/misc/impl/ValidatorList.cpp | 5 +- .../detail/aged_unordered_container.h | 5 + src/ripple/consensus/Consensus.cpp | 16 +- src/ripple/consensus/Consensus.h | 478 +++++++++++++++--- src/ripple/consensus/ConsensusParms.h | 10 +- src/ripple/consensus/ConsensusProposal.h | 41 +- src/ripple/consensus/ConsensusTypes.h | 6 +- src/ripple/consensus/DisputedTx.h | 20 +- src/ripple/overlay/impl/PeerImp.cpp | 9 +- src/ripple/proto/ripple.proto | 2 + src/test/consensus/Consensus_test.cpp | 31 +- src/test/csf/Peer.h | 124 ++++- src/test/csf/Proposal.h | 2 +- 20 files changed, 910 insertions(+), 187 deletions(-) diff --git a/Builds/levelization/results/ordering.txt b/Builds/levelization/results/ordering.txt index 79dcdd3cc0f..7a4f4404321 100644 --- a/Builds/levelization/results/ordering.txt +++ b/Builds/levelization/results/ordering.txt @@ -14,6 +14,7 @@ ripple.consensus > ripple.basics ripple.consensus > ripple.beast ripple.consensus > ripple.json ripple.consensus > ripple.protocol +ripple.consensus > ripple.shamap ripple.core > ripple.beast ripple.core > ripple.json ripple.core > ripple.protocol @@ -125,11 +126,13 @@ test.core > ripple.server test.core > test.jtx test.core > test.toplevel test.core > test.unit_test +test.csf > ripple.app test.csf > ripple.basics test.csf > ripple.beast test.csf > ripple.consensus test.csf > ripple.json test.csf > ripple.protocol +test.csf > test.jtx test.json > ripple.beast test.json > ripple.json test.json > test.jtx diff --git a/src/ripple/app/consensus/RCLConsensus.cpp b/src/ripple/app/consensus/RCLConsensus.cpp index e60c8cf37d3..4e973ef2bdf 100644 --- a/src/ripple/app/consensus/RCLConsensus.cpp +++ b/src/ripple/app/consensus/RCLConsensus.cpp @@ -55,7 +55,7 @@ RCLConsensus::RCLConsensus( LedgerMaster& ledgerMaster, LocalTxs& localTxs, InboundTransactions& inboundTransactions, - Consensus::clock_type const& clock, + Consensus::clock_type& clock, ValidatorKeys const& validatorKeys, beast::Journal journal) : adaptor_( @@ -171,6 +171,9 @@ RCLConsensus::Adaptor::share(RCLCxPeerPos const& peerPos) auto const sig = peerPos.signature(); prop.set_signature(sig.data(), sig.size()); + if (proposal.ledgerSeq().has_value()) + prop.set_ledgerseq(*proposal.ledgerSeq()); + app_.overlay().relay(prop, peerPos.suppressionID(), peerPos.publicKey()); } @@ -180,7 +183,7 @@ RCLConsensus::Adaptor::share(RCLCxTx const& tx) // If we didn't relay this transaction recently, relay it to all peers if (app_.getHashRouter().shouldRelay(tx.id())) { - JLOG(j_.debug()) << "Relaying disputed tx " << tx.id(); + JLOG(j_.trace()) << "Relaying disputed tx " << tx.id(); auto const slice = tx.tx_->slice(); protocol::TMTransaction msg; msg.set_rawtransaction(slice.data(), slice.size()); @@ -192,13 +195,13 @@ RCLConsensus::Adaptor::share(RCLCxTx const& tx) } else { - JLOG(j_.debug()) << "Not relaying disputed tx " << tx.id(); + JLOG(j_.trace()) << "Not relaying disputed tx " << tx.id(); } } void RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal) { - JLOG(j_.trace()) << (proposal.isBowOut() ? "We bow out: " : "We propose: ") + JLOG(j_.debug()) << (proposal.isBowOut() ? "We bow out: " : "We propose: ") << ripple::to_string(proposal.prevLedger()) << " -> " << ripple::to_string(proposal.position()); @@ -212,6 +215,7 @@ RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal) prop.set_closetime(proposal.closeTime().time_since_epoch().count()); prop.set_nodepubkey( validatorKeys_.publicKey.data(), validatorKeys_.publicKey.size()); + prop.set_ledgerseq(*proposal.ledgerSeq()); auto sig = signDigest( validatorKeys_.publicKey, @@ -297,7 +301,8 @@ auto RCLConsensus::Adaptor::onClose( RCLCxLedger const& ledger, NetClock::time_point const& closeTime, - ConsensusMode mode) -> Result + ConsensusMode mode, + clock_type& clock) -> Result { const bool wrongLCL = mode == ConsensusMode::wrongLedger; const bool proposing = mode == ConsensusMode::proposing; @@ -379,7 +384,6 @@ RCLConsensus::Adaptor::onClose( // Needed because of the move below. auto const setHash = initialSet->getHash().as_uint256(); - return Result{ std::move(initialSet), RCLCxPeerPos::Proposal{ @@ -388,7 +392,9 @@ RCLConsensus::Adaptor::onClose( setHash, closeTime, app_.timeKeeper().closeTime(), - validatorKeys_.nodeID}}; + validatorKeys_.nodeID, + initialLedger->info().seq, + clock}}; } void @@ -400,50 +406,43 @@ RCLConsensus::Adaptor::onForceAccept( ConsensusMode const& mode, Json::Value&& consensusJson) { - doAccept( - result, - prevLedger, - closeResolution, - rawCloseTimes, - mode, - std::move(consensusJson)); + auto txsBuilt = buildAndValidate( + result, prevLedger, closeResolution, mode, std::move(consensusJson)); + prepareOpenLedger(std::move(txsBuilt), result, rawCloseTimes, mode); } void RCLConsensus::Adaptor::onAccept( Result const& result, - RCLCxLedger const& prevLedger, - NetClock::duration const& closeResolution, ConsensusCloseTimes const& rawCloseTimes, ConsensusMode const& mode, - Json::Value&& consensusJson) + Json::Value&& consensusJson, + std::pair&& tb) { app_.getJobQueue().addJob( jtACCEPT, "acceptLedger", - [=, this, cj = std::move(consensusJson)]() mutable { + [=, + this, + cj = std::move(consensusJson), + txsBuilt = std::move(tb)]() mutable { // Note that no lock is held or acquired during this job. // This is because generic Consensus guarantees that once a ledger // is accepted, the consensus results and capture by reference state // will not change until startRound is called (which happens via // endConsensus). - this->doAccept( - result, - prevLedger, - closeResolution, - rawCloseTimes, - mode, - std::move(cj)); + prepareOpenLedger(std::move(txsBuilt), result, rawCloseTimes, mode); this->app_.getOPs().endConsensus(); }); } -void -RCLConsensus::Adaptor::doAccept( +std::pair< + RCLConsensus::Adaptor::CanonicalTxSet_t, + RCLConsensus::Adaptor::Ledger_t> +RCLConsensus::Adaptor::buildAndValidate( Result const& result, - RCLCxLedger const& prevLedger, - NetClock::duration closeResolution, - ConsensusCloseTimes const& rawCloseTimes, + Ledger_t const& prevLedger, + NetClock::duration const& closeResolution, ConsensusMode const& mode, Json::Value&& consensusJson) { @@ -497,12 +496,12 @@ RCLConsensus::Adaptor::doAccept( { retriableTxs.insert( std::make_shared(SerialIter{item.slice()})); - JLOG(j_.debug()) << " Tx: " << item.key(); + JLOG(j_.trace()) << " Tx: " << item.key(); } catch (std::exception const& ex) { failed.insert(item.key()); - JLOG(j_.warn()) + JLOG(j_.trace()) << " Tx: " << item.key() << " throws: " << ex.what(); } } @@ -579,6 +578,19 @@ RCLConsensus::Adaptor::doAccept( ledgerMaster_.consensusBuilt( built.ledger_, result.txns.id(), std::move(consensusJson)); + return {retriableTxs, built}; +} + +void +RCLConsensus::Adaptor::prepareOpenLedger( + std::pair&& txsBuilt, + Result const& result, + ConsensusCloseTimes const& rawCloseTimes, + ConsensusMode const& mode) +{ + auto& retriableTxs = txsBuilt.first; + auto const& built = txsBuilt.second; + //------------------------------------------------------------------------- { // Apply disputed transactions that didn't get in @@ -601,7 +613,7 @@ RCLConsensus::Adaptor::doAccept( // we voted NO try { - JLOG(j_.debug()) + JLOG(j_.trace()) << "Test applying disputed transaction that did" << " not get in " << dispute.tx().id(); @@ -619,7 +631,7 @@ RCLConsensus::Adaptor::doAccept( } catch (std::exception const& ex) { - JLOG(j_.debug()) << "Failed to apply transaction we voted " + JLOG(j_.trace()) << "Failed to apply transaction we voted " "NO on. Exception: " << ex.what(); } @@ -669,6 +681,7 @@ RCLConsensus::Adaptor::doAccept( // we entered the round with the network, // see how close our close time is to other node's // close time reports, and update our clock. + bool const consensusFail = result.state == ConsensusState::MovedOn; if ((mode == ConsensusMode::proposing || mode == ConsensusMode::observing) && !consensusFail) @@ -889,12 +902,30 @@ RCLConsensus::Adaptor::onModeChange(ConsensusMode before, ConsensusMode after) mode_ = after; } +bool +RCLConsensus::Adaptor::retryAccept( + Ledger_t const& newLedger, + std::optional>& start) + const +{ + static bool const standalone = ledgerMaster_.standalone(); + auto const& validLedger = ledgerMaster_.getValidatedLedger(); + + return (app_.getOPs().isFull() && !standalone && + (validLedger && (newLedger.id() != validLedger->info().hash) && + (newLedger.seq() >= validLedger->info().seq))) && + (!start || + std::chrono::steady_clock::now() - *start < std::chrono::seconds{5}); +} + +//----------------------------------------------------------------------------- + Json::Value RCLConsensus::getJson(bool full) const { Json::Value ret; { - std::lock_guard _{mutex_}; + std::lock_guard _{adaptor_.peekMutex()}; ret = consensus_.getJson(full); } ret["validating"] = adaptor_.validating(); @@ -906,7 +937,7 @@ RCLConsensus::timerEntry(NetClock::time_point const& now) { try { - std::lock_guard _{mutex_}; + std::lock_guard _{adaptor_.peekMutex()}; consensus_.timerEntry(now); } catch (SHAMapMissingNode const& mn) @@ -922,7 +953,7 @@ RCLConsensus::gotTxSet(NetClock::time_point const& now, RCLTxSet const& txSet) { try { - std::lock_guard _{mutex_}; + std::lock_guard _{adaptor_.peekMutex()}; consensus_.gotTxSet(now, txSet); } catch (SHAMapMissingNode const& mn) @@ -940,7 +971,7 @@ RCLConsensus::simulate( NetClock::time_point const& now, std::optional consensusDelay) { - std::lock_guard _{mutex_}; + std::lock_guard _{adaptor_.peekMutex()}; consensus_.simulate(now, consensusDelay); } @@ -949,7 +980,7 @@ RCLConsensus::peerProposal( NetClock::time_point const& now, RCLCxPeerPos const& newProposal) { - std::lock_guard _{mutex_}; + std::lock_guard _{adaptor_.peekMutex()}; return consensus_.peerProposal(now, newProposal); } @@ -1022,6 +1053,12 @@ RCLConsensus::Adaptor::getQuorumKeys() const return app_.validators().getQuorumKeys(); } +std::size_t +RCLConsensus::Adaptor::quorum() const +{ + return app_.validators().quorum(); +} + std::size_t RCLConsensus::Adaptor::laggards( Ledger_t::Seq const seq, @@ -1051,7 +1088,7 @@ RCLConsensus::startRound( hash_set const& nowUntrusted, hash_set const& nowTrusted) { - std::lock_guard _{mutex_}; + std::lock_guard _{adaptor_.peekMutex()}; consensus_.startRound( now, prevLgrId, @@ -1059,4 +1096,5 @@ RCLConsensus::startRound( nowUntrusted, adaptor_.preStartRound(prevLgr, nowTrusted)); } + } // namespace ripple diff --git a/src/ripple/app/consensus/RCLConsensus.h b/src/ripple/app/consensus/RCLConsensus.h index f8c01e93caa..4e6dce1efd3 100644 --- a/src/ripple/app/consensus/RCLConsensus.h +++ b/src/ripple/app/consensus/RCLConsensus.h @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -36,8 +37,11 @@ #include #include #include +#include #include +#include #include + namespace ripple { class InboundTransactions; @@ -59,6 +63,7 @@ class RCLConsensus Application& app_; std::unique_ptr feeVote_; LedgerMaster& ledgerMaster_; + LocalTxs& localTxs_; InboundTransactions& inboundTransactions_; beast::Journal const j_; @@ -78,7 +83,6 @@ class RCLConsensus // These members are queried via public accesors and are atomic for // thread safety. - std::atomic validating_{false}; std::atomic prevProposers_{0}; std::atomic prevRoundTime_{ std::chrono::milliseconds{0}}; @@ -87,14 +91,25 @@ class RCLConsensus RCLCensorshipDetector censorshipDetector_; NegativeUNLVote nUnlVote_; + // Since Consensus does not provide intrinsic thread-safety, this mutex + // needs to guard all calls to consensus_. One reason it is recursive + // is because logic in phaseEstablish() around buildAndValidate() + // needs to lock and unlock to protect Consensus data members. + mutable std::recursive_mutex mutex_; + std::optional validationDelay_; + std::optional timerDelay_; + std::atomic validating_{false}; + public: using Ledger_t = RCLCxLedger; using NodeID_t = NodeID; using NodeKey_t = PublicKey; using TxSet_t = RCLTxSet; + using CanonicalTxSet_t = CanonicalTXSet; using PeerPosition_t = RCLCxPeerPos; using Result = ConsensusResult; + using clock_type = Stopwatch; Adaptor( Application& app, @@ -149,6 +164,9 @@ class RCLConsensus std::pair> getQuorumKeys() const; + std::size_t + quorum() const; + std::size_t laggards(Ledger_t::Seq const seq, hash_set& trustedKeys) const; @@ -178,6 +196,93 @@ class RCLConsensus return parms_; } + std::recursive_mutex& + peekMutex() const + { + return mutex_; + } + + LedgerMaster& + getLedgerMaster() const + { + return ledgerMaster_; + } + + void + clearValidating() + { + validating_ = false; + } + + /** Whether to try building another ledger to validate. + * + * This should be called when a newly-created ledger hasn't been + * validated to avoid us forking to an invalid ledger. + * + * Retry only if all of the below are true: + * * We are synced to the network. + * * Not in standalone mode. + * * We have validated a ledger. + * * The latest validated ledger and the new ledger are different. + * * The new ledger sequence is >= the validated ledger. + * * Less than 5 seconds have elapsed retrying. + * + * @param newLedger The new ledger which we have created. + * @param start When we started possibly retrying ledgers. + * @return Whether to retry. + */ + bool + retryAccept( + Ledger_t const& newLedger, + std::optional>& + start) const; + + /** Amount of time delayed waiting to confirm validation. + * + * @return Time in milliseconds. + */ + std::optional + getValidationDelay() const + { + return validationDelay_; + } + + /** Set amount of time that has been delayed waiting for validation. + * + * Clear if nothing passed. + * + * @param vd Amount of time in milliseconds. + */ + void + setValidationDelay( + std::optional vd = std::nullopt) + { + validationDelay_ = vd; + } + + /** Amount of time to wait for heartbeat. + * + * @return Time in milliseconds. + */ + std::optional + getTimerDelay() const + { + return timerDelay_; + } + + /** Set amount of time to wait for next heartbeat. + * + * Clear if nothing passed. + * + * @param td Amount of time in milliseconds. + */ + void + setTimerDelay( + std::optional td = std::nullopt) + { + timerDelay_ = td; + } + private: //--------------------------------------------------------------------- // The following members implement the generic Consensus requirements @@ -297,34 +402,34 @@ class RCLConsensus @param ledger the ledger we are changing to @param closeTime When consensus closed the ledger @param mode Current consensus mode + @param clock Clock used for Consensus and testing. @return Tentative consensus result */ Result onClose( RCLCxLedger const& ledger, NetClock::time_point const& closeTime, - ConsensusMode mode); + ConsensusMode mode, + clock_type& clock); /** Process the accepted ledger. @param result The result of consensus - @param prevLedger The closed ledger consensus worked from - @param closeResolution The resolution used in agreeing on an - effective closeTime @param rawCloseTimes The unrounded closetimes of ourself and our peers @param mode Our participating mode at the time consensus was declared @param consensusJson Json representation of consensus state + @param txsBuilt The consensus transaction set and new ledger built + around it */ void onAccept( Result const& result, - RCLCxLedger const& prevLedger, - NetClock::duration const& closeResolution, ConsensusCloseTimes const& rawCloseTimes, ConsensusMode const& mode, - Json::Value&& consensusJson); + Json::Value&& consensusJson, + std::pair&& txsBuilt); /** Process the accepted ledger that was a result of simulation/force accept. @@ -352,18 +457,40 @@ class RCLConsensus RCLCxLedger const& ledger, bool haveCorrectLCL); - /** Accept a new ledger based on the given transactions. + /** Build and attempt to validate a new ledger. + * + * @param result The result of consensus. + * @param prevLedger The closed ledger from which this is to be based. + * @param closeResolution The resolution used in agreeing on an + * effective closeTime. + * @param mode Our participating mode at the time consensus was + * declared. + * @param consensusJson Json representation of consensus state. + * @return The consensus transaction set and resulting ledger. + */ + std::pair + buildAndValidate( + Result const& result, + Ledger_t const& prevLedger, + NetClock::duration const& closeResolution, + ConsensusMode const& mode, + Json::Value&& consensusJson); - @ref onAccept + /** Prepare the next open ledger. + * + * @param txsBuilt The consensus transaction set and resulting ledger. + * @param result The result of consensus. + * @param rawCloseTimes The unrounded closetimes of our peers and + * ourself. + * @param mode Our participating mode at the time consensus was + declared. */ void - doAccept( + prepareOpenLedger( + std::pair&& txsBuilt, Result const& result, - RCLCxLedger const& prevLedger, - NetClock::duration closeResolution, ConsensusCloseTimes const& rawCloseTimes, - ConsensusMode const& mode, - Json::Value&& consensusJson); + ConsensusMode const& mode); /** Build the new last closed ledger. @@ -421,7 +548,7 @@ class RCLConsensus LedgerMaster& ledgerMaster, LocalTxs& localTxs, InboundTransactions& inboundTransactions, - Consensus::clock_type const& clock, + Consensus::clock_type& clock, ValidatorKeys const& validatorKeys, beast::Journal journal); @@ -498,7 +625,7 @@ class RCLConsensus RCLCxLedger::ID prevLedgerID() const { - std::lock_guard _{mutex_}; + std::lock_guard _{adaptor_.peekMutex()}; return consensus_.prevLedgerID(); } @@ -520,12 +647,19 @@ class RCLConsensus return adaptor_.parms(); } -private: - // Since Consensus does not provide intrinsic thread-safety, this mutex - // guards all calls to consensus_. adaptor_ uses atomics internally - // to allow concurrent access of its data members that have getters. - mutable std::recursive_mutex mutex_; + std::optional + getTimerDelay() const + { + return adaptor_.getTimerDelay(); + } + + void + setTimerDelay(std::optional td = std::nullopt) + { + adaptor_.setTimerDelay(td); + } +private: Adaptor adaptor_; Consensus consensus_; beast::Journal const j_; diff --git a/src/ripple/app/consensus/RCLCxPeerPos.h b/src/ripple/app/consensus/RCLCxPeerPos.h index e82a85d422b..f104299b770 100644 --- a/src/ripple/app/consensus/RCLCxPeerPos.h +++ b/src/ripple/app/consensus/RCLCxPeerPos.h @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -44,7 +45,7 @@ class RCLCxPeerPos { public: //< The type of the proposed position - using Proposal = ConsensusProposal; + using Proposal = ConsensusProposal; /** Constructor diff --git a/src/ripple/app/ledger/LedgerMaster.h b/src/ripple/app/ledger/LedgerMaster.h index 3d7adc86223..26738844536 100644 --- a/src/ripple/app/ledger/LedgerMaster.h +++ b/src/ripple/app/ledger/LedgerMaster.h @@ -292,6 +292,27 @@ class LedgerMaster : public AbstractFetchPackContainer std::optional minSqlSeq(); + //! Whether we are in standalone mode. + bool + standalone() const + { + return standalone_; + } + + /** Wait up to a specified duration for the next validated ledger. + * + * @tparam Rep std::chrono duration Rep. + * @tparam Period std::chrono duration Period. + * @param dur Duration to wait. + */ + template + void + waitForValidated(std::chrono::duration const& dur) + { + std::unique_lock lock(validMutex_); + validCond_.wait_for(lock, dur); + } + // Iff a txn exists at the specified ledger and offset then return its txnid std::optional txnIdFromIndex(uint32_t ledgerSeq, uint32_t txnIndex); @@ -412,7 +433,10 @@ class LedgerMaster : public AbstractFetchPackContainer // Time that the previous upgrade warning was issued. TimeKeeper::time_point upgradeWarningPrevTime_{}; -private: + // mutex and condition variable for waiting for next validated ledger + std::mutex validMutex_; + std::condition_variable validCond_; + struct Stats { template @@ -434,7 +458,6 @@ class LedgerMaster : public AbstractFetchPackContainer Stats m_stats; -private: void collect_metrics() { diff --git a/src/ripple/app/ledger/impl/LedgerMaster.cpp b/src/ripple/app/ledger/impl/LedgerMaster.cpp index 5c084e25874..050e2f3ef3d 100644 --- a/src/ripple/app/ledger/impl/LedgerMaster.cpp +++ b/src/ripple/app/ledger/impl/LedgerMaster.cpp @@ -367,6 +367,8 @@ LedgerMaster::setValidLedger(std::shared_ptr const& l) } mValidLedger.set(l); + // In case we're waiting for a valid before proceeding with Consensus. + validCond_.notify_one(); mValidLedgerSign = signTime.time_since_epoch().count(); assert( mValidLedgerSeq || !app_.getMaxDisallowedLedger() || diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index e59dd1128ff..95ca09e0cbd 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -947,9 +947,24 @@ NetworkOPsImp::setTimer( void NetworkOPsImp::setHeartbeatTimer() { + // timerDelay is to optimize the timer interval such as for phase establish. + // Setting a max of ledgerGRANULARITY allows currently in-flight proposals + // to be accounted for at the very beginning of the phase. + std::chrono::milliseconds timerDelay; + auto td = mConsensus.getTimerDelay(); + if (td) + { + timerDelay = std::min(*td, mConsensus.parms().ledgerGRANULARITY); + mConsensus.setTimerDelay(); + } + else + { + timerDelay = mConsensus.parms().ledgerGRANULARITY; + } + setTimer( heartbeatTimer_, - mConsensus.parms().ledgerGRANULARITY, + timerDelay, [this]() { m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() { processHeartbeatTimer(); diff --git a/src/ripple/app/misc/impl/ValidatorList.cpp b/src/ripple/app/misc/impl/ValidatorList.cpp index d17b85c4840..832628dce3e 100644 --- a/src/ripple/app/misc/impl/ValidatorList.cpp +++ b/src/ripple/app/misc/impl/ValidatorList.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -1761,8 +1762,10 @@ ValidatorList::calculateQuorum( // Note that the negative UNL protocol introduced the // AbsoluteMinimumQuorum which is 60% of the original UNL size. The // effective quorum should not be lower than it. + static ConsensusParms const parms; return static_cast(std::max( - std::ceil(effectiveUnlSize * 0.8f), std::ceil(unlSize * 0.6f))); + std::ceil(effectiveUnlSize * parms.minCONSENSUS_FACTOR), + std::ceil(unlSize * parms.negUNL_MIN_CONSENSUS_FACTOR))); } TrustChanges diff --git a/src/ripple/beast/container/detail/aged_unordered_container.h b/src/ripple/beast/container/detail/aged_unordered_container.h index fcdccd2a637..fbd2315794b 100644 --- a/src/ripple/beast/container/detail/aged_unordered_container.h +++ b/src/ripple/beast/container/detail/aged_unordered_container.h @@ -1184,9 +1184,12 @@ class aged_unordered_container beast::detail::aged_container_iterator first, beast::detail::aged_container_iterator last); + /* + * This is broken as of at least gcc 11.3.0 template auto erase(K const& k) -> size_type; + */ void swap(aged_unordered_container& other) noexcept; @@ -3062,6 +3065,7 @@ aged_unordered_container< first.iterator()); } +/* template < bool IsMulti, bool IsMap, @@ -3101,6 +3105,7 @@ aged_unordered_container< } return n; } +*/ template < bool IsMulti, diff --git a/src/ripple/consensus/Consensus.cpp b/src/ripple/consensus/Consensus.cpp index 1b08859c889..c40bd1294e7 100644 --- a/src/ripple/consensus/Consensus.cpp +++ b/src/ripple/consensus/Consensus.cpp @@ -32,17 +32,18 @@ shouldCloseLedger( std::chrono::milliseconds timeSincePrevClose, // Time since last ledger's close time std::chrono::milliseconds openTime, // Time waiting to close this ledger + std::optional validationDelay, std::chrono::milliseconds idleInterval, ConsensusParms const& parms, beast::Journal j) { using namespace std::chrono_literals; + if ((prevRoundTime < -1s) || (prevRoundTime > 10min) || (timeSincePrevClose > 10min)) { // These are unexpected cases, we just close the ledger - JLOG(j.warn()) << "shouldCloseLedger Trans=" - << (anyTransactions ? "yes" : "no") + JLOG(j.warn()) << "Trans=" << (anyTransactions ? "yes" : "no") << " Prop: " << prevProposers << "/" << proposersClosed << " Secs: " << timeSincePrevClose.count() << " (last: " << prevRoundTime.count() << ")"; @@ -56,6 +57,12 @@ shouldCloseLedger( return true; } + // The openTime is the time spent so far waiting to close the ledger. + // Any time spent retrying ledger validation in the previous round is + // also counted. + if (validationDelay) + openTime += *validationDelay; + if (!anyTransactions) { // Only close at the end of the idle interval @@ -122,9 +129,6 @@ checkConsensus( << " time=" << currentAgreeTime.count() << "/" << previousAgreeTime.count(); - if (currentAgreeTime <= parms.ledgerMIN_CONSENSUS) - return ConsensusState::No; - if (currentProposers < (prevProposers * 3 / 4)) { // Less than 3/4 of the last ledger's proposers are present; don't @@ -155,7 +159,7 @@ checkConsensus( } // no consensus yet - JLOG(j.trace()) << "no consensus"; + JLOG(j.trace()) << "checkConsensus no consensus"; return ConsensusState::No; } diff --git a/src/ripple/consensus/Consensus.h b/src/ripple/consensus/Consensus.h index ea88e3232ee..8115e88c9e1 100644 --- a/src/ripple/consensus/Consensus.h +++ b/src/ripple/consensus/Consensus.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -29,10 +30,12 @@ #include #include #include +#include #include #include #include +#include #include #include @@ -52,6 +55,7 @@ namespace ripple { @param timeSincePrevClose time since the previous ledger's (possibly rounded) close time @param openTime duration this ledger has been open + @param validationDelay duration retrying ledger validation @param idleInterval the network's desired idle interval @param parms Consensus constant parameters @param j journal for logging @@ -65,6 +69,7 @@ shouldCloseLedger( std::chrono::milliseconds prevRoundTime, std::chrono::milliseconds timeSincePrevClose, std::chrono::milliseconds openTime, + std::optional validationDelay, std::chrono::milliseconds idleInterval, ConsensusParms const& parms, beast::Journal j); @@ -117,9 +122,20 @@ checkConsensus( reached consensus with its peers on which transactions to include. It transitions to the `Accept` phase. In this phase, the node works on applying the transactions to the prior ledger to generate a new closed - ledger. Once the new ledger is completed, the node shares the validated - ledger with the network, does some book-keeping, then makes a call to - `startRound` to start the cycle again. + ledger. + + Try to avoid advancing to a new ledger that hasn't been validated. + One scenario that causes this is if we came to consensus on a + transaction set as other peers were updating their proposals, but + we haven't received the updated proposals. This could cause the rest + of the network to settle on a different transaction set. + As a validator, it is necessary to first build a new ledger and + send a validation for it. Otherwise it's impossible to know for sure + whether or not the ledger would be validated--we can't otherwise + know the ledger hash. If this ledger does become validated, then + proceed with book-keeping and make a call to `startRound` to start + the cycle again. If it doesn't become validated, pause, check + if there is a better transaction set, and try again. This class uses a generic interface to allow adapting Consensus for specific applications. The Adaptor template implements a set of helper functions that @@ -247,20 +263,31 @@ checkConsensus( // Called when ledger closes Result onClose(Ledger const &, Ledger const & prev, Mode mode); - // Called when ledger is accepted by consensus - void onAccept(Result const & result, - RCLCxLedger const & prevLedger, - NetClock::duration closeResolution, - CloseTimes const & rawCloseTimes, - Mode const & mode); + // Called after a transaction set is agreed upon to create the new + // ledger and attempt to validate it. + std::pair + buildAndValidate( + Result const& result, + Ledger_t const& prevLedger, + NetClock::duration const& closeResolution, + ConsensusMode const& mode, + Json::Value&& consensusJson); + + // Called when the built ledger is accepted by consensus + void onAccept(Result const& result, + ConsensusCloseTimes const& rawCloseTimes, + ConsensusMode const& mode, + Json::Value&& consensusJson, + std::pair&& txsBuilt); // Called when ledger was forcibly accepted by consensus via the simulate // function. - void onForceAccept(Result const & result, - RCLCxLedger const & prevLedger, - NetClock::duration closeResolution, - CloseTimes const & rawCloseTimes, - Mode const & mode); + void onForceAccept(Result const& result, + RCLCxLedger const& prevLedger, + NetClock::duration const& closeResolution, + ConsensusCloseTimes const& rawCloseTimes, + ConsensusMode const& mode, + Json::Value&& consensusJson); // Propose the position to peers. void propose(ConsensusProposal<...> const & pos); @@ -294,7 +321,8 @@ class Consensus using Proposal_t = ConsensusProposal< NodeID_t, typename Ledger_t::ID, - typename TxSet_t::ID>; + typename TxSet_t::ID, + typename Ledger_t::Seq>; using Result = ConsensusResult; @@ -334,7 +362,7 @@ class Consensus @param adaptor The instance of the adaptor class @param j The journal to log debug output */ - Consensus(clock_type const& clock, Adaptor& adaptor, beast::Journal j); + Consensus(clock_type& clock, Adaptor& adaptor, beast::Journal j); /** Kick-off the next round of consensus. @@ -516,8 +544,15 @@ class Consensus closeLedger(); // Adjust our positions to try to agree with other validators. + /** Adjust our positions to try to agree with other validators. + * + * Share them with the network unless we've already accepted a + * consensus position. + * + * @param share Whether to share with the network. + */ void - updateOurPositions(); + updateOurPositions(bool const share); bool haveConsensus(); @@ -540,7 +575,6 @@ class Consensus NetClock::time_point asCloseTime(NetClock::time_point raw) const; -private: Adaptor& adaptor_; ConsensusPhase phase_{ConsensusPhase::accepted}; @@ -548,7 +582,7 @@ class Consensus bool firstRound_ = true; bool haveCloseTimeConsensus_ = false; - clock_type const& clock_; + clock_type& clock_; // How long the consensus convergence has taken, expressed as // a percentage of the time that we expected it to take. @@ -578,8 +612,16 @@ class Consensus // Last validated ledger seen by consensus Ledger_t previousLedger_; - // Transaction Sets, indexed by hash of transaction tree - hash_map acquired_; + // Transaction Sets, indexed by hash of transaction tree. + using AcquiredType = beast::aged_unordered_map< + typename TxSet_t::ID, + const TxSet_t, + clock_type::clock_type, + beast::uhash<>>; + AcquiredType acquired_; + + // Tx sets that can be purged only once there is a new consensus round. + std::stack acquiredPurge_; std::optional result_; ConsensusCloseTimes rawCloseTimes_; @@ -591,8 +633,18 @@ class Consensus hash_map currPeerPositions_; // Recently received peer positions, available when transitioning between - // ledgers or rounds - hash_map> recentPeerPositions_; + // ledgers or rounds. Collected by ledger sequence. This allows us to + // know which positions are likely relevant to the ledger on which we are + // currently working. Also allows us to catch up faster if we fall behind + // the rest of the network since we won't need to re-aquire proposals + // and related transaction sets. + std::map> + recentPeerPositions_; + + // These are for peers not using code that adds a ledger sequence + // to the proposal message. TODO This should be removed eventually when + // the network fully upgrades. + hash_map> recentPeerPositionsLegacy_; // The number of proposers who participated in the last consensus round std::size_t prevProposers_ = 0; @@ -606,10 +658,10 @@ class Consensus template Consensus::Consensus( - clock_type const& clock, + clock_type& clock, Adaptor& adaptor, beast::Journal journal) - : adaptor_(adaptor), clock_(clock), j_{journal} + : adaptor_(adaptor), clock_(clock), acquired_(clock), j_{journal} { JLOG(j_.debug()) << "Creating consensus object"; } @@ -635,8 +687,21 @@ Consensus::startRound( prevCloseTime_ = rawCloseTimes_.self; } + // Clear positions that we know will not ever be necessary again. + auto it = recentPeerPositions_.begin(); + while (it != recentPeerPositions_.end() && it->first <= prevLedger.seq()) + it = recentPeerPositions_.erase(it); + // Get rid of untrusted positions for the current working ledger. + auto currentPositions = + recentPeerPositions_.find(prevLedger.seq() + typename Ledger_t::Seq{1}); + if (currentPositions != recentPeerPositions_.end()) + { + for (NodeID_t const& n : nowUntrusted) + currentPositions->second.erase(n); + } + for (NodeID_t const& n : nowUntrusted) - recentPeerPositions_.erase(n); + recentPeerPositionsLegacy_.erase(n); ConsensusMode startMode = proposing ? ConsensusMode::proposing : ConsensusMode::observing; @@ -678,8 +743,29 @@ Consensus::startRoundInternal( convergePercent_ = 0; haveCloseTimeConsensus_ = false; openTime_.reset(clock_.now()); - currPeerPositions_.clear(); - acquired_.clear(); + + // beast::aged_unordered_map::erase by key is broken and + // is not used anywhere in the existing codebase. + while (!acquiredPurge_.empty()) + { + auto found = acquired_.find(acquiredPurge_.top()); + if (found != acquired_.end()) + acquired_.erase(found); + acquiredPurge_.pop(); + } + for (auto it = currPeerPositions_.begin(); it != currPeerPositions_.end();) + { + if (auto found = acquired_.find(it->second.proposal().position()); + found != acquired_.end()) + { + acquired_.erase(found); + } + it = currPeerPositions_.erase(it); + } + + // Hold up to 30 minutes worth of acquired tx sets. This to help + // catch up quickly from extended de-sync periods. + beast::expire(acquired_, std::chrono::minutes(30)); rawCloseTimes_.peers.clear(); rawCloseTimes_.self = {}; deadNodes_.clear(); @@ -707,14 +793,45 @@ Consensus::peerProposal( auto const& peerID = newPeerPos.proposal().nodeID(); // Always need to store recent positions + if (newPeerPos.proposal().ledgerSeq().has_value()) { - auto& props = recentPeerPositions_[peerID]; + // Ignore proposals from prior ledgers. + typename Ledger_t::Seq const& propLedgerSeq = + *newPeerPos.proposal().ledgerSeq(); + if (propLedgerSeq <= previousLedger_.seq()) + return false; + + auto& bySeq = recentPeerPositions_[propLedgerSeq]; + { + auto peerProp = bySeq.find(peerID); + if (peerProp == bySeq.end()) + { + bySeq.emplace(peerID, newPeerPos); + } + else + { + // Only store if it's the latest proposal from this peer for the + // consensus round in the proposal. + if (newPeerPos.proposal().proposeSeq() <= + peerProp->second.proposal().proposeSeq()) + { + return false; + } + peerProp->second = newPeerPos; + } + } + } + else + { + // legacy proposal with no ledger sequence + auto& props = recentPeerPositionsLegacy_[peerID]; if (props.size() >= 10) props.pop_front(); props.push_back(newPeerPos); } + return peerProposalInternal(now, newPeerPos); } @@ -724,10 +841,6 @@ Consensus::peerProposalInternal( NetClock::time_point const& now, PeerPosition_t const& newPeerPos) { - // Nothing to do for now if we are currently working on a ledger - if (phase_ == ConsensusPhase::accepted) - return false; - now_ = now; auto const& newPeerProp = newPeerPos.proposal(); @@ -736,6 +849,20 @@ Consensus::peerProposalInternal( { JLOG(j_.debug()) << "Got proposal for " << newPeerProp.prevLedger() << " but we are on " << prevLedgerID_; + + if (!acquired_.count(newPeerProp.position())) + { + // acquireTxSet will return the set if it is available, or + // spawn a request for it and return nullopt/nullptr. It will call + // gotTxSet once it arrives. If we're behind, this should save + // time when we catch up. + if (auto set = adaptor_.acquireTxSet(newPeerProp.position())) + gotTxSet(now_, *set); + else + JLOG(j_.debug()) << "Do not have tx set for peer"; + } + + // There's nothing else to do with this proposal currently. return false; } @@ -769,16 +896,45 @@ Consensus::peerProposalInternal( it.second.unVote(peerID); } if (peerPosIt != currPeerPositions_.end()) + { + // Remove from acquired_ or else it will consume space for + // awhile. beast::aged_unordered_map::erase by key is broken and + // is not used anywhere in the existing codebase. + if (auto found = + acquired_.find(peerPosIt->second.proposal().position()); + found != acquired_.end()) + { + acquiredPurge_.push( + peerPosIt->second.proposal().position()); + } currPeerPositions_.erase(peerID); + } deadNodes_.insert(peerID); return true; } if (peerPosIt != currPeerPositions_.end()) + { + // Remove from acquired_ or else it will consume space for awhile. + // beast::aged_unordered_container::erase by key is broken and + // is not used anywhere in the existing codebase. + if (auto found = acquired_.find(newPeerPos.proposal().position()); + found != acquired_.end()) + { + acquiredPurge_.push(newPeerPos.proposal().position()); + } + // The proposal's arrival time determines how long the network + // has been proposing, so new proposals from the same peer + // should reflect the original's arrival time. + newPeerPos.proposal().arrivalTime() = + peerPosIt->second.proposal().arrivalTime(); peerPosIt->second = newPeerPos; + } else + { currPeerPositions_.emplace(peerID, newPeerPos); + } } if (newPeerProp.isInitial()) @@ -827,13 +983,9 @@ Consensus::timerEntry(NetClock::time_point const& now) checkLedger(); if (phase_ == ConsensusPhase::open) - { phaseOpen(); - } else if (phase_ == ConsensusPhase::establish) - { phaseEstablish(); - } } template @@ -842,10 +994,6 @@ Consensus::gotTxSet( NetClock::time_point const& now, TxSet_t const& txSet) { - // Nothing to do if we've finished work on a ledger - if (phase_ == ConsensusPhase::accepted) - return; - now_ = now; auto id = txSet.id(); @@ -1025,7 +1173,18 @@ Consensus::handleWrongLedger(typename Ledger_t::ID const& lgrId) result_->compares.clear(); } - currPeerPositions_.clear(); + for (auto it = currPeerPositions_.begin(); + it != currPeerPositions_.end();) + { + // beast::aged_unordered_map::erase by key is broken and + // is not used anywhere in the existing codebase. + if (auto found = acquired_.find(it->second.proposal().position()); + found != acquired_.end()) + { + acquiredPurge_.push(it->second.proposal().position()); + } + it = currPeerPositions_.erase(it); + } rawCloseTimes_.peers.clear(); deadNodes_.clear(); @@ -1076,7 +1235,30 @@ template void Consensus::playbackProposals() { - for (auto const& it : recentPeerPositions_) + // Only use proposals for the ledger sequence we're currently working on. + auto const currentPositions = recentPeerPositions_.find( + previousLedger_.seq() + typename Ledger_t::Seq{1}); + if (currentPositions != recentPeerPositions_.end()) + { + for (auto const& [peerID, pos] : currentPositions->second) + { + if (pos.proposal().prevLedger() == prevLedgerID_ && + peerProposalInternal(now_, pos)) + { + adaptor_.share(pos); + } + } + } + + // It's safe to do this--if a proposal is based on the wrong ledger, + // then peerProposalInternal() will not replace it in currPeerPositions_. + // TODO Eventually, remove code to check for non-existent ledger sequence + // in peer proposal messages and make that parameter required in + // the protobuf definition. Do this only after the network is running on + // rippled versions with that parameter set in peer proposals. This + // can be done once an amendment for another feature forces that kind + // of upgrade, but this particular feature does not require an amendment. + for (auto const& it : recentPeerPositionsLegacy_) { for (auto const& pos : it.second) { @@ -1134,11 +1316,13 @@ Consensus::phaseOpen() prevRoundTime_, sinceClose, openTime_.read(), + adaptor_.getValidationDelay(), idleInterval, adaptor_.parms(), j_)) { closeLedger(); + adaptor_.setValidationDelay(); } } @@ -1272,11 +1456,52 @@ Consensus::phaseEstablish() convergePercent_ = result_->roundTime.read() * 100 / std::max(prevRoundTime_, parms.avMIN_CONSENSUS_TIME); - // Give everyone a chance to take an initial position - if (result_->roundTime.read() < parms.ledgerMIN_CONSENSUS) - return; + { + // Give everyone a chance to take an initial position unless enough + // have already submitted theirs a long enough time ago + // --because that means we're already + // behind. Optimize pause duration if pausing. Pause until exactly + // the number of ms after roundTime.read(), or the time since + // receiving the earliest qualifying peer proposal. To protect + // from faulty peers on the UNL, discard the earliest proposals + // beyond the quorum threshold. For example, with a UNL of 20, + // 80% quorum is 16. Assume the remaining 4 are Byzantine actors. + // We therefore ignore the first 4 proposals received + // for this calculation. We then take the earliest of either the + // 5th proposal or our own proposal to determine whether enough + // time has passed to possibly close. If not, then use that to + // precisely determine how long to pause until checking again. + std::size_t const q = adaptor_.quorum(); + std::size_t const discard = + static_cast(q / parms.minCONSENSUS_FACTOR) - q; + + std::chrono::milliseconds beginning; + if (currPeerPositions_.size() > discard) + { + std::multiset arrivals; + for (auto& pos : currPeerPositions_) + { + pos.second.proposal().arrivalTime().tick(clock_.now()); + arrivals.insert(pos.second.proposal().arrivalTime().read()); + } + auto it = arrivals.rbegin(); + std::advance(it, discard); + beginning = *it; + } + else + { + beginning = result_->roundTime.read(); + } - updateOurPositions(); + // Give everyone a chance to take an initial position + if (beginning < parms.ledgerMIN_CONSENSUS) + { + adaptor_.setTimerDelay(parms.ledgerMIN_CONSENSUS - beginning); + return; + } + } + + updateOurPositions(true); // Nothing to do if too many laggards or we don't have consensus. if (shouldPause() || !haveConsensus()) @@ -1295,13 +1520,96 @@ Consensus::phaseEstablish() prevRoundTime_ = result_->roundTime.read(); phase_ = ConsensusPhase::accepted; JLOG(j_.debug()) << "transitioned to ConsensusPhase::accepted"; + + std::optional> + txsBuilt; + // Track time spent retrying new ledger validation. + std::optional> + startDelay; + // Amount of time to pause checking for ledger to become validated. + static auto const validationWait = std::chrono::milliseconds(100); + + // Make a copy of the result_ because it may be reset during the accept + // phase if ledgers are switched and a new round is started. + assert(result_.has_value()); + std::optional result{result_}; + // Building the new ledger is time-consuming and safe to not lock, but + // the rest of the logic below needs to be locked, until + // finishing (onAccept). + std::unique_lock lock(adaptor_.peekMutex()); + do + { + if (!result_.has_value() || + result_->position.prevLedger() != result->position.prevLedger()) + { + JLOG(j_.debug()) << "A new consensus round has started based on " + "a different ledger."; + return; + } + if (txsBuilt) + { + if (!startDelay) + startDelay = std::chrono::steady_clock::now(); + + // Only send a single validation per round. + adaptor_.clearValidating(); + // Check if a better proposal has been shared by the network. + auto prevProposal = result_->position; + updateOurPositions(false); + + if (prevProposal == result_->position) + { + JLOG(j_.debug()) + << "old and new positions " + "match: " + << prevProposal.position() << " delay so far " + << std::chrono::duration_cast( + std::chrono::steady_clock::now() - *startDelay) + .count() + << "ms. pausing"; + adaptor_.getLedgerMaster().waitForValidated(validationWait); + continue; + } + JLOG(j_.debug()) << "retrying buildAndValidate with " + "new position: " + << result_->position.position(); + // Update the result used for the remainder of this Consensus round. + assert(result_.has_value()); + result.emplace(*result_); + } + lock.unlock(); + + // This is time-consuming and safe to not have under mutex. + assert(result.has_value()); + txsBuilt = adaptor_.buildAndValidate( + *result, + previousLedger_, + closeResolution_, + mode_.get(), + getJson(true)); + lock.lock(); + } while (adaptor_.retryAccept(txsBuilt->second, startDelay)); + + if (startDelay) + { + auto const delay = + std::chrono::duration_cast( + std::chrono::steady_clock::now() - *startDelay); + JLOG(j_.debug()) << "validationDelay will be " << delay.count() << "ms"; + adaptor_.setValidationDelay(delay); + } + + lock.unlock(); + + assert(result.has_value()); adaptor_.onAccept( - *result_, - previousLedger_, - closeResolution_, + *result, rawCloseTimes_, mode_.get(), - getJson(true)); + getJson(true), + std::move(*txsBuilt)); } template @@ -1315,7 +1623,8 @@ Consensus::closeLedger() JLOG(j_.debug()) << "transitioned to ConsensusPhase::establish"; rawCloseTimes_.self = now_; - result_.emplace(adaptor_.onClose(previousLedger_, now_, mode_.get())); + result_.emplace( + adaptor_.onClose(previousLedger_, now_, mode_.get(), clock_)); result_->roundTime.reset(clock_.now()); // Share the newly created transaction set if we haven't already // received it from a peer @@ -1331,10 +1640,11 @@ Consensus::closeLedger() auto const& pos = pit.second.proposal().position(); auto const it = acquired_.find(pos); if (it != acquired_.end()) - { createDisputes(it->second); - } } + // There's no reason to pause, especially if we have fallen behind and + // can possible agree to a consensus proposal already. + timerEntry(now_); } /** How many of the participants must agree to reach a given threshold? @@ -1359,7 +1669,7 @@ participantsNeeded(int participants, int percent) template void -Consensus::updateOurPositions() +Consensus::updateOurPositions(bool const share) { // We must have a position if we are updating it assert(result_); @@ -1383,6 +1693,14 @@ Consensus::updateOurPositions() JLOG(j_.warn()) << "Removing stale proposal from " << peerID; for (auto& dt : result_->disputes) dt.second.unVote(peerID); + // Remove from acquired_ or else it will consume space for + // awhile. beast::aged_unordered_map::erase by key is broken and + // is not used anywhere in the existing codebase. + if (auto found = acquired_.find(peerProp.position()); + found != acquired_.end()) + { + acquiredPurge_.push(peerProp.position()); + } it = currPeerPositions_.erase(it); } else @@ -1469,8 +1787,26 @@ Consensus::updateOurPositions() << " nw:" << neededWeight << " thrV:" << threshVote << " thrC:" << threshConsensus; - for (auto const& [t, v] : closeTimeVotes) + // An impasse is possible unless a validator pretends to change + // its close time vote. Imagine 5 validators. 3 have positions + // for close time t1, and 2 with t2. That's an impasse because + // 75% will never be met. However, if one of the validators voting + // for t2 switches to t1, then that will be 80% and sufficient + // to break the impasse. It's also OK for those agreeing + // with the 3 to pretend to vote for the one with 2, because + // that will never exceed the threshold of 75%, even with as + // few as 3 validators. The most it can achieve is 2/3. + for (auto& [t, v] : closeTimeVotes) { + if (adaptor_.validating() && + t != asCloseTime(result_->position.closeTime())) + { + JLOG(j_.debug()) << "Others have voted for a close time " + "different than ours. Adding our vote " + "to this one in case it is necessary " + "to break an impasse."; + ++v; + } JLOG(j_.debug()) << "CCTime: seq " << static_cast(previousLedger_.seq()) + 1 << ": " @@ -1484,7 +1820,12 @@ Consensus::updateOurPositions() threshVote = v; if (threshVote >= threshConsensus) + { haveCloseTimeConsensus_ = true; + // Make sure that the winning close time is the one + // that propagates to the rest of the function. + break; + } } } @@ -1520,8 +1861,10 @@ Consensus::updateOurPositions() result_->position.changePosition(newID, consensusCloseTime, now_); // Share our new transaction set and update disputes - // if we haven't already received it - if (acquired_.emplace(newID, result_->txns).second) + // if we haven't already received it. Unless we have already + // accepted a position, but are recalculating because it didn't + // validate. + if (acquired_.emplace(newID, result_->txns).second && share) { if (!result_->position.isBowOut()) adaptor_.share(result_->txns); @@ -1534,9 +1877,11 @@ Consensus::updateOurPositions() } } - // Share our new position if we are still participating this round + // Share our new position if we are still participating this round, + // unless we have already accepted a position but are recalculating + // because it didn't validate. if (!result_->position.isBowOut() && - (mode_.get() == ConsensusMode::proposing)) + (mode_.get() == ConsensusMode::proposing) && share) adaptor_.propose(result_->position); } } @@ -1558,14 +1903,9 @@ Consensus::haveConsensus() { Proposal_t const& peerProp = peerPos.proposal(); if (peerProp.position() == ourPosition) - { ++agree; - } else - { - JLOG(j_.debug()) << nodeId << " has " << peerProp.position(); ++disagree; - } } auto currentFinished = adaptor_.proposersFinished(previousLedger_, prevLedgerID_); @@ -1592,8 +1932,8 @@ Consensus::haveConsensus() // without us. if (result_->state == ConsensusState::MovedOn) { - JLOG(j_.error()) << "Unable to reach consensus"; - JLOG(j_.error()) << Json::Compact{getJson(true)}; + JLOG(j_.error()) << "Unable to reach consensus MovedOn: " + << Json::Compact{getJson(true)}; } return true; @@ -1652,7 +1992,7 @@ Consensus::createDisputes(TxSet_t const& o) if (result_->disputes.find(txID) != result_->disputes.end()) continue; - JLOG(j_.debug()) << "Transaction " << txID << " is disputed"; + JLOG(j_.trace()) << "Transaction " << txID << " is disputed"; typename Result::Dispute_t dtx{ tx, @@ -1672,7 +2012,7 @@ Consensus::createDisputes(TxSet_t const& o) result_->disputes.emplace(txID, std::move(dtx)); } - JLOG(j_.debug()) << dc << " differences found"; + JLOG(j_.trace()) << dc << " differences found"; } template diff --git a/src/ripple/consensus/ConsensusParms.h b/src/ripple/consensus/ConsensusParms.h index 542b3644b42..61722e2c439 100644 --- a/src/ripple/consensus/ConsensusParms.h +++ b/src/ripple/consensus/ConsensusParms.h @@ -70,8 +70,16 @@ struct ConsensusParms // Consensus durations are relative to the internal Consensus clock and use // millisecond resolution. - //! The percentage threshold above which we can declare consensus. + //! The percentage threshold and floating point factor above which we can + //! declare consensus. std::size_t minCONSENSUS_PCT = 80; + float minCONSENSUS_FACTOR = static_cast(minCONSENSUS_PCT / 100.0f); + + //! The percentage threshold and floating point factor above which we can + //! declare consensus based on nodes having fallen off of the UNL. + std::size_t negUNL_MIN_CONSENSUS_PCT = 60; + float negUNL_MIN_CONSENSUS_FACTOR = + static_cast(negUNL_MIN_CONSENSUS_PCT / 100.0f); //! The duration a ledger may remain idle before closing std::chrono::milliseconds ledgerIDLE_INTERVAL = std::chrono::seconds{15}; diff --git a/src/ripple/consensus/ConsensusProposal.h b/src/ripple/consensus/ConsensusProposal.h index c5103cfe0d5..95acb3014a3 100644 --- a/src/ripple/consensus/ConsensusProposal.h +++ b/src/ripple/consensus/ConsensusProposal.h @@ -21,9 +21,13 @@ #include #include +#include +#include #include #include +#include #include +#include #include #include @@ -51,12 +55,15 @@ namespace ripple { @tparam Position_t Type used to represent the position taken on transactions under consideration during this round of consensus */ -template +template class ConsensusProposal { public: using NodeID = NodeID_t; + //! Clock type for measuring time within the consensus code + using clock_type = beast::abstract_clock; + //< Sequence value when a peer initially joins consensus static std::uint32_t const seqJoin = 0; @@ -71,6 +78,8 @@ class ConsensusProposal @param closeTime Position of when this ledger closed. @param now Time when the proposal was taken. @param nodeID ID of node/peer taking this position. + @param ledgerSeq Ledger sequence of proposal. + @param clock Clock that works with real and test time. */ ConsensusProposal( LedgerID_t const& prevLedger, @@ -78,14 +87,20 @@ class ConsensusProposal Position_t const& position, NetClock::time_point closeTime, NetClock::time_point now, - NodeID_t const& nodeID) + NodeID_t const& nodeID, + std::optional const& ledgerSeq, + clock_type const& clock) : previousLedger_(prevLedger) , position_(position) , closeTime_(closeTime) , time_(now) , proposeSeq_(seq) , nodeID_(nodeID) + , ledgerSeq_(ledgerSeq) { + // Track the arrive time to know how long our peers have been + // sending proposals. + arrivalTime_.reset(clock.now()); } //! Identifying which peer took this position. @@ -232,6 +247,18 @@ class ConsensusProposal return signingHash_.value(); } + std::optional const& + ledgerSeq() const + { + return ledgerSeq_; + } + + ConsensusTimer& + arrivalTime() const + { + return arrivalTime_; + } + private: //! Unique identifier of prior ledger this proposal is based on LedgerID_t previousLedger_; @@ -251,15 +278,19 @@ class ConsensusProposal //! The identifier of the node taking this position NodeID_t nodeID_; + std::optional ledgerSeq_; + //! The signing hash for this proposal mutable std::optional signingHash_; + + mutable ConsensusTimer arrivalTime_; }; -template +template bool operator==( - ConsensusProposal const& a, - ConsensusProposal const& b) + ConsensusProposal const& a, + ConsensusProposal const& b) { return a.nodeID() == b.nodeID() && a.proposeSeq() == b.proposeSeq() && a.prevLedger() == b.prevLedger() && a.position() == b.position() && diff --git a/src/ripple/consensus/ConsensusTypes.h b/src/ripple/consensus/ConsensusTypes.h index 05d03c8a9c6..42c0b9561a5 100644 --- a/src/ripple/consensus/ConsensusTypes.h +++ b/src/ripple/consensus/ConsensusTypes.h @@ -21,7 +21,6 @@ #define RIPPLE_CONSENSUS_CONSENSUS_TYPES_H_INCLUDED #include -#include #include #include #include @@ -189,6 +188,8 @@ enum class ConsensusState { Yes //!< We have consensus along with the network }; +template +class ConsensusProposal; /** Encapsulates the result of consensus. Stores all relevant data for the outcome of consensus on a single @@ -208,7 +209,8 @@ struct ConsensusResult using Proposal_t = ConsensusProposal< NodeID_t, typename Ledger_t::ID, - typename TxSet_t::ID>; + typename TxSet_t::ID, + typename Ledger_t::Seq>; using Dispute_t = DisputedTx; ConsensusResult(TxSet_t&& s, Proposal_t&& p) diff --git a/src/ripple/consensus/DisputedTx.h b/src/ripple/consensus/DisputedTx.h index ae127197eec..92d9917145d 100644 --- a/src/ripple/consensus/DisputedTx.h +++ b/src/ripple/consensus/DisputedTx.h @@ -152,19 +152,19 @@ DisputedTx::setVote(NodeID_t const& peer, bool votesYes) { if (votesYes) { - JLOG(j_.debug()) << "Peer " << peer << " votes YES on " << tx_.id(); + JLOG(j_.trace()) << "Peer " << peer << " votes YES on " << tx_.id(); ++yays_; } else { - JLOG(j_.debug()) << "Peer " << peer << " votes NO on " << tx_.id(); + JLOG(j_.trace()) << "Peer " << peer << " votes NO on " << tx_.id(); ++nays_; } } // changes vote to yes else if (votesYes && !it->second) { - JLOG(j_.debug()) << "Peer " << peer << " now votes YES on " << tx_.id(); + JLOG(j_.trace()) << "Peer " << peer << " now votes YES on " << tx_.id(); --nays_; ++yays_; it->second = true; @@ -172,7 +172,7 @@ DisputedTx::setVote(NodeID_t const& peer, bool votesYes) // changes vote to no else if (!votesYes && it->second) { - JLOG(j_.debug()) << "Peer " << peer << " now votes NO on " << tx_.id(); + JLOG(j_.trace()) << "Peer " << peer << " now votes NO on " << tx_.id(); ++nays_; --yays_; it->second = false; @@ -238,17 +238,17 @@ DisputedTx::updateVote( if (newPosition == ourVote_) { - JLOG(j_.info()) << "No change (" << (ourVote_ ? "YES" : "NO") - << ") : weight " << weight << ", percent " - << percentTime; - JLOG(j_.debug()) << Json::Compact{getJson()}; + JLOG(j_.trace()) << "No change (" << (ourVote_ ? "YES" : "NO") + << ") : weight " << weight << ", percent " + << percentTime; + JLOG(j_.trace()) << Json::Compact{getJson()}; return false; } ourVote_ = newPosition; - JLOG(j_.debug()) << "We now vote " << (ourVote_ ? "YES" : "NO") << " on " + JLOG(j_.trace()) << "We now vote " << (ourVote_ ? "YES" : "NO") << " on " << tx_.id(); - JLOG(j_.debug()) << Json::Compact{getJson()}; + JLOG(j_.trace()) << Json::Compact{getJson()}; return true; } diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 3afec605cfa..0d9b9fc549b 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -1994,6 +1995,10 @@ PeerImp::onMessage(std::shared_ptr const& m) JLOG(p_journal_.trace()) << "Proposal: " << (isTrusted ? "trusted" : "untrusted"); + std::optional ledgerSeq; + if (set.has_ledgerseq()) + ledgerSeq = set.ledgerseq(); + auto proposal = RCLCxPeerPos( publicKey, sig, @@ -2004,7 +2009,9 @@ PeerImp::onMessage(std::shared_ptr const& m) proposeHash, closeTime, app_.timeKeeper().closeTime(), - calcNodeID(app_.validatorManifests().getMasterKey(publicKey))}); + calcNodeID(app_.validatorManifests().getMasterKey(publicKey)), + ledgerSeq, + beast::get_abstract_clock()}); std::weak_ptr weak = shared_from_this(); app_.getJobQueue().addJob( diff --git a/src/ripple/proto/ripple.proto b/src/ripple/proto/ripple.proto index 74cbfe8f6cb..d116b992a90 100644 --- a/src/ripple/proto/ripple.proto +++ b/src/ripple/proto/ripple.proto @@ -231,6 +231,8 @@ message TMProposeSet // Number of hops traveled optional uint32 hops = 12 [deprecated=true]; + + optional uint32 ledgerSeq = 14; // sequence of the ledger we are proposing } enum TxSetStatus diff --git a/src/test/consensus/Consensus_test.cpp b/src/test/consensus/Consensus_test.cpp index 1c19ff0708d..6b0b4817875 100644 --- a/src/test/consensus/Consensus_test.cpp +++ b/src/test/consensus/Consensus_test.cpp @@ -44,34 +44,35 @@ class Consensus_test : public beast::unit_test::suite // Use default parameters ConsensusParms const p{}; + std::optional delay; // Bizarre times forcibly close BEAST_EXPECT(shouldCloseLedger( - true, 10, 10, 10, -10s, 10s, 1s, 1s, p, journal_)); + true, 10, 10, 10, -10s, 10s, 1s, delay, 1s, p, journal_)); BEAST_EXPECT(shouldCloseLedger( - true, 10, 10, 10, 100h, 10s, 1s, 1s, p, journal_)); + true, 10, 10, 10, 100h, 10s, 1s, delay, 1s, p, journal_)); BEAST_EXPECT(shouldCloseLedger( - true, 10, 10, 10, 10s, 100h, 1s, 1s, p, journal_)); + true, 10, 10, 10, 10s, 100h, 1s, delay, 1s, p, journal_)); // Rest of network has closed - BEAST_EXPECT( - shouldCloseLedger(true, 10, 3, 5, 10s, 10s, 10s, 10s, p, journal_)); + BEAST_EXPECT(shouldCloseLedger( + true, 10, 3, 5, 10s, 10s, 10s, delay, 10s, p, journal_)); // No transactions means wait until end of internval - BEAST_EXPECT( - !shouldCloseLedger(false, 10, 0, 0, 1s, 1s, 1s, 10s, p, journal_)); - BEAST_EXPECT( - shouldCloseLedger(false, 10, 0, 0, 1s, 10s, 1s, 10s, p, journal_)); + BEAST_EXPECT(!shouldCloseLedger( + false, 10, 0, 0, 1s, 1s, 1s, delay, 10s, p, journal_)); + BEAST_EXPECT(shouldCloseLedger( + false, 10, 0, 0, 1s, 10s, 1s, delay, 10s, p, journal_)); // Enforce minimum ledger open time - BEAST_EXPECT( - !shouldCloseLedger(true, 10, 0, 0, 10s, 10s, 1s, 10s, p, journal_)); + BEAST_EXPECT(!shouldCloseLedger( + true, 10, 0, 0, 10s, 10s, 1s, delay, 10s, p, journal_)); // Don't go too much faster than last time - BEAST_EXPECT( - !shouldCloseLedger(true, 10, 0, 0, 10s, 10s, 3s, 10s, p, journal_)); + BEAST_EXPECT(!shouldCloseLedger( + true, 10, 0, 0, 10s, 10s, 3s, delay, 10s, p, journal_)); - BEAST_EXPECT( - shouldCloseLedger(true, 10, 0, 0, 10s, 10s, 10s, 10s, p, journal_)); + BEAST_EXPECT(shouldCloseLedger( + true, 10, 0, 0, 10s, 10s, 10s, delay, 10s, p, journal_)); } void diff --git a/src/test/csf/Peer.h b/src/test/csf/Peer.h index 6d3008f7348..29c0d0ba75e 100644 --- a/src/test/csf/Peer.h +++ b/src/test/csf/Peer.h @@ -19,6 +19,9 @@ #ifndef RIPPLE_TEST_CSF_PEER_H_INCLUDED #define RIPPLE_TEST_CSF_PEER_H_INCLUDED +#include +#include +#include #include #include #include @@ -26,6 +29,10 @@ #include #include #include +#include +#include +#include +#include #include #include #include @@ -33,6 +40,7 @@ #include #include #include +#include namespace ripple { namespace test { @@ -158,10 +166,13 @@ struct Peer using NodeID_t = PeerID; using NodeKey_t = PeerKey; using TxSet_t = TxSet; + using CanonicalTxSet_t = TxSet; using PeerPosition_t = Position; using Result = ConsensusResult; using NodeKey = Validation::NodeKey; + using clock_type = Stopwatch; + //! Logging support that prefixes messages with the peer ID beast::WrappedSink sink; beast::Journal j; @@ -240,7 +251,7 @@ struct Peer // Quorum of validations needed for a ledger to be fully validated // TODO: Use the logic in ValidatorList to set this dynamically - std::size_t quorum = 0; + std::size_t q = 0; hash_set trustedKeys; @@ -250,6 +261,16 @@ struct Peer //! The collectors to report events to CollectorRefs& collectors; + mutable std::recursive_mutex mtx; + + std::optional delay; + + struct Null_test : public beast::unit_test::suite + { + void + run() override{}; + }; + /** Constructor @param i Unique PeerID @@ -496,7 +517,8 @@ struct Peer onClose( Ledger const& prevLedger, NetClock::time_point closeTime, - ConsensusMode mode) + ConsensusMode mode, + clock_type& clock) { issue(CloseLedger{prevLedger, openTxs}); @@ -508,7 +530,9 @@ struct Peer TxSet::calcID(openTxs), closeTime, now(), - id)); + id, + prevLedger.seq() + typename Ledger_t::Seq{1}, + scheduler.clock())); } void @@ -520,11 +544,10 @@ struct Peer ConsensusMode const& mode, Json::Value&& consensusJson) { - onAccept( + buildAndValidate( result, prevLedger, closeResolution, - rawCloseTimes, mode, std::move(consensusJson)); } @@ -532,10 +555,19 @@ struct Peer void onAccept( Result const& result, - Ledger const& prevLedger, - NetClock::duration const& closeResolution, ConsensusCloseTimes const& rawCloseTimes, ConsensusMode const& mode, + Json::Value&& consensusJson, + std::pair&& txsBuilt) + { + } + + std::pair + buildAndValidate( + Result const& result, + Ledger_t const& prevLedger, + NetClock::duration const& closeResolution, + ConsensusMode const& mode, Json::Value&& consensusJson) { schedule(delays.ledgerAccept, [=, this]() { @@ -599,6 +631,8 @@ struct Peer startRound(); } }); + + return {}; } // Earliest allowed sequence number when checking for ledgers with more @@ -694,8 +728,8 @@ struct Peer std::size_t const count = validations.numTrustedForLedger(ledger.id()); std::size_t const numTrustedPeers = trustGraph.graph().outDegree(this); - quorum = static_cast(std::ceil(numTrustedPeers * 0.8)); - if (count >= quorum && ledger.isAncestor(fullyValidatedLedger)) + q = static_cast(std::ceil(numTrustedPeers * 0.8)); + if (count >= q && ledger.isAncestor(fullyValidatedLedger)) { issue(FullyValidateLedger{ledger, fullyValidatedLedger}); fullyValidatedLedger = ledger; @@ -850,7 +884,13 @@ struct Peer hash_set keys; for (auto const p : trustGraph.trustedPeers(this)) keys.insert(p->key); - return {quorum, keys}; + return {q, keys}; + } + + std::size_t + quorum() const + { + return q; } std::size_t @@ -973,6 +1013,70 @@ struct Peer return TxSet{res}; } + + LedgerMaster& + getLedgerMaster() const + { + Null_test test; + jtx::Env env(test); + + return env.app().getLedgerMaster(); + } + + void + clearValidating() + { + } + + bool + retryAccept( + Ledger_t const& newLedger, + std::optional>& + start) const + { + return false; + } + + std::recursive_mutex& + peekMutex() const + { + return mtx; + } + + void + endConsensus() const + { + } + + bool + validating() const + { + return false; + } + + std::optional + getValidationDelay() const + { + return delay; + } + + void + setValidationDelay( + std::optional vd = std::nullopt) const + { + } + + std::optional + getTimerDelay() const + { + return delay; + } + + void + setTimerDelay( + std::optional vd = std::nullopt) const + { + } }; } // namespace csf diff --git a/src/test/csf/Proposal.h b/src/test/csf/Proposal.h index d1cee16c1a7..76f36877c81 100644 --- a/src/test/csf/Proposal.h +++ b/src/test/csf/Proposal.h @@ -30,7 +30,7 @@ namespace csf { /** Proposal is a position taken in the consensus process and is represented directly from the generic types. */ -using Proposal = ConsensusProposal; +using Proposal = ConsensusProposal; } // namespace csf } // namespace test