Skip to content

Commit 1e2abae

Browse files
authored
Index database metrics by backend (#1758)
1 parent 5f68bf9 commit 1e2abae

14 files changed

+121
-106
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ object Databases extends Logging {
6969
}
7070

7171
object SqliteDatabases {
72-
def apply(auditJdbc: Connection, networkJdbc: Connection, eclairJdbc: Connection): Databases = SqliteDatabases(
72+
def apply(auditJdbc: Connection, networkJdbc: Connection, eclairJdbc: Connection): SqliteDatabases = SqliteDatabases(
7373
network = new SqliteNetworkDb(networkJdbc),
7474
audit = new SqliteAuditDb(auditJdbc),
7575
channels = new SqliteChannelsDb(eclairJdbc),
@@ -155,7 +155,7 @@ object Databases extends Logging {
155155
/**
156156
* Given a parent folder it creates or loads all the databases from a JDBC connection
157157
*/
158-
def sqlite(dbdir: File): Databases = {
158+
def sqlite(dbdir: File): SqliteDatabases = {
159159
dbdir.mkdir()
160160
var sqliteEclair: Connection = null
161161
var sqliteNetwork: Connection = null

eclair-core/src/main/scala/fr/acinq/eclair/db/Monitoring.scala

+13-6
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,31 @@ package fr.acinq.eclair.db
1818

1919
import fr.acinq.eclair.KamonExt
2020
import kamon.Kamon
21+
import kamon.metric.Metric
2122

2223
object Monitoring {
2324

2425
object Metrics {
25-
val FileBackupCompleted = Kamon.counter("db.file-backup.completed")
26-
val FileBackupDuration = Kamon.timer("db.file-backup.duration")
26+
val FileBackupCompleted: Metric.Counter = Kamon.counter("db.file-backup.completed")
27+
val FileBackupDuration: Metric.Timer = Kamon.timer("db.file-backup.duration")
2728

28-
val DbOperation = Kamon.counter("db.operation.execute")
29-
val DbOperationDuration = Kamon.timer("db.operation.duration")
29+
private val DbOperation: Metric.Counter = Kamon.counter("db.operation.execute")
30+
private val DbOperationDuration: Metric.Timer = Kamon.timer("db.operation.duration")
3031

31-
def withMetrics[T](name: String)(operation: => T): T = KamonExt.time(DbOperationDuration.withTag(Tags.DbOperation, name)) {
32-
DbOperation.withTag(Tags.DbOperation, name).increment()
32+
def withMetrics[T](name: String, backend: String)(operation: => T): T = KamonExt.time(DbOperationDuration.withTag(Tags.DbOperation, name).withTag(Tags.DbBackend, backend)) {
33+
DbOperation.withTag(Tags.DbOperation, name).withTag(Tags.DbBackend, backend).increment()
3334
operation
3435
}
3536
}
3637

3738
object Tags {
3839
val DbOperation = "operation"
40+
val DbBackend = "backend"
41+
42+
object DbBackends {
43+
val Sqlite = "sqlite"
44+
val Postgres = "postgres"
45+
}
3946
}
4047

4148
}

eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala

+7-6
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import fr.acinq.eclair.channel.{ChannelErrorOccurred, LocalError, NetworkFeePaid
2222
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
2323
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
2424
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
25+
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
2526
import fr.acinq.eclair.db._
2627
import fr.acinq.eclair.payment._
2728
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong}
@@ -66,7 +67,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
6667
}
6768
}
6869

69-
override def add(e: ChannelEvent): Unit = withMetrics("audit/add-channel-lifecycle") {
70+
override def add(e: ChannelEvent): Unit = withMetrics("audit/add-channel-lifecycle", DbBackends.Postgres) {
7071
inTransaction { pg =>
7172
using(pg.prepareStatement("INSERT INTO channel_events VALUES (?, ?, ?, ?, ?, ?, ?)")) { statement =>
7273
statement.setString(1, e.channelId.toHex)
@@ -81,7 +82,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
8182
}
8283
}
8384

84-
override def add(e: PaymentSent): Unit = withMetrics("audit/add-payment-sent") {
85+
override def add(e: PaymentSent): Unit = withMetrics("audit/add-payment-sent", DbBackends.Postgres) {
8586
inTransaction { pg =>
8687
using(pg.prepareStatement("INSERT INTO sent VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
8788
e.parts.foreach(p => {
@@ -102,7 +103,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
102103
}
103104
}
104105

105-
override def add(e: PaymentReceived): Unit = withMetrics("audit/add-payment-received") {
106+
override def add(e: PaymentReceived): Unit = withMetrics("audit/add-payment-received", DbBackends.Postgres) {
106107
inTransaction { pg =>
107108
using(pg.prepareStatement("INSERT INTO received VALUES (?, ?, ?, ?)")) { statement =>
108109
e.parts.foreach(p => {
@@ -117,7 +118,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
117118
}
118119
}
119120

120-
override def add(e: PaymentRelayed): Unit = withMetrics("audit/add-payment-relayed") {
121+
override def add(e: PaymentRelayed): Unit = withMetrics("audit/add-payment-relayed", DbBackends.Postgres) {
121122
inTransaction { pg =>
122123
val payments = e match {
123124
case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, ts) =>
@@ -141,7 +142,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
141142
}
142143
}
143144

144-
override def add(e: NetworkFeePaid): Unit = withMetrics("audit/add-network-fee") {
145+
override def add(e: NetworkFeePaid): Unit = withMetrics("audit/add-network-fee", DbBackends.Postgres) {
145146
inTransaction { pg =>
146147
using(pg.prepareStatement("INSERT INTO network_fees VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
147148
statement.setString(1, e.channelId.toHex)
@@ -155,7 +156,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
155156
}
156157
}
157158

158-
override def add(e: ChannelErrorOccurred): Unit = withMetrics("audit/add-channel-error") {
159+
override def add(e: ChannelErrorOccurred): Unit = withMetrics("audit/add-channel-error", DbBackends.Postgres) {
159160
inTransaction { pg =>
160161
using(pg.prepareStatement("INSERT INTO channel_errors VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
161162
val (errorName, errorMessage) = e.error match {

eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala

+6-5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import fr.acinq.eclair.channel.HasCommitments
2222
import fr.acinq.eclair.db.ChannelsDb
2323
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
2424
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
25+
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
2526
import fr.acinq.eclair.db.pg.PgUtils.PgLock
2627
import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.stateDataCodec
2728
import grizzled.slf4j.Logging
@@ -63,7 +64,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
6364
}
6465
}
6566

66-
override def addOrUpdateChannel(state: HasCommitments): Unit = withMetrics("channels/add-or-update-channel") {
67+
override def addOrUpdateChannel(state: HasCommitments): Unit = withMetrics("channels/add-or-update-channel", DbBackends.Postgres) {
6768
withLock { pg =>
6869
val data = stateDataCodec.encode(state).require.toByteArray
6970
using(pg.prepareStatement("UPDATE local_channels SET data=? WHERE channel_id=?")) { update =>
@@ -105,7 +106,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
105106
timestampColumn_opt.foreach(updateChannelMetaTimestampColumn(channelId, _))
106107
}
107108

108-
override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel") {
109+
override def removeChannel(channelId: ByteVector32): Unit = withMetrics("channels/remove-channel", DbBackends.Postgres) {
109110
withLock { pg =>
110111
using(pg.prepareStatement("DELETE FROM pending_relay WHERE channel_id=?")) { statement =>
111112
statement.setString(1, channelId.toHex)
@@ -124,7 +125,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
124125
}
125126
}
126127

127-
override def listLocalChannels(): Seq[HasCommitments] = withMetrics("channels/list-local-channels") {
128+
override def listLocalChannels(): Seq[HasCommitments] = withMetrics("channels/list-local-channels", DbBackends.Postgres) {
128129
withLock { pg =>
129130
using(pg.createStatement) { statement =>
130131
val rs = statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=FALSE")
@@ -133,7 +134,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
133134
}
134135
}
135136

136-
override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info") {
137+
override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info", DbBackends.Postgres) {
137138
withLock { pg =>
138139
using(pg.prepareStatement("INSERT INTO htlc_infos VALUES (?, ?, ?, ?)")) { statement =>
139140
statement.setString(1, channelId.toHex)
@@ -145,7 +146,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
145146
}
146147
}
147148

148-
override def listHtlcInfos(channelId: ByteVector32, commitmentNumber: Long): Seq[(ByteVector32, CltvExpiry)] = withMetrics("channels/list-htlc-infos") {
149+
override def listHtlcInfos(channelId: ByteVector32, commitmentNumber: Long): Seq[(ByteVector32, CltvExpiry)] = withMetrics("channels/list-htlc-infos", DbBackends.Postgres) {
149150
withLock { pg =>
150151
using(pg.prepareStatement("SELECT payment_hash, cltv_expiry FROM htlc_infos WHERE channel_id=? AND commitment_number=?")) { statement =>
151152
statement.setString(1, channelId.toHex)

eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala

+14-13
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ package fr.acinq.eclair.db.pg
1919
import fr.acinq.bitcoin.{ByteVector32, Crypto, Satoshi}
2020
import fr.acinq.eclair.ShortChannelId
2121
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
22+
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
2223
import fr.acinq.eclair.db.NetworkDb
2324
import fr.acinq.eclair.router.Router.PublicChannel
2425
import fr.acinq.eclair.wire.protocol.LightningMessageCodecs.{channelAnnouncementCodec, channelUpdateCodec, nodeAnnouncementCodec}
2526
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement}
2627
import grizzled.slf4j.Logging
27-
import javax.sql.DataSource
2828

29+
import javax.sql.DataSource
2930
import scala.collection.immutable.SortedMap
3031

3132
class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
@@ -48,7 +49,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
4849
}
4950
}
5051

51-
override def addNode(n: NodeAnnouncement): Unit = withMetrics("network/add-node") {
52+
override def addNode(n: NodeAnnouncement): Unit = withMetrics("network/add-node", DbBackends.Postgres) {
5253
inTransaction { pg =>
5354
using(pg.prepareStatement("INSERT INTO nodes VALUES (?, ?) ON CONFLICT DO NOTHING")) { statement =>
5455
statement.setString(1, n.nodeId.value.toHex)
@@ -58,7 +59,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
5859
}
5960
}
6061

61-
override def updateNode(n: NodeAnnouncement): Unit = withMetrics("network/update-node") {
62+
override def updateNode(n: NodeAnnouncement): Unit = withMetrics("network/update-node", DbBackends.Postgres) {
6263
inTransaction { pg =>
6364
using(pg.prepareStatement("UPDATE nodes SET data=? WHERE node_id=?")) { statement =>
6465
statement.setBytes(1, nodeAnnouncementCodec.encode(n).require.toByteArray)
@@ -68,7 +69,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
6869
}
6970
}
7071

71-
override def getNode(nodeId: Crypto.PublicKey): Option[NodeAnnouncement] = withMetrics("network/get-node") {
72+
override def getNode(nodeId: Crypto.PublicKey): Option[NodeAnnouncement] = withMetrics("network/get-node", DbBackends.Postgres) {
7273
inTransaction { pg =>
7374
using(pg.prepareStatement("SELECT data FROM nodes WHERE node_id=?")) { statement =>
7475
statement.setString(1, nodeId.value.toHex)
@@ -78,7 +79,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
7879
}
7980
}
8081

81-
override def removeNode(nodeId: Crypto.PublicKey): Unit = withMetrics("network/remove-node") {
82+
override def removeNode(nodeId: Crypto.PublicKey): Unit = withMetrics("network/remove-node", DbBackends.Postgres) {
8283
inTransaction { pg =>
8384
using(pg.prepareStatement("DELETE FROM nodes WHERE node_id=?")) { statement =>
8485
statement.setString(1, nodeId.value.toHex)
@@ -87,7 +88,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
8788
}
8889
}
8990

90-
override def listNodes(): Seq[NodeAnnouncement] = withMetrics("network/list-nodes") {
91+
override def listNodes(): Seq[NodeAnnouncement] = withMetrics("network/list-nodes", DbBackends.Postgres) {
9192
inTransaction { pg =>
9293
using(pg.createStatement()) { statement =>
9394
val rs = statement.executeQuery("SELECT data FROM nodes")
@@ -96,7 +97,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
9697
}
9798
}
9899

99-
override def addChannel(c: ChannelAnnouncement, txid: ByteVector32, capacity: Satoshi): Unit = withMetrics("network/add-channel") {
100+
override def addChannel(c: ChannelAnnouncement, txid: ByteVector32, capacity: Satoshi): Unit = withMetrics("network/add-channel", DbBackends.Postgres) {
100101
inTransaction { pg =>
101102
using(pg.prepareStatement("INSERT INTO channels VALUES (?, ?, ?, ?) ON CONFLICT DO NOTHING")) { statement =>
102103
statement.setLong(1, c.shortChannelId.toLong)
@@ -108,7 +109,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
108109
}
109110
}
110111

111-
override def updateChannel(u: ChannelUpdate): Unit = withMetrics("network/update-channel") {
112+
override def updateChannel(u: ChannelUpdate): Unit = withMetrics("network/update-channel", DbBackends.Postgres) {
112113
val column = if (u.isNode1) "channel_update_1" else "channel_update_2"
113114
inTransaction { pg =>
114115
using(pg.prepareStatement(s"UPDATE channels SET $column=? WHERE short_channel_id=?")) { statement =>
@@ -119,7 +120,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
119120
}
120121
}
121122

122-
override def listChannels(): SortedMap[ShortChannelId, PublicChannel] = withMetrics("network/list-channels") {
123+
override def listChannels(): SortedMap[ShortChannelId, PublicChannel] = withMetrics("network/list-channels", DbBackends.Postgres) {
123124
inTransaction { pg =>
124125
using(pg.createStatement()) { statement =>
125126
val rs = statement.executeQuery("SELECT channel_announcement, txid, capacity_sat, channel_update_1, channel_update_2 FROM channels")
@@ -137,7 +138,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
137138
}
138139
}
139140

140-
override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels") {
141+
override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/remove-channels", DbBackends.Postgres) {
141142
inTransaction { pg =>
142143
using(pg.createStatement) { statement =>
143144
shortChannelIds
@@ -150,7 +151,7 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
150151
}
151152
}
152153

153-
override def addToPruned(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/add-to-pruned") {
154+
override def addToPruned(shortChannelIds: Iterable[ShortChannelId]): Unit = withMetrics("network/add-to-pruned", DbBackends.Postgres) {
154155
inTransaction { pg =>
155156
using(pg.prepareStatement("INSERT INTO pruned VALUES (?) ON CONFLICT DO NOTHING")) { statement =>
156157
shortChannelIds.foreach(shortChannelId => {
@@ -162,15 +163,15 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
162163
}
163164
}
164165

165-
override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned") {
166+
override def removeFromPruned(shortChannelId: ShortChannelId): Unit = withMetrics("network/remove-from-pruned", DbBackends.Postgres) {
166167
inTransaction { pg =>
167168
using(pg.createStatement) { statement =>
168169
statement.executeUpdate(s"DELETE FROM pruned WHERE short_channel_id=${shortChannelId.toLong}")
169170
}
170171
}
171172
}
172173

173-
override def isPruned(shortChannelId: ShortChannelId): Boolean = withMetrics("network/is-pruned") {
174+
override def isPruned(shortChannelId: ShortChannelId): Boolean = withMetrics("network/is-pruned", DbBackends.Postgres) {
174175
inTransaction { pg =>
175176
using(pg.prepareStatement("SELECT short_channel_id from pruned WHERE short_channel_id=?")) { statement =>
176177
statement.setLong(1, shortChannelId.toLong)

0 commit comments

Comments
 (0)