Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ eclair {
funding {
// Each RBF attempt adds more data that we need to store and process, so we want to limit our peers to a reasonable use of RBF.
remote-rbf-limits {
max-attempts = 5 // maximum number of RBF attempts our peer is allowed to make
max-attempts = 10 // maximum number of RBF attempts our peer is allowed to make
attempt-delta-blocks = 6 // minimum number of blocks between RBF attempts
}
// Duration after which we abort a channel creation. If our peer seems unresponsive and doesn't complete the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,14 +210,14 @@ case class RemoteCommit(index: Long, spec: CommitmentSpec, txId: TxId, remotePer
commitmentFormat match {
case _: SegwitV0CommitmentFormat =>
val sig = remoteCommitTx.sign(fundingKey, remoteFundingPubKey)
Right(CommitSig(channelParams.channelId, sig, htlcSigs.toList, batchSize))
Right(CommitSig(channelParams.channelId, commitInput.outPoint.txid, sig, htlcSigs.toList, batchSize))
case _: SimpleTaprootChannelCommitmentFormat =>
remoteNonce_opt match {
case Some(remoteNonce) =>
val localNonce = NonceGenerator.signingNonce(fundingKey.publicKey, remoteFundingPubKey, commitInput.outPoint.txid)
remoteCommitTx.partialSign(fundingKey, remoteFundingPubKey, localNonce, Seq(localNonce.publicNonce, remoteNonce)) match {
case Left(_) => Left(InvalidCommitNonce(channelParams.channelId, commitInput.outPoint.txid, index))
case Right(psig) => Right(CommitSig(channelParams.channelId, psig, htlcSigs.toList, batchSize))
case Right(psig) => Right(CommitSig(channelParams.channelId, commitInput.outPoint.txid, psig, htlcSigs.toList, batchSize))
}
case None => Left(MissingCommitNonce(channelParams.channelId, commitInput.outPoint.txid, index))
}
Expand Down Expand Up @@ -650,7 +650,7 @@ case class Commitment(fundingTxIndex: Long,
case None => return Left(MissingCommitNonce(params.channelId, fundingTxId, remoteCommit.index + 1))
}
}
val commitSig = CommitSig(params.channelId, sig, htlcSigs.toList, batchSize)
val commitSig = CommitSig(params.channelId, fundingTxId, sig, htlcSigs.toList, batchSize)
val nextRemoteCommit = RemoteCommit(remoteCommit.index + 1, spec, remoteCommitTx.tx.txid, remoteNextPerCommitmentPoint)
Right((copy(nextRemoteCommit_opt = Some(nextRemoteCommit)), commitSig))
}
Expand Down Expand Up @@ -1089,9 +1089,11 @@ case class Commitments(channelParams: ChannelParams,
case _: CommitSig if active.size > 1 => return Left(CommitSigCountMismatch(channelId, active.size, 1))
case commitSig: CommitSig => Seq(commitSig)
}
// Signatures are sent in order (most recent first), calling `zip` will drop trailing sigs that are for deactivated/pruned commitments.
val commitKeys = LocalCommitmentKeys(channelParams, channelKeys, localCommitIndex + 1)
val active1 = active.zip(sigs).map { case (commitment, commit) =>
val active1 = active.zipWithIndex.map { case (commitment, idx) =>
// If the funding_txid isn't provided, we assume that signatures are sent in order (most recent first).
// This matches the behavior of peers who only support the experimental version of splicing.
val commit = sigs.find(_.fundingTxId_opt.contains(commitment.fundingTxId)).getOrElse(sigs(idx))
commitment.receiveCommit(channelParams, channelKeys, commitKeys, changes, commit) match {
case Left(f) => return Left(f)
case Right(commitment1) => commitment1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -949,7 +949,7 @@ private class InteractiveTxBuilder(replyTo: ActorRef[InteractiveTxBuilder.Respon
case Right(localSigOfRemoteTx) =>
val htlcSignatures = sortedHtlcTxs.map(_.localSig(remoteCommitmentKeys)).toList
log.info(s"built remote commit number=${purpose.remoteCommitIndex} toLocalMsat=${remoteSpec.toLocal.toLong} toRemoteMsat=${remoteSpec.toRemote.toLong} htlc_in={} htlc_out={} feeratePerKw=${remoteSpec.commitTxFeerate} txid=${remoteCommitTx.tx.txid} fundingTxId=${fundingTx.txid}", remoteSpec.htlcs.collect(DirectedHtlc.outgoing).map(_.id).mkString(","), remoteSpec.htlcs.collect(DirectedHtlc.incoming).map(_.id).mkString(","))
val localCommitSig = CommitSig(fundingParams.channelId, localSigOfRemoteTx, htlcSignatures, batchSize = 1)
val localCommitSig = CommitSig(fundingParams.channelId, fundingTx.txid, localSigOfRemoteTx, htlcSignatures, batchSize = 1)
val localCommit = UnsignedLocalCommit(purpose.localCommitIndex, localSpec, localCommitTx.tx.txid)
val remoteCommit = RemoteCommit(purpose.remoteCommitIndex, remoteSpec, remoteCommitTx.tx.txid, purpose.remotePerCommitmentPoint)
signFundingTx(completeTx, remoteFundingNonce_opt, remoteCommitNonces_opt.map(_.nextCommitNonce), localCommitSig, localCommit, remoteCommit)
Expand Down
51 changes: 49 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,10 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A

case Event(msg: LightningMessage, d: ConnectedData) if sender() != d.transport => // if the message doesn't originate from the transport, it is an outgoing message
msg match {
case batch: CommitSigBatch => batch.messages.foreach(msg => d.transport forward msg)
case batch: CommitSigBatch =>
// We insert a start_batch message to let our peer know how many commit_sig they will receive.
d.transport forward StartBatch.commitSigBatch(batch.channelId, batch.batchSize)
batch.messages.foreach(msg => d.transport forward msg)
case msg => d.transport forward msg
}
msg match {
Expand Down Expand Up @@ -349,8 +352,51 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
// We immediately forward messages to the peer, unless they are part of a batch, in which case we wait to
// receive the whole batch before forwarding.
msg match {
case msg: StartBatch =>
if (!msg.messageType_opt.contains(132)) {
log.debug("ignoring start_batch: we only support batching commit_sig messages")
d.transport ! Warning(msg.channelId, "invalid start_batch message: we only support batching commit_sig messages")
stay()
} else if (msg.batchSize > 20) {
log.debug("ignoring start_batch with batch_size = {} > 20", msg.batchSize)
d.transport ! Warning(msg.channelId, "invalid start_batch message: batch_size must not be greater than 20")
stay()
} else {
log.debug("starting commit_sig batch of size {} for channel_id={}", msg.batchSize, msg.channelId)
d.commitSigBatch_opt match {
case Some(pending) if pending.received.nonEmpty =>
log.warning("starting batch with incomplete previous batch ({}/{} received)", pending.received.size, pending.batchSize)
// This is a spec violation from our peer: this will likely lead to a force-close.
d.transport ! Warning(msg.channelId, "invalid start_batch message: the previous batch is not done yet")
d.peer ! CommitSigBatch(pending.received)
case _ => ()
}
stay() using d.copy(commitSigBatch_opt = Some(PendingCommitSigBatch(msg.channelId, msg.batchSize, Nil)))
}
case msg: HasChannelId if d.commitSigBatch_opt.nonEmpty =>
// We only support batches of commit_sig messages: other messages will simply be relayed individually.
val pending = d.commitSigBatch_opt.get
msg match {
case msg: CommitSig if msg.channelId == pending.channelId =>
val received1 = pending.received :+ msg
if (received1.size == pending.batchSize) {
log.debug("received last commit_sig in batch for channel_id={}", msg.channelId)
d.peer ! CommitSigBatch(received1)
stay() using d.copy(commitSigBatch_opt = None)
} else {
log.debug("received commit_sig {}/{} in batch for channel_id={}", received1.size, pending.batchSize, msg.channelId)
stay() using d.copy(commitSigBatch_opt = Some(pending.copy(received = received1)))
}
case _ =>
log.warning("received {} as part of a batch: we don't support batching that kind of messages", msg.getClass.getSimpleName)
if (pending.received.nonEmpty) d.peer ! CommitSigBatch(pending.received)
d.peer ! msg
stay() using d.copy(commitSigBatch_opt = None)
}
case msg: CommitSig =>
msg.tlvStream.get[CommitSigTlv.BatchTlv].map(_.size) match {
// We keep supporting the experimental version of splicing that older Phoenix wallets use.
// Once we're confident that enough Phoenix users have upgraded, we should remove this branch.
msg.tlvStream.get[CommitSigTlv.ExperimentalBatchTlv].map(_.size) match {
case Some(batchSize) if batchSize > 25 =>
log.warning("received legacy batch of commit_sig exceeding our threshold ({} > 25), processing messages individually", batchSize)
// We don't want peers to be able to exhaust our memory by sending batches of dummy messages that we keep in RAM.
Expand Down Expand Up @@ -614,6 +660,7 @@ object PeerConnection {
gossipTimestampFilter: Option[GossipTimestampFilter] = None,
behavior: Behavior = Behavior(),
expectedPong_opt: Option[ExpectedPong] = None,
commitSigBatch_opt: Option[PendingCommitSigBatch] = None,
legacyCommitSigBatch_opt: Option[PendingCommitSigBatch] = None,
isPersistent: Boolean) extends Data with HasTransport

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,16 @@ sealed trait ChannelReestablishTlv extends Tlv

object ChannelReestablishTlv {

/**
* When disconnected in the middle of an interactive-tx session, this field is used to request a retransmission of
* [[TxSignatures]] for the given [[txId]].
*/
case class NextFundingTlv(txId: TxId) extends ChannelReestablishTlv

/** The txid of the last [[ChannelReady]] or [[SpliceLocked]] message received before disconnecting, if any. */
case class YourLastFundingLockedTlv(txId: TxId) extends ChannelReestablishTlv

/** The txid of our latest outgoing [[ChannelReady]] or [[SpliceLocked]] for this channel. */
case class MyCurrentFundingLockedTlv(txId: TxId) extends ChannelReestablishTlv

/**
Expand Down Expand Up @@ -395,3 +403,13 @@ object ClosingSigTlv {
)
}

sealed trait StartBatchTlv extends Tlv

object StartBatchTlv {
/** Type of [[LightningMessage]] that is included in the batch, when batching a single message type. */
case class MessageType(tag: Int) extends StartBatchTlv

val startBatchTlvCodec: Codec[TlvStream[StartBatchTlv]] = tlvStream(discriminated[StartBatchTlv].by(varint)
.typecase(UInt64(1), tlvField(uint16.as[MessageType]))
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,23 @@ sealed trait CommitSigTlv extends Tlv

object CommitSigTlv {

/** @param size the number of [[CommitSig]] messages in the batch */
case class BatchTlv(size: Int) extends CommitSigTlv
/**
* While a splice is ongoing and not locked, we have multiple valid commitments.
* We send one [[CommitSig]] message for each valid commitment: this field maps it to the corresponding funding transaction.
*
* @param txId the funding transaction spent by this commitment.
*/
case class FundingTx(txId: TxId) extends CommitSigTlv

object BatchTlv {
val codec: Codec[BatchTlv] = tlvField(tu16)
}
private val fundingTxTlv: Codec[FundingTx] = tlvField(txIdAsHash)

/**
* The experimental version of splicing included the number of [[CommitSig]] messages in the batch.
* This TLV can be removed once Phoenix users have upgraded to the official version of splicing and use the [[StartBatch]] message.
*/
case class ExperimentalBatchTlv(size: Int) extends CommitSigTlv

private val experimentalBatchTlv: Codec[ExperimentalBatchTlv] = tlvField(tu16)

/** Partial signature signature for the current commitment transaction, along with the signing nonce used (when using taproot channels). */
case class PartialSignatureWithNonceTlv(partialSigWithNonce: PartialSignatureWithNonce) extends CommitSigTlv
Expand All @@ -111,8 +122,9 @@ object CommitSigTlv {
}

val commitSigTlvCodec: Codec[TlvStream[CommitSigTlv]] = tlvStream(discriminated[CommitSigTlv].by(varint)
.typecase(UInt64(1), fundingTxTlv)
.typecase(UInt64(2), PartialSignatureWithNonceTlv.codec)
.typecase(UInt64(0x47010005), BatchTlv.codec)
.typecase(UInt64(0x47010005), experimentalBatchTlv)
)

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ object LightningMessageCodecs {
("lockTime" | uint32) ::
("tlvStream" | ClosingSigTlv.closingSigTlvCodec)).as[ClosingSig]

val startBatchCodec: Codec[StartBatch] = (
("channelId" | bytes32) ::
("batchSize" | uint16) ::
("tlvStream" | StartBatchTlv.startBatchTlvCodec)).as[StartBatch]

val updateAddHtlcCodec: Codec[UpdateAddHtlc] = (
("channelId" | bytes32) ::
("id" | uint64overflow) ::
Expand Down Expand Up @@ -532,6 +537,7 @@ object LightningMessageCodecs {
.typecase(72, txInitRbfCodec)
.typecase(73, txAckRbfCodec)
.typecase(74, txAbortCodec)
.typecase(127, startBatchCodec)
.typecase(128, updateAddHtlcCodec)
.typecase(130, updateFulfillHtlcCodec)
.typecase(131, updateFailHtlcCodec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ case class TxAddInput(channelId: ByteVector32,

object TxAddInput {
def apply(channelId: ByteVector32, serialId: UInt64, sharedInput: OutPoint, sequence: Long): TxAddInput = {
TxAddInput(channelId, serialId, None, sharedInput.index, sequence, TlvStream(TxAddInputTlv.SharedInputTxId(sharedInput.txid)))
val tlvs = Set[TxAddInputTlv](
TxAddInputTlv.SharedInputTxId(sharedInput.txid),
)
TxAddInput(channelId, serialId, None, sharedInput.index, sequence, TlvStream(tlvs))
}
}

Expand Down Expand Up @@ -146,12 +149,11 @@ case class TxSignatures(channelId: ByteVector32,

object TxSignatures {
def apply(channelId: ByteVector32, tx: Transaction, witnesses: Seq[ScriptWitness], previousFundingSig_opt: Option[ChannelSpendSignature]): TxSignatures = {
val tlvs: Set[TxSignaturesTlv] = Set(
previousFundingSig_opt.map {
case IndividualSignature(sig) => TxSignaturesTlv.PreviousFundingTxSig(sig)
case partialSig: PartialSignatureWithNonce => TxSignaturesTlv.PreviousFundingTxPartialSig(partialSig)
}
).flatten
val tlvs: Set[TxSignaturesTlv] = previousFundingSig_opt match {
case Some(IndividualSignature(sig)) => Set(TxSignaturesTlv.PreviousFundingTxSig(sig))
case Some(partialSig: PartialSignatureWithNonce) => Set(TxSignaturesTlv.PreviousFundingTxPartialSig(partialSig))
case None => Set.empty
}
TxSignatures(channelId, tx.txid, witnesses, TlvStream(tlvs))
}
}
Expand Down Expand Up @@ -476,6 +478,15 @@ case class ClosingSig(channelId: ByteVector32, closerScriptPubKey: ByteVector, c
val nextCloseeNonce_opt: Option[IndividualNonce] = tlvStream.get[ClosingSigTlv.NextCloseeNonce].map(_.nonce)
}

/** This message is used to indicate that the next [[batchSize]] messages form a single logical message. */
case class StartBatch(channelId: ByteVector32, batchSize: Int, tlvStream: TlvStream[StartBatchTlv] = TlvStream.empty) extends ChannelMessage with HasChannelId {
val messageType_opt: Option[Long] = tlvStream.get[StartBatchTlv.MessageType].map(_.tag)
}

object StartBatch {
def commitSigBatch(channelId: ByteVector32, batchSize: Int): StartBatch = StartBatch(channelId, batchSize, TlvStream(StartBatchTlv.MessageType(132)))
}

case class UpdateAddHtlc(channelId: ByteVector32,
id: Long,
amountMsat: MilliSatoshi,
Expand Down Expand Up @@ -545,19 +556,21 @@ case class CommitSig(channelId: ByteVector32,
signature: IndividualSignature,
htlcSignatures: List[ByteVector64],
tlvStream: TlvStream[CommitSigTlv] = TlvStream.empty) extends CommitSigs {
val fundingTxId_opt: Option[TxId] = tlvStream.get[CommitSigTlv.FundingTx].map(_.txId)
val partialSignature_opt: Option[PartialSignatureWithNonce] = tlvStream.get[CommitSigTlv.PartialSignatureWithNonceTlv].map(_.partialSigWithNonce)
val sigOrPartialSig: ChannelSpendSignature = partialSignature_opt.getOrElse(signature)
}

object CommitSig {
def apply(channelId: ByteVector32, signature: ChannelSpendSignature, htlcSignatures: List[ByteVector64], batchSize: Int): CommitSig = {
def apply(channelId: ByteVector32, fundingTxId: TxId, signature: ChannelSpendSignature, htlcSignatures: List[ByteVector64], batchSize: Int): CommitSig = {
val (individualSig, partialSig_opt) = signature match {
case sig: IndividualSignature => (sig, None)
case psig: PartialSignatureWithNonce => (IndividualSignature(ByteVector64.Zeroes), Some(psig))
}
val tlvs = Set(
if (batchSize > 1) Some(CommitSigTlv.BatchTlv(batchSize)) else None,
partialSig_opt.map(CommitSigTlv.PartialSignatureWithNonceTlv(_))
Some(CommitSigTlv.FundingTx(fundingTxId)),
partialSig_opt.map(CommitSigTlv.PartialSignatureWithNonceTlv(_)),
if (batchSize > 1) Some(CommitSigTlv.ExperimentalBatchTlv(batchSize)) else None,
).flatten[CommitSigTlv]
CommitSig(channelId, individualSig, htlcSignatures, TlvStream(tlvs))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3030,7 +3030,7 @@ class InteractiveTxBuilderSpec extends TestKitBaseClass with AnyFunSuiteLike wit
bob ! ReceiveMessage(alice2bob.expectMsgType[SendMessage].msg.asInstanceOf[TxComplete])
// Alice <-- commit_sig --- Bob
val successA1 = alice2bob.expectMsgType[Succeeded]
val invalidCommitSig = CommitSig(params.channelId, PartialSignatureWithNonce(randomBytes32(), txCompleteBob.commitNonces_opt.get.commitNonce), Nil, batchSize = 1)
val invalidCommitSig = CommitSig(params.channelId, successA1.signingSession.fundingTxId, PartialSignatureWithNonce(randomBytes32(), txCompleteBob.commitNonces_opt.get.commitNonce), Nil, batchSize = 1)
val Left(error) = successA1.signingSession.receiveCommitSig(params.channelParamsA, params.channelKeysA, invalidCommitSig, params.nodeParamsA.currentBlockHeight)(akka.event.NoLogging)
assert(error.isInstanceOf[InvalidCommitmentSignature])
}
Expand Down
Loading