Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,16 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
Metrics.PaymentAttempt.withTag(Tags.MultiPart, value = false).record(d.failures.size + 1)
val p = PartialPayment(id, d.request.amount, d.cmd.amount - d.request.amount, htlc.channelId, Some(d.route.fullRoute))
val remainingAttribution_opt = fulfill match {
case HtlcResult.RemoteFulfill(fulfill) =>
fulfill.attribution_opt.flatMap(Sphinx.Attribution.fulfillHoldTimes(_, d.sharedSecrets).remaining_opt)
case HtlcResult.RemoteFulfill(updateFulfill) =>
updateFulfill.attribution_opt match {
case Some(attribution) =>
val Sphinx.Attribution.UnwrappedAttribution(holdTimes, remaining_opt) = Sphinx.Attribution.fulfillHoldTimes(attribution, d.sharedSecrets)
if (holdTimes.nonEmpty) {
context.system.eventStream.publish(Router.ReportedHoldTimes(holdTimes))
}
remaining_opt
case None => None
}
case _: HtlcResult.OnChainFulfill => None
}
myStop(d.request, Right(cfg.createPaymentSent(d.recipient, fulfill.paymentPreimage, p :: Nil, remainingAttribution_opt)))
Expand Down Expand Up @@ -170,6 +178,9 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
private def handleRemoteFail(d: WaitingForComplete, fail: UpdateFailHtlc) = {
import d._
val htlcFailure = Sphinx.FailurePacket.decrypt(fail.reason, fail.attribution_opt, sharedSecrets)
if (htlcFailure.holdTimes.nonEmpty) {
context.system.eventStream.publish(Router.ReportedHoldTimes(htlcFailure.holdTimes))
}
((htlcFailure.failure match {
case success@Right(e) =>
Metrics.PaymentError.withTag(Tags.Failure, Tags.FailureType(RemoteFailure(request.amount, Nil, e))).increment()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package fr.acinq.eclair.payment.send

import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior}
Expand All @@ -27,6 +28,8 @@ import fr.acinq.eclair.io.Peer
import fr.acinq.eclair.payment.OutgoingPaymentPacket.{NodePayload, buildOnion}
import fr.acinq.eclair.payment.PaymentSent.PartialPayment
import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.send.TrampolinePayment.{buildOutgoingPayment, computeFees}
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.router.Router.RouteParams
import fr.acinq.eclair.wire.protocol.{PaymentOnion, PaymentOnionCodecs}
import fr.acinq.eclair.{CltvExpiry, CltvExpiryDelta, Features, Logs, MilliSatoshi, NodeParams, randomBytes32}
Expand Down Expand Up @@ -54,9 +57,9 @@ object TrampolinePaymentLifecycle {
require(invoice.amount_opt.nonEmpty, "amount-less invoices are not supported in trampoline tests")
}
private case class TrampolinePeerNotFound(trampolineNodeId: PublicKey) extends Command
private case class CouldntAddHtlc(failure: Throwable) extends Command
private case class HtlcSettled(result: HtlcResult, part: PartialPayment, holdTimes: Seq[Sphinx.HoldTime]) extends Command
private case class WrappedPeerChannels(channels: Seq[Peer.ChannelInfo]) extends Command
private case class WrappedAddHtlcResponse(response: CommandResponse[CMD_ADD_HTLC]) extends Command
private case class WrappedHtlcSettled(result: RES_ADD_SETTLED[Origin.Hot, HtlcResult]) extends Command
// @formatter:on

def apply(nodeParams: NodeParams, register: ActorRef[Register.ForwardNodeId[Peer.GetPeerChannels]]): Behavior[Command] =
Expand All @@ -75,6 +78,79 @@ object TrampolinePaymentLifecycle {
}
}

object PartHandler {
sealed trait Command

private case class WrappedAddHtlcResponse(response: CommandResponse[CMD_ADD_HTLC]) extends Command

private case class WrappedHtlcSettled(result: RES_ADD_SETTLED[Origin.Hot, HtlcResult]) extends Command

def apply(parent: ActorRef[TrampolinePaymentLifecycle.Command],
cmd: TrampolinePaymentLifecycle.SendPayment,
amount: MilliSatoshi,
channelInfo: Peer.ChannelInfo,
expiry: CltvExpiry,
trampolinePaymentSecret: ByteVector32,
attemptNumber: Int): Behavior[Command] =
Behaviors.setup { context =>
new PartHandler(context, parent, cmd).start(amount, channelInfo, expiry, trampolinePaymentSecret, attemptNumber)
}
}

class PartHandler(context: ActorContext[PartHandler.Command], parent: ActorRef[Command], cmd: TrampolinePaymentLifecycle.SendPayment) {

import PartHandler._

private val paymentHash = cmd.invoice.paymentHash

private val addHtlcAdapter = context.messageAdapter[CommandResponse[CMD_ADD_HTLC]](WrappedAddHtlcResponse)
private val htlcSettledAdapter = context.messageAdapter[RES_ADD_SETTLED[Origin.Hot, HtlcResult]](WrappedHtlcSettled)

def start(amount: MilliSatoshi, channelInfo: Peer.ChannelInfo, expiry: CltvExpiry, trampolinePaymentSecret: ByteVector32, attemptNumber: Int): Behavior[PartHandler.Command] = {
val origin = Origin.Hot(htlcSettledAdapter.toClassic, Upstream.Local(cmd.paymentId))
val outgoing = buildOutgoingPayment(cmd.trampolineNodeId, cmd.invoice, amount, expiry, Some(trampolinePaymentSecret), attemptNumber)
val add = CMD_ADD_HTLC(addHtlcAdapter.toClassic, outgoing.trampolineAmount, paymentHash, outgoing.trampolineExpiry, outgoing.onion.packet, None, 1.0, None, origin, commit = true)
channelInfo.channel ! add
val channelId = channelInfo.data.asInstanceOf[DATA_NORMAL].channelId
val part = PartialPayment(cmd.paymentId, amount, computeFees(amount, attemptNumber), channelId, None)
waitForSettlement(part, outgoing.onion.sharedSecrets, outgoing.trampolineOnion.sharedSecrets)
}

def waitForSettlement(part: PartialPayment, outerOnionSecrets: Seq[Sphinx.SharedSecret], trampolineOnionSecrets: Seq[Sphinx.SharedSecret]): Behavior[PartHandler.Command] = {
Behaviors.receiveMessagePartial {
case WrappedAddHtlcResponse(response) => response match {
case _: CommandSuccess[_] =>
// HTLC was correctly sent out.
Behaviors.same
case failure: CommandFailure[_, Throwable] =>
parent ! CouldntAddHtlc(failure.t)
Behaviors.stopped
}
case WrappedHtlcSettled(result) => result.result match {
case fulfill: HtlcResult.Fulfill =>
val holdTimes = fulfill match {
case HtlcResult.RemoteFulfill(updateFulfill) =>
updateFulfill.attribution_opt match {
case Some(attribution) =>
Sphinx.Attribution.fulfillHoldTimes(attribution, outerOnionSecrets).holdTimes
case None => Nil
}
case _: HtlcResult.OnChainFulfill => Nil
}
parent ! HtlcSettled(fulfill, part, holdTimes)
Behaviors.stopped
case fail: HtlcResult.Fail =>
val holdTimes = fail match {
case HtlcResult.RemoteFail(updateFail) =>
Sphinx.FailurePacket.decrypt(updateFail.reason, updateFail.attribution_opt, outerOnionSecrets).holdTimes
case _ => Nil
}
parent ! HtlcSettled(fail, part, holdTimes)
Behaviors.stopped
}
}
}
}
}

class TrampolinePaymentLifecycle private(nodeParams: NodeParams,
Expand All @@ -90,8 +166,6 @@ class TrampolinePaymentLifecycle private(nodeParams: NodeParams,

private val forwardNodeIdFailureAdapter = context.messageAdapter[Register.ForwardNodeIdFailure[Peer.GetPeerChannels]](_ => TrampolinePeerNotFound(cmd.trampolineNodeId))
private val peerChannelsResponseAdapter = context.messageAdapter[Peer.PeerChannels](c => WrappedPeerChannels(c.channels))
private val addHtlcAdapter = context.messageAdapter[CommandResponse[CMD_ADD_HTLC]](WrappedAddHtlcResponse)
private val htlcSettledAdapter = context.messageAdapter[RES_ADD_SETTLED[Origin.Hot, HtlcResult]](WrappedHtlcSettled)

def start(): Behavior[Command] = listChannels(attemptNumber = 0)

Expand All @@ -117,7 +191,6 @@ class TrampolinePaymentLifecycle private(nodeParams: NodeParams,
case _ => None
}
})
val origin = Origin.Hot(htlcSettledAdapter.toClassic, Upstream.Local(cmd.paymentId))
val expiry = CltvExpiry(nodeParams.currentBlockHeight) + CltvExpiryDelta(36)
if (filtered.isEmpty) {
context.log.warn("no usable channel with trampoline node {}", cmd.trampolineNodeId)
Expand All @@ -131,54 +204,49 @@ class TrampolinePaymentLifecycle private(nodeParams: NodeParams,
// We generate a random secret to avoid leaking the invoice secret to the trampoline node.
val trampolinePaymentSecret = randomBytes32()
context.log.info("sending trampoline payment parts: {}->{}, {}->{}", channel1.data.channelId, amount1, channel2.data.channelId, amount2)
val parts = Seq((amount1, channel1), (amount2, channel2)).map { case (amount, channelInfo) =>
val outgoing = buildOutgoingPayment(cmd.trampolineNodeId, cmd.invoice, amount, expiry, Some(trampolinePaymentSecret), attemptNumber)
val add = CMD_ADD_HTLC(addHtlcAdapter.toClassic, outgoing.trampolineAmount, paymentHash, outgoing.trampolineExpiry, outgoing.onion.packet, None, 1.0, None, origin, commit = true)
channelInfo.channel ! add
val channelId = channelInfo.data.asInstanceOf[DATA_NORMAL].channelId
PartialPayment(cmd.paymentId, amount, computeFees(amount, attemptNumber), channelId, None)
Seq((amount1, channel1), (amount2, channel2)).foreach { case (amount, channelInfo) =>
context.spawnAnonymous(PartHandler(context.self, cmd, amount, channelInfo, expiry, trampolinePaymentSecret, attemptNumber))
}
waitForSettlement(remaining = 2, attemptNumber, parts)
waitForSettlement(remaining = 2, attemptNumber, Nil)
}
}

private def waitForSettlement(remaining: Int, attemptNumber: Int, parts: Seq[PartialPayment]): Behavior[Command] = {
private def waitForSettlement(remaining: Int, attemptNumber: Int, fulfilledParts: Seq[PartialPayment]): Behavior[Command] = {
Behaviors.receiveMessagePartial {
case WrappedAddHtlcResponse(response) => response match {
case _: CommandSuccess[_] =>
// HTLC was correctly sent out.
Behaviors.same
case failure: CommandFailure[_, Throwable] =>
context.log.warn("HTLC could not be sent: {}", failure.t.getMessage)
if (remaining > 1) {
context.log.info("waiting for remaining HTLCs to complete")
waitForSettlement(remaining - 1, attemptNumber, parts)
} else {
context.log.warn("trampoline payment failed")
cmd.replyTo ! PaymentFailed(cmd.paymentId, paymentHash, LocalFailure(totalAmount, Nil, failure.t) :: Nil)
Behaviors.stopped
}
}
case WrappedHtlcSettled(result) => result.result match {
case fulfill: HtlcResult.Fulfill =>
context.log.info("HTLC was fulfilled")
if (remaining > 1) {
context.log.info("waiting for remaining HTLCs to be fulfilled")
waitForSettlement(remaining - 1, attemptNumber, parts)
} else {
context.log.info("trampoline payment succeeded")
cmd.replyTo ! PaymentSent(cmd.paymentId, paymentHash, fulfill.paymentPreimage, totalAmount, cmd.invoice.nodeId, parts, None)
Behaviors.stopped
}
case fail: HtlcResult.Fail =>
context.log.warn("received HTLC failure: {}", fail)
if (remaining > 1) {
context.log.info("waiting for remaining HTLCs to be failed")
waitForSettlement(remaining - 1, attemptNumber, parts)
} else {
retryOrStop(attemptNumber + 1)
}
}
case CouldntAddHtlc(failure) =>
context.log.warn("HTLC could not be sent: {}", failure.getMessage)
if (remaining > 1) {
context.log.info("waiting for remaining HTLCs to complete")
waitForSettlement(remaining - 1, attemptNumber, fulfilledParts)
} else {
context.log.warn("trampoline payment failed")
cmd.replyTo ! PaymentFailed(cmd.paymentId, paymentHash, LocalFailure(totalAmount, Nil, failure) :: Nil)
Behaviors.stopped
}
case HtlcSettled(result: HtlcResult, part, holdTimes) =>
if (holdTimes.nonEmpty) {
context.system.eventStream ! EventStream.Publish(Router.ReportedHoldTimes(holdTimes))
}
result match {
case fulfill: HtlcResult.Fulfill =>
context.log.info("HTLC was fulfilled")
if (remaining > 1) {
context.log.info("waiting for remaining HTLCs to be fulfilled")
waitForSettlement(remaining - 1, attemptNumber, part +: fulfilledParts)
} else {
context.log.info("trampoline payment succeeded")
cmd.replyTo ! PaymentSent(cmd.paymentId, paymentHash, fulfill.paymentPreimage, totalAmount, cmd.invoice.nodeId, part +: fulfilledParts, None)
Behaviors.stopped
}
case fail: HtlcResult.Fail =>
context.log.warn("received HTLC failure: {}", fail)
if (remaining > 1) {
context.log.info("waiting for remaining HTLCs to be failed")
waitForSettlement(remaining - 1, attemptNumber, fulfilledParts)
} else {
retryOrStop(attemptNumber + 1)
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import fr.acinq.eclair._
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._
import fr.acinq.eclair.channel._
import fr.acinq.eclair.crypto.TransportHandler
import fr.acinq.eclair.crypto.{Sphinx, TransportHandler}
import fr.acinq.eclair.db.NetworkDb
import fr.acinq.eclair.io.Peer.PeerRoutingMessage
import fr.acinq.eclair.payment.Invoice.ExtraEdge
Expand Down Expand Up @@ -830,4 +830,6 @@ object Router {

/** We have tried to relay this amount from this channel and it failed. */
case class ChannelCouldNotRelay(amount: MilliSatoshi, hop: ChannelHop)

case class ReportedHoldTimes(holdTimes: Seq[Sphinx.HoldTime])
}
Loading