diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/PaymentLifecycle.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/PaymentLifecycle.scala index 207e8a7bd0..d43326280e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/PaymentLifecycle.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/PaymentLifecycle.scala @@ -103,8 +103,16 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A Metrics.PaymentAttempt.withTag(Tags.MultiPart, value = false).record(d.failures.size + 1) val p = PartialPayment(id, d.request.amount, d.cmd.amount - d.request.amount, htlc.channelId, Some(d.route.fullRoute)) val remainingAttribution_opt = fulfill match { - case HtlcResult.RemoteFulfill(fulfill) => - fulfill.attribution_opt.flatMap(Sphinx.Attribution.fulfillHoldTimes(_, d.sharedSecrets).remaining_opt) + case HtlcResult.RemoteFulfill(updateFulfill) => + updateFulfill.attribution_opt match { + case Some(attribution) => + val Sphinx.Attribution.UnwrappedAttribution(holdTimes, remaining_opt) = Sphinx.Attribution.fulfillHoldTimes(attribution, d.sharedSecrets) + if (holdTimes.nonEmpty) { + context.system.eventStream.publish(Router.ReportedHoldTimes(holdTimes)) + } + remaining_opt + case None => None + } case _: HtlcResult.OnChainFulfill => None } myStop(d.request, Right(cfg.createPaymentSent(d.recipient, fulfill.paymentPreimage, p :: Nil, remainingAttribution_opt))) @@ -170,6 +178,9 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A private def handleRemoteFail(d: WaitingForComplete, fail: UpdateFailHtlc) = { import d._ val htlcFailure = Sphinx.FailurePacket.decrypt(fail.reason, fail.attribution_opt, sharedSecrets) + if (htlcFailure.holdTimes.nonEmpty) { + context.system.eventStream.publish(Router.ReportedHoldTimes(htlcFailure.holdTimes)) + } ((htlcFailure.failure match { case success@Right(e) => Metrics.PaymentError.withTag(Tags.Failure, Tags.FailureType(RemoteFailure(request.amount, Nil, e))).increment() diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/TrampolinePaymentLifecycle.scala b/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/TrampolinePaymentLifecycle.scala index c48fc328ec..d5a0167aaf 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/TrampolinePaymentLifecycle.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/payment/send/TrampolinePaymentLifecycle.scala @@ -16,6 +16,7 @@ package fr.acinq.eclair.payment.send +import akka.actor.typed.eventstream.EventStream import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.{ActorContext, Behaviors} import akka.actor.typed.{ActorRef, Behavior} @@ -27,6 +28,8 @@ import fr.acinq.eclair.io.Peer import fr.acinq.eclair.payment.OutgoingPaymentPacket.{NodePayload, buildOnion} import fr.acinq.eclair.payment.PaymentSent.PartialPayment import fr.acinq.eclair.payment._ +import fr.acinq.eclair.payment.send.TrampolinePayment.{buildOutgoingPayment, computeFees} +import fr.acinq.eclair.router.Router import fr.acinq.eclair.router.Router.RouteParams import fr.acinq.eclair.wire.protocol.{PaymentOnion, PaymentOnionCodecs} import fr.acinq.eclair.{CltvExpiry, CltvExpiryDelta, Features, Logs, MilliSatoshi, NodeParams, randomBytes32} @@ -54,9 +57,9 @@ object TrampolinePaymentLifecycle { require(invoice.amount_opt.nonEmpty, "amount-less invoices are not supported in trampoline tests") } private case class TrampolinePeerNotFound(trampolineNodeId: PublicKey) extends Command + private case class CouldntAddHtlc(failure: Throwable) extends Command + private case class HtlcSettled(result: HtlcResult, part: PartialPayment, holdTimes: Seq[Sphinx.HoldTime]) extends Command private case class WrappedPeerChannels(channels: Seq[Peer.ChannelInfo]) extends Command - private case class WrappedAddHtlcResponse(response: CommandResponse[CMD_ADD_HTLC]) extends Command - private case class WrappedHtlcSettled(result: RES_ADD_SETTLED[Origin.Hot, HtlcResult]) extends Command // @formatter:on def apply(nodeParams: NodeParams, register: ActorRef[Register.ForwardNodeId[Peer.GetPeerChannels]]): Behavior[Command] = @@ -75,6 +78,79 @@ object TrampolinePaymentLifecycle { } } + object PartHandler { + sealed trait Command + + private case class WrappedAddHtlcResponse(response: CommandResponse[CMD_ADD_HTLC]) extends Command + + private case class WrappedHtlcSettled(result: RES_ADD_SETTLED[Origin.Hot, HtlcResult]) extends Command + + def apply(parent: ActorRef[TrampolinePaymentLifecycle.Command], + cmd: TrampolinePaymentLifecycle.SendPayment, + amount: MilliSatoshi, + channelInfo: Peer.ChannelInfo, + expiry: CltvExpiry, + trampolinePaymentSecret: ByteVector32, + attemptNumber: Int): Behavior[Command] = + Behaviors.setup { context => + new PartHandler(context, parent, cmd).start(amount, channelInfo, expiry, trampolinePaymentSecret, attemptNumber) + } + } + + class PartHandler(context: ActorContext[PartHandler.Command], parent: ActorRef[Command], cmd: TrampolinePaymentLifecycle.SendPayment) { + + import PartHandler._ + + private val paymentHash = cmd.invoice.paymentHash + + private val addHtlcAdapter = context.messageAdapter[CommandResponse[CMD_ADD_HTLC]](WrappedAddHtlcResponse) + private val htlcSettledAdapter = context.messageAdapter[RES_ADD_SETTLED[Origin.Hot, HtlcResult]](WrappedHtlcSettled) + + def start(amount: MilliSatoshi, channelInfo: Peer.ChannelInfo, expiry: CltvExpiry, trampolinePaymentSecret: ByteVector32, attemptNumber: Int): Behavior[PartHandler.Command] = { + val origin = Origin.Hot(htlcSettledAdapter.toClassic, Upstream.Local(cmd.paymentId)) + val outgoing = buildOutgoingPayment(cmd.trampolineNodeId, cmd.invoice, amount, expiry, Some(trampolinePaymentSecret), attemptNumber) + val add = CMD_ADD_HTLC(addHtlcAdapter.toClassic, outgoing.trampolineAmount, paymentHash, outgoing.trampolineExpiry, outgoing.onion.packet, None, 1.0, None, origin, commit = true) + channelInfo.channel ! add + val channelId = channelInfo.data.asInstanceOf[DATA_NORMAL].channelId + val part = PartialPayment(cmd.paymentId, amount, computeFees(amount, attemptNumber), channelId, None) + waitForSettlement(part, outgoing.onion.sharedSecrets, outgoing.trampolineOnion.sharedSecrets) + } + + def waitForSettlement(part: PartialPayment, outerOnionSecrets: Seq[Sphinx.SharedSecret], trampolineOnionSecrets: Seq[Sphinx.SharedSecret]): Behavior[PartHandler.Command] = { + Behaviors.receiveMessagePartial { + case WrappedAddHtlcResponse(response) => response match { + case _: CommandSuccess[_] => + // HTLC was correctly sent out. + Behaviors.same + case failure: CommandFailure[_, Throwable] => + parent ! CouldntAddHtlc(failure.t) + Behaviors.stopped + } + case WrappedHtlcSettled(result) => result.result match { + case fulfill: HtlcResult.Fulfill => + val holdTimes = fulfill match { + case HtlcResult.RemoteFulfill(updateFulfill) => + updateFulfill.attribution_opt match { + case Some(attribution) => + Sphinx.Attribution.fulfillHoldTimes(attribution, outerOnionSecrets).holdTimes + case None => Nil + } + case _: HtlcResult.OnChainFulfill => Nil + } + parent ! HtlcSettled(fulfill, part, holdTimes) + Behaviors.stopped + case fail: HtlcResult.Fail => + val holdTimes = fail match { + case HtlcResult.RemoteFail(updateFail) => + Sphinx.FailurePacket.decrypt(updateFail.reason, updateFail.attribution_opt, outerOnionSecrets).holdTimes + case _ => Nil + } + parent ! HtlcSettled(fail, part, holdTimes) + Behaviors.stopped + } + } + } + } } class TrampolinePaymentLifecycle private(nodeParams: NodeParams, @@ -90,8 +166,6 @@ class TrampolinePaymentLifecycle private(nodeParams: NodeParams, private val forwardNodeIdFailureAdapter = context.messageAdapter[Register.ForwardNodeIdFailure[Peer.GetPeerChannels]](_ => TrampolinePeerNotFound(cmd.trampolineNodeId)) private val peerChannelsResponseAdapter = context.messageAdapter[Peer.PeerChannels](c => WrappedPeerChannels(c.channels)) - private val addHtlcAdapter = context.messageAdapter[CommandResponse[CMD_ADD_HTLC]](WrappedAddHtlcResponse) - private val htlcSettledAdapter = context.messageAdapter[RES_ADD_SETTLED[Origin.Hot, HtlcResult]](WrappedHtlcSettled) def start(): Behavior[Command] = listChannels(attemptNumber = 0) @@ -117,7 +191,6 @@ class TrampolinePaymentLifecycle private(nodeParams: NodeParams, case _ => None } }) - val origin = Origin.Hot(htlcSettledAdapter.toClassic, Upstream.Local(cmd.paymentId)) val expiry = CltvExpiry(nodeParams.currentBlockHeight) + CltvExpiryDelta(36) if (filtered.isEmpty) { context.log.warn("no usable channel with trampoline node {}", cmd.trampolineNodeId) @@ -131,54 +204,49 @@ class TrampolinePaymentLifecycle private(nodeParams: NodeParams, // We generate a random secret to avoid leaking the invoice secret to the trampoline node. val trampolinePaymentSecret = randomBytes32() context.log.info("sending trampoline payment parts: {}->{}, {}->{}", channel1.data.channelId, amount1, channel2.data.channelId, amount2) - val parts = Seq((amount1, channel1), (amount2, channel2)).map { case (amount, channelInfo) => - val outgoing = buildOutgoingPayment(cmd.trampolineNodeId, cmd.invoice, amount, expiry, Some(trampolinePaymentSecret), attemptNumber) - val add = CMD_ADD_HTLC(addHtlcAdapter.toClassic, outgoing.trampolineAmount, paymentHash, outgoing.trampolineExpiry, outgoing.onion.packet, None, 1.0, None, origin, commit = true) - channelInfo.channel ! add - val channelId = channelInfo.data.asInstanceOf[DATA_NORMAL].channelId - PartialPayment(cmd.paymentId, amount, computeFees(amount, attemptNumber), channelId, None) + Seq((amount1, channel1), (amount2, channel2)).foreach { case (amount, channelInfo) => + context.spawnAnonymous(PartHandler(context.self, cmd, amount, channelInfo, expiry, trampolinePaymentSecret, attemptNumber)) } - waitForSettlement(remaining = 2, attemptNumber, parts) + waitForSettlement(remaining = 2, attemptNumber, Nil) } } - private def waitForSettlement(remaining: Int, attemptNumber: Int, parts: Seq[PartialPayment]): Behavior[Command] = { + private def waitForSettlement(remaining: Int, attemptNumber: Int, fulfilledParts: Seq[PartialPayment]): Behavior[Command] = { Behaviors.receiveMessagePartial { - case WrappedAddHtlcResponse(response) => response match { - case _: CommandSuccess[_] => - // HTLC was correctly sent out. - Behaviors.same - case failure: CommandFailure[_, Throwable] => - context.log.warn("HTLC could not be sent: {}", failure.t.getMessage) - if (remaining > 1) { - context.log.info("waiting for remaining HTLCs to complete") - waitForSettlement(remaining - 1, attemptNumber, parts) - } else { - context.log.warn("trampoline payment failed") - cmd.replyTo ! PaymentFailed(cmd.paymentId, paymentHash, LocalFailure(totalAmount, Nil, failure.t) :: Nil) - Behaviors.stopped - } - } - case WrappedHtlcSettled(result) => result.result match { - case fulfill: HtlcResult.Fulfill => - context.log.info("HTLC was fulfilled") - if (remaining > 1) { - context.log.info("waiting for remaining HTLCs to be fulfilled") - waitForSettlement(remaining - 1, attemptNumber, parts) - } else { - context.log.info("trampoline payment succeeded") - cmd.replyTo ! PaymentSent(cmd.paymentId, paymentHash, fulfill.paymentPreimage, totalAmount, cmd.invoice.nodeId, parts, None) - Behaviors.stopped - } - case fail: HtlcResult.Fail => - context.log.warn("received HTLC failure: {}", fail) - if (remaining > 1) { - context.log.info("waiting for remaining HTLCs to be failed") - waitForSettlement(remaining - 1, attemptNumber, parts) - } else { - retryOrStop(attemptNumber + 1) - } - } + case CouldntAddHtlc(failure) => + context.log.warn("HTLC could not be sent: {}", failure.getMessage) + if (remaining > 1) { + context.log.info("waiting for remaining HTLCs to complete") + waitForSettlement(remaining - 1, attemptNumber, fulfilledParts) + } else { + context.log.warn("trampoline payment failed") + cmd.replyTo ! PaymentFailed(cmd.paymentId, paymentHash, LocalFailure(totalAmount, Nil, failure) :: Nil) + Behaviors.stopped + } + case HtlcSettled(result: HtlcResult, part, holdTimes) => + if (holdTimes.nonEmpty) { + context.system.eventStream ! EventStream.Publish(Router.ReportedHoldTimes(holdTimes)) + } + result match { + case fulfill: HtlcResult.Fulfill => + context.log.info("HTLC was fulfilled") + if (remaining > 1) { + context.log.info("waiting for remaining HTLCs to be fulfilled") + waitForSettlement(remaining - 1, attemptNumber, part +: fulfilledParts) + } else { + context.log.info("trampoline payment succeeded") + cmd.replyTo ! PaymentSent(cmd.paymentId, paymentHash, fulfill.paymentPreimage, totalAmount, cmd.invoice.nodeId, part +: fulfilledParts, None) + Behaviors.stopped + } + case fail: HtlcResult.Fail => + context.log.warn("received HTLC failure: {}", fail) + if (remaining > 1) { + context.log.info("waiting for remaining HTLCs to be failed") + waitForSettlement(remaining - 1, attemptNumber, fulfilledParts) + } else { + retryOrStop(attemptNumber + 1) + } + } } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index 3cf10e4aea..479dd5a052 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -28,7 +28,7 @@ import fr.acinq.eclair._ import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._ import fr.acinq.eclair.channel._ -import fr.acinq.eclair.crypto.TransportHandler +import fr.acinq.eclair.crypto.{Sphinx, TransportHandler} import fr.acinq.eclair.db.NetworkDb import fr.acinq.eclair.io.Peer.PeerRoutingMessage import fr.acinq.eclair.payment.Invoice.ExtraEdge @@ -830,4 +830,6 @@ object Router { /** We have tried to relay this amount from this channel and it failed. */ case class ChannelCouldNotRelay(amount: MilliSatoshi, hop: ChannelHop) + + case class ReportedHoldTimes(holdTimes: Seq[Sphinx.HoldTime]) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/PaymentIntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/PaymentIntegrationSpec.scala index 380d31d5ba..162f0f78f1 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/PaymentIntegrationSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/PaymentIntegrationSpec.scala @@ -158,8 +158,9 @@ class PaymentIntegrationSpec extends IntegrationSpec { } test("send an HTLC A->D") { - val (sender, eventListener) = (TestProbe(), TestProbe()) + val (sender, eventListener, holdTimesRecorder) = (TestProbe(), TestProbe(), TestProbe()) nodes("D").system.eventStream.subscribe(eventListener.ref, classOf[PaymentMetadataReceived]) + nodes("A").system.eventStream.subscribe(holdTimesRecorder.ref, classOf[Router.ReportedHoldTimes]) // first we retrieve a payment hash from D val amountMsat = 4200000.msat @@ -174,6 +175,8 @@ class PaymentIntegrationSpec extends IntegrationSpec { assert(ps.id == paymentId) assert(Crypto.sha256(ps.paymentPreimage) == invoice.paymentHash) eventListener.expectMsg(PaymentMetadataReceived(invoice.paymentHash, invoice.paymentMetadata.get)) + + assert(holdTimesRecorder.expectMsgType[Router.ReportedHoldTimes].holdTimes.map(_.remoteNodeId) == Seq("B", "C", "D").map(nodes(_).nodeParams.nodeId)) } test("send an HTLC A->D with an invalid expiry delta for B") { @@ -243,7 +246,9 @@ class PaymentIntegrationSpec extends IntegrationSpec { } test("send an HTLC A->D with an unknown payment hash") { - val sender = TestProbe() + val (sender, holdTimesRecorder) = (TestProbe(), TestProbe()) + nodes("A").system.eventStream.subscribe(holdTimesRecorder.ref, classOf[Router.ReportedHoldTimes]) + val amount = 100000000 msat val unknownInvoice = Bolt11Invoice(Block.RegtestGenesisBlock.hash, Some(amount), randomBytes32(), nodes("D").nodeParams.privateKey, Left("test"), finalCltvExpiryDelta) val invoice = SendPaymentToNode(sender.ref, amount, unknownInvoice, Nil, routeParams = integrationTestRouteParams, maxAttempts = 5) @@ -256,6 +261,8 @@ class PaymentIntegrationSpec extends IntegrationSpec { assert(failed.paymentHash == invoice.paymentHash) assert(failed.failures.size == 1) assert(failed.failures.head.asInstanceOf[RemoteFailure].e == DecryptedFailurePacket(nodes("D").nodeParams.nodeId, IncorrectOrUnknownPaymentDetails(amount, getBlockHeight()))) + + assert(holdTimesRecorder.expectMsgType[Router.ReportedHoldTimes].holdTimes.map(_.remoteNodeId) == Seq("B", "C", "D").map(nodes(_).nodeParams.nodeId)) } test("send an HTLC A->D with a lower amount than requested") { @@ -327,7 +334,8 @@ class PaymentIntegrationSpec extends IntegrationSpec { } test("send an HTLC A->B->G->C using heuristics to select the route") { - val sender = TestProbe() + val (sender, holdTimesRecorder) = (TestProbe(), TestProbe()) + nodes("A").system.eventStream.subscribe(holdTimesRecorder.ref, classOf[Router.ReportedHoldTimes]) // first we retrieve a payment hash from C val amountMsat = 2000.msat sender.send(nodes("C").paymentHandler, ReceiveStandardPayment(sender.ref, Some(amountMsat), Left("Change from coffee"))) @@ -338,6 +346,8 @@ class PaymentIntegrationSpec extends IntegrationSpec { sender.expectMsgType[UUID] val ps = sender.expectMsgType[PaymentSent] ps.parts.foreach(part => assert(part.route.getOrElse(Nil).exists(_.nodeId == nodes("G").nodeParams.nodeId))) + + assert(holdTimesRecorder.expectMsgType[Router.ReportedHoldTimes].holdTimes.map(_.remoteNodeId) == Seq("B", "G", "C").map(nodes(_).nodeParams.nodeId)) } test("send a multi-part payment B->D") { @@ -463,7 +473,9 @@ class PaymentIntegrationSpec extends IntegrationSpec { test("send a trampoline payment B->F1 (via trampoline G)") { val start = TimestampMilli.now() - val sender = TestProbe() + val (sender, holdTimesRecorderB, holdTimesRecorderG) = (TestProbe(), TestProbe(), TestProbe()) + nodes("B").system.eventStream.subscribe(holdTimesRecorderB.ref, classOf[Router.ReportedHoldTimes]) + nodes("G").system.eventStream.subscribe(holdTimesRecorderG.ref, classOf[Router.ReportedHoldTimes]) val amount = 4_000_000_000L.msat sender.send(nodes("F").paymentHandler, ReceiveStandardPayment(sender.ref, Some(amount), Left("like trampoline much?"))) val invoice = sender.expectMsgType[Bolt11Invoice] @@ -492,6 +504,9 @@ class PaymentIntegrationSpec extends IntegrationSpec { val relayed = nodes("G").nodeParams.db.audit.listRelayed(start, TimestampMilli.now()).filter(_.paymentHash == invoice.paymentHash).head assert(relayed.amountIn - relayed.amountOut > 0.msat, relayed) assert(relayed.amountIn - relayed.amountOut < paymentSent.feesPaid, relayed) + + assert(holdTimesRecorderG.expectMsgType[Router.ReportedHoldTimes].holdTimes.map(_.remoteNodeId) == Seq("C", "F").map(nodes(_).nodeParams.nodeId)) + assert(holdTimesRecorderB.expectMsgType[Router.ReportedHoldTimes].holdTimes.map(_.remoteNodeId) == Seq("G").map(nodes(_).nodeParams.nodeId)) } test("send a trampoline payment D->B (via trampoline C)") { @@ -567,7 +582,7 @@ class PaymentIntegrationSpec extends IntegrationSpec { } test("send a trampoline payment B->D (temporary local failure at trampoline)") { - val sender = TestProbe() + val (sender, holdTimesRecorder) = (TestProbe(), TestProbe()) // We put most of the capacity C <-> D on D's side. sender.send(nodes("D").paymentHandler, ReceiveStandardPayment(sender.ref, Some(8_000_000_000L msat), Left("plz send everything"))) @@ -583,16 +598,22 @@ class PaymentIntegrationSpec extends IntegrationSpec { assert(invoice.features.hasFeature(Features.BasicMultiPartPayment)) assert(invoice.features.hasFeature(Features.TrampolinePaymentPrototype)) + nodes("B").system.eventStream.subscribe(holdTimesRecorder.ref, classOf[Router.ReportedHoldTimes]) val payment = SendTrampolinePayment(sender.ref, invoice, nodes("C").nodeParams.nodeId, routeParams = integrationTestRouteParams) sender.send(nodes("B").paymentInitiator, payment) val paymentId = sender.expectMsgType[UUID] val paymentFailed = sender.expectMsgType[PaymentFailed](max = 30 seconds) assert(paymentFailed.id == paymentId, paymentFailed) assert(paymentFailed.paymentHash == invoice.paymentHash, paymentFailed) + + assert(holdTimesRecorder.expectMsgType[Router.ReportedHoldTimes].holdTimes.map(_.remoteNodeId) == Seq("C").map(nodes(_).nodeParams.nodeId)) } test("send a trampoline payment A->D (temporary remote failure at trampoline)") { - val sender = TestProbe() + val (sender, holdTimesRecorderA, holdTimesRecorderB) = (TestProbe(), TestProbe(), TestProbe()) + nodes("A").system.eventStream.subscribe(holdTimesRecorderA.ref, classOf[Router.ReportedHoldTimes]) + nodes("B").system.eventStream.subscribe(holdTimesRecorderB.ref, classOf[Router.ReportedHoldTimes]) + val amount = 1_800_000_000L.msat // B can forward to C, but C doesn't have that much outgoing capacity to D sender.send(nodes("D").paymentHandler, ReceiveStandardPayment(sender.ref, Some(amount), Left("I iz not Satoshi"))) val invoice = sender.expectMsgType[Bolt11Invoice] @@ -605,6 +626,9 @@ class PaymentIntegrationSpec extends IntegrationSpec { val paymentFailed = sender.expectMsgType[PaymentFailed](max = 30 seconds) assert(paymentFailed.id == paymentId, paymentFailed) assert(paymentFailed.paymentHash == invoice.paymentHash, paymentFailed) + + assert(holdTimesRecorderB.expectMsgType[Router.ReportedHoldTimes].holdTimes.map(_.remoteNodeId) == Seq("C").map(nodes(_).nodeParams.nodeId)) + assert(holdTimesRecorderA.expectMsgType[Router.ReportedHoldTimes].holdTimes.map(_.remoteNodeId) == Seq("B").map(nodes(_).nodeParams.nodeId)) } test("send a blinded payment B->D with many blinded routes") { @@ -765,7 +789,9 @@ class PaymentIntegrationSpec extends IntegrationSpec { val offerHandler = TypedProbe[HandlerCommand]()(nodes("D").system.toTyped) nodes("D").offerManager ! RegisterOffer(offer, Some(nodes("D").nodeParams.privateKey), None, offerHandler.ref) - val sender = TestProbe() + val (sender, holdTimesRecorderA, holdTimesRecorderB) = (TestProbe(), TestProbe(), TestProbe()) + nodes("A").system.eventStream.subscribe(holdTimesRecorderA.ref, classOf[Router.ReportedHoldTimes]) + nodes("B").system.eventStream.subscribe(holdTimesRecorderB.ref, classOf[Router.ReportedHoldTimes]) val alice = new EclairImpl(nodes("A")) alice.payOfferTrampoline(offer, amount, 1, nodes("B").nodeParams.nodeId, maxAttempts_opt = Some(1))(30 seconds).pipeTo(sender.ref) @@ -788,6 +814,9 @@ class PaymentIntegrationSpec extends IntegrationSpec { awaitCond(nodes("D").nodeParams.db.payments.getIncomingPayment(paymentSent.paymentHash).exists(_.status.isInstanceOf[IncomingPaymentStatus.Received])) val Some(IncomingBlindedPayment(_, _, _, _, IncomingPaymentStatus.Received(receivedAmount, _))) = nodes("D").nodeParams.db.payments.getIncomingPayment(paymentSent.paymentHash) assert(receivedAmount >= amount) + + assert(holdTimesRecorderB.expectMsgType[Router.ReportedHoldTimes].holdTimes.map(_.remoteNodeId) == Seq("C").map(nodes(_).nodeParams.nodeId)) + assert(holdTimesRecorderA.expectMsgType[Router.ReportedHoldTimes].holdTimes.map(_.remoteNodeId) == Seq("B").map(nodes(_).nodeParams.nodeId)) } test("send to compact route") {