Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 89 additions & 115 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -2001,23 +2001,29 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
handleRemoteSpentNext(tx, d1)
} else {
// Our counterparty is trying to broadcast a revoked commit tx (cheating attempt).
// We need to fail pending outgoing HTLCs: we must do it here because we're overwriting the commitments data, so we won't be able to do it afterwards.
val remoteCommit = d.commitments.latest.remoteCommit
val nextRemoteCommit_opt = d.commitments.latest.nextRemoteCommit_opt.map(_.commit)
val pendingOutgoingHtlcs = nextRemoteCommit_opt.getOrElse(remoteCommit).spec.htlcs.collect(DirectedHtlc.incoming)
val failedHtlcs = Closing.recentlyFailedHtlcs(remoteCommit, nextRemoteCommit_opt, d.commitments.changes)
(pendingOutgoingHtlcs ++ failedHtlcs).foreach { add =>
// We need to fail pending outgoing HTLCs, otherwise they will timeout upstream.
// We must do it here because since we're overwriting the commitments data, we will lose all information
// about HTLCs that are in the current commitments but were not in the revoked one.
// We fail *all* outgoing HTLCs:
// - those that are not in the revoked commitment will never settle on-chain
// - those that are in the revoked commitment will be claimed on-chain, so it's as if they were failed
// Note that if we already received the preimage for some of these HTLCs, we already relayed it upstream
// so the fail command will be a no-op.
val outgoingHtlcs = d.commitments.latest.localCommit.spec.htlcs.collect(DirectedHtlc.outgoing) ++
d.commitments.latest.remoteCommit.spec.htlcs.collect(DirectedHtlc.incoming) ++
d.commitments.latest.nextRemoteCommit_opt.map(_.commit.spec.htlcs.collect(DirectedHtlc.incoming)).getOrElse(Set.empty)
outgoingHtlcs.foreach { add =>
d.commitments.originChannels.get(add.id) match {
case Some(origin) =>
log.info(s"failing htlc #${add.id} paymentHash=${add.paymentHash} origin=$origin: overridden by revoked remote commit")
log.info("failing htlc #{} paymentHash={} origin={}: overridden by revoked remote commit", add.id, add.paymentHash, origin)
relayer ! RES_ADD_SETTLED(origin, add, HtlcResult.OnChainFail(HtlcOverriddenByLocalCommit(d.channelId, add)))
case None => ()
}
}
handleRemoteSpentOther(tx, d1)
}
case None =>
log.warning(s"ignoring unrecognized alternative commit tx=${tx.txid}")
log.warning("ignoring unrecognized alternative commit tx={}", tx.txid)
stay()
}

Expand All @@ -2028,20 +2034,22 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
// when a remote or local commitment tx containing outgoing htlcs is published on the network,
// we watch it in order to extract payment preimage if funds are pulled by the counterparty
// we can then use these preimages to fulfill origin htlcs
log.debug(s"processing bitcoin output spent by txid=${tx.txid} tx=$tx")
log.debug(s"processing bitcoin output spent by txid={} tx={}", tx.txid, tx)
val extracted = Closing.extractPreimages(d.commitments.latest, tx)
extracted.foreach { case (htlc, preimage) =>
d.commitments.originChannels.get(htlc.id) match {
case Some(origin) =>
log.info(s"fulfilling htlc #${htlc.id} paymentHash=${htlc.paymentHash} origin=$origin")
log.info("fulfilling htlc #{} paymentHash={} origin={}", htlc.id, htlc.paymentHash, origin)
relayer ! RES_ADD_SETTLED(origin, htlc, HtlcResult.OnChainFulfill(preimage))
case None =>
// if we don't have the origin, it means that we already have forwarded the fulfill so that's not a big deal.
// this can happen if they send a signature containing the fulfill, then fail the channel before we have time to sign it
log.info(s"cannot fulfill htlc #${htlc.id} paymentHash=${htlc.paymentHash} (origin not found)")
log.warning("cannot fulfill htlc #{} paymentHash={} (origin not found)", htlc.id, htlc.paymentHash)
}
}
val revokedCommitPublished1 = d.revokedCommitPublished.map { rev =>
// this transaction may be an HTLC transaction spending a revoked commitment
// in that case, we immediately publish an HTLC-penalty transaction spending its output(s)
val (rev1, penaltyTxs) = Closing.RevokedClose.claimHtlcTxOutputs(keyManager, d.commitments.params, d.commitments.remotePerCommitmentSecrets, rev, tx, nodeParams.currentBitcoinCoreFeerates, d.finalScriptPubKey)
penaltyTxs.foreach(claimTx => txPublisher ! PublishFinalTx(claimTx, claimTx.fee, None))
penaltyTxs.foreach(claimTx => blockchain ! WatchOutputSpent(self, tx.txid, claimTx.input.outPoint.index.toInt, claimTx.amountIn, hints = Set(claimTx.tx.txid)))
Expand All @@ -2050,7 +2058,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
stay() using d.copy(revokedCommitPublished = revokedCommitPublished1) storing()

case Event(WatchTxConfirmedTriggered(blockHeight, _, tx), d: DATA_CLOSING) =>
log.info(s"txid=${tx.txid} has reached mindepth, updating closing state")
log.info("txid={} has reached mindepth, updating closing state", tx.txid)
context.system.eventStream.publish(TransactionConfirmed(d.channelId, remoteNodeId, tx))
// first we check if this tx belongs to one of the current local/remote commits, update it and update the channel data
val d1 = d.copy(
Expand Down Expand Up @@ -2101,27 +2109,31 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
val timedOutHtlcs = Closing.isClosingTypeAlreadyKnown(d1) match {
case Some(c: Closing.LocalClose) => Closing.trimmedOrTimedOutHtlcs(d.commitments.params.commitmentFormat, c.localCommit, c.localCommitPublished, d.commitments.params.localParams.dustLimit, tx)
case Some(c: Closing.RemoteClose) => Closing.trimmedOrTimedOutHtlcs(d.commitments.params.commitmentFormat, c.remoteCommit, c.remoteCommitPublished, d.commitments.params.remoteParams.dustLimit, tx)
case _ => Set.empty[UpdateAddHtlc] // we lose htlc outputs in dataloss protection scenarios (future remote commit)
case Some(_: Closing.RevokedClose) => Set.empty[UpdateAddHtlc] // revoked commitments are handled using [[overriddenOutgoingHtlcs]] below
case Some(_: Closing.RecoveryClose) => Set.empty[UpdateAddHtlc] // we lose htlc outputs in dataloss protection scenarios (future remote commit)
case Some(_: Closing.MutualClose) => Set.empty[UpdateAddHtlc]
case None => Set.empty[UpdateAddHtlc]
}
timedOutHtlcs.foreach { add =>
d.commitments.originChannels.get(add.id) match {
case Some(origin) =>
log.info(s"failing htlc #${add.id} paymentHash=${add.paymentHash} origin=$origin: htlc timed out")
log.info("failing htlc #{} paymentHash={} origin={}: htlc timed out", add.id, add.paymentHash, origin)
relayer ! RES_ADD_SETTLED(origin, add, HtlcResult.OnChainFail(HtlcsTimedoutDownstream(d.channelId, Set(add))))
case None =>
// same as for fulfilling the htlc (no big deal)
log.info(s"cannot fail timed out htlc #${add.id} paymentHash=${add.paymentHash} (origin not found)")
log.info("cannot fail timed out htlc #{} paymentHash={} (origin not found)", add.id, add.paymentHash)
}
}
// we also need to fail outgoing htlcs that we know will never reach the blockchain
// if we previously received the preimage, we have already relayed it upstream and the command below will be ignored
Closing.overriddenOutgoingHtlcs(d, tx).foreach { add =>
d.commitments.originChannels.get(add.id) match {
case Some(origin) =>
log.info(s"failing htlc #${add.id} paymentHash=${add.paymentHash} origin=$origin: overridden by local commit")
log.info("failing htlc #{} paymentHash={} origin={}: overridden by local commit", add.id, add.paymentHash, origin)
relayer ! RES_ADD_SETTLED(origin, add, HtlcResult.OnChainFail(HtlcOverriddenByLocalCommit(d.channelId, add)))
case None =>
// same as for fulfilling the htlc (no big deal)
log.info(s"cannot fail overridden htlc #${add.id} paymentHash=${add.paymentHash} (origin not found)")
log.info("cannot fail overridden htlc #{} paymentHash={} (origin not found)", add.id, add.paymentHash)
}
}
// for our outgoing payments, let's send events if we know that they will settle on chain
Expand Down Expand Up @@ -2295,8 +2307,8 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
case _ => Set.empty
}
val lastFundingLockedTlvs: Set[ChannelReestablishTlv] = if (d.commitments.params.remoteParams.initFeatures.hasFeature(Features.SplicePrototype)) {
d.commitments.lastLocalLocked_opt.map(c => ChannelReestablishTlv.MyCurrentFundingLockedTlv(c.fundingTxId)).toSet ++
d.commitments.lastRemoteLocked_opt.map(c => ChannelReestablishTlv.YourLastFundingLockedTlv(c.fundingTxId)).toSet
d.commitments.lastLocalLocked_opt.map(c => ChannelReestablishTlv.MyCurrentFundingLockedTlv(c.fundingTxId)).toSet ++
d.commitments.lastRemoteLocked_opt.map(c => ChannelReestablishTlv.YourLastFundingLockedTlv(c.fundingTxId)).toSet
} else Set.empty

val channelReestablish = ChannelReestablish(
Expand Down Expand Up @@ -2996,11 +3008,12 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with
/** Fail outgoing unsigned htlcs right away when transitioning from NORMAL to CLOSING */
onTransition {
case NORMAL -> CLOSING =>
(nextStateData: @unchecked) match {
nextStateData match {
case d: DATA_CLOSING =>
d.commitments.changes.localChanges.proposed.collect {
case add: UpdateAddHtlc => relayer ! RES_ADD_SETTLED(d.commitments.originChannels(add.id), add, HtlcResult.ChannelFailureBeforeSigned)
}
case _ => ()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,30 @@ import fr.acinq.eclair.channel._
import fr.acinq.eclair.wire.protocol.{UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc, UpdateMessage}

/**
* This database stores CMD_FULFILL_HTLC and CMD_FAIL_HTLC that we have received from downstream
* (either directly via UpdateFulfillHtlc or by extracting the value from the
* blockchain).
* This database stores [[CMD_FULFILL_HTLC]], [[CMD_FAIL_HTLC]] and [[CMD_FAIL_MALFORMED_HTLC]] commands received from
* downstream (either directly via channel settlement like [[UpdateFulfillHtlc]] or [[UpdateFailHtlc]] or by extracting
* the preimage from the blockchain during a force-close).
*
* This means that this database is only used in the context of *relaying* payments.
* We must ensure that if a downstream channel is able to pull funds from us, we can always do the same from upstream,
* otherwise we lose money. Hence the need for persistence to handle all corner cases where we disconnect or restart
* before settling on the upstream channel.
*
* We need to be sure that if downstream is able to pull funds from us, we can always
* do the same from upstream, otherwise we lose money. Hence the need for persistence
* to handle all corner cases.
* Importantly, we must only store the *first* command received for a given upstream HTLC: if we first receive
* [[CMD_FULFILL_HTLC]] and then [[CMD_FAIL_HTLC]], the second command must be ignored. This should be implemented by
* using a primary key based on the (channel_id, htlc_id) pair and ignoring conflicting inserts.
*
* Note: this database is only used in the context of *relaying* payments.
*/
trait PendingCommandsDb {

// @formatter:off
def addSettlementCommand(channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit

def removeSettlementCommand(channelId: ByteVector32, htlcId: Long): Unit

def listSettlementCommands(channelId: ByteVector32): Seq[HtlcSettlementCommand]

def listSettlementCommands(): Seq[(ByteVector32, HtlcSettlementCommand)]

// @formatter:on
}

object PendingCommandsDb {
/**
* We store [[CMD_FULFILL_HTLC]]/[[CMD_FAIL_HTLC]]/[[CMD_FAIL_MALFORMED_HTLC]]
* in a database because we don't want to lose preimages, or to forget to fail
* incoming htlcs, which would lead to unwanted channel closings.
*/
def safeSend(register: ActorRef, db: PendingCommandsDb, channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit = {
// htlc settlement commands don't have replyTo
register ! Register.Forward(null, channelId, cmd)
Expand All @@ -65,17 +60,17 @@ object PendingCommandsDb {

def ackSettlementCommands(db: PendingCommandsDb, updates: List[UpdateMessage])(implicit log: LoggingAdapter): Unit = updates.collect {
case u: UpdateFulfillHtlc =>
log.debug(s"fulfill acked for htlcId=${u.id}")
log.debug("fulfill acked for htlcId={}", u.id)
db.removeSettlementCommand(u.channelId, u.id)
case u: UpdateFailHtlc =>
log.debug(s"fail acked for htlcId=${u.id}")
log.debug("fail acked for htlcId={}", u.id)
db.removeSettlementCommand(u.channelId, u.id)
case u: UpdateFailMalformedHtlc =>
log.debug(s"fail-malformed acked for htlcId=${u.id}")
log.debug("fail-malformed acked for htlcId={}", u.id)
db.removeSettlementCommand(u.channelId, u.id)
}

def getSettlementCommands(db: PendingCommandsDb, channelId: ByteVector32)(implicit log: LoggingAdapter): Seq[HtlcSettlementCommand] = {
def getSettlementCommands(db: PendingCommandsDb, channelId: ByteVector32): Seq[HtlcSettlementCommand] = {
db.listSettlementCommands(channelId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import fr.acinq.eclair.db._
import fr.acinq.eclair.payment.Monitoring.Tags
import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPaymentPacket, PaymentFailed, PaymentSent}
import fr.acinq.eclair.transactions.DirectedHtlc.outgoing
import fr.acinq.eclair.wire.protocol.{FailureMessage, FailureReason, InvalidOnionBlinding, TemporaryNodeFailure, UpdateAddHtlc}
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{CustomCommitmentsPlugin, Feature, Features, Logs, MilliSatoshiLong, NodeParams, TimestampMilli}

import scala.concurrent.Promise
Expand Down Expand Up @@ -407,31 +407,35 @@ object PostRestartHtlcCleaner {
val htlcsOut = channels
.collect { case c: ChannelDataWithCommitments => c }
.flatMap { c =>
// Filter out HTLCs that will never reach the blockchain or have already been timed-out on-chain.
// Filter out HTLCs that will never reach the blockchain or have already been settled on-chain.
val htlcsToIgnore: Set[Long] = c match {
case d: DATA_CLOSING =>
val closingType_opt = Closing.isClosingTypeAlreadyKnown(d)
val overriddenHtlcs: Set[Long] = (closingType_opt match {
case Some(c: Closing.LocalClose) => Closing.overriddenOutgoingHtlcs(d, c.localCommitPublished.commitTx)
case Some(c: Closing.RemoteClose) => Closing.overriddenOutgoingHtlcs(d, c.remoteCommitPublished.commitTx)
case Some(c: Closing.RevokedClose) => Closing.overriddenOutgoingHtlcs(d, c.revokedCommitPublished.commitTx)
case _ => Set.empty[UpdateAddHtlc]
case Some(c: Closing.RecoveryClose) => Closing.overriddenOutgoingHtlcs(d, c.remoteCommitPublished.commitTx)
case Some(_: Closing.MutualClose) => Set.empty[UpdateAddHtlc]
case None => Set.empty[UpdateAddHtlc]
}).map(_.id)
val irrevocablySpent = closingType_opt match {
case Some(c: Closing.LocalClose) => c.localCommitPublished.irrevocablySpent.values.toSeq
case Some(c: Closing.RemoteClose) => c.remoteCommitPublished.irrevocablySpent.values.toSeq
case Some(c: Closing.RevokedClose) => c.revokedCommitPublished.irrevocablySpent.values.toSeq
case _ => Nil
val confirmedTxs = closingType_opt match {
case Some(c: Closing.LocalClose) => c.localCommitPublished.irrevocablySpent.values.toSet
case Some(c: Closing.RemoteClose) => c.remoteCommitPublished.irrevocablySpent.values.toSet
case Some(c: Closing.RevokedClose) => c.revokedCommitPublished.irrevocablySpent.values.toSet
case Some(c: Closing.RecoveryClose) => c.remoteCommitPublished.irrevocablySpent.values.toSet
case Some(_: Closing.MutualClose) => Set.empty
case None => Set.empty
}
val params = d.commitments.params
val timedOutHtlcs: Set[Long] = (closingType_opt match {
case Some(c: Closing.LocalClose) =>
val confirmedTxs = c.localCommitPublished.commitTx +: irrevocablySpent.filter(tx => Closing.isHtlcTimeout(tx, c.localCommitPublished))
confirmedTxs.flatMap(tx => Closing.trimmedOrTimedOutHtlcs(d.commitments.params.commitmentFormat, c.localCommit, c.localCommitPublished, d.commitments.params.localParams.dustLimit, tx))
case Some(c: Closing.RemoteClose) =>
val confirmedTxs = c.remoteCommitPublished.commitTx +: irrevocablySpent.filter(tx => Closing.isClaimHtlcTimeout(tx, c.remoteCommitPublished))
confirmedTxs.flatMap(tx => Closing.trimmedOrTimedOutHtlcs(d.commitments.params.commitmentFormat, c.remoteCommit, c.remoteCommitPublished, d.commitments.params.remoteParams.dustLimit, tx))
case _ => Seq.empty[UpdateAddHtlc]
}).map(_.id).toSet
case Some(c: Closing.LocalClose) => confirmedTxs.flatMap(tx => Closing.trimmedOrTimedOutHtlcs(params.commitmentFormat, c.localCommit, c.localCommitPublished, params.localParams.dustLimit, tx))
case Some(c: Closing.RemoteClose) => confirmedTxs.flatMap(tx => Closing.trimmedOrTimedOutHtlcs(params.commitmentFormat, c.remoteCommit, c.remoteCommitPublished, params.remoteParams.dustLimit, tx))
case Some(_: Closing.RevokedClose) => Set.empty // revoked commitments are handled using [[overriddenOutgoingHtlcs]] above
case Some(_: Closing.RecoveryClose) => Set.empty // we lose htlc outputs in dataloss protection scenarios (future remote commit)
case Some(_: Closing.MutualClose) => Set.empty
case None => Set.empty
}).map(_.id)
overriddenHtlcs ++ timedOutHtlcs
case _ => Set.empty
}
Expand Down
Loading