Skip to content

Commit ded5ce0

Browse files
authored
Add metadata to local_channels table (#1724)
* rename `Auditor` to `DbEventHandler` Move it to the `db` package (it was in the `payments` package for historical reasons but doesn't deal only with payment anymore). Better typing for channel lifecycle event. * add meta info to local_channels table Here is the rationale for implementing channel metadata as additional columns in the `local_channels` table of the `channels` db, as opposed to a dedicated `channel_metadata` table of a `audit` db: 1) There is a migration to do (in the `local_channels` table, no less!), but it's just a table migration, as opposed to a data migration, if we had to populate a new table in a separate database. 2) We don't need to worry about creating a new metadata line when a new channel is created (compared to doing add-or-update stuff). It's only _updating_ optional columns in a best-effort manner. 3) We don't need to worry about inconsistencies between two tables located in two separated databases (that's a big one). 4) We may want to use the metadata during operations, not just for audit purposes. For example to close channels that have stayed unused for a long time. 5) The audit db is an append-only log of events and shouldn't be used for anything else. There is no `UPDATE` sql statement in `*AuditDb.scala`. The `channel_metadata` would break that heuristic.
1 parent 2772138 commit ded5ce0

File tree

11 files changed

+310
-63
lines changed

11 files changed

+310
-63
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala

+4-4
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616

1717
package fr.acinq.eclair
1818

19-
import java.nio.charset.StandardCharsets
20-
import java.util.UUID
21-
2219
import akka.actor.ActorRef
2320
import akka.pattern._
2421
import akka.util.Timeout
@@ -30,7 +27,8 @@ import fr.acinq.eclair.blockchain.bitcoind.BitcoinCoreWallet
3027
import fr.acinq.eclair.blockchain.bitcoind.BitcoinCoreWallet.WalletTransaction
3128
import fr.acinq.eclair.blockchain.fee.{FeeratePerByte, FeeratePerKw}
3229
import fr.acinq.eclair.channel._
33-
import fr.acinq.eclair.db.{IncomingPayment, NetworkFee, OutgoingPayment, Stats}
30+
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
31+
import fr.acinq.eclair.db.{IncomingPayment, OutgoingPayment}
3432
import fr.acinq.eclair.io.Peer.{GetPeerInfo, PeerInfo}
3533
import fr.acinq.eclair.io.{NodeURI, Peer, PeerConnection}
3634
import fr.acinq.eclair.payment._
@@ -42,6 +40,8 @@ import fr.acinq.eclair.router.{NetworkStats, RouteCalculation, Router}
4240
import fr.acinq.eclair.wire._
4341
import scodec.bits.ByteVector
4442

43+
import java.nio.charset.StandardCharsets
44+
import java.util.UUID
4545
import scala.concurrent.duration._
4646
import scala.concurrent.{ExecutionContext, Future}
4747
import scala.reflect.ClassTag

eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,8 @@ import fr.acinq.eclair.blockchain.{EclairWallet, _}
3636
import fr.acinq.eclair.channel.Register
3737
import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager}
3838
import fr.acinq.eclair.db.Databases.FileBackup
39-
import fr.acinq.eclair.db.{Databases, FileBackupHandler}
39+
import fr.acinq.eclair.db.{DbEventHandler, Databases, FileBackupHandler}
4040
import fr.acinq.eclair.io.{ClientSpawner, Server, Switchboard}
41-
import fr.acinq.eclair.payment.Auditor
4241
import fr.acinq.eclair.payment.receive.PaymentHandler
4342
import fr.acinq.eclair.payment.relay.Relayer
4443
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
@@ -308,7 +307,7 @@ class Setup(datadir: File,
308307
logger.warn("database backup is disabled")
309308
system.deadLetters
310309
}
311-
audit = system.actorOf(SimpleSupervisor.props(Auditor.props(nodeParams), "auditor", SupervisorStrategy.Resume))
310+
dbEventHandler = system.actorOf(SimpleSupervisor.props(DbEventHandler.props(nodeParams), "db-event-handler", SupervisorStrategy.Resume))
312311
register = system.actorOf(SimpleSupervisor.props(Props(new Register), "register", SupervisorStrategy.Resume))
313312
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register), "payment-handler", SupervisorStrategy.Resume))
314313
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))

eclair-core/src/main/scala/fr/acinq/eclair/db/AuditDb.scala

+10-6
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,18 @@
1616

1717
package fr.acinq.eclair.db
1818

19-
import java.io.Closeable
20-
2119
import fr.acinq.bitcoin.Crypto.PublicKey
2220
import fr.acinq.bitcoin.{ByteVector32, Satoshi}
2321
import fr.acinq.eclair.channel._
22+
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
23+
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
2424
import fr.acinq.eclair.payment.{PaymentReceived, PaymentRelayed, PaymentSent}
2525

26+
import java.io.Closeable
27+
2628
trait AuditDb extends Closeable {
2729

28-
def add(channelLifecycle: ChannelLifecycleEvent): Unit
30+
def add(channelLifecycle: ChannelEvent): Unit
2931

3032
def add(paymentSent: PaymentSent): Unit
3133

@@ -49,8 +51,10 @@ trait AuditDb extends Closeable {
4951

5052
}
5153

52-
case class ChannelLifecycleEvent(channelId: ByteVector32, remoteNodeId: PublicKey, capacity: Satoshi, isFunder: Boolean, isPrivate: Boolean, event: String)
54+
object AuditDb {
55+
56+
case class NetworkFee(remoteNodeId: PublicKey, channelId: ByteVector32, txId: ByteVector32, fee: Satoshi, txType: String, timestamp: Long)
5357

54-
case class NetworkFee(remoteNodeId: PublicKey, channelId: ByteVector32, txId: ByteVector32, fee: Satoshi, txType: String, timestamp: Long)
58+
case class Stats(channelId: ByteVector32, direction: String, avgPaymentAmount: Satoshi, paymentCount: Int, relayFee: Satoshi, networkFee: Satoshi)
5559

56-
case class Stats(channelId: ByteVector32, direction: String, avgPaymentAmount: Satoshi, paymentCount: Int, relayFee: Satoshi, networkFee: Satoshi)
60+
}

eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala

+5-2
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,19 @@
1616

1717
package fr.acinq.eclair.db
1818

19-
import java.io.Closeable
20-
2119
import fr.acinq.bitcoin.ByteVector32
2220
import fr.acinq.eclair.CltvExpiry
2321
import fr.acinq.eclair.channel.HasCommitments
22+
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
23+
24+
import java.io.Closeable
2425

2526
trait ChannelsDb extends Closeable {
2627

2728
def addOrUpdateChannel(state: HasCommitments): Unit
2829

30+
def updateChannelMeta(channelId: ByteVector32, event: ChannelEvent.EventType)
31+
2932
def removeChannel(channelId: ByteVector32): Unit
3033

3134
def listLocalChannels(): Seq[HasCommitments]
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 ACINQ SAS
2+
* Copyright 2021 ACINQ SAS
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -14,19 +14,26 @@
1414
* limitations under the License.
1515
*/
1616

17-
package fr.acinq.eclair.payment
17+
package fr.acinq.eclair.db
1818

1919
import akka.actor.{Actor, ActorLogging, Props}
20+
import fr.acinq.bitcoin.Crypto.PublicKey
21+
import fr.acinq.bitcoin.{ByteVector32, Satoshi}
2022
import fr.acinq.eclair.NodeParams
21-
import fr.acinq.eclair.channel.Helpers.Closing._
23+
import fr.acinq.eclair.channel.Helpers.Closing.{ClosingType, CurrentRemoteClose, LocalClose, MutualClose, NextRemoteClose, RecoveryClose, RevokedClose}
2224
import fr.acinq.eclair.channel.Monitoring.{Metrics => ChannelMetrics, Tags => ChannelTags}
2325
import fr.acinq.eclair.channel._
24-
import fr.acinq.eclair.db.ChannelLifecycleEvent
26+
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
2527
import fr.acinq.eclair.payment.Monitoring.{Metrics => PaymentMetrics, Tags => PaymentTags}
28+
import fr.acinq.eclair.payment._
2629

27-
class Auditor(nodeParams: NodeParams) extends Actor with ActorLogging {
30+
/**
31+
* This actor sits at the interface between our event stream and the database.
32+
*/
33+
class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {
2834

29-
val db = nodeParams.db.audit
35+
val auditDb: AuditDb = nodeParams.db.audit
36+
val channelsDb: ChannelsDb = nodeParams.db.channels
3037

3138
context.system.eventStream.subscribe(self, classOf[PaymentEvent])
3239
context.system.eventStream.subscribe(self, classOf[NetworkFeePaid])
@@ -40,15 +47,17 @@ class Auditor(nodeParams: NodeParams) extends Actor with ActorLogging {
4047
PaymentMetrics.PaymentAmount.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).record(e.recipientAmount.truncateToSatoshi.toLong)
4148
PaymentMetrics.PaymentFees.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).record(e.feesPaid.truncateToSatoshi.toLong)
4249
PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).record(e.parts.length)
43-
db.add(e)
50+
auditDb.add(e)
51+
e.parts.foreach(p => channelsDb.updateChannelMeta(p.toChannelId, ChannelEvent.EventType.PaymentSent))
4452

4553
case _: PaymentFailed =>
4654
PaymentMetrics.PaymentFailed.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).increment()
4755

4856
case e: PaymentReceived =>
4957
PaymentMetrics.PaymentAmount.withTag(PaymentTags.Direction, PaymentTags.Directions.Received).record(e.amount.truncateToSatoshi.toLong)
5058
PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Received).record(e.parts.length)
51-
db.add(e)
59+
auditDb.add(e)
60+
e.parts.foreach(p => channelsDb.updateChannelMeta(p.fromChannelId, ChannelEvent.EventType.PaymentReceived))
5261

5362
case e: PaymentRelayed =>
5463
PaymentMetrics.PaymentAmount
@@ -63,51 +72,76 @@ class Auditor(nodeParams: NodeParams) extends Actor with ActorLogging {
6372
case TrampolinePaymentRelayed(_, incoming, outgoing, _) =>
6473
PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Received).record(incoming.length)
6574
PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).record(outgoing.length)
66-
case _: ChannelPaymentRelayed =>
75+
incoming.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentReceived))
76+
outgoing.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentSent))
77+
case ChannelPaymentRelayed(_, _, _, fromChannelId, toChannelId, _) =>
78+
channelsDb.updateChannelMeta(fromChannelId, ChannelEvent.EventType.PaymentReceived)
79+
channelsDb.updateChannelMeta(toChannelId, ChannelEvent.EventType.PaymentSent)
6780
}
68-
db.add(e)
81+
auditDb.add(e)
6982

70-
case e: NetworkFeePaid => db.add(e)
83+
case e: NetworkFeePaid => auditDb.add(e)
7184

7285
case e: ChannelErrorOccurred =>
7386
e.error match {
7487
case LocalError(_) if e.isFatal => ChannelMetrics.ChannelErrors.withTag(ChannelTags.Origin, ChannelTags.Origins.Local).withTag(ChannelTags.Fatal, value = true).increment()
7588
case LocalError(_) if !e.isFatal => ChannelMetrics.ChannelErrors.withTag(ChannelTags.Origin, ChannelTags.Origins.Local).withTag(ChannelTags.Fatal, value = false).increment()
7689
case RemoteError(_) => ChannelMetrics.ChannelErrors.withTag(ChannelTags.Origin, ChannelTags.Origins.Remote).increment()
7790
}
78-
db.add(e)
91+
auditDb.add(e)
7992

8093
case e: ChannelStateChanged =>
8194
// NB: order matters!
8295
e match {
8396
case ChannelStateChanged(_, channelId, _, remoteNodeId, WAIT_FOR_FUNDING_LOCKED, NORMAL, Some(commitments: Commitments)) =>
8497
ChannelMetrics.ChannelLifecycleEvents.withTag(ChannelTags.Event, ChannelTags.Events.Created).increment()
85-
db.add(ChannelLifecycleEvent(channelId, remoteNodeId, commitments.capacity, commitments.localParams.isFunder, !commitments.announceChannel, "created"))
98+
val event = ChannelEvent.EventType.Created
99+
auditDb.add(ChannelEvent(channelId, remoteNodeId, commitments.capacity, commitments.localParams.isFunder, !commitments.announceChannel, event))
100+
channelsDb.updateChannelMeta(channelId, event)
86101
case ChannelStateChanged(_, _, _, _, WAIT_FOR_INIT_INTERNAL, _, _) =>
102+
case ChannelStateChanged(_, channelId, _, _, OFFLINE, SYNCING, _) =>
103+
channelsDb.updateChannelMeta(channelId, ChannelEvent.EventType.Connected)
87104
case ChannelStateChanged(_, _, _, _, _, CLOSING, _) =>
88105
ChannelMetrics.ChannelLifecycleEvents.withTag(ChannelTags.Event, ChannelTags.Events.Closing).increment()
89106
case _ => ()
90107
}
91108

92109
case e: ChannelClosed =>
93110
ChannelMetrics.ChannelLifecycleEvents.withTag(ChannelTags.Event, ChannelTags.Events.Closed).increment()
94-
val event = e.closingType match {
95-
case _: MutualClose => "mutual"
96-
case _: LocalClose => "local"
97-
case _: RemoteClose => "remote" // can be current or next
98-
case _: RecoveryClose => "recovery"
99-
case _: RevokedClose => "revoked"
100-
}
101-
db.add(ChannelLifecycleEvent(e.channelId, e.commitments.remoteParams.nodeId, e.commitments.commitInput.txOut.amount, e.commitments.localParams.isFunder, !e.commitments.announceChannel, event))
111+
val event = ChannelEvent.EventType.Closed(e.closingType)
112+
auditDb.add(ChannelEvent(e.channelId, e.commitments.remoteParams.nodeId, e.commitments.commitInput.txOut.amount, e.commitments.localParams.isFunder, !e.commitments.announceChannel, event))
113+
channelsDb.updateChannelMeta(e.channelId, event)
102114

103115
}
104116

105117
override def unhandled(message: Any): Unit = log.warning(s"unhandled msg=$message")
106118

107119
}
108120

109-
object Auditor {
110-
111-
def props(nodeParams: NodeParams) = Props(classOf[Auditor], nodeParams)
112-
121+
object DbEventHandler {
122+
123+
def props(nodeParams: NodeParams): Props = Props(new DbEventHandler(nodeParams))
124+
125+
// @formatter:off
126+
case class ChannelEvent(channelId: ByteVector32, remoteNodeId: PublicKey, capacity: Satoshi, isFunder: Boolean, isPrivate: Boolean, event: ChannelEvent.EventType)
127+
object ChannelEvent {
128+
sealed trait EventType { def label: String }
129+
object EventType {
130+
object Created extends EventType { override def label: String = "created" }
131+
object Connected extends EventType { override def label: String = "connected" }
132+
object PaymentSent extends EventType { override def label: String = "sent" }
133+
object PaymentReceived extends EventType { override def label: String = "received" }
134+
case class Closed(closingType: ClosingType) extends EventType {
135+
override def label: String = closingType match {
136+
case _: MutualClose => "mutual"
137+
case _: LocalClose => "local"
138+
case _: CurrentRemoteClose => "remote"
139+
case _: NextRemoteClose => "remote"
140+
case _: RecoveryClose => "recovery"
141+
case _: RevokedClose => "revoked"
142+
}
143+
}
144+
}
145+
}
146+
// @formatter:on
113147
}

eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package fr.acinq.eclair.db.pg
1919
import fr.acinq.bitcoin.Crypto.PublicKey
2020
import fr.acinq.bitcoin.{ByteVector32, Satoshi, SatoshiLong}
2121
import fr.acinq.eclair.channel.{ChannelErrorOccurred, LocalError, NetworkFeePaid, RemoteError}
22+
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
23+
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
2224
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
2325
import fr.acinq.eclair.db._
2426
import fr.acinq.eclair.payment._
@@ -64,15 +66,15 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
6466
}
6567
}
6668

67-
override def add(e: ChannelLifecycleEvent): Unit = withMetrics("audit/add-channel-lifecycle") {
69+
override def add(e: ChannelEvent): Unit = withMetrics("audit/add-channel-lifecycle") {
6870
inTransaction { pg =>
6971
using(pg.prepareStatement("INSERT INTO channel_events VALUES (?, ?, ?, ?, ?, ?, ?)")) { statement =>
7072
statement.setString(1, e.channelId.toHex)
7173
statement.setString(2, e.remoteNodeId.value.toHex)
7274
statement.setLong(3, e.capacity.toLong)
7375
statement.setBoolean(4, e.isFunder)
7476
statement.setBoolean(5, e.isPrivate)
75-
statement.setString(6, e.event)
77+
statement.setString(6, e.event.label)
7678
statement.setLong(7, System.currentTimeMillis)
7779
statement.executeUpdate()
7880
}

eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala

+43-4
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ import fr.acinq.bitcoin.ByteVector32
2020
import fr.acinq.eclair.CltvExpiry
2121
import fr.acinq.eclair.channel.HasCommitments
2222
import fr.acinq.eclair.db.ChannelsDb
23+
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
2324
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
2425
import fr.acinq.eclair.db.pg.PgUtils.DatabaseLock
2526
import fr.acinq.eclair.wire.ChannelCodecs.stateDataCodec
2627
import grizzled.slf4j.Logging
27-
import javax.sql.DataSource
2828

29+
import java.sql.Statement
30+
import javax.sql.DataSource
2931
import scala.collection.immutable.Queue
3032

3133
class PgChannelsDb(implicit ds: DataSource, lock: DatabaseLock) extends ChannelsDb with Logging {
@@ -35,13 +37,25 @@ class PgChannelsDb(implicit ds: DataSource, lock: DatabaseLock) extends Channels
3537
import lock._
3638

3739
val DB_NAME = "channels"
38-
val CURRENT_VERSION = 2
40+
val CURRENT_VERSION = 3
41+
42+
def migration23(statement: Statement): Unit = {
43+
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN created_timestamp BIGINT")
44+
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_payment_sent_timestamp BIGINT")
45+
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_payment_received_timestamp BIGINT")
46+
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_connected_timestamp BIGINT")
47+
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN closed_timestamp BIGINT")
48+
}
3949

4050
inTransaction { pg =>
4151
using(pg.createStatement()) { statement =>
4252
getVersion(statement, DB_NAME, CURRENT_VERSION) match {
53+
case 2 =>
54+
logger.warn(s"migrating db $DB_NAME, found version=2 current=$CURRENT_VERSION")
55+
migration23(statement)
56+
setVersion(statement, DB_NAME, CURRENT_VERSION)
4357
case CURRENT_VERSION =>
44-
statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE)")
58+
statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE, created_timestamp BIGINT, last_payment_sent_timestamp BIGINT, last_payment_received_timestamp BIGINT, last_connected_timestamp BIGINT, closed_timestamp BIGINT)")
4559
statement.executeUpdate("CREATE TABLE IF NOT EXISTS htlc_infos (channel_id TEXT NOT NULL, commitment_number TEXT NOT NULL, payment_hash TEXT NOT NULL, cltv_expiry BIGINT NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))")
4660
statement.executeUpdate("CREATE INDEX IF NOT EXISTS htlc_infos_idx ON htlc_infos(channel_id, commitment_number)")
4761
case unknownVersion => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
@@ -56,7 +70,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: DatabaseLock) extends Channels
5670
update.setBytes(1, data)
5771
update.setString(2, state.channelId.toHex)
5872
if (update.executeUpdate() == 0) {
59-
using(pg.prepareStatement("INSERT INTO local_channels VALUES (?, ?, FALSE)")) { statement =>
73+
using(pg.prepareStatement("INSERT INTO local_channels (channel_id, data, is_closed) VALUES (?, ?, FALSE)")) { statement =>
6074
statement.setString(1, state.channelId.toHex)
6175
statement.setBytes(2, data)
6276
statement.executeUpdate()
@@ -66,6 +80,31 @@ class PgChannelsDb(implicit ds: DataSource, lock: DatabaseLock) extends Channels
6680
}
6781
}
6882

83+
/**
84+
* Helper method to factor updating timestamp columns
85+
*/
86+
private def updateChannelMetaTimestampColumn(channelId: ByteVector32, columnName: String): Unit = {
87+
inTransaction { pg =>
88+
using(pg.prepareStatement(s"UPDATE local_channels SET $columnName=? WHERE channel_id=?")) { statement =>
89+
statement.setLong(1, System.currentTimeMillis)
90+
statement.setString(2, channelId.toHex)
91+
statement.executeUpdate()
92+
}
93+
}
94+
}
95+
96+
override def updateChannelMeta(channelId: ByteVector32, event: ChannelEvent.EventType): Unit = {
97+
val timestampColumn_opt = event match {
98+
case ChannelEvent.EventType.Created => Some("created_timestamp")
99+
case ChannelEvent.EventType.Connected => Some("last_connected_timestamp")
100+
case ChannelEvent.EventType.PaymentReceived => Some("last_payment_received_timestamp")
101+
case ChannelEvent.EventType.PaymentSent => Some("last_payment_sent_timestamp")
102+
case _: ChannelEvent.EventType.Closed => Some("closed_timestamp")
103+
case _ => None
104+
}
105+
timestampColumn_opt.foreach(updateChannelMetaTimestampColumn(channelId, _))
106+
}
107+
69108
override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel") {
70109
withLock { pg =>
71110
using(pg.prepareStatement("DELETE FROM pending_relay WHERE channel_id=?")) { statement =>

eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteAuditDb.scala

+4-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package fr.acinq.eclair.db.sqlite
1919
import fr.acinq.bitcoin.Crypto.{PrivateKey, PublicKey}
2020
import fr.acinq.bitcoin.{ByteVector32, Satoshi, SatoshiLong}
2121
import fr.acinq.eclair.channel.{ChannelErrorOccurred, LocalError, NetworkFeePaid, RemoteError}
22+
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
23+
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
2224
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
2325
import fr.acinq.eclair.db._
2426
import fr.acinq.eclair.payment._
@@ -107,14 +109,14 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
107109
}
108110
}
109111

110-
override def add(e: ChannelLifecycleEvent): Unit = withMetrics("audit/add-channel-lifecycle") {
112+
override def add(e: ChannelEvent): Unit = withMetrics("audit/add-channel-lifecycle") {
111113
using(sqlite.prepareStatement("INSERT INTO channel_events VALUES (?, ?, ?, ?, ?, ?, ?)")) { statement =>
112114
statement.setBytes(1, e.channelId.toArray)
113115
statement.setBytes(2, e.remoteNodeId.value.toArray)
114116
statement.setLong(3, e.capacity.toLong)
115117
statement.setBoolean(4, e.isFunder)
116118
statement.setBoolean(5, e.isPrivate)
117-
statement.setString(6, e.event)
119+
statement.setString(6, e.event.label)
118120
statement.setLong(7, System.currentTimeMillis)
119121
statement.executeUpdate()
120122
}

0 commit comments

Comments
 (0)