16
16
17
17
package fr .acinq .eclair .channel
18
18
19
- import akka .actor .{ActorRef , FSM , OneForOneStrategy , Props , Status , SupervisorStrategy }
19
+ import akka .actor .typed .scaladsl .Behaviors
20
+ import akka .actor .typed .scaladsl .adapter .ClassicActorContextOps
21
+ import akka .actor .{ActorContext , ActorRef , FSM , OneForOneStrategy , Props , Status , SupervisorStrategy }
20
22
import akka .event .Logging .MDC
21
23
import akka .pattern .pipe
22
24
import fr .acinq .bitcoin .Crypto .{PrivateKey , PublicKey }
23
25
import fr .acinq .bitcoin .{ByteVector32 , OutPoint , Satoshi , SatoshiLong , Script , ScriptFlags , Transaction }
24
26
import fr .acinq .eclair .Logs .LogCategory
25
27
import fr .acinq .eclair ._
26
28
import fr .acinq .eclair .blockchain ._
29
+ import fr .acinq .eclair .blockchain .bitcoind .rpc .ExtendedBitcoinClient
27
30
import fr .acinq .eclair .channel .Helpers .{Closing , Funding }
28
31
import fr .acinq .eclair .channel .Monitoring .{Metrics , Tags }
32
+ import fr .acinq .eclair .channel .TxPublisher .{PublishRawTx , PublishTx , SetChannelId , SignAndPublishTx }
29
33
import fr .acinq .eclair .crypto .ShaChain
30
34
import fr .acinq .eclair .crypto .keymanager .ChannelKeyManager
31
35
import fr .acinq .eclair .db .PendingRelayDb
@@ -47,7 +51,19 @@ import scala.util.{Failure, Success, Try}
47
51
*/
48
52
49
53
object Channel {
50
- def props (nodeParams : NodeParams , wallet : EclairWallet , remoteNodeId : PublicKey , blockchain : ActorRef , relayer : ActorRef , origin_opt : Option [ActorRef ]): Props = Props (new Channel (nodeParams, wallet, remoteNodeId, blockchain, relayer, origin_opt))
54
+
55
+ trait TxPublisherFactory {
56
+ def spawnTxPublisher (context : ActorContext , remoteNodeId : PublicKey ): akka.actor.typed.ActorRef [TxPublisher .Command ]
57
+ }
58
+
59
+ case class SimpleTxPublisherFactory (nodeParams : NodeParams , watcher : ActorRef , bitcoinClient : ExtendedBitcoinClient ) extends TxPublisherFactory {
60
+ override def spawnTxPublisher (context : ActorContext , remoteNodeId : PublicKey ): akka.actor.typed.ActorRef [TxPublisher .Command ] = {
61
+ context.spawn(Behaviors .supervise(TxPublisher (nodeParams, remoteNodeId, watcher, bitcoinClient)).onFailure(akka.actor.typed.SupervisorStrategy .restart), " tx-publisher" )
62
+ }
63
+ }
64
+
65
+ def props (nodeParams : NodeParams , wallet : EclairWallet , remoteNodeId : PublicKey , blockchain : ActorRef , relayer : ActorRef , txPublisherFactory : TxPublisherFactory , origin_opt : Option [ActorRef ]): Props =
66
+ Props (new Channel (nodeParams, wallet, remoteNodeId, blockchain, relayer, txPublisherFactory, origin_opt))
51
67
52
68
// see https://github.com/lightningnetwork/lightning-rfc/blob/master/07-routing-gossip.md#requirements
53
69
val ANNOUNCEMENTS_MINCONF = 6
@@ -100,7 +116,7 @@ object Channel {
100
116
101
117
}
102
118
103
- class Channel (val nodeParams : NodeParams , val wallet : EclairWallet , remoteNodeId : PublicKey , blockchain : ActorRef , relayer : ActorRef , origin_opt : Option [ActorRef ] = None )(implicit ec : ExecutionContext = ExecutionContext .Implicits .global) extends FSM [State , Data ] with FSMDiagnosticActorLogging [State , Data ] {
119
+ class Channel (val nodeParams : NodeParams , val wallet : EclairWallet , remoteNodeId : PublicKey , blockchain : ActorRef , relayer : ActorRef , txPublisherFactory : Channel . TxPublisherFactory , origin_opt : Option [ActorRef ] = None )(implicit ec : ExecutionContext = ExecutionContext .Implicits .global) extends FSM [State , Data ] with FSMDiagnosticActorLogging [State , Data ] {
104
120
105
121
import Channel ._
106
122
@@ -111,14 +127,16 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
111
127
112
128
// we assume that the peer is the channel's parent
113
129
private val peer = context.parent
114
- // noinspection ActorMutableStateInspection
130
+ // noinspection ActorMutableStateInspection
115
131
// the last active connection we are aware of; note that the peer manages connections and asynchronously notifies
116
132
// the channel, which means that if we get disconnected, the previous active connection will die and some messages will
117
133
// be sent to dead letters, before the channel gets notified of the disconnection; knowing that this will happen, we
118
134
// choose to not make this an Option (that would be None before the first connection), and instead embrace the fact
119
135
// that the active connection may point to dead letters at all time
120
136
private var activeConnection = context.system.deadLetters
121
137
138
+ private val txPublisher = txPublisherFactory.spawnTxPublisher(context, remoteNodeId)
139
+
122
140
// this will be used to detect htlc timeouts
123
141
context.system.eventStream.subscribe(self, classOf [CurrentBlockCount ])
124
142
// this will be used to make sure the current commitment fee is up-to-date
@@ -165,6 +183,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
165
183
case Event (initFunder@ INPUT_INIT_FUNDER (temporaryChannelId, fundingSatoshis, pushMsat, initialFeeratePerKw, fundingTxFeeratePerKw, _, localParams, remote, _, channelFlags, channelVersion), Nothing ) =>
166
184
context.system.eventStream.publish(ChannelCreated (self, peer, remoteNodeId, isFunder = true , temporaryChannelId, initialFeeratePerKw, Some (fundingTxFeeratePerKw)))
167
185
activeConnection = remote
186
+ txPublisher ! SetChannelId (remoteNodeId, temporaryChannelId)
168
187
val fundingPubKey = keyManager.fundingPublicKey(localParams.fundingKeyPath).publicKey
169
188
val channelKeyPath = keyManager.keyPath(localParams, channelVersion)
170
189
val open = OpenChannel (nodeParams.chainHash,
@@ -192,11 +211,13 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
192
211
193
212
case Event (inputFundee@ INPUT_INIT_FUNDEE (_, localParams, remote, _, _), Nothing ) if ! localParams.isFunder =>
194
213
activeConnection = remote
214
+ txPublisher ! SetChannelId (remoteNodeId, inputFundee.temporaryChannelId)
195
215
goto(WAIT_FOR_OPEN_CHANNEL ) using DATA_WAIT_FOR_OPEN_CHANNEL (inputFundee)
196
216
197
217
case Event (INPUT_RESTORED (data), _) =>
198
218
log.info(" restoring channel" )
199
219
context.system.eventStream.publish(ChannelRestored (self, data.channelId, peer, remoteNodeId, data.commitments.localParams.isFunder, data.commitments))
220
+ txPublisher ! SetChannelId (remoteNodeId, data.channelId)
200
221
data match {
201
222
// NB: order matters!
202
223
case closing : DATA_CLOSING if Closing .nothingAtStake(closing) =>
@@ -413,6 +434,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
413
434
)
414
435
val channelId = toLongId(fundingTx.hash, fundingTxOutputIndex)
415
436
peer ! ChannelIdAssigned (self, remoteNodeId, temporaryChannelId, channelId) // we notify the peer asap so it knows how to route messages
437
+ txPublisher ! SetChannelId (remoteNodeId, channelId)
416
438
context.system.eventStream.publish(ChannelIdAssigned (self, remoteNodeId, temporaryChannelId, channelId))
417
439
// NB: we don't send a ChannelSignatureSent for the first commit
418
440
goto(WAIT_FOR_FUNDING_SIGNED ) using DATA_WAIT_FOR_FUNDING_SIGNED (channelId, localParams, remoteParams, fundingTx, fundingTxFee, initialRelayFees_opt, localSpec, localCommitTx, RemoteCommit (0 , remoteSpec, remoteCommitTx.tx.txid, remoteFirstPerCommitmentPoint), open.channelFlags, channelVersion, fundingCreated) sending fundingCreated
@@ -469,6 +491,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
469
491
remoteNextCommitInfo = Right (randomKey.publicKey), // we will receive their next per-commitment point in the next message, so we temporarily put a random byte array,
470
492
commitInput, ShaChain .init, channelId = channelId)
471
493
peer ! ChannelIdAssigned (self, remoteNodeId, temporaryChannelId, channelId) // we notify the peer asap so it knows how to route messages
494
+ txPublisher ! SetChannelId (remoteNodeId, channelId)
472
495
context.system.eventStream.publish(ChannelIdAssigned (self, remoteNodeId, temporaryChannelId, channelId))
473
496
context.system.eventStream.publish(ChannelSignatureReceived (self, commitments))
474
497
// NB: we don't send a ChannelSignatureSent for the first commit
@@ -1341,7 +1364,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
1341
1364
}
1342
1365
val revokedCommitPublished1 = d.revokedCommitPublished.map { rev =>
1343
1366
val (rev1, penaltyTxs) = Closing .claimRevokedHtlcTxOutputs(keyManager, d.commitments, rev, tx, nodeParams.onChainFeeConf.feeEstimator)
1344
- penaltyTxs.foreach(claimTx => blockchain ! PublishAsap (claimTx.tx, PublishStrategy . JustPublish ))
1367
+ penaltyTxs.foreach(claimTx => txPublisher ! PublishRawTx (claimTx))
1345
1368
penaltyTxs.foreach(claimTx => blockchain ! WatchSpent (self, tx.txid, claimTx.input.outPoint.index.toInt, BITCOIN_OUTPUT_SPENT , hints = Set (claimTx.tx.txid)))
1346
1369
rev1
1347
1370
}
@@ -1355,7 +1378,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
1355
1378
// If the tx is one of our HTLC txs, we now publish a 3rd-stage claim-htlc-tx that claims its output.
1356
1379
val (localCommitPublished1, claimHtlcTx_opt) = Closing .claimLocalCommitHtlcTxOutput(localCommitPublished, keyManager, d.commitments, tx, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
1357
1380
claimHtlcTx_opt.foreach(claimHtlcTx => {
1358
- blockchain ! PublishAsap (claimHtlcTx.tx, PublishStrategy . JustPublish )
1381
+ txPublisher ! PublishRawTx (claimHtlcTx)
1359
1382
blockchain ! WatchConfirmed (self, claimHtlcTx.tx.txid, nodeParams.minDepthBlocks, BITCOIN_TX_CONFIRMED (claimHtlcTx.tx))
1360
1383
})
1361
1384
Closing .updateLocalCommitPublished(localCommitPublished1, tx)
@@ -1990,7 +2013,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
1990
2013
case Some (fundingTx) =>
1991
2014
// if we are funder, we never give up
1992
2015
log.info(s " republishing the funding tx... " )
1993
- blockchain ! PublishAsap (fundingTx, PublishStrategy . JustPublish )
2016
+ txPublisher ! PublishRawTx (fundingTx, " funding-tx " )
1994
2017
// we also check if the funding tx has been double-spent
1995
2018
checkDoubleSpent(fundingTx)
1996
2019
context.system.scheduler.scheduleOnce(1 day, blockchain, GetTxWithMeta (txid))
@@ -2142,7 +2165,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
2142
2165
}
2143
2166
2144
2167
private def doPublish (closingTx : ClosingTx ): Unit = {
2145
- blockchain ! PublishAsap (closingTx.tx, PublishStrategy . JustPublish )
2168
+ txPublisher ! PublishRawTx (closingTx)
2146
2169
blockchain ! WatchConfirmed (self, closingTx.tx.txid, nodeParams.minDepthBlocks, BITCOIN_TX_CONFIRMED (closingTx.tx))
2147
2170
}
2148
2171
@@ -2171,11 +2194,11 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
2171
2194
/**
2172
2195
* This helper method will publish txs only if they haven't yet reached minDepth
2173
2196
*/
2174
- private def publishIfNeeded (txs : Iterable [PublishAsap ], irrevocablySpent : Map [OutPoint , Transaction ]): Unit = {
2197
+ private def publishIfNeeded (txs : Iterable [PublishTx ], irrevocablySpent : Map [OutPoint , Transaction ]): Unit = {
2175
2198
val (skip, process) = txs.partition(publishTx => Closing .inputsAlreadySpent(publishTx.tx, irrevocablySpent))
2176
2199
process.foreach { publishTx =>
2177
2200
log.info(s " publishing txid= ${publishTx.tx.txid}" )
2178
- blockchain ! publishTx
2201
+ txPublisher ! publishTx
2179
2202
}
2180
2203
skip.foreach(publishTx => log.info(s " no need to republish txid= ${publishTx.tx.txid}, it has already been confirmed " ))
2181
2204
}
@@ -2209,11 +2232,11 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
2209
2232
2210
2233
val publishQueue = commitments.commitmentFormat match {
2211
2234
case Transactions .DefaultCommitmentFormat =>
2212
- val txs = List (commitTx) ++ claimMainDelayedOutputTx.map(_.tx) ++ htlcTxs.values.flatten.map(_.tx) ++ claimHtlcDelayedTxs.map(_.tx)
2213
- txs.map(tx => PublishAsap (tx, PublishStrategy .JustPublish ))
2235
+ List (PublishRawTx (commitTx, " commit-tx" )) ++ (claimMainDelayedOutputTx ++ htlcTxs.values.flatten ++ claimHtlcDelayedTxs).map(tx => PublishRawTx (tx))
2214
2236
case Transactions .AnchorOutputsCommitmentFormat =>
2215
- val (publishCommitTx, htlcTxs) = Helpers .Closing .createLocalCommitAnchorPublishStrategy(keyManager, commitments, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
2216
- List (publishCommitTx) ++ claimMainDelayedOutputTx.map(tx => PublishAsap (tx.tx, PublishStrategy .JustPublish )) ++ htlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishAsap (tx.tx, PublishStrategy .JustPublish ))
2237
+ val claimLocalAnchor = claimAnchorTxs.collect { case tx : Transactions .ClaimLocalAnchorOutputTx => SignAndPublishTx (tx, commitments) }
2238
+ val redeemableHtlcTxs = htlcTxs.values.collect { case Some (tx) => SignAndPublishTx (tx, commitments) }
2239
+ List (PublishRawTx (commitTx, " commit-tx" )) ++ claimLocalAnchor ++ claimMainDelayedOutputTx.map(tx => PublishRawTx (tx)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishRawTx (tx))
2217
2240
}
2218
2241
publishIfNeeded(publishQueue, irrevocablySpent)
2219
2242
@@ -2276,7 +2299,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
2276
2299
private def doPublish (remoteCommitPublished : RemoteCommitPublished ): Unit = {
2277
2300
import remoteCommitPublished ._
2278
2301
2279
- val publishQueue = (claimMainOutputTx ++ claimHtlcTxs.values.flatten).map(tx => PublishAsap (tx.tx, PublishStrategy . JustPublish ))
2302
+ val publishQueue = (claimMainOutputTx ++ claimHtlcTxs.values.flatten).map(tx => PublishRawTx (tx))
2280
2303
publishIfNeeded(publishQueue, irrevocablySpent)
2281
2304
2282
2305
// we watch:
@@ -2315,7 +2338,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
2315
2338
private def doPublish (revokedCommitPublished : RevokedCommitPublished ): Unit = {
2316
2339
import revokedCommitPublished ._
2317
2340
2318
- val publishQueue = (claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs).map(tx => PublishAsap (tx.tx, PublishStrategy . JustPublish ))
2341
+ val publishQueue = (claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs).map(tx => PublishRawTx (tx))
2319
2342
publishIfNeeded(publishQueue, irrevocablySpent)
2320
2343
2321
2344
// we watch:
0 commit comments