Skip to content

Commit

Permalink
Allow only 1 job queue slot for acquiring inbound ledger.
Browse files Browse the repository at this point in the history
* Log when duplicate concurrent inbound ledger are filtered.
* RAII for containers that track concurrent inbound ledger.
* Comment on when to asynchronously acquire inbound ledgers, which
   is possible to be always OK, but should have further review.
* Other small logging changes

Co-authored-by: Ed Hennis <[email protected]>
  • Loading branch information
mtrippled and ximinez committed Aug 31, 2024
1 parent 00ed7c9 commit 7741483
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 8 deletions.
8 changes: 6 additions & 2 deletions src/ripple/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,12 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash)
acquiringLedger_ = hash;

app_.getJobQueue().addJob(
jtADVANCE, "getConsensusLedger", [id = hash, &app = app_]() {
app.getInboundLedgers().acquire(
jtADVANCE,
"getConsensusLedger1",
[id = hash, &app = app_, this]() {
JLOG(j_.debug())
<< "JOB advanceLedger getConsensusLedger1 started";
app.getInboundLedgers().acquireAsync(
id, 0, InboundLedger::Reason::CONSENSUS);
});
}
Expand Down
6 changes: 4 additions & 2 deletions src/ripple/app/consensus/RCLValidations.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ RCLValidationsAdaptor::acquire(LedgerHash const& hash)
Application* pApp = &app_;

app_.getJobQueue().addJob(
jtADVANCE, "getConsensusLedger", [pApp, hash]() {
pApp->getInboundLedgers().acquire(
jtADVANCE, "getConsensusLedger2", [pApp, hash, this]() {
JLOG(j_.debug())
<< "JOB advanceLedger getConsensusLedger2 started";
pApp->getInboundLedgers().acquireAsync(
hash, 0, InboundLedger::Reason::CONSENSUS);
});
return std::nullopt;
Expand Down
14 changes: 12 additions & 2 deletions src/ripple/app/ledger/InboundLedgers.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <ripple/app/ledger/InboundLedger.h>
#include <ripple/protocol/RippleLedgerHash.h>
#include <memory>
#include <set>

namespace ripple {

Expand All @@ -37,11 +38,20 @@ class InboundLedgers

virtual ~InboundLedgers() = default;

// VFALCO TODO Should this be called findOrAdd ?
//
// Callers should use this if they possibly need an authoritative
// response immediately.
virtual std::shared_ptr<Ledger const>
acquire(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason) = 0;

// Callers should use this if they are known to be executing on the Job
// Queue. TODO review whether all callers of acquire() can use this
// instead. Inbound ledger acquisition is asynchronous anyway.
virtual void
acquireAsync(
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) = 0;

virtual std::shared_ptr<InboundLedger>
find(LedgerHash const& hash) = 0;

Expand Down
2 changes: 1 addition & 1 deletion src/ripple/app/ledger/impl/InboundLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
return;
}

if (auto stream = journal_.trace())
if (auto stream = journal_.debug())
{
stream << "Trigger acquiring ledger " << hash_;
if (peer)
Expand Down
35 changes: 35 additions & 0 deletions src/ripple/app/ledger/impl/InboundLedgers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <ripple/core/JobQueue.h>
#include <ripple/nodestore/DatabaseShard.h>
#include <ripple/protocol/jss.h>
#include <exception>
#include <memory>
#include <mutex>
#include <vector>
Expand Down Expand Up @@ -149,6 +150,37 @@ class InboundLedgersImp : public InboundLedgers
return ledger;
}

void
acquireAsync(
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) override
{
std::unique_lock lock(acquiresMutex_);
try
{
if (pendingAcquires_.contains(hash))
return;
pendingAcquires_.insert(hash);
lock.unlock();
acquire(hash, seq, reason);
}
catch (std::exception const& e)
{
JLOG(j_.warn())
<< "Exception thrown for acquiring new inbound ledger " << hash
<< ": " << e.what();
}
catch (...)
{
JLOG(j_.warn())
<< "Unknown exception thrown for acquiring new inbound ledger "
<< hash;
}
lock.lock();
pendingAcquires_.erase(hash);
}

std::shared_ptr<InboundLedger>
find(uint256 const& hash) override
{
Expand Down Expand Up @@ -441,6 +473,9 @@ class InboundLedgersImp : public InboundLedgers
beast::insight::Counter mCounter;

std::unique_ptr<PeerSetBuilder> mPeerSetBuilder;

std::set<uint256> pendingAcquires_;
std::mutex acquiresMutex_;
};

//------------------------------------------------------------------------------
Expand Down
3 changes: 2 additions & 1 deletion src/ripple/app/misc/NetworkOPs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1730,7 +1730,8 @@ NetworkOPsImp::checkLastClosedLedger(
}

JLOG(m_journal.warn()) << "We are not running on the consensus ledger";
JLOG(m_journal.info()) << "Our LCL: " << getJson({*ourClosed, {}});
JLOG(m_journal.info()) << "Our LCL: " << ourClosed->info().hash
<< getJson({*ourClosed, {}});
JLOG(m_journal.info()) << "Net LCL " << closedLedger;

if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
Expand Down
8 changes: 8 additions & 0 deletions src/test/app/LedgerReplay_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ class MagicInboundLedgers : public InboundLedgers
return {};
}

virtual void
acquireAsync(
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) override
{
}

virtual std::shared_ptr<InboundLedger>
find(LedgerHash const& hash) override
{
Expand Down

0 comments on commit 7741483

Please sign in to comment.