Skip to content

Commit a4d9560

Browse files
committed
Polish
1 parent 96b5205 commit a4d9560

File tree

12 files changed

+109
-50
lines changed

12 files changed

+109
-50
lines changed

docs/release-notes/eclair-vnext.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ eclair.relay.peer-reputation {
4343
// value, as described by https://github.com/lightning/blips/blob/master/blip-0004.md,
4444
enabled = true
4545
// Reputation decays with the following half life to emphasize recent behavior.
46-
half-life = 7 days
46+
half-life = 15 days
4747
// Payments that stay pending for longer than this get penalized
4848
max-relay-duration = 12 seconds
4949
// Pending payments are counted as failed, and because they could potentially stay pending for a very long time,
5050
// the following multiplier is applied.
51-
pending-multiplier = 1000 // A pending payment counts as a thousand failed ones.
51+
pending-multiplier = 200 // A pending payment counts as a thousand failed ones.
5252
}
5353
```
5454

eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,19 +147,25 @@ case class INPUT_RESTORED(data: PersistentChannelData)
147147
sealed trait Upstream { def amountIn: MilliSatoshi }
148148
object Upstream {
149149
/** We haven't restarted and have full information about the upstream parent(s). */
150-
sealed trait Hot extends Upstream
150+
sealed trait Hot extends Upstream {
151+
def show: String
152+
}
151153
object Hot {
152154
/** Our node is forwarding a single incoming HTLC. */
153155
case class Channel(add: UpdateAddHtlc, receivedAt: TimestampMilli, receivedFrom: PublicKey) extends Hot {
154156
override val amountIn: MilliSatoshi = add.amountMsat
155157
val expiryIn: CltvExpiry = add.cltvExpiry
158+
159+
override def show: String = s"Channel(receivedAt=${receivedAt.toLong}, receivedFrom=${receivedFrom.toHex}, endorsement=${add.endorsement})"
156160
}
157161
/** Our node is forwarding a payment based on a set of HTLCs from potentially multiple upstream channels. */
158162
case class Trampoline(received: List[Channel]) extends Hot {
159163
override val amountIn: MilliSatoshi = received.map(_.add.amountMsat).sum
160164
// We must use the lowest expiry of the incoming HTLC set.
161165
val expiryIn: CltvExpiry = received.map(_.add.cltvExpiry).min
162166
val receivedAt: TimestampMilli = received.map(_.receivedAt).max
167+
168+
override def show: String = s"Trampoline(${received.map(_.show).mkString(",")})"
163169
}
164170
}
165171

@@ -183,7 +189,11 @@ object Upstream {
183189
}
184190

185191
/** Our node is the origin of the payment: there are no matching upstream HTLCs. */
186-
case class Local(id: UUID) extends Hot with Cold { override val amountIn: MilliSatoshi = 0 msat }
192+
case class Local(id: UUID) extends Hot with Cold {
193+
override val amountIn: MilliSatoshi = 0 msat
194+
195+
override def show: String = toString
196+
}
187197
}
188198

189199
/**

eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -537,7 +537,7 @@ case class Commitment(fundingTxIndex: Long,
537537
}
538538
}
539539
val occupancy = (outgoingHtlcs.size.toDouble / maxAcceptedHtlcs).max(htlcValueInFlight.toLong.toDouble / allowedHtlcValueInFlight.toBigInt.toDouble)
540-
if (confidence + 0.05 < occupancy) {
540+
if (confidence + 0.1 < occupancy) { // We add a 10% tolerance to enable payments from nodes without history and to account for the fact that even at the highest endorsement level we still expect a confidence of less than 93.75%.
541541
return Left(ConfidenceTooLow(params.channelId, confidence, occupancy))
542542
}
543543

eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,9 @@ class Channel(val nodeParams: NodeParams, val channelKeys: ChannelKeys, val wall
514514
case Right((commitments1, add)) =>
515515
if (c.commit) self ! CMD_SIGN()
516516
context.system.eventStream.publish(AvailableBalanceChanged(self, d.channelId, d.aliases, commitments1, d.lastAnnouncement_opt))
517-
context.system.eventStream.publish(OutgoingHtlcAdded(add, remoteNodeId, c.origin.upstream, nodeFee(d.channelUpdate.relayFees, add.amountMsat)))
517+
val relayFee = nodeFee(d.channelUpdate.relayFees, add.amountMsat)
518+
context.system.eventStream.publish(OutgoingHtlcAdded(add, remoteNodeId, c.origin.upstream, relayFee))
519+
log.info("OutgoingHtlcAdded: channelId={}, id={}, endorsement={}, remoteNodeId={}, upstream={}, fee={}", Array(add.channelId.toHex, add.id, add.endorsement, remoteNodeId.toHex, c.origin.upstream.show, relayFee))
518520
handleCommandSuccess(c, d.copy(commitments = commitments1)) sending add
519521
case Left(cause) => handleAddHtlcCommandError(c, cause, Some(d.channelUpdate))
520522
}
@@ -542,6 +544,7 @@ class Channel(val nodeParams: NodeParams, val channelKeys: ChannelKeys, val wall
542544
// we forward preimages as soon as possible to the upstream channel because it allows us to pull funds
543545
relayer ! RES_ADD_SETTLED(origin, htlc, HtlcResult.RemoteFulfill(fulfill))
544546
context.system.eventStream.publish(OutgoingHtlcFulfilled(fulfill))
547+
log.info("OutgoingHtlcFulfilled: channelId={}, id={}", fulfill.channelId.toHex, fulfill.id)
545548
stay() using d.copy(commitments = commitments1)
546549
case Left(cause) => handleLocalError(cause, d, Some(fulfill))
547550
}
@@ -576,13 +579,15 @@ class Channel(val nodeParams: NodeParams, val channelKeys: ChannelKeys, val wall
576579

577580
case Event(fail: UpdateFailHtlc, d: DATA_NORMAL) =>
578581
context.system.eventStream.publish(OutgoingHtlcFailed(fail))
582+
log.info("OutgoingHtlcFailed: channelId={}, id={}", fail.channelId.toHex, fail.id)
579583
d.commitments.receiveFail(fail) match {
580584
case Right((commitments1, _, _)) => stay() using d.copy(commitments = commitments1)
581585
case Left(cause) => handleLocalError(cause, d, Some(fail))
582586
}
583587

584588
case Event(fail: UpdateFailMalformedHtlc, d: DATA_NORMAL) =>
585589
context.system.eventStream.publish(OutgoingHtlcFailed(fail))
590+
log.info("OutgoingHtlcFailed: channelId={}, id={}", fail.channelId.toHex, fail.id)
586591
d.commitments.receiveFailMalformed(fail) match {
587592
case Right((commitments1, _, _)) => stay() using d.copy(commitments = commitments1)
588593
case Left(cause) => handleLocalError(cause, d, Some(fail))

eclair-core/src/main/scala/fr/acinq/eclair/reputation/Reputation.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,16 @@ import scala.collection.mutable
2323
import scala.concurrent.duration.FiniteDuration
2424

2525
/**
26+
* Reputation score per endorsement level.
2627
*
27-
* @param weight How much fees we would have collected in the past if all payments had succeeded (exponential moving average).
28+
* @param weight How much fees we would have collected in the past if all HTLCs had succeeded (exponential moving average).
2829
* @param score How much fees we have collected in the past (exponential moving average).
29-
* @param lastSettlementAt Timestamp of the last recorded payment settlement.
30+
* @param lastSettlementAt Timestamp of the last recorded HTLC settlement.
3031
*/
3132
case class PastScore(weight: Double, score: Double, lastSettlementAt: TimestampMilli)
3233

33-
/** We're relaying that payment and are waiting for it to settle. */
34-
case class PendingPayment(fee: MilliSatoshi, endorsement: Int, startedAt: TimestampMilli) {
34+
/** We're relaying that HTLC and are waiting for it to settle. */
35+
case class PendingHtlc(fee: MilliSatoshi, endorsement: Int, startedAt: TimestampMilli) {
3536
def weight(now: TimestampMilli, minDuration: FiniteDuration, multiplier: Double): Double = {
3637
val duration = now - startedAt
3738
fee.toLong.toDouble * (duration / minDuration).max(multiplier)
@@ -43,12 +44,13 @@ case class HtlcId(channelId: ByteVector32, id: Long)
4344
/**
4445
* Local reputation for a given node.
4546
*
46-
* @param pending Set of pending payments (payments may contain multiple HTLCs when using trampoline).
47+
* @param pastScores Scores from past HTLCs for each endorsement level.
48+
* @param pending Set of pending HTLCs.
4749
* @param halfLife Half life for the exponential moving average.
48-
* @param maxRelayDuration Duration after which payments are penalized for staying pending too long.
49-
* @param pendingMultiplier How much to penalize pending payments.
50+
* @param maxRelayDuration Duration after which HTLCs are penalized for staying pending too long.
51+
* @param pendingMultiplier How much to penalize pending HTLCs.
5052
*/
51-
case class Reputation(pastScores: Array[PastScore], pending: mutable.Map[HtlcId, PendingPayment], halfLife: FiniteDuration, maxRelayDuration: FiniteDuration, pendingMultiplier: Double) {
53+
case class Reputation(pastScores: Array[PastScore], pending: mutable.Map[HtlcId, PendingHtlc], halfLife: FiniteDuration, maxRelayDuration: FiniteDuration, pendingMultiplier: Double) {
5254
private def decay(now: TimestampMilli, lastSettlementAt: TimestampMilli): Double = scala.math.pow(0.5, (now - lastSettlementAt) / halfLife)
5355

5456
/**
@@ -88,10 +90,10 @@ case class Reputation(pastScores: Array[PastScore], pending: mutable.Map[HtlcId,
8890
* Register a pending relay.
8991
*/
9092
def attempt(htlcId: HtlcId, fee: MilliSatoshi, endorsement: Int, now: TimestampMilli = TimestampMilli.now()): Unit =
91-
pending(htlcId) = PendingPayment(fee, endorsement, now)
93+
pending(htlcId) = PendingHtlc(fee, endorsement, now)
9294

9395
/**
94-
* When a payment is settled, we record whether it succeeded and how long it took.
96+
* When a HTLC is settled, we record whether it succeeded and how long it took.
9597
*/
9698
def record(htlcId: HtlcId, isSuccess: Boolean, now: TimestampMilli = TimestampMilli.now()): Unit =
9799
pending.remove(htlcId).foreach(p => {

eclair-core/src/main/scala/fr/acinq/eclair/reputation/ReputationRecorder.scala

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package fr.acinq.eclair.reputation
1818

1919
import akka.actor.typed.eventstream.EventStream
20-
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
20+
import akka.actor.typed.scaladsl.Behaviors
2121
import akka.actor.typed.{ActorRef, Behavior}
2222
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
2323
import fr.acinq.eclair.MilliSatoshi
@@ -37,21 +37,31 @@ object ReputationRecorder {
3737
private case class WrappedOutgoingHtlcFulfilled(fulfilled: OutgoingHtlcFulfilled) extends Command
3838
// @formatter:on
3939

40-
/** Confidence that the outgoing HTLC will succeed. */
40+
/**
41+
* @param confidence Confidence that the outgoing HTLC will succeed (takes into account both upstream and downstream reputation).
42+
* @param endorsement Endorsement level to set for the outgoing HTLC (takes into account upstream reputation only).
43+
*/
4144
case class Confidence(confidence: Double, endorsement: Int)
4245

4346
def apply(config: Reputation.Config): Behavior[Command] =
4447
Behaviors.setup(context => {
4548
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcAdded))
4649
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcFailed))
4750
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter(WrappedOutgoingHtlcFulfilled))
48-
new ReputationRecorder(config, context).run()
51+
new ReputationRecorder(config).run()
4952
})
5053

51-
case class PendingHtlc(add: UpdateAddHtlc, remoteNodeId: PublicKey, upstream: Upstream.Hot)
54+
/**
55+
* A pending outgoing HTLC.
56+
*
57+
* @param add UpdateAddHtlc that contains an id for the HTLC and an endorsement value.
58+
* @param upstream The incoming node or nodes.
59+
* @param downstream The outgoing node.
60+
*/
61+
case class PendingHtlc(add: UpdateAddHtlc, upstream: Upstream.Hot, downstream: PublicKey)
5262
}
5363

54-
class ReputationRecorder(config: Reputation.Config, context: ActorContext[ReputationRecorder.Command]) {
64+
class ReputationRecorder(config: Reputation.Config) {
5565
private val incomingReputations: mutable.Map[PublicKey, Reputation] = mutable.HashMap.empty
5666
private val outgoingReputations: mutable.Map[PublicKey, Reputation] = mutable.HashMap.empty
5767
private val pending: mutable.Map[HtlcId, PendingHtlc] = mutable.HashMap.empty
@@ -112,13 +122,17 @@ class ReputationRecorder(config: Reputation.Config, context: ActorContext[Reputa
112122
case channel: Hot.Channel =>
113123
incomingReputation(channel.receivedFrom).attempt(htlcId, fee, channel.add.endorsement)
114124
case trampoline: Hot.Trampoline =>
115-
trampoline.received.foreach(channel =>
116-
incomingReputation(channel.receivedFrom).attempt(htlcId, fee * channel.amountIn.toLong / trampoline.amountIn.toLong, channel.add.endorsement)
117-
)
125+
trampoline.received
126+
.groupMapReduce(_.receivedFrom)(r => (r.add.amountMsat, r.add.endorsement)) {
127+
case ((amount1, endorsement1), (amount2, endorsement2)) => (amount1 + amount2, endorsement1 min endorsement2)
128+
}
129+
.foreach { case (nodeId, (amount, endorsement)) =>
130+
incomingReputation(nodeId).attempt(htlcId, fee * amount.toLong / trampoline.amountIn.toLong, endorsement)
131+
}
118132
case _: Upstream.Local => ()
119133
}
120134
outgoingReputation(remoteNodeId).attempt(htlcId, fee, add.endorsement)
121-
pending(htlcId) = PendingHtlc(add, remoteNodeId, upstream)
135+
pending(htlcId) = PendingHtlc(add, upstream, remoteNodeId)
122136
Behaviors.same
123137

124138
case WrappedOutgoingHtlcFailed(OutgoingHtlcFailed(fail)) =>
@@ -136,7 +150,7 @@ class ReputationRecorder(config: Reputation.Config, context: ActorContext[Reputa
136150
)
137151
case _: Upstream.Local => ()
138152
}
139-
outgoingReputation(p.remoteNodeId).record(htlcId, isSuccess = false)
153+
outgoingReputation(p.downstream).record(htlcId, isSuccess = false)
140154
})
141155
Behaviors.same
142156

@@ -152,7 +166,7 @@ class ReputationRecorder(config: Reputation.Config, context: ActorContext[Reputa
152166
)
153167
case _: Upstream.Local => ()
154168
}
155-
outgoingReputation(p.remoteNodeId).record(htlcId, isSuccess = true)
169+
outgoingReputation(p.downstream).record(htlcId, isSuccess = true)
156170
})
157171
Behaviors.same
158172
}

eclair-core/src/main/scala/fr/acinq/eclair/reputation/package.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,6 @@ package fr.acinq.eclair
1818

1919
package object reputation {
2020
implicit class EndorsementDouble(private val c: Double) extends AnyVal {
21-
def toEndorsement: Int = (c * 7.999).toInt
21+
def toEndorsement: Int = (c * Reputation.endorsementLevels).toInt
2222
}
2323
}

eclair-core/src/main/scala/fr/acinq/eclair/wire/protocol/LightningMessageTypes.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import fr.acinq.bitcoin.scalacompat.{BlockHash, ByteVector32, ByteVector64, OutP
2323
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
2424
import fr.acinq.eclair.channel.{ChannelFlags, ChannelType}
2525
import fr.acinq.eclair.payment.relay.Relayer
26-
import fr.acinq.eclair.reputation.EndorsementDouble
2726
import fr.acinq.eclair.wire.protocol.ChannelReadyTlv.ShortChannelIdTlv
2827
import fr.acinq.eclair.{Alias, BlockHeight, CltvExpiry, CltvExpiryDelta, Feature, Features, InitFeature, MilliSatoshi, MilliSatoshiLong, RealShortChannelId, ShortChannelId, TimestampSecond, UInt64, isAsciiPrintable}
2928
import scodec.bits.ByteVector

eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ object TestConstants {
180180
feeProportionalMillionths = 30),
181181
enforcementDelay = 10 minutes,
182182
asyncPaymentsParams = AsyncPaymentsParams(1008, CltvExpiryDelta(144)),
183-
peerReputationConfig = Reputation.Config(enabled = false, 1 day, 10 seconds, 100),
183+
peerReputationConfig = Reputation.Config(enabled = true, 1 day, 10 seconds, 100),
184184
),
185185
db = TestDatabases.inMemoryDb(),
186186
autoReconnect = false,
@@ -372,7 +372,7 @@ object TestConstants {
372372
feeProportionalMillionths = 30),
373373
enforcementDelay = 10 minutes,
374374
asyncPaymentsParams = AsyncPaymentsParams(1008, CltvExpiryDelta(144)),
375-
peerReputationConfig = Reputation.Config(enabled = false, 2 day, 20 seconds, 200),
375+
peerReputationConfig = Reputation.Config(enabled = true, 2 day, 20 seconds, 200),
376376
),
377377
db = TestDatabases.inMemoryDb(),
378378
autoReconnect = false,

eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentLifecycleSpec.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package fr.acinq.eclair.payment
1818

1919
import akka.actor.ActorRef
2020
import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
21+
import akka.actor.typed.scaladsl.Behaviors
2122
import akka.actor.typed.scaladsl.adapter._
2223
import akka.testkit.{TestFSMRef, TestProbe}
2324
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
@@ -38,7 +39,7 @@ import fr.acinq.eclair.payment.relay.Relayer.RelayFees
3839
import fr.acinq.eclair.payment.send.PaymentInitiator.SendPaymentConfig
3940
import fr.acinq.eclair.payment.send.PaymentLifecycle._
4041
import fr.acinq.eclair.payment.send.{ClearRecipient, PaymentLifecycle, Recipient}
41-
import fr.acinq.eclair.reputation.Reputation
42+
import fr.acinq.eclair.reputation.{Reputation, ReputationRecorder}
4243
import fr.acinq.eclair.router.Announcements.makeChannelUpdate
4344
import fr.acinq.eclair.router.BaseRouterSpec.{blindedRouteFromHops, channelAnnouncement, channelHopFromUpdate}
4445
import fr.acinq.eclair.router.Graph.PaymentWeightRatios
@@ -93,7 +94,13 @@ class PaymentLifecycleSpec extends BaseRouterSpec {
9394
val nodeParams = TestConstants.Alice.nodeParams.copy(nodeKeyManager = testNodeKeyManager, channelKeyManager = testChannelKeyManager)
9495
val cfg = SendPaymentConfig(id, parentId, Some(defaultExternalId), defaultPaymentHash, invoice.nodeId, Upstream.Local(id), Some(invoice), None, storeInDb, publishEvent, recordMetrics)
9596
val (routerForwarder, register, sender, monitor, eventListener, metricsListener) = (TestProbe(), TestProbe(), TestProbe(), TestProbe(), TestProbe(), TestProbe())
96-
val paymentFSM = TestFSMRef(new PaymentLifecycle(nodeParams, cfg, routerForwarder.ref, register.ref, None))
97+
val reputationRecorder = system.spawnAnonymous(Behaviors.receiveMessage[ReputationRecorder.GetConfidence](getConfidence => {
98+
assert(getConfidence.upstream.isInstanceOf[Upstream.Local])
99+
assert(getConfidence.downstream == b)
100+
getConfidence.replyTo ! ReputationRecorder.Confidence(1.0, Reputation.maxEndorsement)
101+
Behaviors.same
102+
}))
103+
val paymentFSM = TestFSMRef(new PaymentLifecycle(nodeParams, cfg, routerForwarder.ref, register.ref, Some(reputationRecorder)))
97104
paymentFSM ! SubscribeTransitionCallBack(monitor.ref)
98105
val CurrentState(_, WAITING_FOR_REQUEST) = monitor.expectMsgClass(classOf[CurrentState[_]])
99106
system.eventStream.subscribe(eventListener.ref, classOf[PaymentEvent])

0 commit comments

Comments
 (0)