Skip to content

Commit 98cae45

Browse files
authored
Rename pending_relay to pending_commands (#1822)
Naming was confusing because it led to believe messages were related to htlcs that have not yet been relayed, whereas those are settlement messages, meaning that htlcs have relayed and are pending resolution upstream. The database has been renamed to a more generic `PendingCommandsDb` because we may store other types of commands for which we need reliable delivery.
1 parent d437ea1 commit 98cae45

22 files changed

+272
-180
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala

+6-6
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import fr.acinq.eclair.channel.TxPublisher.{PublishRawTx, PublishTx, SetChannelI
3535
import fr.acinq.eclair.crypto.ShaChain
3636
import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
3737
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent.EventType
38-
import fr.acinq.eclair.db.PendingRelayDb
38+
import fr.acinq.eclair.db.PendingCommandsDb
3939
import fr.acinq.eclair.db.pg.PgUtils.PgLock.logger
4040
import fr.acinq.eclair.io.Peer
4141
import fr.acinq.eclair.payment.PaymentSettlingOnChain
@@ -1875,15 +1875,15 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
18751875

18761876
onTransition {
18771877
case _ -> CLOSING =>
1878-
PendingRelayDb.getPendingFailsAndFulfills(nodeParams.db.pendingRelay, nextStateData.asInstanceOf[HasCommitments].channelId) match {
1878+
PendingCommandsDb.getSettlementCommands(nodeParams.db.pendingCommands, nextStateData.asInstanceOf[HasCommitments].channelId) match {
18791879
case Nil =>
18801880
log.debug("nothing to replay")
18811881
case cmds =>
18821882
log.info("replaying {} unacked fulfills/fails", cmds.size)
18831883
cmds.foreach(self ! _) // they all have commit = false
18841884
}
18851885
case SYNCING -> (NORMAL | SHUTDOWN) =>
1886-
PendingRelayDb.getPendingFailsAndFulfills(nodeParams.db.pendingRelay, nextStateData.asInstanceOf[HasCommitments].channelId) match {
1886+
PendingCommandsDb.getSettlementCommands(nodeParams.db.pendingCommands, nextStateData.asInstanceOf[HasCommitments].channelId) match {
18871887
case Nil =>
18881888
log.debug("nothing to replay")
18891889
case cmds =>
@@ -2109,7 +2109,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
21092109
} else {
21102110
// There might be pending fulfill commands that we haven't relayed yet.
21112111
// Since this involves a DB call, we only want to check it if all the previous checks failed (this is the slow path).
2112-
val pendingRelayFulfills = nodeParams.db.pendingRelay.listPendingRelay(d.channelId).collect { case c: CMD_FULFILL_HTLC => c.id }
2112+
val pendingRelayFulfills = nodeParams.db.pendingCommands.listSettlementCommands(d.channelId).collect { case c: CMD_FULFILL_HTLC => c.id }
21132113
val offendingPendingRelayFulfills = almostTimedOutIncoming.filter(htlc => pendingRelayFulfills.contains(htlc.id))
21142114
if (offendingPendingRelayFulfills.nonEmpty) {
21152115
handleLocalError(HtlcsWillTimeoutUpstream(d.channelId, offendingPendingRelayFulfills), d, Some(c))
@@ -2520,13 +2520,13 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
25202520
*/
25212521
def acking(channelId: ByteVector32, cmd: HtlcSettlementCommand): FSM.State[fr.acinq.eclair.channel.State, Data] = {
25222522
log.debug("scheduling acknowledgement of cmd id={}", cmd.id)
2523-
context.system.scheduler.scheduleOnce(10 seconds)(PendingRelayDb.ackCommand(nodeParams.db.pendingRelay, channelId, cmd))(context.system.dispatcher)
2523+
context.system.scheduler.scheduleOnce(10 seconds)(PendingCommandsDb.ackSettlementCommand(nodeParams.db.pendingCommands, channelId, cmd))(context.system.dispatcher)
25242524
state
25252525
}
25262526

25272527
def acking(updates: List[UpdateMessage]): FSM.State[fr.acinq.eclair.channel.State, Data] = {
25282528
log.debug("scheduling acknowledgement of cmds ids={}", updates.collect { case s: HtlcSettlementMessage => s.id }.mkString(","))
2529-
context.system.scheduler.scheduleOnce(10 seconds)(PendingRelayDb.ackPendingFailsAndFulfills(nodeParams.db.pendingRelay, updates))(context.system.dispatcher)
2529+
context.system.scheduler.scheduleOnce(10 seconds)(PendingCommandsDb.ackSettlementCommands(nodeParams.db.pendingCommands, updates))(context.system.dispatcher)
25302530
state
25312531
}
25322532

eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala

+5-5
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ trait Databases {
3838
def channels: ChannelsDb
3939
def peers: PeersDb
4040
def payments: PaymentsDb
41-
def pendingRelay: PendingRelayDb
41+
def pendingCommands: PendingCommandsDb
4242
//@formatter:on
4343
}
4444

@@ -59,7 +59,7 @@ object Databases extends Logging {
5959
channels: SqliteChannelsDb,
6060
peers: SqlitePeersDb,
6161
payments: SqlitePaymentsDb,
62-
pendingRelay: SqlitePendingRelayDb,
62+
pendingCommands: SqlitePendingCommandsDb,
6363
private val backupConnection: Connection) extends Databases with FileBackup {
6464
override def backup(backupFile: File): Unit = SqliteUtils.using(backupConnection.createStatement()) {
6565
statement => {
@@ -75,7 +75,7 @@ object Databases extends Logging {
7575
channels = new SqliteChannelsDb(eclairJdbc),
7676
peers = new SqlitePeersDb(eclairJdbc),
7777
payments = new SqlitePaymentsDb(eclairJdbc),
78-
pendingRelay = new SqlitePendingRelayDb(eclairJdbc),
78+
pendingCommands = new SqlitePendingCommandsDb(eclairJdbc),
7979
backupConnection = eclairJdbc
8080
)
8181
}
@@ -85,7 +85,7 @@ object Databases extends Logging {
8585
channels: PgChannelsDb,
8686
peers: PgPeersDb,
8787
payments: PgPaymentsDb,
88-
pendingRelay: PgPendingRelayDb,
88+
pendingCommands: PgPendingCommandsDb,
8989
dataSource: HikariDataSource,
9090
lock: PgLock) extends Databases with ExclusiveLock {
9191
override def obtainExclusiveLock(): Unit = lock.obtainExclusiveLock(dataSource)
@@ -119,7 +119,7 @@ object Databases extends Logging {
119119
channels = new PgChannelsDb,
120120
peers = new PgPeersDb,
121121
payments = new PgPaymentsDb,
122-
pendingRelay = new PgPendingRelayDb,
122+
pendingCommands = new PgPendingCommandsDb,
123123
dataSource = ds,
124124
lock = lock)
125125

eclair-core/src/main/scala/fr/acinq/eclair/db/PendingRelayDb.scala eclair-core/src/main/scala/fr/acinq/eclair/db/PendingCommandsDb.scala

+19-19
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616

1717
package fr.acinq.eclair.db
1818

19-
import java.io.Closeable
20-
21-
import akka.actor.{ActorContext, ActorRef}
19+
import akka.actor.ActorRef
2220
import akka.event.LoggingAdapter
2321
import fr.acinq.bitcoin.ByteVector32
2422
import fr.acinq.eclair.channel._
2523
import fr.acinq.eclair.wire.protocol.{UpdateFailHtlc, UpdateFailMalformedHtlc, UpdateFulfillHtlc, UpdateMessage}
2624

25+
import java.io.Closeable
26+
2727
/**
2828
* This database stores CMD_FULFILL_HTLC and CMD_FAIL_HTLC that we have received from downstream
2929
* (either directly via UpdateFulfillHtlc or by extracting the value from the
@@ -36,48 +36,48 @@ import fr.acinq.eclair.wire.protocol.{UpdateFailHtlc, UpdateFailMalformedHtlc, U
3636
* to handle all corner cases.
3737
*
3838
*/
39-
trait PendingRelayDb extends Closeable {
39+
trait PendingCommandsDb extends Closeable {
4040

41-
def addPendingRelay(channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit
41+
def addSettlementCommand(channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit
4242

43-
def removePendingRelay(channelId: ByteVector32, htlcId: Long): Unit
43+
def removeSettlementCommand(channelId: ByteVector32, htlcId: Long): Unit
4444

45-
def listPendingRelay(channelId: ByteVector32): Seq[HtlcSettlementCommand]
45+
def listSettlementCommands(channelId: ByteVector32): Seq[HtlcSettlementCommand]
4646

47-
def listPendingRelay(): Set[(ByteVector32, Long)]
47+
def listSettlementCommands(): Seq[(ByteVector32, HtlcSettlementCommand)]
4848

4949
}
5050

51-
object PendingRelayDb {
51+
object PendingCommandsDb {
5252
/**
5353
* We store [[CMD_FULFILL_HTLC]]/[[CMD_FAIL_HTLC]]/[[CMD_FAIL_MALFORMED_HTLC]]
5454
* in a database because we don't want to lose preimages, or to forget to fail
5555
* incoming htlcs, which would lead to unwanted channel closings.
5656
*/
57-
def safeSend(register: ActorRef, db: PendingRelayDb, channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit = {
57+
def safeSend(register: ActorRef, db: PendingCommandsDb, channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit = {
5858
// htlc settlement commands don't have replyTo
5959
register ! Register.Forward(ActorRef.noSender, channelId, cmd)
6060
// we store the command in a db (note that this happens *after* forwarding the command to the channel, so we don't add latency)
61-
db.addPendingRelay(channelId, cmd)
61+
db.addSettlementCommand(channelId, cmd)
6262
}
6363

64-
def ackCommand(db: PendingRelayDb, channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit = {
65-
db.removePendingRelay(channelId, cmd.id)
64+
def ackSettlementCommand(db: PendingCommandsDb, channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit = {
65+
db.removeSettlementCommand(channelId, cmd.id)
6666
}
6767

68-
def ackPendingFailsAndFulfills(db: PendingRelayDb, updates: List[UpdateMessage])(implicit log: LoggingAdapter): Unit = updates.collect {
68+
def ackSettlementCommands(db: PendingCommandsDb, updates: List[UpdateMessage])(implicit log: LoggingAdapter): Unit = updates.collect {
6969
case u: UpdateFulfillHtlc =>
7070
log.debug(s"fulfill acked for htlcId=${u.id}")
71-
db.removePendingRelay(u.channelId, u.id)
71+
db.removeSettlementCommand(u.channelId, u.id)
7272
case u: UpdateFailHtlc =>
7373
log.debug(s"fail acked for htlcId=${u.id}")
74-
db.removePendingRelay(u.channelId, u.id)
74+
db.removeSettlementCommand(u.channelId, u.id)
7575
case u: UpdateFailMalformedHtlc =>
7676
log.debug(s"fail-malformed acked for htlcId=${u.id}")
77-
db.removePendingRelay(u.channelId, u.id)
77+
db.removeSettlementCommand(u.channelId, u.id)
7878
}
7979

80-
def getPendingFailsAndFulfills(db: PendingRelayDb, channelId: ByteVector32)(implicit log: LoggingAdapter): Seq[HtlcSettlementCommand] = {
81-
db.listPendingRelay(channelId)
80+
def getSettlementCommands(db: PendingCommandsDb, channelId: ByteVector32)(implicit log: LoggingAdapter): Seq[HtlcSettlementCommand] = {
81+
db.listSettlementCommands(channelId)
8282
}
8383
}

eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
126126

127127
override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel", DbBackends.Postgres) {
128128
withLock { pg =>
129-
using(pg.prepareStatement("DELETE FROM pending_relay WHERE channel_id=?")) { statement =>
129+
using(pg.prepareStatement("DELETE FROM pending_settlement_commands WHERE channel_id=?")) { statement =>
130130
statement.setString(1, channelId.toHex)
131131
statement.executeUpdate()
132132
}

eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPendingRelayDb.scala eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPendingCommandsDb.scala

+25-15
Original file line numberDiff line numberDiff line change
@@ -21,38 +21,48 @@ import fr.acinq.bitcoin.ByteVector32
2121
import fr.acinq.eclair.channel.{Command, HtlcSettlementCommand}
2222
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
2323
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
24-
import fr.acinq.eclair.db.PendingRelayDb
24+
import fr.acinq.eclair.db.PendingCommandsDb
2525
import fr.acinq.eclair.db.pg.PgUtils._
2626
import fr.acinq.eclair.wire.internal.CommandCodecs.cmdCodec
27+
import grizzled.slf4j.Logging
2728

29+
import java.sql.Statement
2830
import javax.sql.DataSource
2931
import scala.collection.immutable.Queue
3032

31-
class PgPendingRelayDb(implicit ds: DataSource, lock: PgLock) extends PendingRelayDb {
33+
class PgPendingCommandsDb(implicit ds: DataSource, lock: PgLock) extends PendingCommandsDb with Logging {
3234

3335
import PgUtils.ExtendedResultSet._
3436
import PgUtils._
3537
import lock._
3638

3739
val DB_NAME = "pending_relay"
38-
val CURRENT_VERSION = 1
40+
val CURRENT_VERSION = 2
3941

4042
inTransaction { pg =>
4143
using(pg.createStatement()) { statement =>
44+
45+
def migration12(statement: Statement): Unit = {
46+
statement.executeUpdate("ALTER TABLE pending_relay RENAME TO pending_settlement_commands")
47+
}
48+
4249
getVersion(statement, DB_NAME) match {
4350
case None =>
4451
// note: should we use a foreign key to local_channels table here?
45-
statement.executeUpdate("CREATE TABLE pending_relay (channel_id TEXT NOT NULL, htlc_id BIGINT NOT NULL, data BYTEA NOT NULL, PRIMARY KEY(channel_id, htlc_id))")
52+
statement.executeUpdate("CREATE TABLE pending_settlement_commands (channel_id TEXT NOT NULL, htlc_id BIGINT NOT NULL, data BYTEA NOT NULL, PRIMARY KEY(channel_id, htlc_id))")
53+
case Some(v@1) =>
54+
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
55+
migration12(statement)
4656
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
4757
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
4858
}
4959
setVersion(statement, DB_NAME, CURRENT_VERSION)
5060
}
5161
}
5262

53-
override def addPendingRelay(channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit = withMetrics("pending-relay/add", DbBackends.Postgres) {
63+
override def addSettlementCommand(channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit = withMetrics("pending-relay/add", DbBackends.Postgres) {
5464
withLock { pg =>
55-
using(pg.prepareStatement("INSERT INTO pending_relay VALUES (?, ?, ?) ON CONFLICT DO NOTHING")) { statement =>
65+
using(pg.prepareStatement("INSERT INTO pending_settlement_commands VALUES (?, ?, ?) ON CONFLICT DO NOTHING")) { statement =>
5666
statement.setString(1, channelId.toHex)
5767
statement.setLong(2, cmd.id)
5868
statement.setBytes(3, cmdCodec.encode(cmd).require.toByteArray)
@@ -61,35 +71,35 @@ class PgPendingRelayDb(implicit ds: DataSource, lock: PgLock) extends PendingRel
6171
}
6272
}
6373

64-
override def removePendingRelay(channelId: ByteVector32, htlcId: Long): Unit = withMetrics("pending-relay/remove", DbBackends.Postgres) {
74+
override def removeSettlementCommand(channelId: ByteVector32, htlcId: Long): Unit = withMetrics("pending-relay/remove", DbBackends.Postgres) {
6575
withLock { pg =>
66-
using(pg.prepareStatement("DELETE FROM pending_relay WHERE channel_id=? AND htlc_id=?")) { statement =>
76+
using(pg.prepareStatement("DELETE FROM pending_settlement_commands WHERE channel_id=? AND htlc_id=?")) { statement =>
6777
statement.setString(1, channelId.toHex)
6878
statement.setLong(2, htlcId)
6979
statement.executeUpdate()
7080
}
7181
}
7282
}
7383

74-
override def listPendingRelay(channelId: ByteVector32): Seq[HtlcSettlementCommand] = withMetrics("pending-relay/list-channel", DbBackends.Postgres) {
84+
override def listSettlementCommands(channelId: ByteVector32): Seq[HtlcSettlementCommand] = withMetrics("pending-relay/list-channel", DbBackends.Postgres) {
7585
withLock { pg =>
76-
using(pg.prepareStatement("SELECT htlc_id, data FROM pending_relay WHERE channel_id=?")) { statement =>
86+
using(pg.prepareStatement("SELECT htlc_id, data FROM pending_settlement_commands WHERE channel_id=?")) { statement =>
7787
statement.setString(1, channelId.toHex)
7888
val rs = statement.executeQuery()
7989
codecSequence(rs, cmdCodec)
8090
}
8191
}
8292
}
8393

84-
override def listPendingRelay(): Set[(ByteVector32, Long)] = withMetrics("pending-relay/list", DbBackends.Postgres) {
94+
override def listSettlementCommands(): Seq[(ByteVector32, HtlcSettlementCommand)] = withMetrics("pending-relay/list", DbBackends.Postgres) {
8595
withLock { pg =>
86-
using(pg.prepareStatement("SELECT channel_id, htlc_id FROM pending_relay")) { statement =>
96+
using(pg.prepareStatement("SELECT channel_id, data FROM pending_settlement_commands")) { statement =>
8797
val rs = statement.executeQuery()
88-
var q: Queue[(ByteVector32, Long)] = Queue()
98+
var q: Queue[(ByteVector32, HtlcSettlementCommand)] = Queue()
8999
while (rs.next()) {
90-
q = q :+ (rs.getByteVector32FromHex("channel_id"), rs.getLong("htlc_id"))
100+
q = q :+ (rs.getByteVector32FromHex("channel_id"), cmdCodec.decode(rs.getByteVector("data").bits).require.value)
91101
}
92-
q.toSet
102+
q
93103
}
94104
}
95105
}

eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ class SqliteChannelsDb(sqlite: Connection) extends ChannelsDb with Logging {
117117
}
118118

119119
override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel", DbBackends.Sqlite) {
120-
using(sqlite.prepareStatement("DELETE FROM pending_relay WHERE channel_id=?")) { statement =>
120+
using(sqlite.prepareStatement("DELETE FROM pending_settlement_commands WHERE channel_id=?")) { statement =>
121121
statement.setBytes(1, channelId.toArray)
122122
statement.executeUpdate()
123123
}

0 commit comments

Comments
 (0)