diff --git a/docs/release-notes/eclair-vnext.md b/docs/release-notes/eclair-vnext.md index 6decb73849..ddd4e53bda 100644 --- a/docs/release-notes/eclair-vnext.md +++ b/docs/release-notes/eclair-vnext.md @@ -11,6 +11,15 @@ We remove the code used to deserialize channel data from versions of eclair prior to v0.13. Node operators running a version of `eclair` older than v0.13 must first upgrade to v0.13 to migrate their channel data, and then upgrade to the latest version. +### Move closed channels to dedicated database table + +We previously kept closed channels in the same database table as active channels, with a flag indicating that it was closed. +This creates performance issues for nodes with a large history of channels, and creates backwards-compatibility issues when changing the channel data format. + +We now store closed channels in a dedicated table, where we only keep relevant information regarding the channel. +When restarting your node, the channels table will automatically be cleaned up and closed channels will move to the new table. +This may take some time depending on your channels history, but will only happen once. + ### Update minimal version of Bitcoin Core With this release, eclair requires using Bitcoin Core 29.1. @@ -22,7 +31,7 @@ Newer versions of Bitcoin Core may be used, but have not been extensively tested ### API changes - +- the `closedchannels` API now returns human-readable channel data ### Miscellaneous improvements and bug fixes diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala index 005d44e404..1c7ec9a859 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala @@ -47,7 +47,6 @@ import fr.acinq.eclair.payment.send.PaymentInitiator._ import fr.acinq.eclair.payment.send.{ClearRecipient, OfferPayment, PaymentIdentifier} import fr.acinq.eclair.router.Router import fr.acinq.eclair.router.Router._ -import fr.acinq.eclair.transactions.Transactions.CommitmentFormat import fr.acinq.eclair.wire.protocol.OfferTypes.Offer import fr.acinq.eclair.wire.protocol._ import grizzled.slf4j.Logging @@ -117,7 +116,7 @@ trait Eclair { def channelInfo(channel: ApiTypes.ChannelIdentifier)(implicit timeout: Timeout): Future[CommandResponse[CMD_GET_CHANNEL_INFO]] - def closedChannels(nodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] + def closedChannels(nodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[DATA_CLOSED]] def peers()(implicit timeout: Timeout): Future[Iterable[PeerInfo]] @@ -348,11 +347,9 @@ class EclairImpl(val appKit: Kit) extends Eclair with Logging with SpendFromChan sendToChannelTyped(channel = channel, cmdBuilder = CMD_GET_CHANNEL_INFO(_)) } - override def closedChannels(nodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[RES_GET_CHANNEL_INFO]] = { + override def closedChannels(nodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated])(implicit timeout: Timeout): Future[Iterable[DATA_CLOSED]] = { Future { - appKit.nodeParams.db.channels.listClosedChannels(nodeId_opt, paginated_opt).map { data => - RES_GET_CHANNEL_INFO(nodeId = data.remoteNodeId, channelId = data.channelId, channel = ActorRef.noSender, state = CLOSED, data = data) - } + appKit.nodeParams.db.channels.listClosedChannels(nodeId_opt, paginated_opt) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala index c12d7c953d..1b0b28637d 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/ChannelData.scala @@ -20,6 +20,7 @@ import akka.actor.{ActorRef, PossiblyHarmful, typed} import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.bitcoin.scalacompat.{ByteVector32, DeterministicWallet, OutPoint, Satoshi, SatoshiLong, Transaction, TxId, TxOut} import fr.acinq.eclair.blockchain.fee.{ConfirmationTarget, FeeratePerKw} +import fr.acinq.eclair.channel.Helpers.Closing import fr.acinq.eclair.channel.fund.InteractiveTxBuilder._ import fr.acinq.eclair.channel.fund.{InteractiveTxBuilder, InteractiveTxSigningSession} import fr.acinq.eclair.io.Peer @@ -560,6 +561,8 @@ sealed trait ChannelDataWithCommitments extends PersistentChannelData { def commitments: Commitments } +sealed trait ClosedData extends ChannelData + final case class DATA_WAIT_FOR_OPEN_CHANNEL(initFundee: INPUT_INIT_CHANNEL_NON_INITIATOR) extends TransientChannelData { val channelId: ByteVector32 = initFundee.temporaryChannelId } @@ -696,6 +699,102 @@ final case class DATA_CLOSING(commitments: Commitments, final case class DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT(commitments: Commitments, remoteChannelReestablish: ChannelReestablish) extends ChannelDataWithCommitments +/** We use this class when a channel shouldn't be stored in the DB (e.g. because it never confirmed). */ +case class IgnoreClosedData(previousData: ChannelData) extends ClosedData { + val channelId: ByteVector32 = previousData.channelId +} + +/** + * This class contains the data we will keep in our DB for every closed channel. + * It shouldn't contain data we may wish to remove in the future, otherwise we'll have backwards-compatibility issues. + * This is why for example the commitmentFormat is a string instead of using the [[CommitmentFormat]] trait, to allow + * storing legacy cases that we don't support anymore for active channels. + * + * Note that we only store channels that have been fully opened and for which we had something at stake. Channels that + * are cancelled before having a confirmed funding transactions are ignored, which protects against spam. + */ +final case class DATA_CLOSED(channelId: ByteVector32, + remoteNodeId: PublicKey, + fundingTxId: TxId, + fundingOutputIndex: Long, + fundingTxIndex: Long, + fundingKeyPath: String, + channelFeatures: String, + isChannelOpener: Boolean, + commitmentFormat: String, + announced: Boolean, + capacity: Satoshi, + closingTxId: TxId, + closingType: String, + closingScript: ByteVector, + localBalance: MilliSatoshi, + remoteBalance: MilliSatoshi, + closingAmount: Satoshi) extends ClosedData + +object DATA_CLOSED { + def apply(d: DATA_NEGOTIATING_SIMPLE, closingTx: ClosingTx): DATA_CLOSED = DATA_CLOSED( + channelId = d.channelId, + remoteNodeId = d.remoteNodeId, + fundingTxId = d.commitments.latest.fundingTxId, + fundingOutputIndex = d.commitments.latest.fundingInput.index, + fundingTxIndex = d.commitments.latest.fundingTxIndex, + fundingKeyPath = d.commitments.channelParams.localParams.fundingKeyPath.toString(), + channelFeatures = d.commitments.channelParams.channelFeatures.toString, + isChannelOpener = d.commitments.latest.channelParams.localParams.isChannelOpener, + commitmentFormat = d.commitments.latest.commitmentFormat.toString, + announced = d.commitments.latest.channelParams.announceChannel, + capacity = d.commitments.latest.capacity, + closingTxId = closingTx.tx.txid, + closingType = Helpers.Closing.MutualClose(closingTx).toString, + closingScript = d.localScriptPubKey, + localBalance = d.commitments.latest.localCommit.spec.toLocal, + remoteBalance = d.commitments.latest.localCommit.spec.toRemote, + closingAmount = closingTx.toLocalOutput_opt.map(_.amount).getOrElse(0 sat) + ) + + def apply(d: DATA_CLOSING, closingType: Helpers.Closing.ClosingType): DATA_CLOSED = DATA_CLOSED( + channelId = d.channelId, + remoteNodeId = d.remoteNodeId, + fundingTxId = d.commitments.latest.fundingTxId, + fundingOutputIndex = d.commitments.latest.fundingInput.index, + fundingTxIndex = d.commitments.latest.fundingTxIndex, + fundingKeyPath = d.commitments.channelParams.localParams.fundingKeyPath.toString(), + channelFeatures = d.commitments.channelParams.channelFeatures.toString, + isChannelOpener = d.commitments.latest.channelParams.localParams.isChannelOpener, + commitmentFormat = d.commitments.latest.commitmentFormat.toString, + announced = d.commitments.latest.channelParams.announceChannel, + capacity = d.commitments.latest.capacity, + closingTxId = closingType match { + case Closing.MutualClose(closingTx) => closingTx.tx.txid + case Closing.LocalClose(_, localCommitPublished) => localCommitPublished.commitTx.txid + case Closing.CurrentRemoteClose(_, remoteCommitPublished) => remoteCommitPublished.commitTx.txid + case Closing.NextRemoteClose(_, remoteCommitPublished) => remoteCommitPublished.commitTx.txid + case Closing.RecoveryClose(remoteCommitPublished) => remoteCommitPublished.commitTx.txid + case Closing.RevokedClose(revokedCommitPublished) => revokedCommitPublished.commitTx.txid + }, + closingType = closingType.toString, + closingScript = d.finalScriptPubKey, + localBalance = closingType match { + case _: Closing.CurrentRemoteClose => d.commitments.latest.remoteCommit.spec.toRemote + case _: Closing.NextRemoteClose => d.commitments.latest.nextRemoteCommit_opt.getOrElse(d.commitments.latest.remoteCommit).spec.toRemote + case _ => d.commitments.latest.localCommit.spec.toLocal + }, + remoteBalance = closingType match { + case _: Closing.CurrentRemoteClose => d.commitments.latest.remoteCommit.spec.toLocal + case _: Closing.NextRemoteClose => d.commitments.latest.nextRemoteCommit_opt.getOrElse(d.commitments.latest.remoteCommit).spec.toLocal + case _ => d.commitments.latest.localCommit.spec.toRemote + }, + closingAmount = closingType match { + case Closing.MutualClose(closingTx) => closingTx.toLocalOutput_opt.map(_.amount).getOrElse(0 sat) + case Closing.LocalClose(_, localCommitPublished) => Closing.closingBalance(d.channelParams, d.commitments.latest.commitmentFormat, d.finalScriptPubKey, localCommitPublished) + case Closing.CurrentRemoteClose(_, remoteCommitPublished) => Closing.closingBalance(d.channelParams, d.commitments.latest.commitmentFormat, d.finalScriptPubKey, remoteCommitPublished) + case Closing.NextRemoteClose(_, remoteCommitPublished) => Closing.closingBalance(d.channelParams, d.commitments.latest.commitmentFormat, d.finalScriptPubKey, remoteCommitPublished) + case Closing.RecoveryClose(remoteCommitPublished) => Closing.closingBalance(d.channelParams, d.commitments.latest.commitmentFormat, d.finalScriptPubKey, remoteCommitPublished) + case Closing.RevokedClose(revokedCommitPublished) => Closing.closingBalance(d.channelParams, d.commitments.latest.commitmentFormat, d.finalScriptPubKey, revokedCommitPublished) + } + ) +} + /** Local params that apply for the channel's lifetime. */ case class LocalChannelParams(nodeId: PublicKey, fundingKeyPath: DeterministicWallet.KeyPath, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala index 98e7317f9f..fef8e2affd 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala @@ -608,13 +608,13 @@ object Helpers { // @formatter:off sealed trait ClosingType - case class MutualClose(tx: ClosingTx) extends ClosingType - case class LocalClose(localCommit: LocalCommit, localCommitPublished: LocalCommitPublished) extends ClosingType + case class MutualClose(tx: ClosingTx) extends ClosingType { override def toString: String = "mutual-close" } + case class LocalClose(localCommit: LocalCommit, localCommitPublished: LocalCommitPublished) extends ClosingType { override def toString: String = "local-close" } sealed trait RemoteClose extends ClosingType { def remoteCommit: RemoteCommit; def remoteCommitPublished: RemoteCommitPublished } - case class CurrentRemoteClose(remoteCommit: RemoteCommit, remoteCommitPublished: RemoteCommitPublished) extends RemoteClose - case class NextRemoteClose(remoteCommit: RemoteCommit, remoteCommitPublished: RemoteCommitPublished) extends RemoteClose - case class RecoveryClose(remoteCommitPublished: RemoteCommitPublished) extends ClosingType - case class RevokedClose(revokedCommitPublished: RevokedCommitPublished) extends ClosingType + case class CurrentRemoteClose(remoteCommit: RemoteCommit, remoteCommitPublished: RemoteCommitPublished) extends RemoteClose { override def toString: String = "remote-close" } + case class NextRemoteClose(remoteCommit: RemoteCommit, remoteCommitPublished: RemoteCommitPublished) extends RemoteClose { override def toString: String = "next-remote-close" } + case class RecoveryClose(remoteCommitPublished: RemoteCommitPublished) extends ClosingType { override def toString: String = "recovery-close" } + case class RevokedClose(revokedCommitPublished: RevokedCommitPublished) extends ClosingType { override def toString: String = "revoked-close" } // @formatter:on /** @@ -1707,6 +1707,23 @@ object Helpers { revokedCommitPublished.copy(irrevocablySpent = revokedCommitPublished.irrevocablySpent ++ relevantOutpoints.map(o => o -> tx).toMap) } + /** Returns the amount we've successfully claimed from a force-closed channel. */ + def closingBalance(channelParams: ChannelParams, commitmentFormat: CommitmentFormat, closingScript: ByteVector, commit: CommitPublished): Satoshi = { + val toLocal = commit.localOutput_opt match { + case Some(o) if o.index < commit.commitTx.txOut.size => commit.commitTx.txOut(o.index.toInt).amount + case _ => 0 sat + } + val toClosingScript = commit.irrevocablySpent.values.flatMap(_.txOut) + .filter(_.publicKeyScript == closingScript) + .map(_.amount) + .sum + commitmentFormat match { + case DefaultCommitmentFormat if channelParams.localParams.walletStaticPaymentBasepoint.nonEmpty => toLocal + toClosingScript + case DefaultCommitmentFormat => toClosingScript + case _: AnchorOutputsCommitmentFormat | _: SimpleTaprootChannelCommitmentFormat => toClosingScript + } + } + } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala index fbad954c02..01d0629c64 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala @@ -385,7 +385,7 @@ class Channel(val nodeParams: NodeParams, val channelKeys: ChannelKeys, val wall case closing: DATA_CLOSING if Closing.nothingAtStake(closing) => log.info("we have nothing at stake, going straight to CLOSED") context.system.eventStream.publish(ChannelAborted(self, remoteNodeId, closing.channelId)) - goto(CLOSED) using closing + goto(CLOSED) using IgnoreClosedData(closing) case closing: DATA_CLOSING => val localPaysClosingFees = closing.commitments.localChannelParams.paysClosingFees val closingType_opt = Closing.isClosingTypeAlreadyKnown(closing) @@ -2289,7 +2289,7 @@ class Channel(val nodeParams: NodeParams, val channelKeys: ChannelKeys, val wall case Some(closingType) => log.info("channel closed (type={})", EventType.Closed(closingType).label) context.system.eventStream.publish(ChannelClosed(self, d.channelId, closingType, d.commitments)) - goto(CLOSED) using d1 storing() + goto(CLOSED) using DATA_CLOSED(d1, closingType) case None => stay() using d1 storing() } @@ -2366,9 +2366,12 @@ class Channel(val nodeParams: NodeParams, val channelKeys: ChannelKeys, val wall when(CLOSED)(handleExceptions { case Event(Symbol("shutdown"), _) => stateData match { - case d: PersistentChannelData => - log.info(s"deleting database record for channelId=${d.channelId}") - nodeParams.db.channels.removeChannel(d.channelId) + case d: DATA_CLOSED => + log.info(s"moving channelId=${d.channelId} to the closed channels DB") + nodeParams.db.channels.removeChannel(d.channelId, Some(d)) + case _: PersistentChannelData | _: IgnoreClosedData => + log.info("deleting database record for channelId={}", stateData.channelId) + nodeParams.db.channels.removeChannel(stateData.channelId, None) case _: TransientChannelData => // nothing was stored in the DB } log.info("shutting down") @@ -3029,10 +3032,11 @@ class Channel(val nodeParams: NodeParams, val channelKeys: ChannelKeys, val wall } case Event(WatchTxConfirmedTriggered(_, _, tx), d: DATA_NEGOTIATING_SIMPLE) if d.findClosingTx(tx).nonEmpty => - val closingType = MutualClose(d.findClosingTx(tx).get) + val closingTx = d.findClosingTx(tx).get + val closingType = MutualClose(closingTx) log.info("channel closed (type={})", EventType.Closed(closingType).label) context.system.eventStream.publish(ChannelClosed(self, d.channelId, closingType, d.commitments)) - goto(CLOSED) using d storing() + goto(CLOSED) using DATA_CLOSED(d, closingTx) case Event(WatchFundingSpentTriggered(tx), d: ChannelDataWithCommitments) => if (d.commitments.all.map(_.fundingTxId).contains(tx.txid)) { @@ -3070,6 +3074,7 @@ class Channel(val nodeParams: NodeParams, val channelKeys: ChannelKeys, val wall case d: ChannelDataWithCommitments => Some(d.commitments) case _: ChannelDataWithoutCommitments => None case _: TransientChannelData => None + case _: ClosedData => None } context.system.eventStream.publish(ChannelStateChanged(self, nextStateData.channelId, peer, remoteNodeId, state, nextState, commitments_opt)) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenDualFunded.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenDualFunded.scala index 0877492ea3..783bc626b0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenDualFunded.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenDualFunded.scala @@ -285,11 +285,11 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers { case Event(INPUT_DISCONNECTED, d: DATA_WAIT_FOR_ACCEPT_DUAL_FUNDED_CHANNEL) => d.init.replyTo ! OpenChannelResponse.Disconnected - goto(CLOSED) + goto(CLOSED) using IgnoreClosedData(d) case Event(TickChannelOpenTimeout, d: DATA_WAIT_FOR_ACCEPT_DUAL_FUNDED_CHANNEL) => d.init.replyTo ! OpenChannelResponse.TimedOut - goto(CLOSED) + goto(CLOSED) using IgnoreClosedData(d) }) when(WAIT_FOR_DUAL_FUNDING_CREATED)(handleExceptions { @@ -302,12 +302,12 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers { log.info("our peer aborted the dual funding flow: ascii='{}' bin={}", msg.toAscii, msg.data) d.txBuilder ! InteractiveTxBuilder.Abort d.replyTo_opt.foreach(_ ! OpenChannelResponse.RemoteError(msg.toAscii)) - goto(CLOSED) sending TxAbort(d.channelId, DualFundingAborted(d.channelId).getMessage) + goto(CLOSED) using IgnoreClosedData(d) sending TxAbort(d.channelId, DualFundingAborted(d.channelId).getMessage) case _: TxSignatures => log.info("received unexpected tx_signatures") d.txBuilder ! InteractiveTxBuilder.Abort d.replyTo_opt.foreach(_ ! OpenChannelResponse.Rejected(UnexpectedFundingSignatures(d.channelId).getMessage)) - goto(CLOSED) sending TxAbort(d.channelId, UnexpectedFundingSignatures(d.channelId).getMessage) + goto(CLOSED) using IgnoreClosedData(d) sending TxAbort(d.channelId, UnexpectedFundingSignatures(d.channelId).getMessage) case _: TxInitRbf => log.info("ignoring unexpected tx_init_rbf message") stay() sending Warning(d.channelId, InvalidRbfAttempt(d.channelId).getMessage) @@ -333,7 +333,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers { goto(WAIT_FOR_DUAL_FUNDING_SIGNED) using d1 storing() sending commitSig case f: InteractiveTxBuilder.Failed => d.replyTo_opt.foreach(_ ! OpenChannelResponse.Rejected(f.cause.getMessage)) - goto(CLOSED) sending TxAbort(d.channelId, f.cause.getMessage) + goto(CLOSED) using IgnoreClosedData(d) sending TxAbort(d.channelId, f.cause.getMessage) } case Event(c: CloseCommand, d: DATA_WAIT_FOR_DUAL_FUNDING_CREATED) => @@ -349,12 +349,12 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers { case Event(INPUT_DISCONNECTED, d: DATA_WAIT_FOR_DUAL_FUNDING_CREATED) => d.txBuilder ! InteractiveTxBuilder.Abort d.replyTo_opt.foreach(_ ! OpenChannelResponse.Disconnected) - goto(CLOSED) + goto(CLOSED) using IgnoreClosedData(d) case Event(TickChannelOpenTimeout, d: DATA_WAIT_FOR_DUAL_FUNDING_CREATED) => d.txBuilder ! InteractiveTxBuilder.Abort d.replyTo_opt.foreach(_ ! OpenChannelResponse.TimedOut) - goto(CLOSED) + goto(CLOSED) using IgnoreClosedData(d) }) when(WAIT_FOR_DUAL_FUNDING_SIGNED)(handleExceptions { @@ -362,7 +362,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers { d.signingSession.receiveCommitSig(d.channelParams, channelKeys, commitSig, nodeParams.currentBlockHeight) match { case Left(f) => rollbackFundingAttempt(d.signingSession.fundingTx.tx, Nil) - goto(CLOSED) sending Error(d.channelId, f.getMessage) + goto(CLOSED) using IgnoreClosedData(d) sending Error(d.channelId, f.getMessage) case Right(signingSession1) => signingSession1 match { case signingSession1: InteractiveTxSigningSession.WaitingForSigs => // In theory we don't have to store their commit_sig here, as they would re-send it if we disconnect, but @@ -394,7 +394,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers { d.signingSession.receiveTxSigs(channelKeys, txSigs, nodeParams.currentBlockHeight) match { case Left(f) => rollbackFundingAttempt(d.signingSession.fundingTx.tx, Nil) - goto(CLOSED) sending Error(d.channelId, f.getMessage) + goto(CLOSED) using IgnoreClosedData(d) sending Error(d.channelId, f.getMessage) case Right(signingSession) => val minDepth_opt = d.channelParams.minDepth(nodeParams.channelConf.minDepth) watchFundingConfirmed(d.signingSession.fundingTx.txId, minDepth_opt, delay_opt = None) @@ -412,7 +412,7 @@ trait ChannelOpenDualFunded extends DualFundingHandlers with ErrorHandlers { case msg: TxAbort => log.info("our peer aborted the dual funding flow: ascii='{}' bin={}", msg.toAscii, msg.data) rollbackFundingAttempt(d.signingSession.fundingTx.tx, Nil) - goto(CLOSED) sending TxAbort(d.channelId, DualFundingAborted(d.channelId).getMessage) + goto(CLOSED) using IgnoreClosedData(d) sending TxAbort(d.channelId, DualFundingAborted(d.channelId).getMessage) case msg: InteractiveTxConstructionMessage => log.info("received unexpected interactive-tx message: {}", msg.getClass.getSimpleName) stay() sending Warning(d.channelId, UnexpectedInteractiveTxMessage(d.channelId, msg).getMessage) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala index 23651e6a3d..f0510ce8a2 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala @@ -163,7 +163,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers { case Event(e: Error, d: DATA_WAIT_FOR_OPEN_CHANNEL) => handleRemoteError(e, d) - case Event(INPUT_DISCONNECTED, _) => goto(CLOSED) + case Event(INPUT_DISCONNECTED, d: DATA_WAIT_FOR_OPEN_CHANNEL) => goto(CLOSED) using IgnoreClosedData(d) }) when(WAIT_FOR_ACCEPT_CHANNEL)(handleExceptions { @@ -203,11 +203,11 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers { case Event(INPUT_DISCONNECTED, d: DATA_WAIT_FOR_ACCEPT_CHANNEL) => d.initFunder.replyTo ! OpenChannelResponse.Disconnected - goto(CLOSED) + goto(CLOSED) using IgnoreClosedData(d) case Event(TickChannelOpenTimeout, d: DATA_WAIT_FOR_ACCEPT_CHANNEL) => d.initFunder.replyTo ! OpenChannelResponse.TimedOut - goto(CLOSED) + goto(CLOSED) using IgnoreClosedData(d) }) when(WAIT_FOR_FUNDING_INTERNAL)(handleExceptions { @@ -264,11 +264,11 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers { case Event(INPUT_DISCONNECTED, d: DATA_WAIT_FOR_FUNDING_INTERNAL) => d.replyTo ! OpenChannelResponse.Disconnected - goto(CLOSED) + goto(CLOSED) using IgnoreClosedData(d) case Event(TickChannelOpenTimeout, d: DATA_WAIT_FOR_FUNDING_INTERNAL) => d.replyTo ! OpenChannelResponse.TimedOut - goto(CLOSED) + goto(CLOSED) using IgnoreClosedData(d) }) when(WAIT_FOR_FUNDING_CREATED)(handleExceptions { @@ -346,7 +346,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers { case Event(e: Error, d: DATA_WAIT_FOR_FUNDING_CREATED) => handleRemoteError(e, d) - case Event(INPUT_DISCONNECTED, _) => goto(CLOSED) + case Event(INPUT_DISCONNECTED, d: DATA_WAIT_FOR_FUNDING_CREATED) => goto(CLOSED) using IgnoreClosedData(d) }) when(WAIT_FOR_FUNDING_SIGNED)(handleExceptions { @@ -414,13 +414,13 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers { // we rollback the funding tx, it will never be published wallet.rollback(d.fundingTx) d.replyTo ! OpenChannelResponse.Disconnected - goto(CLOSED) + goto(CLOSED) using IgnoreClosedData(d) case Event(TickChannelOpenTimeout, d: DATA_WAIT_FOR_FUNDING_SIGNED) => // we rollback the funding tx, it will never be published wallet.rollback(d.fundingTx) d.replyTo ! OpenChannelResponse.TimedOut - goto(CLOSED) + goto(CLOSED) using IgnoreClosedData(d) }) when(WAIT_FOR_FUNDING_CONFIRMED)(handleExceptions { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonHandlers.scala index 3ed0eea080..56ff91ab41 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonHandlers.scala @@ -59,7 +59,7 @@ trait CommonHandlers { nodeParams.db.channels.addOrUpdateChannel(d) context.system.eventStream.publish(ChannelPersisted(self, remoteNodeId, d.channelId, d)) state - case _: TransientChannelData => + case _: TransientChannelData | _: ClosedData => log.error(s"can't store data=${state.stateData} in state=${state.stateName}") state } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/DualFundingHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/DualFundingHandlers.scala index 3ead505694..0f891bea1a 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/DualFundingHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/DualFundingHandlers.scala @@ -110,7 +110,7 @@ trait DualFundingHandlers extends CommonFundingHandlers { if (fundingTxIds.subsetOf(e.fundingTxIds)) { log.warning("{} funding attempts have been double-spent, forgetting channel", fundingTxIds.size) d.allFundingTxs.map(_.sharedTx.tx.buildUnsignedTx()).foreach(tx => wallet.rollback(tx)) - goto(CLOSED) sending Error(d.channelId, FundingTxDoubleSpent(d.channelId).getMessage) + goto(CLOSED) using IgnoreClosedData(d) sending Error(d.channelId, FundingTxDoubleSpent(d.channelId).getMessage) } else { // Not all funding attempts have been double-spent, the channel may still confirm. // For example, we may have published an RBF attempt while we were checking if funding attempts were double-spent. diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala index 2cd62627d9..a2ac4b4016 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala @@ -46,7 +46,7 @@ trait ErrorHandlers extends CommonHandlers { def handleFastClose(c: CloseCommand, channelId: ByteVector32) = { val replyTo = if (c.replyTo == ActorRef.noSender) sender() else c.replyTo replyTo ! RES_SUCCESS(c, channelId) - goto(CLOSED) + goto(CLOSED) using IgnoreClosedData(stateData) } def handleMutualClose(closingTx: ClosingTx, d: Either[DATA_NEGOTIATING, DATA_CLOSING]) = { @@ -100,7 +100,7 @@ trait ErrorHandlers extends CommonHandlers { cause match { case _: InvalidFundingTx => // invalid funding tx in the single-funding case: we just close the channel - goto(CLOSED) + goto(CLOSED) using IgnoreClosedData(d) case _: ChannelException => // known channel exception: we force close using our current commitment spendLocalCurrent(dd, maxClosingFeerate_opt) sending error @@ -125,8 +125,9 @@ trait ErrorHandlers extends CommonHandlers { } } // When there is no commitment yet, we just send an error to our peer and go to CLOSED state. - case _: ChannelDataWithoutCommitments => goto(CLOSED) sending error - case _: TransientChannelData => goto(CLOSED) sending error + case _: ChannelDataWithoutCommitments => goto(CLOSED) using IgnoreClosedData(d) sending error + case _: TransientChannelData => goto(CLOSED) using IgnoreClosedData(d) sending error + case _: ClosedData => stay() } } @@ -165,8 +166,9 @@ trait ErrorHandlers extends CommonHandlers { // When there is no commitment yet, we just go to CLOSED state in case an error occurs. case waitForDualFundingSigned: DATA_WAIT_FOR_DUAL_FUNDING_SIGNED => rollbackFundingAttempt(waitForDualFundingSigned.signingSession.fundingTx.tx, Nil) - goto(CLOSED) - case _: TransientChannelData => goto(CLOSED) + goto(CLOSED) using IgnoreClosedData(d) + case _: TransientChannelData => goto(CLOSED) using IgnoreClosedData(d) + case _: ClosedData => stay() } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/SingleFundingHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/SingleFundingHandlers.scala index 67b3a5668c..95a1192493 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/SingleFundingHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/SingleFundingHandlers.scala @@ -97,21 +97,26 @@ trait SingleFundingHandlers extends CommonFundingHandlers { } def handleFundingPublishFailed(d: PersistentChannelData) = { - log.error(s"failed to publish funding tx") + log.error("failed to publish funding tx") val exc = ChannelFundingError(d.channelId) val error = Error(d.channelId, exc.getMessage) // NB: we don't use the handleLocalError handler because it would result in the commit tx being published, which we don't want: // implementation *guarantees* that in case of BITCOIN_FUNDING_PUBLISH_FAILED, the funding tx hasn't and will never be published, so we can close the channel right away context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, LocalError(exc), isFatal = true)) - goto(CLOSED) sending error + goto(CLOSED) using IgnoreClosedData(d) sending error } def handleFundingTimeout(d: PersistentChannelData) = { - log.warning(s"funding tx hasn't been confirmed in time, cancelling channel delay=$FUNDING_TIMEOUT_FUNDEE") + // We log the commit tx: if our peer loses their channel backup, they will need that commit tx to recover their funds. + val commitTx_opt = d match { + case _: ChannelDataWithoutCommitments => None + case d: ChannelDataWithCommitments => Some(d.commitments.latest.fullySignedLocalCommitTx(channelKeys)) + } + log.warning("funding tx hasn't been confirmed after {} blocks, ignoring channel (commitTx={})", FUNDING_TIMEOUT_FUNDEE, commitTx_opt.getOrElse("n/a")) val exc = FundingTxTimedout(d.channelId) val error = Error(d.channelId, exc.getMessage) context.system.eventStream.publish(ChannelErrorOccurred(self, stateData.channelId, remoteNodeId, LocalError(exc), isFatal = true)) - goto(CLOSED) sending error + goto(CLOSED) using IgnoreClosedData(d) sending error } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala index 6f1f34ecce..71c8ed8281 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala @@ -18,7 +18,7 @@ package fr.acinq.eclair.db import fr.acinq.bitcoin.scalacompat.ByteVector32 import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey -import fr.acinq.eclair.channel.PersistentChannelData +import fr.acinq.eclair.channel.{DATA_CLOSED, PersistentChannelData} import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.{CltvExpiry, Paginated} @@ -30,8 +30,13 @@ trait ChannelsDb { def updateChannelMeta(channelId: ByteVector32, event: ChannelEvent.EventType): Unit - /** Mark a channel as closed, but keep it in the DB. */ - def removeChannel(channelId: ByteVector32): Unit + /** + * Remove a channel from our DB. + * + * @param channelId ID of the channel that should be removed. + * @param data_opt if provided, closing data will be stored in a dedicated table. + */ + def removeChannel(channelId: ByteVector32, data_opt: Option[DATA_CLOSED]): Unit /** Mark revoked HTLC information as obsolete. It will be removed from the DB once [[removeHtlcInfos]] is called. */ def markHtlcInfosForRemoval(channelId: ByteVector32, beforeCommitIndex: Long): Unit @@ -41,9 +46,10 @@ trait ChannelsDb { def listLocalChannels(): Seq[PersistentChannelData] - def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] + def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[DATA_CLOSED] def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit def listHtlcInfos(channelId: ByteVector32, commitmentNumber: Long): Seq[(ByteVector32, CltvExpiry)] + } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala index 50045491a3..8d67fc700e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala @@ -214,9 +214,8 @@ object Databases extends Logging { maxAge = initChecks.localChannelsMaxAge, sqlQuery = """ - |SELECT MAX(GREATEST(created_timestamp, last_payment_sent_timestamp, last_payment_received_timestamp, last_connected_timestamp, closed_timestamp)) - |FROM local.channels - |WHERE NOT is_closed""".stripMargin) + |SELECT MAX(GREATEST(created_timestamp, last_payment_sent_timestamp, last_payment_received_timestamp, last_connected_timestamp)) + |FROM local.channels""".stripMargin) checkMaxAge(name = "network node", maxAge = initChecks.networkNodesMaxAge, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala index 50d533eede..fe2beae4e4 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala @@ -17,26 +17,26 @@ package fr.acinq.eclair.db.pg import com.zaxxer.hikari.util.IsolationLevel -import fr.acinq.bitcoin.scalacompat.ByteVector32 import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey -import fr.acinq.eclair.channel.PersistentChannelData +import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, TxId} +import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.ChannelsDb import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.db.pg.PgUtils.PgLock import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec -import fr.acinq.eclair.{CltvExpiry, Paginated} +import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated} import grizzled.slf4j.Logging import scodec.bits.BitVector -import java.sql.{Connection, Timestamp} +import java.sql.{Connection, Statement, Timestamp} import java.time.Instant import javax.sql.DataSource object PgChannelsDb { val DB_NAME = "channels" - val CURRENT_VERSION = 11 + val CURRENT_VERSION = 12 } class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb with Logging { @@ -49,12 +49,95 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit inTransaction { pg => using(pg.createStatement()) { statement => + /** + * Before version 12, closed channels were directly kept in the local_channels table with an is_closed flag set to true. + * We move them to a dedicated table, where we keep minimal channel information. + */ + def migration1112(statement: Statement): Unit = { + // We start by dropping for foreign key constraint on htlc_infos, otherwise we won't be able to move recently + // closed channels to a different table. + statement.executeQuery("SELECT conname FROM pg_catalog.pg_constraint WHERE contype = 'f'").map(rs => rs.getString("conname")).headOption match { + case Some(foreignKeyConstraint) => statement.executeUpdate(s"ALTER TABLE local.htlc_infos DROP CONSTRAINT $foreignKeyConstraint") + case None => logger.warn("couldn't find foreign key constraint for htlc_infos table: DB migration may fail") + } + // We can now move closed channels to a dedicated table. + statement.executeUpdate("CREATE TABLE local.channels_closed (channel_id TEXT NOT NULL PRIMARY KEY, remote_node_id TEXT NOT NULL, funding_txid TEXT NOT NULL, funding_output_index BIGINT NOT NULL, funding_tx_index BIGINT NOT NULL, funding_key_path TEXT NOT NULL, channel_features TEXT NOT NULL, is_channel_opener BOOLEAN NOT NULL, commitment_format TEXT NOT NULL, announced BOOLEAN NOT NULL, capacity_satoshis BIGINT NOT NULL, closing_txid TEXT NOT NULL, closing_type TEXT NOT NULL, closing_script TEXT NOT NULL, local_balance_msat BIGINT NOT NULL, remote_balance_msat BIGINT NOT NULL, closing_amount_satoshis BIGINT NOT NULL, created_at TIMESTAMP WITH TIME ZONE NOT NULL, closed_at TIMESTAMP WITH TIME ZONE NOT NULL)") + statement.executeUpdate("CREATE INDEX channels_closed_remote_node_id_idx ON local.channels_closed(remote_node_id)") + // We migrate closed channels from the local_channels table to the new channels_closed table, whenever possible. + val insertStatement = pg.prepareStatement("INSERT INTO local.channels_closed VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + val batchSize = 50 + using(pg.prepareStatement("SELECT channel_id, data, is_closed, created_timestamp, closed_timestamp FROM local.channels WHERE is_closed=TRUE")) { queryStatement => + val rs = queryStatement.executeQuery() + var inserted = 0 + var batchCount = 0 + while (rs.next()) { + val channelId = rs.getByteVector32FromHex("channel_id") + val data_opt = channelDataCodec.decode(BitVector(rs.getBytes("data"))).require.value match { + case d: DATA_NEGOTIATING_SIMPLE => + // We didn't store which closing transaction actually confirmed, so we select the most likely one. + // The simple_close feature wasn't widely supported before this migration, so this shouldn't affect a lot of channels. + val closingTx = d.publishedClosingTxs.lastOption.getOrElse(d.proposedClosingTxs.last.preferred_opt.get) + Some(DATA_CLOSED(d, closingTx)) + case d: DATA_CLOSING => + Helpers.Closing.isClosingTypeAlreadyKnown(d) match { + case Some(closingType) => Some(DATA_CLOSED(d, closingType)) + // If the closing type cannot be inferred from the stored data, it must be a mutual close. + // In that case, we didn't store which closing transaction actually confirmed, so we select the most likely one. + case None if d.mutualClosePublished.nonEmpty => Some(DATA_CLOSED(d, Helpers.Closing.MutualClose(d.mutualClosePublished.last))) + case None => + logger.warn(s"cannot move channel_id=$channelId to the channels_closed table, unknown closing_type") + None + } + case d => + logger.warn(s"cannot move channel_id=$channelId to the channels_closed table (state=${d.getClass.getSimpleName})") + None + } + data_opt match { + case Some(data) => + insertStatement.setString(1, channelId.toHex) + insertStatement.setString(2, data.remoteNodeId.toHex) + insertStatement.setString(3, data.fundingTxId.value.toHex) + insertStatement.setLong(4, data.fundingOutputIndex) + insertStatement.setLong(5, data.fundingTxIndex) + insertStatement.setString(6, data.fundingKeyPath) + insertStatement.setString(7, data.channelFeatures) + insertStatement.setBoolean(8, data.isChannelOpener) + insertStatement.setString(9, data.commitmentFormat) + insertStatement.setBoolean(10, data.announced) + insertStatement.setLong(11, data.capacity.toLong) + insertStatement.setString(12, data.closingTxId.value.toHex) + insertStatement.setString(13, data.closingType) + insertStatement.setString(14, data.closingScript.toHex) + insertStatement.setLong(15, data.localBalance.toLong) + insertStatement.setLong(16, data.remoteBalance.toLong) + insertStatement.setLong(17, data.closingAmount.toLong) + insertStatement.setTimestamp(18, rs.getTimestampNullable("created_timestamp").getOrElse(Timestamp.from(Instant.ofEpochMilli(0)))) + insertStatement.setTimestamp(19, rs.getTimestampNullable("closed_timestamp").getOrElse(Timestamp.from(Instant.ofEpochMilli(0)))) + insertStatement.addBatch() + batchCount = batchCount + 1 + if (batchCount % batchSize == 0) { + inserted = inserted + insertStatement.executeBatch().sum + batchCount = 0 + } + case None => () + } + } + inserted = inserted + insertStatement.executeBatch().sum + logger.info(s"moved $inserted channels to the channels_closed table") + } + // We can now clean-up the active channels table. + statement.executeUpdate("DELETE FROM local.channels WHERE is_closed=TRUE") + statement.executeUpdate("ALTER TABLE local.channels DROP COLUMN is_closed") + statement.executeUpdate("ALTER TABLE local.channels DROP COLUMN closed_timestamp") + } + getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local") - statement.executeUpdate("CREATE TABLE local.channels (channel_id TEXT NOT NULL PRIMARY KEY, remote_node_id TEXT NOT NULL, data BYTEA NOT NULL, json JSONB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE, created_timestamp TIMESTAMP WITH TIME ZONE, last_payment_sent_timestamp TIMESTAMP WITH TIME ZONE, last_payment_received_timestamp TIMESTAMP WITH TIME ZONE, last_connected_timestamp TIMESTAMP WITH TIME ZONE, closed_timestamp TIMESTAMP WITH TIME ZONE)") - statement.executeUpdate("CREATE TABLE local.htlc_infos (channel_id TEXT NOT NULL, commitment_number BIGINT NOT NULL, payment_hash TEXT NOT NULL, cltv_expiry BIGINT NOT NULL, FOREIGN KEY(channel_id) REFERENCES local.channels(channel_id))") + statement.executeUpdate("CREATE TABLE local.channels (channel_id TEXT NOT NULL PRIMARY KEY, remote_node_id TEXT NOT NULL, data BYTEA NOT NULL, json JSONB NOT NULL, created_timestamp TIMESTAMP WITH TIME ZONE, last_payment_sent_timestamp TIMESTAMP WITH TIME ZONE, last_payment_received_timestamp TIMESTAMP WITH TIME ZONE, last_connected_timestamp TIMESTAMP WITH TIME ZONE)") + statement.executeUpdate("CREATE TABLE local.channels_closed (channel_id TEXT NOT NULL PRIMARY KEY, remote_node_id TEXT NOT NULL, funding_txid TEXT NOT NULL, funding_output_index BIGINT NOT NULL, funding_tx_index BIGINT NOT NULL, funding_key_path TEXT NOT NULL, channel_features TEXT NOT NULL, is_channel_opener BOOLEAN NOT NULL, commitment_format TEXT NOT NULL, announced BOOLEAN NOT NULL, capacity_satoshis BIGINT NOT NULL, closing_txid TEXT NOT NULL, closing_type TEXT NOT NULL, closing_script TEXT NOT NULL, local_balance_msat BIGINT NOT NULL, remote_balance_msat BIGINT NOT NULL, closing_amount_satoshis BIGINT NOT NULL, created_at TIMESTAMP WITH TIME ZONE NOT NULL, closed_at TIMESTAMP WITH TIME ZONE NOT NULL)") + statement.executeUpdate("CREATE TABLE local.htlc_infos (channel_id TEXT NOT NULL, commitment_number BIGINT NOT NULL, payment_hash TEXT NOT NULL, cltv_expiry BIGINT NOT NULL)") statement.executeUpdate("CREATE TABLE local.htlc_infos_to_remove (channel_id TEXT NOT NULL PRIMARY KEY, before_commitment_number BIGINT NOT NULL)") statement.executeUpdate("CREATE INDEX local_channels_type_idx ON local.channels ((json->>'type'))") @@ -63,7 +146,11 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit // This is more efficient because we're writing a lot to this table but only reading when a channel is force-closed. statement.executeUpdate("CREATE INDEX htlc_infos_channel_id_idx ON local.htlc_infos(channel_id)") statement.executeUpdate("CREATE INDEX htlc_infos_commitment_number_idx ON local.htlc_infos(commitment_number)") + statement.executeUpdate("CREATE INDEX channels_closed_remote_node_id_idx ON local.channels_closed(remote_node_id)") case Some(v) if v < 11 => throw new RuntimeException("You are updating from a version of eclair older than v0.13: please update to the v0.13 release first to migrate your channel data, and afterwards you'll be able to update to the latest version.") + case Some(v@11) => + logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") + if (v < 12) migration1112(statement) case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -91,8 +178,8 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit val encoded = channelDataCodec.encode(data).require.toByteArray using(pg.prepareStatement( """ - | INSERT INTO local.channels (channel_id, remote_node_id, data, json, created_timestamp, last_connected_timestamp, is_closed) - | VALUES (?, ?, ?, ?::JSONB, ?, ?, FALSE) + | INSERT INTO local.channels (channel_id, remote_node_id, data, json, created_timestamp, last_connected_timestamp) + | VALUES (?, ?, ?, ?::JSONB, ?, ?) | ON CONFLICT (channel_id) | DO UPDATE SET data = EXCLUDED.data, json = EXCLUDED.json ; | """.stripMargin)) { statement => @@ -109,7 +196,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit override def getChannel(channelId: ByteVector32): Option[PersistentChannelData] = withMetrics("channels/get-channel", DbBackends.Postgres) { withLock { pg => - using(pg.prepareStatement("SELECT data FROM local.channels WHERE channel_id=? AND is_closed=FALSE")) { statement => + using(pg.prepareStatement("SELECT data FROM local.channels WHERE channel_id=?")) { statement => statement.setString(1, channelId.toHex) statement.executeQuery.mapCodec(channelDataCodec).lastOption } @@ -137,7 +224,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit timestampColumn_opt.foreach(updateChannelMetaTimestampColumn(channelId, _)) } - override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel", DbBackends.Postgres) { + override def removeChannel(channelId: ByteVector32, data_opt: Option[DATA_CLOSED]): Unit = withMetrics("channels/remove-channel", DbBackends.Postgres) { withLock { pg => using(pg.prepareStatement("DELETE FROM local.pending_settlement_commands WHERE channel_id=?")) { statement => statement.setString(1, channelId.toHex) @@ -148,9 +235,39 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit // We instead run an asynchronous job to clean up that data in small batches. markHtlcInfosForRemoval(channelId, Long.MaxValue) - using(pg.prepareStatement("UPDATE local.channels SET is_closed=TRUE, closed_timestamp=? WHERE channel_id=?")) { statement => - statement.setTimestamp(1, Timestamp.from(Instant.now())) - statement.setString(2, channelId.toHex) + // If we have useful closing data for this channel, we keep it in a dedicated table. + data_opt.foreach(data => { + val createdAt_opt = using(pg.prepareStatement("SELECT created_timestamp FROM local.channels WHERE channel_id=?")) { statement => + statement.setString(1, channelId.toHex) + statement.executeQuery().flatMap(rs => rs.getTimestampNullable("created_timestamp")).headOption + } + using(pg.prepareStatement("INSERT INTO local.channels_closed VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT DO NOTHING")) { statement => + statement.setString(1, channelId.toHex) + statement.setString(2, data.remoteNodeId.toHex) + statement.setString(3, data.fundingTxId.value.toHex) + statement.setLong(4, data.fundingOutputIndex) + statement.setLong(5, data.fundingTxIndex) + statement.setString(6, data.fundingKeyPath) + statement.setString(7, data.channelFeatures) + statement.setBoolean(8, data.isChannelOpener) + statement.setString(9, data.commitmentFormat) + statement.setBoolean(10, data.announced) + statement.setLong(11, data.capacity.toLong) + statement.setString(12, data.closingTxId.value.toHex) + statement.setString(13, data.closingType) + statement.setString(14, data.closingScript.toHex) + statement.setLong(15, data.localBalance.toLong) + statement.setLong(16, data.remoteBalance.toLong) + statement.setLong(17, data.closingAmount.toLong) + statement.setTimestamp(18, createdAt_opt.getOrElse(Timestamp.from(Instant.ofEpochMilli(0)))) + statement.setTimestamp(19, Timestamp.from(Instant.now())) + statement.executeUpdate() + } + }) + + // We can now remove this channel from the active channels table. + using(pg.prepareStatement("DELETE FROM local.channels WHERE channel_id=?")) { statement => + statement.setString(1, channelId.toHex) statement.executeUpdate() } } @@ -199,21 +316,40 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit override def listLocalChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-local-channels", DbBackends.Postgres) { withLock { pg => using(pg.createStatement) { statement => - statement.executeQuery("SELECT data FROM local.channels WHERE is_closed=FALSE") + statement.executeQuery("SELECT data FROM local.channels") .mapCodec(channelDataCodec).toSeq } } } - override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Postgres) { + override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[DATA_CLOSED] = withMetrics("channels/list-closed-channels", DbBackends.Postgres) { val sql = remoteNodeId_opt match { - case None => "SELECT data FROM local.channels WHERE is_closed=TRUE ORDER BY closed_timestamp DESC" - case Some(remoteNodeId) => s"SELECT data FROM local.channels WHERE is_closed=TRUE AND remote_node_id = '${remoteNodeId.toHex}' ORDER BY closed_timestamp DESC" + case Some(remoteNodeId) => s"SELECT * FROM local.channels_closed WHERE remote_node_id = '${remoteNodeId.toHex}' ORDER BY closed_at DESC" + case None => "SELECT * FROM local.channels_closed ORDER BY closed_at DESC" } withLock { pg => using(pg.prepareStatement(limited(sql, paginated_opt))) { statement => - statement.executeQuery() - .mapCodec(channelDataCodec).toSeq + statement.executeQuery().map { rs => + DATA_CLOSED( + channelId = rs.getByteVector32FromHex("channel_id"), + remoteNodeId = PublicKey(rs.getByteVectorFromHex("remote_node_id")), + fundingTxId = TxId(rs.getByteVector32FromHex("funding_txid")), + fundingOutputIndex = rs.getLong("funding_output_index"), + fundingTxIndex = rs.getLong("funding_tx_index"), + fundingKeyPath = rs.getString("funding_key_path"), + channelFeatures = rs.getString("channel_features"), + isChannelOpener = rs.getBoolean("is_channel_opener"), + commitmentFormat = rs.getString("commitment_format"), + announced = rs.getBoolean("announced"), + capacity = Satoshi(rs.getLong("capacity_satoshis")), + closingTxId = TxId(rs.getByteVector32FromHex("closing_txid")), + closingType = rs.getString("closing_type"), + closingScript = rs.getByteVectorFromHex("closing_script"), + localBalance = MilliSatoshi(rs.getLong("local_balance_msat")), + remoteBalance = MilliSatoshi(rs.getLong("remote_balance_msat")), + closingAmount = Satoshi(rs.getLong("closing_amount_satoshis")) + ) + }.toSeq } } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPendingCommandsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPendingCommandsDb.scala index 2ebd7d57b3..eed31da9c5 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPendingCommandsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPendingCommandsDb.scala @@ -56,7 +56,6 @@ class PgPendingCommandsDb(implicit ds: DataSource, lock: PgLock) extends Pending getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local") - // note: should we use a foreign key to local_channels table here? statement.executeUpdate("CREATE TABLE local.pending_settlement_commands (channel_id TEXT NOT NULL, htlc_id BIGINT NOT NULL, data BYTEA NOT NULL, PRIMARY KEY(channel_id, htlc_id))") case Some(v@(1 | 2)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala index bf6ead03e5..b2ef666624 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala @@ -16,22 +16,23 @@ package fr.acinq.eclair.db.sqlite -import fr.acinq.bitcoin.scalacompat.ByteVector32 import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey -import fr.acinq.eclair.channel.PersistentChannelData +import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi, TxId} +import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.ChannelsDb import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec -import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampMilli} +import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, TimestampMilli} import grizzled.slf4j.Logging +import scodec.bits.BitVector -import java.sql.Connection +import java.sql.{Connection, Statement} object SqliteChannelsDb { val DB_NAME = "channels" - val CURRENT_VERSION = 7 + val CURRENT_VERSION = 8 } class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { @@ -49,17 +50,106 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { statement.execute("PRAGMA foreign_keys = ON") } + /** + * Before version 8, closed channels were directly kept in the local_channels table with an is_closed flag set to true. + * We move them to a dedicated table, where we keep minimal channel information. + */ + def migration78(statement: Statement): Unit = { + // We start by dropping for foreign key constraint on htlc_infos, otherwise we won't be able to move recently + // closed channels to a different table. The only option for that in sqlite is to re-create the table. + statement.executeUpdate("ALTER TABLE htlc_infos RENAME TO htlc_infos_old") + statement.executeUpdate("CREATE TABLE htlc_infos (channel_id BLOB NOT NULL, commitment_number INTEGER NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL)") + statement.executeUpdate("INSERT INTO htlc_infos(channel_id, commitment_number, payment_hash, cltv_expiry) SELECT channel_id, commitment_number, payment_hash, cltv_expiry FROM htlc_infos_old") + statement.executeUpdate("DROP TABLE htlc_infos_old") + statement.executeUpdate("CREATE INDEX htlc_infos_channel_id_idx ON htlc_infos(channel_id)") + statement.executeUpdate("CREATE INDEX htlc_infos_commitment_number_idx ON htlc_infos(commitment_number)") + // We can now move closed channels to a dedicated table. + statement.executeUpdate("CREATE TABLE local_channels_closed (channel_id TEXT NOT NULL PRIMARY KEY, remote_node_id TEXT NOT NULL, funding_txid TEXT NOT NULL, funding_output_index INTEGER NOT NULL, funding_tx_index INTEGER NOT NULL, funding_key_path TEXT NOT NULL, channel_features TEXT NOT NULL, is_channel_opener BOOLEAN NOT NULL, commitment_format TEXT NOT NULL, announced BOOLEAN NOT NULL, capacity_satoshis INTEGER NOT NULL, closing_txid TEXT NOT NULL, closing_type TEXT NOT NULL, closing_script TEXT NOT NULL, local_balance_msat INTEGER NOT NULL, remote_balance_msat INTEGER NOT NULL, closing_amount_satoshis INTEGER NOT NULL, created_at INTEGER NOT NULL, closed_at INTEGER NOT NULL)") + statement.executeUpdate("CREATE INDEX local_channels_closed_remote_node_id_idx ON local_channels_closed(remote_node_id)") + // We migrate closed channels from the local_channels table to the new local_channels_closed table, whenever possible. + val insertStatement = sqlite.prepareStatement("INSERT INTO local_channels_closed VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") + val batchSize = 50 + using(sqlite.prepareStatement("SELECT channel_id, data, is_closed, created_timestamp, closed_timestamp FROM local_channels WHERE is_closed=1")) { queryStatement => + val rs = queryStatement.executeQuery() + var inserted = 0 + var batchCount = 0 + while (rs.next()) { + val channelId = rs.getByteVector32("channel_id") + val data_opt = channelDataCodec.decode(BitVector(rs.getBytes("data"))).require.value match { + case d: DATA_NEGOTIATING_SIMPLE => + // We didn't store which closing transaction actually confirmed, so we select the most likely one. + // The simple_close feature wasn't widely supported before this migration, so this shouldn't affect a lot of channels. + val closingTx = d.publishedClosingTxs.lastOption.getOrElse(d.proposedClosingTxs.last.preferred_opt.get) + Some(DATA_CLOSED(d, closingTx)) + case d: DATA_CLOSING => + Helpers.Closing.isClosingTypeAlreadyKnown(d) match { + case Some(closingType) => Some(DATA_CLOSED(d, closingType)) + // If the closing type cannot be inferred from the stored data, it must be a mutual close. + // In that case, we didn't store which closing transaction actually confirmed, so we select the most likely one. + case None if d.mutualClosePublished.nonEmpty => Some(DATA_CLOSED(d, Helpers.Closing.MutualClose(d.mutualClosePublished.last))) + case None => + logger.warn(s"cannot move channel_id=$channelId to the local_channels_closed table, unknown closing_type") + None + } + case d => + logger.warn(s"cannot move channel_id=$channelId to the local_channels_closed table (state=${d.getClass.getSimpleName})") + None + } + data_opt match { + case Some(data) => + insertStatement.setString(1, channelId.toHex) + insertStatement.setString(2, data.remoteNodeId.toHex) + insertStatement.setString(3, data.fundingTxId.value.toHex) + insertStatement.setLong(4, data.fundingOutputIndex) + insertStatement.setLong(5, data.fundingTxIndex) + insertStatement.setString(6, data.fundingKeyPath) + insertStatement.setString(7, data.channelFeatures) + insertStatement.setBoolean(8, data.isChannelOpener) + insertStatement.setString(9, data.commitmentFormat) + insertStatement.setBoolean(10, data.announced) + insertStatement.setLong(11, data.capacity.toLong) + insertStatement.setString(12, data.closingTxId.value.toHex) + insertStatement.setString(13, data.closingType) + insertStatement.setString(14, data.closingScript.toHex) + insertStatement.setLong(15, data.localBalance.toLong) + insertStatement.setLong(16, data.remoteBalance.toLong) + insertStatement.setLong(17, data.closingAmount.toLong) + insertStatement.setLong(18, rs.getLongNullable("created_timestamp").getOrElse(0)) + insertStatement.setLong(19, rs.getLongNullable("closed_timestamp").getOrElse(0)) + insertStatement.addBatch() + batchCount = batchCount + 1 + if (batchCount % batchSize == 0) { + inserted = inserted + insertStatement.executeBatch().sum + batchCount = 0 + } + case None => () + } + } + inserted = inserted + insertStatement.executeBatch().sum + logger.info(s"moved $inserted channels to the local_channels_closed table") + } + // We can now clean-up the active channels table. + statement.executeUpdate("DELETE FROM local_channels WHERE is_closed=1") + statement.executeUpdate("ALTER TABLE local_channels DROP COLUMN is_closed") + statement.executeUpdate("ALTER TABLE local_channels DROP COLUMN closed_timestamp") + } + using(sqlite.createStatement(), inTransaction = true) { statement => getVersion(statement, DB_NAME) match { case None => - statement.executeUpdate("CREATE TABLE local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0, created_timestamp INTEGER, last_payment_sent_timestamp INTEGER, last_payment_received_timestamp INTEGER, last_connected_timestamp INTEGER, closed_timestamp INTEGER)") - statement.executeUpdate("CREATE TABLE htlc_infos (channel_id BLOB NOT NULL, commitment_number INTEGER NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") + statement.executeUpdate("CREATE TABLE local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, created_timestamp INTEGER, last_payment_sent_timestamp INTEGER, last_payment_received_timestamp INTEGER, last_connected_timestamp INTEGER)") + statement.executeUpdate("CREATE TABLE local_channels_closed (channel_id TEXT NOT NULL PRIMARY KEY, remote_node_id TEXT NOT NULL, funding_txid TEXT NOT NULL, funding_output_index INTEGER NOT NULL, funding_tx_index INTEGER NOT NULL, funding_key_path TEXT NOT NULL, channel_features TEXT NOT NULL, is_channel_opener BOOLEAN NOT NULL, commitment_format TEXT NOT NULL, announced BOOLEAN NOT NULL, capacity_satoshis INTEGER NOT NULL, closing_txid TEXT NOT NULL, closing_type TEXT NOT NULL, closing_script TEXT NOT NULL, local_balance_msat INTEGER NOT NULL, remote_balance_msat INTEGER NOT NULL, closing_amount_satoshis INTEGER NOT NULL, created_at INTEGER NOT NULL, closed_at INTEGER NOT NULL)") + statement.executeUpdate("CREATE TABLE htlc_infos (channel_id BLOB NOT NULL, commitment_number INTEGER NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL)") statement.executeUpdate("CREATE TABLE htlc_infos_to_remove (channel_id BLOB NOT NULL PRIMARY KEY, before_commitment_number INTEGER NOT NULL)") // Note that we use two distinct indices instead of a composite index on (channel_id, commitment_number). // This is more efficient because we're writing a lot to this table but only reading when a channel is force-closed. statement.executeUpdate("CREATE INDEX htlc_infos_channel_id_idx ON htlc_infos(channel_id)") statement.executeUpdate("CREATE INDEX htlc_infos_commitment_number_idx ON htlc_infos(commitment_number)") + statement.executeUpdate("CREATE INDEX local_channels_closed_remote_node_id_idx ON local_channels_closed(remote_node_id)") case Some(v) if v < 7 => throw new RuntimeException("You are updating from a version of eclair older than v0.13: please update to the v0.13 release first to migrate your channel data, and afterwards you'll be able to update to the latest version.") + case Some(v@7) => + logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") + if (v < 8) migration78(statement) case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -72,7 +162,7 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { update.setBytes(1, encoded) update.setBytes(2, data.channelId.toArray) if (update.executeUpdate() == 0) { - using(sqlite.prepareStatement("INSERT INTO local_channels (channel_id, data, created_timestamp, last_connected_timestamp, is_closed) VALUES (?, ?, ?, ?, 0)")) { statement => + using(sqlite.prepareStatement("INSERT INTO local_channels (channel_id, data, created_timestamp, last_connected_timestamp) VALUES (?, ?, ?, ?)")) { statement => statement.setBytes(1, data.channelId.toArray) statement.setBytes(2, encoded) statement.setLong(3, TimestampMilli.now().toLong) @@ -84,7 +174,7 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { } override def getChannel(channelId: ByteVector32): Option[PersistentChannelData] = withMetrics("channels/get-channel", DbBackends.Sqlite) { - using(sqlite.prepareStatement("SELECT data FROM local_channels WHERE channel_id=? AND is_closed=0")) { statement => + using(sqlite.prepareStatement("SELECT data FROM local_channels WHERE channel_id=?")) { statement => statement.setBytes(1, channelId.toArray) statement.executeQuery.mapCodec(channelDataCodec).lastOption } @@ -111,7 +201,7 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { timestampColumn_opt.foreach(updateChannelMetaTimestampColumn(channelId, _)) } - override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel", DbBackends.Sqlite) { + override def removeChannel(channelId: ByteVector32, data_opt: Option[DATA_CLOSED]): Unit = withMetrics("channels/remove-channel", DbBackends.Sqlite) { using(sqlite.prepareStatement("DELETE FROM pending_settlement_commands WHERE channel_id=?")) { statement => statement.setBytes(1, channelId.toArray) statement.executeUpdate() @@ -121,9 +211,39 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { // We instead run an asynchronous job to clean up that data in small batches. markHtlcInfosForRemoval(channelId, Long.MaxValue) - using(sqlite.prepareStatement("UPDATE local_channels SET is_closed=1, closed_timestamp=? WHERE channel_id=?")) { statement => - statement.setLong(1, TimestampMilli.now().toLong) - statement.setBytes(2, channelId.toArray) + // If we have useful closing data for this channel, we keep it in a dedicated table. + data_opt.foreach(data => { + val createdAt_opt = using(sqlite.prepareStatement("SELECT created_timestamp FROM local_channels WHERE channel_id=?")) { statement => + statement.setBytes(1, channelId.toArray) + statement.executeQuery().flatMap(rs => rs.getLongNullable("created_timestamp")).headOption + } + using(sqlite.prepareStatement("INSERT OR IGNORE INTO local_channels_closed VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { statement => + statement.setString(1, channelId.toHex) + statement.setString(2, data.remoteNodeId.toHex) + statement.setString(3, data.fundingTxId.value.toHex) + statement.setLong(4, data.fundingOutputIndex) + statement.setLong(5, data.fundingTxIndex) + statement.setString(6, data.fundingKeyPath) + statement.setString(7, data.channelFeatures) + statement.setBoolean(8, data.isChannelOpener) + statement.setString(9, data.commitmentFormat) + statement.setBoolean(10, data.announced) + statement.setLong(11, data.capacity.toLong) + statement.setString(12, data.closingTxId.value.toHex) + statement.setString(13, data.closingType) + statement.setString(14, data.closingScript.toHex) + statement.setLong(15, data.localBalance.toLong) + statement.setLong(16, data.remoteBalance.toLong) + statement.setLong(17, data.closingAmount.toLong) + statement.setLong(18, createdAt_opt.getOrElse(0)) + statement.setLong(19, TimestampMilli.now().toLong) + statement.executeUpdate() + } + }) + + // We can now remove this channel from the active channels table. + using(sqlite.prepareStatement("DELETE FROM local_channels WHERE channel_id=?")) { statement => + statement.setBytes(1, channelId.toArray) statement.executeUpdate() } } @@ -172,29 +292,39 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { override def listLocalChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-local-channels", DbBackends.Sqlite) { using(sqlite.createStatement) { statement => - statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=0") + statement.executeQuery("SELECT data FROM local_channels") .mapCodec(channelDataCodec).toSeq } } - - override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Sqlite) { - val sql = "SELECT data FROM local_channels WHERE is_closed=1 ORDER BY closed_timestamp DESC" - remoteNodeId_opt match { - case None => - using(sqlite.prepareStatement(limited(sql, paginated_opt))) { statement => - statement.executeQuery().mapCodec(channelDataCodec).toSeq - } - case Some(nodeId) => - using(sqlite.prepareStatement(sql)) { statement => - val filtered = statement.executeQuery() - .mapCodec(channelDataCodec).filter(_.remoteNodeId == nodeId) - val limited = paginated_opt match { - case None => filtered - case Some(p) => filtered.slice(p.skip, p.skip + p.count) - } - limited.toSeq - } + override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[DATA_CLOSED] = withMetrics("channels/list-closed-channels", DbBackends.Sqlite) { + val sql = remoteNodeId_opt match { + case Some(_) => "SELECT * FROM local_channels_closed WHERE remote_node_id=? ORDER BY closed_at DESC" + case None => "SELECT * FROM local_channels_closed ORDER BY closed_at DESC" + } + using(sqlite.prepareStatement(limited(sql, paginated_opt))) { statement => + remoteNodeId_opt.foreach(remoteNodeId => statement.setString(1, remoteNodeId.toHex)) + statement.executeQuery().map { rs => + DATA_CLOSED( + channelId = rs.getByteVector32FromHex("channel_id"), + remoteNodeId = PublicKey(rs.getByteVectorFromHex("remote_node_id")), + fundingTxId = TxId(rs.getByteVector32FromHex("funding_txid")), + fundingOutputIndex = rs.getLong("funding_output_index"), + fundingTxIndex = rs.getLong("funding_tx_index"), + fundingKeyPath = rs.getString("funding_key_path"), + channelFeatures = rs.getString("channel_features"), + isChannelOpener = rs.getBoolean("is_channel_opener"), + commitmentFormat = rs.getString("commitment_format"), + announced = rs.getBoolean("announced"), + capacity = Satoshi(rs.getLong("capacity_satoshis")), + closingTxId = TxId(rs.getByteVector32FromHex("closing_txid")), + closingType = rs.getString("closing_type"), + closingScript = rs.getByteVectorFromHex("closing_script"), + localBalance = MilliSatoshi(rs.getLong("local_balance_msat")), + remoteBalance = MilliSatoshi(rs.getLong("remote_balance_msat")), + closingAmount = Satoshi(rs.getLong("closing_amount_satoshis")) + ) + }.toSeq } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePendingCommandsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePendingCommandsDb.scala index 351a229e74..64a14e5104 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePendingCommandsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqlitePendingCommandsDb.scala @@ -45,7 +45,6 @@ class SqlitePendingCommandsDb(val sqlite: Connection) extends PendingCommandsDb getVersion(statement, DB_NAME) match { case None => - // note: should we use a foreign key to local_channels table here? statement.executeUpdate("CREATE TABLE pending_settlement_commands (channel_id BLOB NOT NULL, htlc_id INTEGER NOT NULL, data BLOB NOT NULL, PRIMARY KEY(channel_id, htlc_id))") case Some(v@1) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/OpenChannelInterceptor.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/OpenChannelInterceptor.scala index 7a481db36c..ba4d2048de 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/OpenChannelInterceptor.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/OpenChannelInterceptor.scala @@ -255,6 +255,7 @@ private class OpenChannelInterceptor(peer: ActorRef[Any], case _: DATA_NEGOTIATING_SIMPLE => true case _: DATA_CLOSING => true case _: DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT => true + case _: ClosedData => true } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala index cfede0ffa5..6b4adfcc16 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala @@ -20,7 +20,7 @@ import akka.actor.typed.receptionist.{Receptionist, ServiceKey} import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter.{ClassicActorContextOps, ClassicActorRefOps, ClassicActorSystemOps, TypedActorRefOps} import akka.actor.{Actor, ActorContext, ActorLogging, ActorRef, OneForOneStrategy, Props, Stash, Status, SupervisorStrategy, typed} -import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi} +import fr.acinq.bitcoin.scalacompat.ByteVector32 import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.blockchain.OnChainPubkeyCache import fr.acinq.eclair.channel.Helpers.Closing @@ -56,8 +56,12 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory) // Closed channels will be removed, other channels will be restored. val (channels, closedChannels) = init.channels.partition(c => Closing.isClosed(c, None).isEmpty) closedChannels.foreach(c => { - log.info(s"closing channel ${c.channelId}") - nodeParams.db.channels.removeChannel(c.channelId) + log.info("channel {} was closed before restarting, updating the DB", c.channelId) + val closingData_opt = (c, Closing.isClosed(c, None)) match { + case (c: DATA_CLOSING, Some(closingType)) => Some(DATA_CLOSED(c, closingType)) + case _ => None + } + nodeParams.db.channels.removeChannel(c.channelId, closingData_opt) }) val peersWithChannels = channels.groupBy(_.remoteNodeId) val peersWithOnTheFlyFunding = nodeParams.db.liquidity.listPendingOnTheFlyFunding() diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala index dbb4775563..99b1c7def8 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestDatabases.scala @@ -110,7 +110,6 @@ object TestDatabases { } case class TestPgDatabases() extends TestDatabases { - val datasource: DataSource = getNewDatabase() val hikariConfig = new HikariConfig hikariConfig.setDataSource(datasource) @@ -165,8 +164,7 @@ object TestDatabases { initializeTables: Connection => Unit, dbName: String, targetVersion: Int, - postCheck: Connection => Unit - ): Unit = { + postCheck: Connection => Unit): Unit = { val connection = dbs.connection // initialize the database to a previous version and populate data initializeTables(connection) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala index 2b53fbefcd..20f7ff0505 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/b/WaitForFundingSignedStateSpec.scala @@ -153,9 +153,10 @@ class WaitForFundingSignedStateSpec extends TestKitBaseClass with FixtureAnyFunS test("recv CMD_CLOSE") { f => import f._ val sender = TestProbe() + val channelId = alice.stateData.asInstanceOf[DATA_WAIT_FOR_FUNDING_SIGNED].channelId val c = CMD_CLOSE(sender.ref, None, None) alice ! c - sender.expectMsg(RES_SUCCESS(c, alice.stateData.asInstanceOf[DATA_WAIT_FOR_FUNDING_SIGNED].channelId)) + sender.expectMsg(RES_SUCCESS(c, channelId)) awaitCond(alice.stateName == CLOSED) aliceOpenReplyTo.expectMsg(OpenChannelResponse.Cancelled) listener.expectMsgType[ChannelAborted] diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala index ff1645aa5e..2f85025771 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala @@ -27,7 +27,6 @@ import fr.acinq.eclair._ import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._ import fr.acinq.eclair.blockchain.fee.FeeratePerKw import fr.acinq.eclair.blockchain.{NewTransaction, SingleKeyOnChainWallet} -import fr.acinq.eclair.channel.Helpers.Closing.{LocalClose, RemoteClose, RevokedClose} import fr.acinq.eclair.channel.LocalFundingStatus.DualFundedUnconfirmedFundingTx import fr.acinq.eclair.channel._ import fr.acinq.eclair.channel.fsm.Channel @@ -3340,6 +3339,8 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik }) alice2blockchain.expectNoMessage(100 millis) awaitCond(alice.stateName == CLOSED) + assert(alice.stateData.asInstanceOf[DATA_CLOSED].closingTxId == commitTx2.txid) + assert(alice.stateData.asInstanceOf[DATA_CLOSED].fundingTxId == fundingTx2.txid) // Bob also detects that the commit confirms, along with 2nd-stage transactions. bob ! WatchTxConfirmedTriggered(BlockHeight(400000), 42, commitTx2) @@ -3355,10 +3356,8 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik }) bob2blockchain.expectNoMessage(100 millis) awaitCond(bob.stateName == CLOSED) - - checkPostSpliceState(f, spliceOutFee(f, capacity = 1_900_000.sat, signedTx_opt = Some(fundingTx2))) - assert(Helpers.Closing.isClosed(alice.stateData.asInstanceOf[DATA_CLOSING], None).exists(_.isInstanceOf[LocalClose])) - assert(Helpers.Closing.isClosed(bob.stateData.asInstanceOf[DATA_CLOSING], None).exists(_.isInstanceOf[RemoteClose])) + assert(bob.stateData.asInstanceOf[DATA_CLOSED].closingTxId == commitTx2.txid) + assert(bob.stateData.asInstanceOf[DATA_CLOSED].fundingTxId == fundingTx2.txid) } test("force-close with multiple splices (previous active remote)", Tag(ChannelStateTestsTags.OptionSimpleTaprootPhoenix)) { f => @@ -3435,9 +3434,9 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik bobHtlcTxs.foreach(htlcTx => alice ! WatchTxConfirmedTriggered(BlockHeight(400000), 42, htlcTx.tx)) alice2blockchain.expectNoMessage(100 millis) awaitCond(alice.stateName == CLOSED) - - checkPostSpliceState(f, spliceOutFee = 0.sat) - assert(Helpers.Closing.isClosed(alice.stateData.asInstanceOf[DATA_CLOSING], None).exists(_.isInstanceOf[RemoteClose])) + assert(alice.stateData.asInstanceOf[DATA_CLOSED].closingType == "remote-close") + assert(alice.stateData.asInstanceOf[DATA_CLOSED].closingTxId == bobCommitTx1.txid) + assert(alice.stateData.asInstanceOf[DATA_CLOSED].fundingTxId == fundingTx1.txid) } test("force-close with multiple splices (previous active revoked)", Tag(ChannelStateTestsTags.StaticRemoteKey), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f => @@ -3513,7 +3512,9 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik alice ! WatchTxConfirmedTriggered(BlockHeight(400000), 42, mainPenalty.tx) htlcPenalty.foreach { penalty => alice ! WatchTxConfirmedTriggered(BlockHeight(400000), 42, penalty.tx) } awaitCond(alice.stateName == CLOSED) - assert(Helpers.Closing.isClosed(alice.stateData.asInstanceOf[DATA_CLOSING], None).exists(_.isInstanceOf[RevokedClose])) + assert(alice.stateData.asInstanceOf[DATA_CLOSED].closingType == "revoked-close") + assert(alice.stateData.asInstanceOf[DATA_CLOSED].closingTxId == bobRevokedCommitTx.txid) + assert(alice.stateData.asInstanceOf[DATA_CLOSED].closingAmount == (Seq(remoteMain.tx, mainPenalty.tx) ++ htlcPenalty.map(_.tx)).map(_.txOut.head.amount).sum) } test("force-close with multiple splices (inactive remote)", Tag(ChannelStateTestsTags.ZeroConf), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f => @@ -3611,9 +3612,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik bobHtlcTxs.foreach(htlcTx => alice ! WatchTxConfirmedTriggered(BlockHeight(400000), 42, htlcTx.tx)) alice2blockchain.expectNoMessage(100 millis) awaitCond(alice.stateName == CLOSED) - - checkPostSpliceState(f, spliceOutFee = 0.sat) - assert(Helpers.Closing.isClosed(alice.stateData.asInstanceOf[DATA_CLOSING], None).exists(_.isInstanceOf[RemoteClose])) + assert(alice.stateData.asInstanceOf[DATA_CLOSED].closingTxId == bobCommitTx1.txid) } test("force-close with multiple splices (inactive revoked)", Tag(ChannelStateTestsTags.ZeroConf), Tag(ChannelStateTestsTags.OptionSimpleTaproot)) { f => @@ -3715,7 +3714,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik alice ! WatchTxConfirmedTriggered(BlockHeight(400000), 42, mainPenalty.tx) htlcPenalty.foreach { penalty => alice ! WatchTxConfirmedTriggered(BlockHeight(400000), 42, penalty.tx) } awaitCond(alice.stateName == CLOSED) - assert(Helpers.Closing.isClosed(alice.stateData.asInstanceOf[DATA_CLOSING], None).exists(_.isInstanceOf[RevokedClose])) + assert(alice.stateData.asInstanceOf[DATA_CLOSED].closingTxId == bobRevokedCommitTx.txid) } test("force-close after channel type upgrade (latest active)", Tag(ChannelStateTestsTags.AnchorOutputs)) { f => @@ -3791,6 +3790,9 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik }) alice2blockchain.expectNoMessage(100 millis) awaitCond(alice.stateName == CLOSED) + assert(alice.stateData.asInstanceOf[DATA_CLOSED].closingType == "local-close") + assert(alice.stateData.asInstanceOf[DATA_CLOSED].closingTxId == commitTx2.txid) + assert(alice.stateData.asInstanceOf[DATA_CLOSED].fundingTxId == fundingTx2.txid) // Bob also detects that the commit confirms, along with 2nd-stage transactions. bob ! WatchTxConfirmedTriggered(BlockHeight(400000), 42, commitTx2) @@ -3807,10 +3809,9 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik }) bob2blockchain.expectNoMessage(100 millis) awaitCond(bob.stateName == CLOSED) - - checkPostSpliceState(f, spliceOutFee(f, capacity = 1_900_000.sat, signedTx_opt = Some(fundingTx2))) - assert(Helpers.Closing.isClosed(alice.stateData.asInstanceOf[DATA_CLOSING], None).exists(_.isInstanceOf[LocalClose])) - assert(Helpers.Closing.isClosed(bob.stateData.asInstanceOf[DATA_CLOSING], None).exists(_.isInstanceOf[RemoteClose])) + assert(bob.stateData.asInstanceOf[DATA_CLOSED].closingType == "remote-close") + assert(bob.stateData.asInstanceOf[DATA_CLOSED].closingTxId == commitTx2.txid) + assert(bob.stateData.asInstanceOf[DATA_CLOSED].fundingTxId == fundingTx2.txid) } test("force-close after channel type upgrade (previous active)", Tag(ChannelStateTestsTags.AnchorOutputs)) { f => @@ -3902,7 +3903,8 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik bob ! WatchTxConfirmedTriggered(BlockHeight(400000), 42, penaltyTx.tx) htlcPenalty.foreach { penalty => bob ! WatchTxConfirmedTriggered(BlockHeight(400000), 42, penalty.tx) } awaitCond(bob.stateName == CLOSED) - assert(Helpers.Closing.isClosed(bob.stateData.asInstanceOf[DATA_CLOSING], None).exists(_.isInstanceOf[RevokedClose])) + assert(bob.stateData.asInstanceOf[DATA_CLOSED].closingType == "revoked-close") + assert(bob.stateData.asInstanceOf[DATA_CLOSED].closingTxId == aliceCommitTx.txid) } test("force-close after channel type upgrade (revoked latest active)", Tag(ChannelStateTestsTags.AnchorOutputs)) { f => @@ -3938,7 +3940,9 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik bob ! WatchTxConfirmedTriggered(BlockHeight(400000), 42, penaltyTx.tx) htlcPenalty.foreach { penalty => bob ! WatchTxConfirmedTriggered(BlockHeight(400000), 42, penalty.tx) } awaitCond(bob.stateName == CLOSED) - assert(Helpers.Closing.isClosed(bob.stateData.asInstanceOf[DATA_CLOSING], None).exists(_.isInstanceOf[RevokedClose])) + assert(bob.stateData.asInstanceOf[DATA_CLOSED].closingType == "revoked-close") + assert(bob.stateData.asInstanceOf[DATA_CLOSED].closingTxId == aliceCommitTx.txid) + assert(bob.stateData.asInstanceOf[DATA_CLOSED].fundingTxId == spliceTx.txid) } test("force-close after channel type upgrade (revoked previous inactive)", Tag(ChannelStateTestsTags.AnchorOutputs), Tag(ChannelStateTestsTags.ZeroConf)) { f => @@ -4005,7 +4009,8 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik bob ! WatchTxConfirmedTriggered(BlockHeight(400000), 42, penaltyTx.tx) htlcPenalty.foreach { penalty => bob ! WatchTxConfirmedTriggered(BlockHeight(400000), 42, penalty.tx) } awaitCond(bob.stateName == CLOSED) - assert(Helpers.Closing.isClosed(bob.stateData.asInstanceOf[DATA_CLOSING], None).exists(_.isInstanceOf[RevokedClose])) + assert(bob.stateData.asInstanceOf[DATA_CLOSED].closingType == "revoked-close") + assert(bob.stateData.asInstanceOf[DATA_CLOSED].closingTxId == aliceCommitTx.txid) } test("put back watches after restart") { f => diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/g/NegotiatingStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/g/NegotiatingStateSpec.scala index 0c3c2280ff..adc947c2f2 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/g/NegotiatingStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/g/NegotiatingStateSpec.scala @@ -36,6 +36,7 @@ import fr.acinq.eclair.{BlockHeight, CltvExpiry, Features, MilliSatoshiLong, Tes import org.scalatest.Inside.inside import org.scalatest.funsuite.FixtureAnyFunSuiteLike import org.scalatest.{Outcome, Tag} +import scodec.bits.ByteVector import scala.concurrent.duration._ @@ -57,10 +58,10 @@ class NegotiatingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike implicit val log: akka.event.LoggingAdapter = akka.event.NoLogging - def aliceClose(f: FixtureParam, feerates: Option[ClosingFeerates] = None): Unit = { + def aliceClose(f: FixtureParam, feerates: Option[ClosingFeerates] = None, script_opt: Option[ByteVector] = None): Unit = { import f._ val sender = TestProbe() - alice ! CMD_CLOSE(sender.ref, None, feerates) + alice ! CMD_CLOSE(sender.ref, script_opt, feerates) sender.expectMsgType[RES_SUCCESS[CMD_CLOSE]] val aliceShutdown = alice2bob.expectMsgType[Shutdown] if (alice.commitments.latest.commitmentFormat.isInstanceOf[TaprootCommitmentFormat]) assert(aliceShutdown.closeeNonce_opt.nonEmpty) @@ -79,10 +80,10 @@ class NegotiatingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike } } - def bobClose(f: FixtureParam, feerates: Option[ClosingFeerates] = None): Unit = { + def bobClose(f: FixtureParam, feerates: Option[ClosingFeerates] = None, script_opt: Option[ByteVector] = None): Unit = { import f._ val sender = TestProbe() - bob ! CMD_CLOSE(sender.ref, None, feerates) + bob ! CMD_CLOSE(sender.ref, script_opt, feerates) sender.expectMsgType[RES_SUCCESS[CMD_CLOSE]] val bobShutdown = bob2alice.expectMsgType[Shutdown] if (bob.commitments.latest.commitmentFormat.isInstanceOf[TaprootCommitmentFormat]) assert(bobShutdown.closeeNonce_opt.nonEmpty) @@ -999,7 +1000,7 @@ class NegotiatingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike test("receive INPUT_RESTORED", Tag(ChannelStateTestsTags.SimpleClose)) { f => import f._ - aliceClose(f) + aliceClose(f, script_opt = Some(Script.write(Script.pay2wpkh(randomKey().publicKey)))) alice2bob.expectMsgType[ClosingComplete] alice2bob.forward(bob) val aliceTx = bob2blockchain.expectMsgType[PublishFinalTx].tx @@ -1026,13 +1027,21 @@ class NegotiatingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike // Alice's transaction (published by Bob) confirms. alice ! WatchFundingSpentTriggered(aliceTx) - inside(alice2blockchain.expectMsgType[PublishFinalTx]) { p => + val fee = inside(alice2blockchain.expectMsgType[PublishFinalTx]) { p => assert(p.tx.txid == aliceTx.txid) assert(p.fee > 0.sat) + p.fee } assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == aliceTx.txid) alice ! WatchTxConfirmedTriggered(BlockHeight(100), 3, aliceTx) awaitCond(alice.stateName == CLOSED) + assert(alice.stateData.isInstanceOf[DATA_CLOSED]) + assert(alice.stateData.asInstanceOf[DATA_CLOSED].isChannelOpener) + assert(alice.stateData.asInstanceOf[DATA_CLOSED].closingType == "mutual-close") + assert(alice.stateData.asInstanceOf[DATA_CLOSED].closingTxId == aliceTx.txid) + assert(alice.stateData.asInstanceOf[DATA_CLOSED].remoteNodeId == alice.underlyingActor.remoteNodeId) + assert(alice.stateData.asInstanceOf[DATA_CLOSED].capacity == 1_000_000.sat) + assert(alice.stateData.asInstanceOf[DATA_CLOSED].closingAmount == 800_000.sat - fee) // Bob restarts and detects that Alice's closing transaction is confirmed. bob.setState(WAIT_FOR_INIT_INTERNAL, Nothing) @@ -1044,6 +1053,13 @@ class NegotiatingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike assert(bob2blockchain.expectMsgType[WatchTxConfirmed].txId == aliceTx.txid) bob ! WatchTxConfirmedTriggered(BlockHeight(100), 3, aliceTx) awaitCond(bob.stateName == CLOSED) + assert(bob.stateData.isInstanceOf[DATA_CLOSED]) + assert(!bob.stateData.asInstanceOf[DATA_CLOSED].isChannelOpener) + assert(bob.stateData.asInstanceOf[DATA_CLOSED].closingType == "mutual-close") + assert(bob.stateData.asInstanceOf[DATA_CLOSED].closingTxId == aliceTx.txid) + assert(bob.stateData.asInstanceOf[DATA_CLOSED].remoteNodeId == bob.underlyingActor.remoteNodeId) + assert(bob.stateData.asInstanceOf[DATA_CLOSED].capacity == TestConstants.fundingSatoshis) + assert(bob.stateData.asInstanceOf[DATA_CLOSED].closingAmount == 200_000.sat) } test("recv Error") { f => diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala index 3b30411ff3..66ff368e0c 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala @@ -548,10 +548,11 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with // Alice's Claim-HTLC-timeout transaction confirms: we relay the failure upstream. val claimHtlcTimeout = claimHtlcTimeoutTxs.find(_.htlcId == htlc3.id).get + val origin = alice.stateData.asInstanceOf[DATA_CLOSING].commitments.originChannels(htlc3.id) alice ! WatchTxConfirmedTriggered(alice.nodeParams.currentBlockHeight, 13, claimHtlcTimeout.tx) inside(alice2relayer.expectMsgType[RES_ADD_SETTLED[Origin, HtlcResult.OnChainFail]]) { fail => assert(fail.htlc == htlc3) - assert(fail.origin == alice.stateData.asInstanceOf[DATA_CLOSING].commitments.originChannels(htlc3.id)) + assert(fail.origin == origin) } } @@ -644,10 +645,11 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with // Alice's Claim-HTLC-timeout transaction confirms: we relay the failure upstream. val claimHtlcTimeout = claimHtlcTimeoutTxs.find(_.htlcId == htlc3.id).get + val origin = alice.stateData.asInstanceOf[DATA_CLOSING].commitments.originChannels(htlc3.id) alice ! WatchTxConfirmedTriggered(alice.nodeParams.currentBlockHeight, 13, claimHtlcTimeout.tx) inside(alice2relayer.expectMsgType[RES_ADD_SETTLED[Origin, HtlcResult.OnChainFail]]) { fail => assert(fail.htlc == htlc3) - assert(fail.origin == alice.stateData.asInstanceOf[DATA_CLOSING].commitments.originChannels(htlc3.id)) + assert(fail.origin == origin) } alice2relayer.expectNoMessage(100 millis) } @@ -828,9 +830,9 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with assert(closingState.htlcOutputs.isEmpty && closingState.htlcDelayedOutputs.isEmpty) assert(closingTxs.htlcTxs.isEmpty) // when the commit tx is confirmed, alice knows that the htlc she sent right before the unilateral close will never reach the chain + val origin = alice.stateData.asInstanceOf[DATA_CLOSING].commitments.originChannels(htlc.id) alice ! WatchTxConfirmedTriggered(BlockHeight(0), 0, aliceCommitTx) // so she fails it - val origin = alice.stateData.asInstanceOf[DATA_CLOSING].commitments.originChannels(htlc.id) alice2relayer.expectMsg(RES_ADD_SETTLED(origin, htlc, HtlcResult.OnChainFail(HtlcOverriddenByLocalCommit(channelId(alice), htlc)))) // the htlc will not settle on chain listener.expectNoMessage(100 millis) @@ -942,9 +944,9 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with assert(closingState.htlcOutputs.isEmpty && closingState.htlcDelayedOutputs.isEmpty) assert(closingTxs.htlcTxs.isEmpty) // when the commit tx is confirmed, alice knows that the htlc will never reach the chain + val origin = alice.stateData.asInstanceOf[DATA_CLOSING].commitments.originChannels(htlc.id) alice ! WatchTxConfirmedTriggered(BlockHeight(0), 0, closingState.commitTx) // so she fails it - val origin = alice.stateData.asInstanceOf[DATA_CLOSING].commitments.originChannels(htlc.id) alice2relayer.expectMsg(RES_ADD_SETTLED(origin, htlc, HtlcResult.OnChainFail(HtlcOverriddenByLocalCommit(channelId(alice), htlc)))) // the htlc will not settle on chain listener.expectNoMessage(100 millis) @@ -1271,11 +1273,19 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with alice2blockchain.expectWatchOutputSpent(htlcDelayed3.input) assert(alice.stateName == CLOSING) awaitCond(bob.stateName == CLOSED) + val closedBob = bob.stateData.asInstanceOf[DATA_CLOSED] + assert(closedBob.closingType == "remote-close") + assert(closedBob.closingTxId == closingStateAlice.commitTx.txid) + assert(closedBob.closingAmount == closingTxsBob.mainTx_opt.get.txOut.head.amount + Seq(htlcTimeoutTxBob1, htlcTimeoutTxBob2, htlcSuccessBob).map(_.txOut.head.amount).sum) // Alice's 3rd-stage transactions confirm. Seq(htlcDelayed1, htlcDelayed2, htlcDelayed3).foreach(p => alice ! WatchTxConfirmedTriggered(BlockHeight(750_100), 0, p.tx)) alice2blockchain.expectNoMessage(100 millis) awaitCond(alice.stateName == CLOSED) + val closedAlice = alice.stateData.asInstanceOf[DATA_CLOSED] + assert(closedAlice.closingType == "local-close") + assert(closedAlice.closingTxId == closingStateAlice.commitTx.txid) + assert(closedAlice.closingAmount == closingTxsAlice.mainTx_opt.get.txOut.head.amount + Seq(htlcDelayed1, htlcDelayed2, htlcDelayed3).map(_.tx.txOut.head.amount).sum) } test("recv WatchTxConfirmedTriggered (remote commit with htlcs only signed by local in next remote commit)") { f => @@ -1295,9 +1305,9 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with assert(closingState.localOutput_opt.isEmpty) assert(closingState.htlcOutputs.isEmpty) // when the commit tx is signed, alice knows that the htlc she sent right before the unilateral close will never reach the chain + val origin = alice.stateData.asInstanceOf[DATA_CLOSING].commitments.originChannels(htlc.id) alice ! WatchTxConfirmedTriggered(BlockHeight(0), 0, bobCommitTx) // so she fails it - val origin = alice.stateData.asInstanceOf[DATA_CLOSING].commitments.originChannels(htlc.id) alice2relayer.expectMsg(RES_ADD_SETTLED(origin, htlc, HtlcResult.OnChainFail(HtlcOverriddenByLocalCommit(channelId(alice), htlc)))) // the htlc will not settle on chain listener.expectNoMessage(100 millis) @@ -1704,6 +1714,10 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with assert(Set(forwardedFail1, forwardedFail2, forwardedFail3) == htlcs) alice2relayer.expectNoMessage(100 millis) awaitCond(alice.stateName == CLOSED) + val closedAlice = alice.stateData.asInstanceOf[DATA_CLOSED] + assert(closedAlice.closingType == "next-remote-close") + assert(closedAlice.closingTxId == bobCommitTx.txid) + assert(closedAlice.closingAmount == closingTxs.mainTx_opt.map(_.txOut.head.amount).getOrElse(0 sat) + closingTxs.htlcTimeoutTxs.map(_.txOut.head.amount).sum) } test("recv WatchTxConfirmedTriggered (next remote commit, taproot)", Tag(ChannelStateTestsTags.OptionSimpleTaproot)) { f => @@ -2062,10 +2076,10 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with assert(alice.stateName == CLOSING) alice ! WatchTxConfirmedTriggered(BlockHeight(115), 2, closingTxs.htlcPenaltyTxs.last) awaitCond(alice.stateName == CLOSED) - } - - test("recv WatchFundingSpentTriggered (one revoked tx, option_static_remotekey)", Tag(ChannelStateTestsTags.StaticRemoteKey)) { f => - testFundingSpentRevokedTx(f, DefaultCommitmentFormat) + val closedAlice = alice.stateData.asInstanceOf[DATA_CLOSED] + assert(closedAlice.closingType == "revoked-close") + assert(closedAlice.closingTxId == bobRevokedTx.txid) + assert(closedAlice.closingAmount == closingTxs.mainTx_opt.map(_.txOut.head.amount).getOrElse(0 sat) + closingTxs.mainPenaltyTx.txOut.head.amount + closingTxs.htlcPenaltyTxs.map(_.txOut.head.amount).sum) } test("recv WatchFundingSpentTriggered (one revoked tx, anchor outputs)", Tag(ChannelStateTestsTags.AnchorOutputs)) { f => diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala index ac380b6146..cd44ecfb43 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala @@ -17,26 +17,32 @@ package fr.acinq.eclair.db import com.softwaremill.quicklens._ -import fr.acinq.bitcoin.scalacompat.ByteVector32 -import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey} -import fr.acinq.eclair.TestDatabases.{TestPgDatabases, TestSqliteDatabases} +import fr.acinq.bitcoin.scalacompat.Crypto.PrivateKey +import fr.acinq.bitcoin.scalacompat.{ByteVector32, OutPoint, SatoshiLong, Script, Transaction, TxIn, TxOut} +import fr.acinq.eclair.TestDatabases.{TestPgDatabases, TestSqliteDatabases, migrationCheck} +import fr.acinq.eclair.TestUtils.randomTxId +import fr.acinq.eclair.blockchain.fee.FeeratePerKw +import fr.acinq.eclair.channel._ import fr.acinq.eclair.db.ChannelsDbSpec.getTimestamp import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.db.jdbc.JdbcUtils.using import fr.acinq.eclair.db.pg.PgChannelsDb +import fr.acinq.eclair.db.pg.PgUtils.setVersion import fr.acinq.eclair.db.sqlite.SqliteChannelsDb import fr.acinq.eclair.db.sqlite.SqliteUtils.ExtendedResultSet._ +import fr.acinq.eclair.json.JsonSerializers +import fr.acinq.eclair.transactions.Transactions.{ClosingTx, InputInfo} import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec import fr.acinq.eclair.wire.internal.channel.ChannelCodecsSpec -import fr.acinq.eclair.{Alias, CltvExpiry, TestDatabases, randomBytes32, randomKey, randomLong} +import fr.acinq.eclair.wire.protocol.Shutdown +import fr.acinq.eclair.{Alias, BlockHeight, CltvExpiry, MilliSatoshiLong, TestDatabases, randomBytes32, randomKey, randomLong} import org.scalatest.funsuite.AnyFunSuite -import scodec.bits.ByteVector +import scodec.bits.{ByteVector, HexStringSyntax} -import java.sql.{Connection, SQLException} +import java.sql.Connection import java.util.concurrent.Executors import scala.concurrent.duration._ import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future} -import scala.util.Random class ChannelsDbSpec extends AnyFunSuite { @@ -67,8 +73,6 @@ class ChannelsDbSpec extends AnyFunSuite { val paymentHash2 = ByteVector32(ByteVector.fill(32)(1)) val cltvExpiry2 = CltvExpiry(656) - intercept[SQLException](db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash1, cltvExpiry1)) // no related channel - assert(db.listLocalChannels().isEmpty) db.addOrUpdateChannel(channel1) db.addOrUpdateChannel(channel1) @@ -88,16 +92,19 @@ class ChannelsDbSpec extends AnyFunSuite { assert(db.listHtlcInfos(channel1.channelId, commitNumber + 1).isEmpty) assert(db.listClosedChannels(None, None).isEmpty) - db.removeChannel(channel1.channelId) + val closed1 = DATA_CLOSED(channel1.channelId, channel1.remoteNodeId, randomTxId(), 3, 2, channel1.channelParams.localParams.fundingKeyPath.toString(), channel1.channelParams.channelFeatures.toString, isChannelOpener = true, "anchor_outputs", announced = true, 100_000 sat, randomTxId(), "local-close", hex"deadbeef", 61_000_500 msat, 40_000_000 msat, 60_000 sat) + db.removeChannel(channel1.channelId, Some(closed1)) assert(db.getChannel(channel1.channelId).isEmpty) assert(db.listLocalChannels() == List(channel2b)) - assert(db.listClosedChannels(None, None) == List(channel1)) - assert(db.listClosedChannels(Some(channel1.remoteNodeId), None) == List(channel1)) + assert(db.listClosedChannels(None, None) == List(closed1)) + assert(db.listClosedChannels(Some(channel1.remoteNodeId), None) == List(closed1)) assert(db.listClosedChannels(Some(PrivateKey(randomBytes32()).publicKey), None).isEmpty) - db.removeChannel(channel2b.channelId) + // If no closing data is provided, the channel won't be backed-up in the closed_channels table. + db.removeChannel(channel2b.channelId, None) assert(db.getChannel(channel2b.channelId).isEmpty) assert(db.listLocalChannels().isEmpty) + assert(db.listClosedChannels(None, None) == Seq(closed1)) } } @@ -120,7 +127,7 @@ class ChannelsDbSpec extends AnyFunSuite { db.markHtlcInfosForRemoval(channel1.channelId, commitNumberSplice1) db.addHtlcInfo(channel1.channelId, 51, randomBytes32(), CltvExpiry(561)) db.addHtlcInfo(channel1.channelId, 52, randomBytes32(), CltvExpiry(561)) - db.removeChannel(channel1.channelId) + db.removeChannel(channel1.channelId, None) // The second channel has two splice transactions. db.addHtlcInfo(channel2.channelId, 48, randomBytes32(), CltvExpiry(561)) @@ -191,7 +198,6 @@ class ChannelsDbSpec extends AnyFunSuite { assert(getTimestamp(dbs, channel1.channelId, "last_payment_sent_timestamp").isEmpty) assert(getTimestamp(dbs, channel1.channelId, "last_payment_received_timestamp").isEmpty) assert(getTimestamp(dbs, channel1.channelId, "last_connected_timestamp").nonEmpty) - assert(getTimestamp(dbs, channel1.channelId, "closed_timestamp").isEmpty) db.updateChannelMeta(channel1.channelId, ChannelEvent.EventType.Created) assert(getTimestamp(dbs, channel1.channelId, "created_timestamp").nonEmpty) @@ -205,14 +211,156 @@ class ChannelsDbSpec extends AnyFunSuite { db.updateChannelMeta(channel1.channelId, ChannelEvent.EventType.Connected) assert(getTimestamp(dbs, channel1.channelId, "last_connected_timestamp").nonEmpty) - db.removeChannel(channel1.channelId) - assert(getTimestamp(dbs, channel1.channelId, "closed_timestamp").nonEmpty) + db.removeChannel(channel1.channelId, None) assert(getTimestamp(dbs, channel2.channelId, "created_timestamp").nonEmpty) assert(getTimestamp(dbs, channel2.channelId, "last_payment_sent_timestamp").isEmpty) assert(getTimestamp(dbs, channel2.channelId, "last_payment_received_timestamp").isEmpty) assert(getTimestamp(dbs, channel2.channelId, "last_connected_timestamp").nonEmpty) - assert(getTimestamp(dbs, channel2.channelId, "closed_timestamp").isEmpty) + } + } + + test("migrate closed channels to dedicated table") { + def createCommitments(): Commitments = { + ChannelCodecsSpec.normal.commitments + .modify(_.channelParams.channelId).setTo(randomBytes32()) + .modify(_.channelParams.remoteParams.nodeId).setTo(randomKey().publicKey) + } + + def closingTx(): ClosingTx = { + val input = InputInfo(OutPoint(randomTxId(), 3), TxOut(300_000 sat, Script.pay2wpkh(randomKey().publicKey))) + val tx = Transaction(2, Seq(TxIn(input.outPoint, Nil, 0)), Seq(TxOut(120_000 sat, Script.pay2wpkh(randomKey().publicKey)), TxOut(175_000 sat, Script.pay2tr(randomKey().xOnlyPublicKey()))), 0) + ClosingTx(input, tx, Some(1)) + } + + val paymentHash1 = randomBytes32() + val paymentHash2 = randomBytes32() + // The next two channels are closed and should be migrated to the closed_channels table. + // We haven't yet removed their corresponding htlc_infos, because it is done asynchronously for performance reasons. + val closed1 = DATA_CLOSING(createCommitments(), BlockHeight(750_000), hex"deadbeef", closingTx() :: Nil, closingTx() :: Nil) + val closed2 = DATA_NEGOTIATING_SIMPLE(createCommitments(), FeeratePerKw(2500 sat), hex"deadbeef", hex"beefdead", Nil, closingTx() :: Nil) + val htlcInfos = Map( + closed1.channelId -> Seq( + (7, paymentHash1, CltvExpiry(800_000)), + (7, paymentHash2, CltvExpiry(795_000)), + (8, paymentHash1, CltvExpiry(800_000)), + ), + closed2.channelId -> Seq( + (13, paymentHash1, CltvExpiry(801_000)), + (14, paymentHash2, CltvExpiry(801_000)), + ) + ) + // The following channel is closed, but was never confirmed and thus doesn't need to be migrated to the closed_channels table. + val closed3 = DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED(createCommitments(), 0 msat, 0 msat, BlockHeight(775_000), BlockHeight(780_000), DualFundingStatus.WaitingForConfirmations, None) + // The following channels aren't closed, and must stay in the channels table after the migration. + val notClosed1 = DATA_CLOSING(createCommitments(), BlockHeight(800_000), hex"deadbeef", closingTx() :: Nil, closingTx() :: Nil) + val notClosed2 = DATA_SHUTDOWN(createCommitments(), Shutdown(randomBytes32(), hex"deadbeef"), Shutdown(randomBytes32(), hex"beefdead"), CloseStatus.Initiator(Some(ClosingFeerates(FeeratePerKw(1500 sat), FeeratePerKw(1000 sat), FeeratePerKw(2500 sat))))) + + def postCheck(db: ChannelsDb): Unit = { + // The closed channels have been migrated to a dedicated DB. + assert(db.listClosedChannels(None, None).map(_.channelId).toSet == Set(closed1.channelId, closed2.channelId)) + // The remaining channels are still active. + assert(db.listLocalChannels().toSet == Set(notClosed1, notClosed2)) + // The corresponding htlc_infos hasn't been removed. + assert(db.listHtlcInfos(closed1.channelId, 7).toSet == Set((paymentHash1, CltvExpiry(800_000)), (paymentHash2, CltvExpiry(795_000)))) + assert(db.listHtlcInfos(closed1.channelId, 8).toSet == Set((paymentHash1, CltvExpiry(800_000)))) + assert(db.listHtlcInfos(closed2.channelId, 13).toSet == Set((paymentHash1, CltvExpiry(801_000)))) + assert(db.listHtlcInfos(closed2.channelId, 14).toSet == Set((paymentHash2, CltvExpiry(801_000)))) + } + + forAllDbs { + case dbs: TestPgDatabases => + migrationCheck( + dbs = dbs, + initializeTables = connection => { + // We initialize a v11 database, where closed channels were kept inside the channels table with an is_closed flag. + using(connection.createStatement()) { statement => + statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local") + statement.executeUpdate("CREATE TABLE local.channels (channel_id TEXT NOT NULL PRIMARY KEY, remote_node_id TEXT NOT NULL, data BYTEA NOT NULL, json JSONB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE, created_timestamp TIMESTAMP WITH TIME ZONE, last_payment_sent_timestamp TIMESTAMP WITH TIME ZONE, last_payment_received_timestamp TIMESTAMP WITH TIME ZONE, last_connected_timestamp TIMESTAMP WITH TIME ZONE, closed_timestamp TIMESTAMP WITH TIME ZONE)") + statement.executeUpdate("CREATE TABLE local.htlc_infos (channel_id TEXT NOT NULL, commitment_number BIGINT NOT NULL, payment_hash TEXT NOT NULL, cltv_expiry BIGINT NOT NULL, FOREIGN KEY(channel_id) REFERENCES local.channels(channel_id))") + statement.executeUpdate("CREATE INDEX htlc_infos_channel_id_idx ON local.htlc_infos(channel_id)") + statement.executeUpdate("CREATE INDEX htlc_infos_commitment_number_idx ON local.htlc_infos(commitment_number)") + setVersion(statement, PgChannelsDb.DB_NAME, 11) + } + // We insert some channels in our DB and htc info related to those channels. + Seq(closed1, closed2, closed3).foreach { c => + using(connection.prepareStatement("INSERT INTO local.channels (channel_id, remote_node_id, data, json, is_closed) VALUES (?, ?, ?, ?::JSONB, TRUE)")) { statement => + statement.setString(1, c.channelId.toHex) + statement.setString(2, c.remoteNodeId.toHex) + statement.setBytes(3, channelDataCodec.encode(c).require.toByteArray) + statement.setString(4, JsonSerializers.serialization.write(c)(JsonSerializers.formats)) + statement.executeUpdate() + } + } + Seq(notClosed1, notClosed2).foreach { c => + using(connection.prepareStatement("INSERT INTO local.channels (channel_id, remote_node_id, data, json, is_closed) VALUES (?, ?, ?, ?::JSONB, FALSE)")) { statement => + statement.setString(1, c.channelId.toHex) + statement.setString(2, c.remoteNodeId.toHex) + statement.setBytes(3, channelDataCodec.encode(c).require.toByteArray) + statement.setString(4, JsonSerializers.serialization.write(c)(JsonSerializers.formats)) + statement.executeUpdate() + } + } + htlcInfos.foreach { case (channelId, infos) => + infos.foreach { case (commitmentNumber, paymentHash, expiry) => + using(connection.prepareStatement("INSERT INTO local.htlc_infos VALUES (?, ?, ?, ?)")) { statement => + statement.setString(1, channelId.toHex) + statement.setLong(2, commitmentNumber) + statement.setString(3, paymentHash.toHex) + statement.setLong(4, expiry.toLong) + statement.executeUpdate() + } + } + } + }, + dbName = PgChannelsDb.DB_NAME, + targetVersion = PgChannelsDb.CURRENT_VERSION, + postCheck = _ => postCheck(dbs.channels) + ) + case dbs: TestSqliteDatabases => + migrationCheck( + dbs = dbs, + initializeTables = connection => { + // We initialize a v7 database, where closed channels were kept inside the channels table with an is_closed flag. + using(connection.createStatement()) { statement => + statement.execute("PRAGMA foreign_keys = ON") + statement.executeUpdate("CREATE TABLE local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0, created_timestamp INTEGER, last_payment_sent_timestamp INTEGER, last_payment_received_timestamp INTEGER, last_connected_timestamp INTEGER, closed_timestamp INTEGER)") + statement.executeUpdate("CREATE TABLE htlc_infos (channel_id BLOB NOT NULL, commitment_number INTEGER NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") + statement.executeUpdate("CREATE INDEX htlc_infos_channel_id_idx ON htlc_infos(channel_id)") + statement.executeUpdate("CREATE INDEX htlc_infos_commitment_number_idx ON htlc_infos(commitment_number)") + setVersion(statement, SqliteChannelsDb.DB_NAME, 7) + } + // We insert some channels in our DB and htc info related to those channels. + Seq(closed1, closed2, closed3).foreach { c => + using(connection.prepareStatement("INSERT INTO local_channels (channel_id, data, is_closed) VALUES (?, ?, 1)")) { statement => + statement.setBytes(1, c.channelId.toArray) + statement.setBytes(2, channelDataCodec.encode(c).require.toByteArray) + statement.executeUpdate() + } + } + Seq(notClosed1, notClosed2).foreach { c => + using(connection.prepareStatement("INSERT INTO local_channels (channel_id, data, is_closed) VALUES (?, ?, 0)")) { statement => + statement.setBytes(1, c.channelId.toArray) + statement.setBytes(2, channelDataCodec.encode(c).require.toByteArray) + statement.executeUpdate() + } + } + htlcInfos.foreach { case (channelId, infos) => + infos.foreach { case (commitmentNumber, paymentHash, expiry) => + using(connection.prepareStatement("INSERT INTO htlc_infos VALUES (?, ?, ?, ?)")) { statement => + statement.setBytes(1, channelId.toArray) + statement.setLong(2, commitmentNumber) + statement.setBytes(3, paymentHash.toArray) + statement.setLong(4, expiry.toLong) + statement.executeUpdate() + } + } + } + }, + dbName = SqliteChannelsDb.DB_NAME, + targetVersion = SqliteChannelsDb.CURRENT_VERSION, + postCheck = _ => postCheck(dbs.channels) + ) } } @@ -233,38 +381,6 @@ class ChannelsDbSpec extends AnyFunSuite { object ChannelsDbSpec { - case class TestCase(channelId: ByteVector32, - remoteNodeId: PublicKey, - data: ByteVector, - isClosed: Boolean, - createdTimestamp: Option[Long], - lastPaymentSentTimestamp: Option[Long], - lastPaymentReceivedTimestamp: Option[Long], - lastConnectedTimestamp: Option[Long], - closedTimestamp: Option[Long], - commitmentNumbers: Seq[Int]) - - val testCases: Seq[TestCase] = for (_ <- 0 until 10) yield { - val channelId = randomBytes32() - val remoteNodeId = randomKey().publicKey - val channel = ChannelCodecsSpec.normal - .modify(_.commitments.channelParams.channelId).setTo(channelId) - .modify(_.commitments.channelParams.remoteParams.nodeId).setTo(remoteNodeId) - val data = channelDataCodec.encode(channel).require.bytes - TestCase( - channelId = channelId, - remoteNodeId = remoteNodeId, - data = data, - isClosed = Random.nextBoolean(), - createdTimestamp = if (Random.nextBoolean()) Some(Random.nextInt(Int.MaxValue)) else None, - lastPaymentSentTimestamp = if (Random.nextBoolean()) Some(Random.nextInt(Int.MaxValue)) else None, - lastPaymentReceivedTimestamp = if (Random.nextBoolean()) Some(Random.nextInt(Int.MaxValue)) else None, - lastConnectedTimestamp = if (Random.nextBoolean()) Some(Random.nextInt(Int.MaxValue)) else None, - closedTimestamp = if (Random.nextBoolean()) Some(Random.nextInt(Int.MaxValue)) else None, - commitmentNumbers = for (_ <- 0 until Random.nextInt(10)) yield Random.nextInt(5) // there will be repetitions, on purpose - ) - } - def getTimestamp(dbs: TestDatabases, channelId: ByteVector32, columnName: String): Option[Long] = { dbs match { case _: TestPgDatabases => getPgTimestamp(dbs.connection, channelId, columnName) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleanerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleanerSpec.scala index fd3cbaa4f0..a120fea36d 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleanerSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleanerSpec.scala @@ -38,7 +38,7 @@ class RevokedHtlcInfoCleanerSpec extends ScalaTestWithActorTestKit(ConfigFactory channelsDb.addHtlcInfo(channelId, 17, randomBytes32(), CltvExpiry(561)) channelsDb.addHtlcInfo(channelId, 19, randomBytes32(), CltvExpiry(1105)) channelsDb.addHtlcInfo(channelId, 23, randomBytes32(), CltvExpiry(1729)) - channelsDb.removeChannel(channelId) + channelsDb.removeChannel(channelId, None) assert(channelsDb.listHtlcInfos(channelId, 17).nonEmpty) assert(channelsDb.listHtlcInfos(channelId, 19).nonEmpty) assert(channelsDb.listHtlcInfos(channelId, 23).nonEmpty)