Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions docs/release-notes/eclair-vnext.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,9 @@ eclair.relay.peer-reputation {
// value, as described by https://github.com/lightning/blips/blob/master/blip-0004.md,
enabled = true
// Reputation decays with the following half life to emphasize recent behavior.
half-life = 15 days
half-life = 30 days
// Payments that stay pending for longer than this get penalized
max-relay-duration = 12 seconds
// Pending payments are counted as failed, and because they could potentially stay pending for a very long time,
// the following multiplier is applied.
pending-multiplier = 200 // A pending payment counts as two hundred failed ones.
max-relay-duration = 5 minutes
}
```

Expand Down
9 changes: 2 additions & 7 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,9 @@ eclair {
// value, as described by https://github.com/lightning/blips/blob/master/blip-0004.md,
enabled = true
// Reputation decays with the following half life to emphasize recent behavior.
half-life = 15 days
half-life = 30 days
// Payments that stay pending for longer than this get penalized.
max-relay-duration = 12 seconds
// Pending payments are counted as failed, and because they could potentially stay pending for a very long time,
// the following multiplier is applied. We want it to be as close as possible to the true cost of a worst case
// HTLC (max-cltv-delta / max-relay-duration, around 100000 with default parameters) while still being comparable
// to the number of HTLCs received per peer during twice the half life.
pending-multiplier = 200 // A pending payment counts as two hundred failed ones.
max-relay-duration = 5 minutes
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,6 @@ object NodeParams extends Logging {
enabled = config.getBoolean("relay.peer-reputation.enabled"),
halfLife = FiniteDuration(config.getDuration("relay.peer-reputation.half-life").getSeconds, TimeUnit.SECONDS),
maxRelayDuration = FiniteDuration(config.getDuration("relay.peer-reputation.max-relay-duration").getSeconds, TimeUnit.SECONDS),
pendingMultiplier = config.getDouble("relay.peer-reputation.pending-multiplier"),
),
),
db = database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ class Channel(val nodeParams: NodeParams, val channelKeys: ChannelKeys, val wall
context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.aliases, commitments1, d.lastAnnouncement_opt))
val relayFee = nodeFee(d.channelUpdate.relayFees, add.amountMsat)
context.system.eventStream.publish(OutgoingHtlcAdded(add, remoteNodeId, c.origin.upstream, relayFee))
log.info("OutgoingHtlcAdded: channelId={}, id={}, endorsement={}, remoteNodeId={}, upstream={}, fee={}", Array(add.channelId.toHex, add.id, add.endorsement, remoteNodeId.toHex, c.origin.upstream.toString, relayFee))
log.info("OutgoingHtlcAdded: channelId={}, id={}, endorsement={}, remoteNodeId={}, upstream={}, fee={}, now={}, blockHeight={}, expiry={}", Array(add.channelId.toHex, add.id, add.endorsement, remoteNodeId.toHex, c.origin.upstream.toString, relayFee, TimestampMilli.now().toLong, nodeParams.currentBlockHeight.toLong, add.cltvExpiry))
handleCommandSuccess(c, d.copy(commitments = commitments1)) sending add
case Left(cause) => handleAddHtlcCommandError(c, cause, Some(d.channelUpdate))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ object ChannelRelay {
val upstream = Upstream.Hot.Channel(r.add.removeUnknownTlvs(), r.receivedAt, originNode, incomingChannelOccupancy)
reputationRecorder_opt match {
case Some(reputationRecorder) =>
reputationRecorder ! GetConfidence(context.messageAdapter(WrappedReputationScore(_)), upstream, channels.values.headOption.map(_.nextNodeId), r.relayFeeMsat)
reputationRecorder ! GetConfidence(context.messageAdapter(WrappedReputationScore(_)), upstream, channels.values.headOption.map(_.nextNodeId), r.relayFeeMsat, nodeParams.currentBlockHeight, r.outgoingCltv)
case None =>
context.self ! WrappedReputationScore(Reputation.Score.fromEndorsement(r.add.endorsement))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
case Event(RouteResponse(route +: _), WaitingForRoute(request, failures, ignore)) =>
log.info(s"route found: attempt=${failures.size + 1}/${request.maxAttempts} route=${route.printNodes()} channels=${route.printChannels()}")
reputationRecorder_opt match {
case Some(reputationRecorder) => reputationRecorder ! GetConfidence(self, cfg.upstream, Some(route.hops.head.nextNodeId), route.hops.head.fee(request.amount))
case Some(reputationRecorder) =>
val cltvExpiry = route.fullRoute.map(_.cltvExpiryDelta).foldLeft(request.recipient.expiry)(_ + _)
reputationRecorder ! GetConfidence(self, cfg.upstream, Some(route.hops.head.nextNodeId), route.hops.head.fee(request.amount), nodeParams.currentBlockHeight, cltvExpiry)
case None =>
val endorsement = cfg.upstream match {
case Hot.Channel(add, _, _, _) => add.endorsement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.eclair.channel.{ChannelJammingException, ChannelParams, Commitments, IncomingConfidenceTooLow, OutgoingConfidenceTooLow, TooManySmallHtlcs}
import fr.acinq.eclair.transactions.DirectedHtlc
import fr.acinq.eclair.wire.protocol.UpdateAddHtlc
import fr.acinq.eclair.{MilliSatoshi, TimestampMilli}
import fr.acinq.eclair.{BlockHeight, CltvExpiry, MilliSatoshi, TimestampMilli}

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration.{DurationInt, FiniteDuration}

/**
* Reputation score per endorsement level.
Expand All @@ -34,31 +34,36 @@ import scala.concurrent.duration.FiniteDuration
case class PastScore(weight: Double, score: Double, lastSettlementAt: TimestampMilli)

/** We're relaying that HTLC and are waiting for it to settle. */
case class PendingHtlc(fee: MilliSatoshi, endorsement: Int, startedAt: TimestampMilli) {
def weight(now: TimestampMilli, minDuration: FiniteDuration, multiplier: Double): Double = {
val duration = now - startedAt
fee.toLong.toDouble * (duration / minDuration).max(multiplier)
case class PendingHtlc(fee: MilliSatoshi, endorsement: Int, startedAt: TimestampMilli, expiry: CltvExpiry) {
def weight(now: TimestampMilli, minDuration: FiniteDuration, currentBlockHeight: BlockHeight): Double = {
val alreadyPending = now - startedAt
val untilExpiry = (expiry.toLong - currentBlockHeight.toLong) * 10.minutes
val duration = alreadyPending + untilExpiry
fee.toLong.toDouble * (duration / minDuration)
}
}

case class HtlcId(channelId: ByteVector32, id: Long)

case object HtlcId {
def apply(add: UpdateAddHtlc): HtlcId = HtlcId(add.channelId, add.id)
}

/**
* Local reputation for a given node.
*
* @param pastScores Scores from past HTLCs for each endorsement level.
* @param pending Set of pending HTLCs.
* @param halfLife Half life for the exponential moving average.
* @param maxRelayDuration Duration after which HTLCs are penalized for staying pending too long.
* @param pendingMultiplier How much to penalize pending HTLCs.
*/
case class Reputation(pastScores: Map[Int, PastScore], pending: Map[HtlcId, PendingHtlc], halfLife: FiniteDuration, maxRelayDuration: FiniteDuration, pendingMultiplier: Double) {
case class Reputation(pastScores: Map[Int, PastScore], pending: Map[HtlcId, PendingHtlc], halfLife: FiniteDuration, maxRelayDuration: FiniteDuration) {
private def decay(now: TimestampMilli, lastSettlementAt: TimestampMilli): Double = scala.math.pow(0.5, (now - lastSettlementAt) / halfLife)

/**
* Estimate the confidence that a payment will succeed.
*/
def getConfidence(fee: MilliSatoshi, endorsement: Int, now: TimestampMilli = TimestampMilli.now()): Double = {
def getConfidence(fee: MilliSatoshi, endorsement: Int, currentBlockHeight: BlockHeight, expiry: CltvExpiry, now: TimestampMilli = TimestampMilli.now()): Double = {
val weights = Array.fill(Reputation.endorsementLevels)(0.0)
val scores = Array.fill(Reputation.endorsementLevels)(0.0)
for (e <- 0 until Reputation.endorsementLevels) {
Expand All @@ -67,9 +72,9 @@ case class Reputation(pastScores: Map[Int, PastScore], pending: Map[HtlcId, Pend
scores(e) += d * pastScores(e).score
}
for (p <- pending.values) {
weights(p.endorsement) += p.weight(now, maxRelayDuration, pendingMultiplier)
weights(p.endorsement) += p.weight(now, maxRelayDuration, currentBlockHeight)
}
weights(endorsement) += fee.toLong.toDouble * pendingMultiplier
weights(endorsement) += PendingHtlc(fee, endorsement, now, expiry).weight(now, maxRelayDuration, currentBlockHeight)
/*
Higher endorsement buckets may have fewer payments which makes the weight of pending payments disproportionately
important. To counter this effect, we try adding payments from the lower buckets to see if it gives us a higher
Expand All @@ -91,17 +96,23 @@ case class Reputation(pastScores: Map[Int, PastScore], pending: Map[HtlcId, Pend
/**
* Register a pending relay.
*/
def addPendingHtlc(htlcId: HtlcId, fee: MilliSatoshi, endorsement: Int, now: TimestampMilli = TimestampMilli.now()): Reputation =
copy(pending = pending + (htlcId -> PendingHtlc(fee, endorsement, now)))
def addPendingHtlc(add: UpdateAddHtlc, fee: MilliSatoshi, endorsement: Int, now: TimestampMilli = TimestampMilli.now()): Reputation =
copy(pending = pending + (HtlcId(add) -> PendingHtlc(fee, endorsement, now, add.cltvExpiry)))

/**
* When a HTLC is settled, we record whether it succeeded and how long it took.
*/
def settlePendingHtlc(htlcId: HtlcId, isSuccess: Boolean, now: TimestampMilli = TimestampMilli.now()): Reputation = {
val newScores = pending.get(htlcId).map(p => {
val d = decay(now, pastScores(p.endorsement).lastSettlementAt)
val newWeight = d * pastScores(p.endorsement).weight + p.weight(now, maxRelayDuration, if (isSuccess) 1.0 else 0.0)
val newScore = d * pastScores(p.endorsement).score + (if (isSuccess) p.fee.toLong.toDouble else 0)
val duration = now - p.startedAt
val (weight, score) = if (isSuccess) {
(p.fee.toLong.toDouble * (duration / maxRelayDuration).max(1.0), p.fee.toLong.toDouble)
} else {
(p.fee.toLong.toDouble * (duration / maxRelayDuration), 0.0)
}
val newWeight = d * pastScores(p.endorsement).weight + weight
val newScore = d * pastScores(p.endorsement).score + score
pastScores + (p.endorsement -> PastScore(newWeight, newScore, now))
}).getOrElse(pastScores)
copy(pending = pending - htlcId, pastScores = newScores)
Expand All @@ -112,9 +123,9 @@ object Reputation {
val endorsementLevels = 8
val maxEndorsement = endorsementLevels - 1

case class Config(enabled: Boolean, halfLife: FiniteDuration, maxRelayDuration: FiniteDuration, pendingMultiplier: Double)
case class Config(enabled: Boolean, halfLife: FiniteDuration, maxRelayDuration: FiniteDuration)

def init(config: Config): Reputation = Reputation(Map.empty.withDefaultValue(PastScore(0.0, 0.0, TimestampMilli.min)), Map.empty, config.halfLife, config.maxRelayDuration, config.pendingMultiplier)
def init(config: Config): Reputation = Reputation(Map.empty.withDefaultValue(PastScore(0.0, 0.0, TimestampMilli.min)), Map.empty, config.halfLife, config.maxRelayDuration)

/**
* @param incomingConfidence Confidence that the outgoing HTLC will succeed given the reputation of the incoming peer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.MilliSatoshi
import fr.acinq.eclair.{BlockHeight, CltvExpiry, MilliSatoshi}
import fr.acinq.eclair.channel.Upstream.Hot
import fr.acinq.eclair.channel.{OutgoingHtlcAdded, OutgoingHtlcFailed, OutgoingHtlcFulfilled, OutgoingHtlcSettled, Upstream}
import fr.acinq.eclair.reputation.ReputationRecorder._
Expand All @@ -31,7 +31,7 @@ import scala.collection.mutable
object ReputationRecorder {
// @formatter:off
sealed trait Command
case class GetConfidence(replyTo: ActorRef[Reputation.Score], upstream: Upstream.Hot, downstream_opt: Option[PublicKey], fee: MilliSatoshi) extends Command
case class GetConfidence(replyTo: ActorRef[Reputation.Score], upstream: Upstream.Hot, downstream_opt: Option[PublicKey], fee: MilliSatoshi, currentBlockHeight: BlockHeight, expiry: CltvExpiry) extends Command
private case class WrappedOutgoingHtlcAdded(added: OutgoingHtlcAdded) extends Command
private case class WrappedOutgoingHtlcSettled(settled: OutgoingHtlcSettled) extends Command
// @formatter:on
Expand Down Expand Up @@ -60,17 +60,17 @@ class ReputationRecorder(config: Reputation.Config) {

def run(): Behavior[Command] =
Behaviors.receiveMessage {
case GetConfidence(replyTo, _: Upstream.Local, _, _) =>
case GetConfidence(replyTo, _: Upstream.Local, _, _, _, _) =>
replyTo ! Reputation.Score.max
Behaviors.same

case GetConfidence(replyTo, upstream: Upstream.Hot.Channel, downstream_opt, fee) =>
val incomingConfidence = incomingReputations.get(upstream.receivedFrom).map(_.getConfidence(fee, upstream.add.endorsement)).getOrElse(0.0)
val outgoingConfidence = downstream_opt.flatMap(outgoingReputations.get).map(_.getConfidence(fee, Reputation.toEndorsement(incomingConfidence))).getOrElse(0.0)
case GetConfidence(replyTo, upstream: Upstream.Hot.Channel, downstream_opt, fee, currentBlockHeight, expiry) =>
val incomingConfidence = incomingReputations.get(upstream.receivedFrom).map(_.getConfidence(fee, upstream.add.endorsement, currentBlockHeight, expiry)).getOrElse(0.0)
val outgoingConfidence = downstream_opt.flatMap(outgoingReputations.get).map(_.getConfidence(fee, Reputation.toEndorsement(incomingConfidence), currentBlockHeight, expiry)).getOrElse(0.0)
replyTo ! Reputation.Score(incomingConfidence, outgoingConfidence)
Behaviors.same

case GetConfidence(replyTo, upstream: Upstream.Hot.Trampoline, downstream_opt, totalFee) =>
case GetConfidence(replyTo, upstream: Upstream.Hot.Trampoline, downstream_opt, totalFee, currentBlockHeight, expiry) =>
val incomingConfidence =
upstream.received
.groupMapReduce(_.receivedFrom)(r => (r.add.amountMsat, r.add.endorsement)) {
Expand All @@ -79,29 +79,29 @@ class ReputationRecorder(config: Reputation.Config) {
.map {
case (nodeId, (amount, endorsement)) =>
val fee = amount * totalFee.toLong / upstream.amountIn.toLong
incomingReputations.get(nodeId).map(_.getConfidence(fee, endorsement)).getOrElse(0.0)
incomingReputations.get(nodeId).map(_.getConfidence(fee, endorsement, currentBlockHeight, expiry)).getOrElse(0.0)
}
.min
val outgoingConfidence = downstream_opt.flatMap(outgoingReputations.get).map(_.getConfidence(totalFee, Reputation.toEndorsement(incomingConfidence))).getOrElse(0.0)
val outgoingConfidence = downstream_opt.flatMap(outgoingReputations.get).map(_.getConfidence(totalFee, Reputation.toEndorsement(incomingConfidence), currentBlockHeight, expiry)).getOrElse(0.0)
replyTo ! Reputation.Score(incomingConfidence, outgoingConfidence)
Behaviors.same

case WrappedOutgoingHtlcAdded(OutgoingHtlcAdded(add, remoteNodeId, upstream, fee)) =>
val htlcId = HtlcId(add.channelId, add.id)
val htlcId = HtlcId(add)
upstream match {
case channel: Hot.Channel =>
incomingReputations(channel.receivedFrom) = incomingReputations(channel.receivedFrom).addPendingHtlc(htlcId, fee, channel.add.endorsement)
incomingReputations(channel.receivedFrom) = incomingReputations(channel.receivedFrom).addPendingHtlc(add, fee, channel.add.endorsement)
case trampoline: Hot.Trampoline =>
trampoline.received
.groupMapReduce(_.receivedFrom)(r => (r.add.amountMsat, r.add.endorsement)) {
case ((amount1, endorsement1), (amount2, endorsement2)) => (amount1 + amount2, endorsement1 min endorsement2)
}
.foreach { case (nodeId, (amount, endorsement)) =>
incomingReputations(nodeId) = incomingReputations(nodeId).addPendingHtlc(htlcId, fee * amount.toLong / trampoline.amountIn.toLong, endorsement)
incomingReputations(nodeId) = incomingReputations(nodeId).addPendingHtlc(add, fee * amount.toLong / trampoline.amountIn.toLong, endorsement)
}
case _: Upstream.Local => ()
}
outgoingReputations(remoteNodeId) = outgoingReputations(remoteNodeId).addPendingHtlc(htlcId, fee, add.endorsement)
outgoingReputations(remoteNodeId) = outgoingReputations(remoteNodeId).addPendingHtlc(add, fee, add.endorsement)
pending(htlcId) = PendingHtlc(add, upstream, remoteNodeId)
Behaviors.same

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ object TestConstants {
feeProportionalMillionths = 30),
enforcementDelay = 10 minutes,
asyncPaymentsParams = AsyncPaymentsParams(1008, CltvExpiryDelta(144)),
peerReputationConfig = Reputation.Config(enabled = true, 1 day, 10 seconds, 100),
peerReputationConfig = Reputation.Config(enabled = true, 1 day, 10 minutes),
),
db = TestDatabases.inMemoryDb(),
autoReconnect = false,
Expand Down Expand Up @@ -372,7 +372,7 @@ object TestConstants {
feeProportionalMillionths = 30),
enforcementDelay = 10 minutes,
asyncPaymentsParams = AsyncPaymentsParams(1008, CltvExpiryDelta(144)),
peerReputationConfig = Reputation.Config(enabled = true, 2 day, 20 seconds, 200),
peerReputationConfig = Reputation.Config(enabled = true, 2 day, 20 minutes),
),
db = TestDatabases.inMemoryDb(),
autoReconnect = false,
Expand Down
Loading