@@ -24,6 +24,7 @@ import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
24
24
import fr .acinq .bitcoin .ByteVector32
25
25
import fr .acinq .eclair .channel .{CMD_FAIL_HTLC , CMD_FULFILL_HTLC }
26
26
import fr .acinq .eclair .db .PendingRelayDb
27
+ import fr .acinq .eclair .payment .IncomingPacket .NodeRelayPacket
27
28
import fr .acinq .eclair .payment .Monitoring .{Metrics , Tags }
28
29
import fr .acinq .eclair .payment .OutgoingPacket .Upstream
29
30
import fr .acinq .eclair .payment ._
@@ -75,13 +76,29 @@ object NodeRelay {
75
76
}
76
77
}
77
78
78
- def apply (nodeParams : NodeParams , parent : akka.actor.typed.ActorRef [NodeRelayer .Command ], register : ActorRef , relayId : UUID , paymentHash : ByteVector32 , outgoingPaymentFactory : OutgoingPaymentFactory ): Behavior [Command ] =
79
+ def apply (nodeParams : NodeParams ,
80
+ parent : akka.actor.typed.ActorRef [NodeRelayer .Command ],
81
+ register : ActorRef ,
82
+ relayId : UUID ,
83
+ nodeRelayPacket : NodeRelayPacket ,
84
+ paymentSecret : ByteVector32 ,
85
+ outgoingPaymentFactory : OutgoingPaymentFactory ): Behavior [Command ] =
79
86
Behaviors .setup { context =>
87
+ val paymentHash = nodeRelayPacket.add.paymentHash
88
+ val totalAmountIn = nodeRelayPacket.outerPayload.totalAmount
80
89
Behaviors .withMdc(Logs .mdc(
81
90
category_opt = Some (Logs .LogCategory .PAYMENT ),
82
91
parentPaymentId_opt = Some (relayId), // for a node relay, we use the same identifier for the whole relay itself, and the outgoing payment
83
92
paymentHash_opt = Some (paymentHash))) {
84
- new NodeRelay (nodeParams, parent, register, relayId, paymentHash, context, outgoingPaymentFactory)()
93
+ context.log.info(" relaying payment relayId={}" , relayId)
94
+ val mppFsmAdapters = {
95
+ context.messageAdapter[MultiPartPaymentFSM .ExtraPaymentReceived [HtlcPart ]](WrappedMultiPartExtraPaymentReceived )
96
+ context.messageAdapter[MultiPartPaymentFSM .MultiPartPaymentFailed ](WrappedMultiPartPaymentFailed )
97
+ context.messageAdapter[MultiPartPaymentFSM .MultiPartPaymentSucceeded ](WrappedMultiPartPaymentSucceeded )
98
+ }.toClassic
99
+ val incomingPaymentHandler = context.actorOf(MultiPartPaymentFSM .props(nodeParams, paymentHash, totalAmountIn, mppFsmAdapters))
100
+ new NodeRelay (nodeParams, parent, register, relayId, paymentHash, paymentSecret, context, outgoingPaymentFactory)
101
+ .receiving(Queue .empty, nodeRelayPacket.innerPayload, nodeRelayPacket.nextPacket, incomingPaymentHandler)
85
102
}
86
103
}
87
104
@@ -144,66 +161,37 @@ class NodeRelay private(nodeParams: NodeParams,
144
161
register : ActorRef ,
145
162
relayId : UUID ,
146
163
paymentHash : ByteVector32 ,
164
+ paymentSecret : ByteVector32 ,
147
165
context : ActorContext [NodeRelay .Command ],
148
166
outgoingPaymentFactory : NodeRelay .OutgoingPaymentFactory ) {
149
167
150
168
import NodeRelay ._
151
169
152
- private val mppFsmAdapters = {
153
- context.messageAdapter[MultiPartPaymentFSM .ExtraPaymentReceived [HtlcPart ]](WrappedMultiPartExtraPaymentReceived )
154
- context.messageAdapter[MultiPartPaymentFSM .MultiPartPaymentFailed ](WrappedMultiPartPaymentFailed )
155
- context.messageAdapter[MultiPartPaymentFSM .MultiPartPaymentSucceeded ](WrappedMultiPartPaymentSucceeded )
156
- }.toClassic
157
- private val payFsmAdapters = {
158
- context.messageAdapter[PreimageReceived ](WrappedPreimageReceived )
159
- context.messageAdapter[PaymentSent ](WrappedPaymentSent )
160
- context.messageAdapter[PaymentFailed ](WrappedPaymentFailed )
161
- }.toClassic
162
-
163
- def apply (): Behavior [Command ] =
164
- Behaviors .receiveMessagePartial {
165
- // We make sure we receive all payment parts before forwarding to the next trampoline node.
166
- case Relay (IncomingPacket .NodeRelayPacket (add, outer, inner, next)) => outer.paymentSecret match {
167
- case None =>
168
- // TODO: @pm: maybe those checks should be done later in the flow (by the mpp FSM?)
169
- context.log.warn(" rejecting htlcId={}: missing payment secret" , add.id)
170
- rejectHtlc(add.id, add.channelId, add.amountMsat)
171
- stopping()
172
- case Some (secret) =>
173
- import akka .actor .typed .scaladsl .adapter ._
174
- context.log.info(" relaying payment relayId={}" , relayId)
175
- val mppFsm = context.actorOf(MultiPartPaymentFSM .props(nodeParams, add.paymentHash, outer.totalAmount, mppFsmAdapters))
176
- context.log.debug(" forwarding incoming htlc to the payment FSM" )
177
- mppFsm ! MultiPartPaymentFSM .HtlcPart (outer.totalAmount, add)
178
- receiving(Queue (add), secret, inner, next, mppFsm)
179
- }
180
- }
181
-
182
170
/**
183
171
* We start by aggregating an incoming HTLC set. Once we received the whole set, we will compute a route to the next
184
172
* trampoline node and forward the payment.
185
173
*
186
174
* @param htlcs received incoming HTLCs for this set.
187
- * @param secret all incoming HTLCs in this set must have the same secret to protect against probing / fee theft.
188
175
* @param nextPayload relay instructions (should be identical across HTLCs in this set).
189
176
* @param nextPacket trampoline onion to relay to the next trampoline node.
190
177
* @param handler actor handling the aggregation of the incoming HTLC set.
191
178
*/
192
- private def receiving (htlcs : Queue [UpdateAddHtlc ], secret : ByteVector32 , nextPayload : Onion .NodeRelayPayload , nextPacket : OnionRoutingPacket , handler : ActorRef ): Behavior [Command ] =
179
+ private def receiving (htlcs : Queue [UpdateAddHtlc ], nextPayload : Onion .NodeRelayPayload , nextPacket : OnionRoutingPacket , handler : ActorRef ): Behavior [Command ] =
193
180
Behaviors .receiveMessagePartial {
194
181
case Relay (IncomingPacket .NodeRelayPacket (add, outer, _, _)) => outer.paymentSecret match {
182
+ // TODO: @pm: maybe those checks should be done by the mpp FSM?
195
183
case None =>
196
- context.log.warn(" rejecting htlcId={} : missing payment secret" , add.id)
184
+ context.log.warn(" rejecting htlc #{} from channel {} : missing payment secret" , add.id, add.channelId )
197
185
rejectHtlc(add.id, add.channelId, add.amountMsat)
198
186
Behaviors .same
199
- case Some (incomingSecret) if incomingSecret != secret =>
200
- context.log.warn(" rejecting htlcId={} : payment secret doesn't match other HTLCs in the set" , add.id)
187
+ case Some (incomingSecret) if incomingSecret != paymentSecret =>
188
+ context.log.warn(" rejecting htlc #{} from channel {} : payment secret doesn't match other HTLCs in the set" , add.id, add.channelId )
201
189
rejectHtlc(add.id, add.channelId, add.amountMsat)
202
190
Behaviors .same
203
- case Some (incomingSecret) if incomingSecret == secret =>
204
- context.log.debug(" forwarding incoming htlc to the payment FSM" )
191
+ case Some (incomingSecret) if incomingSecret == paymentSecret =>
192
+ context.log.debug(" forwarding incoming htlc #{} from channel {} to the payment FSM" , add.id, add.channelId )
205
193
handler ! MultiPartPaymentFSM .HtlcPart (outer.totalAmount, add)
206
- receiving(htlcs :+ add, secret, nextPayload, nextPacket, handler)
194
+ receiving(htlcs :+ add, nextPayload, nextPacket, handler)
207
195
}
208
196
case WrappedMultiPartPaymentFailed (MultiPartPaymentFSM .MultiPartPaymentFailed (_, failure, parts)) =>
209
197
context.log.warn(" could not complete incoming multi-part payment (parts={} paidAmount={} failure={})" , parts.size, parts.map(_.amount).sum, failure)
@@ -267,14 +255,20 @@ class NodeRelay private(nodeParams: NodeParams,
267
255
* Once the downstream payment is settled (fulfilled or failed), we reject new upstream payments while we wait for our parent to stop us.
268
256
*/
269
257
private def stopping (): Behavior [Command ] = {
270
- parent ! NodeRelayer .RelayComplete (context.self, paymentHash)
258
+ parent ! NodeRelayer .RelayComplete (context.self, paymentHash, paymentSecret )
271
259
Behaviors .receiveMessagePartial {
272
260
rejectExtraHtlcPartialFunction orElse {
273
261
case Stop => Behaviors .stopped
274
262
}
275
263
}
276
264
}
277
265
266
+ private val payFsmAdapters = {
267
+ context.messageAdapter[PreimageReceived ](WrappedPreimageReceived )
268
+ context.messageAdapter[PaymentSent ](WrappedPaymentSent )
269
+ context.messageAdapter[PaymentFailed ](WrappedPaymentFailed )
270
+ }.toClassic
271
+
278
272
private def relay (upstream : Upstream .Trampoline , payloadOut : Onion .NodeRelayPayload , packetOut : OnionRoutingPacket ): ActorRef = {
279
273
val paymentCfg = SendPaymentConfig (relayId, relayId, None , paymentHash, payloadOut.amountToForward, payloadOut.outgoingNodeId, upstream, None , storeInDb = false , publishEvent = false , Nil )
280
274
val routeParams = computeRouteParams(nodeParams, upstream.amountIn, upstream.expiryIn, payloadOut.amountToForward, payloadOut.outgoingCltv)
@@ -322,7 +316,7 @@ class NodeRelay private(nodeParams: NodeParams,
322
316
}
323
317
324
318
private def rejectExtraHtlc (add : UpdateAddHtlc ): Unit = {
325
- context.log.warn(" rejecting extra htlcId={} " , add.id)
319
+ context.log.warn(" rejecting extra htlc #{} from channel {} " , add.id, add.channelId )
326
320
rejectHtlc(add.id, add.channelId, add.amountMsat)
327
321
}
328
322
0 commit comments