Skip to content

Commit f8feb19

Browse files
authored
Use schemas in Postgres (#1866)
Instead of having a flat organization under the default `public` schema, we classify tables in schemas. There is roughly one schema per database type. The new hierarchy is: - `local` - `channels` - `htlc_infos` - `pending_settlement_commands` - `peers` - `network` - `nodes` - `public_channels` - `pruned_channels` - `payments` - `received` - `sent` - `audit` - (all the audit db tables) - `public` - `lease` - `versions` Note in particular, the change in naming for local channels vs external channels: - `local_channels` -> `local.channels` - `channels` -> `network.public_channels` The two internal tables `lease` and `versions` stay in the `public` schema, because we have no meta way of migrating them.
1 parent 08faf3b commit f8feb19

13 files changed

+259
-152
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala

+6-2
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,12 @@ object Databases extends Logging {
127127
readOnlyUser_opt.foreach { readOnlyUser =>
128128
PgUtils.inTransaction { connection =>
129129
using(connection.createStatement()) { statement =>
130-
logger.info(s"granting read-only access to user=$readOnlyUser")
131-
statement.executeUpdate(s"GRANT SELECT ON ALL TABLES IN SCHEMA public TO $readOnlyUser")
130+
val schemas = "public" :: "audit" :: "local" :: "network" :: "payments" :: Nil
131+
schemas.foreach { schema =>
132+
logger.info(s"granting read-only access to user=$readOnlyUser schema=$schema")
133+
statement.executeUpdate(s"GRANT USAGE ON SCHEMA $schema TO $readOnlyUser")
134+
statement.executeUpdate(s"GRANT SELECT ON ALL TABLES IN SCHEMA $schema TO $readOnlyUser")
135+
}
132136
}
133137
}
134138
}

eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala

+47-29
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
4040
import ExtendedResultSet._
4141

4242
val DB_NAME = "audit"
43-
val CURRENT_VERSION = 6
43+
val CURRENT_VERSION = 7
4444

4545
case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, timestamp: Long)
4646

@@ -62,32 +62,50 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
6262
statement.executeUpdate("ALTER TABLE channel_errors ALTER COLUMN timestamp SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + timestamp * interval '1 millisecond'")
6363
}
6464

65+
def migration67(statement: Statement): Unit = {
66+
statement.executeUpdate("CREATE SCHEMA audit")
67+
statement.executeUpdate("ALTER TABLE sent SET SCHEMA audit")
68+
statement.executeUpdate("ALTER TABLE received SET SCHEMA audit")
69+
statement.executeUpdate("ALTER TABLE relayed SET SCHEMA audit")
70+
statement.executeUpdate("ALTER TABLE relayed_trampoline SET SCHEMA audit")
71+
statement.executeUpdate("ALTER TABLE network_fees SET SCHEMA audit")
72+
statement.executeUpdate("ALTER TABLE channel_events SET SCHEMA audit")
73+
statement.executeUpdate("ALTER TABLE channel_errors SET SCHEMA audit")
74+
}
75+
6576
getVersion(statement, DB_NAME) match {
6677
case None =>
67-
statement.executeUpdate("CREATE TABLE sent (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, recipient_amount_msat BIGINT NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash TEXT NOT NULL, payment_preimage TEXT NOT NULL, recipient_node_id TEXT NOT NULL, to_channel_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
68-
statement.executeUpdate("CREATE TABLE received (amount_msat BIGINT NOT NULL, payment_hash TEXT NOT NULL, from_channel_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
69-
statement.executeUpdate("CREATE TABLE relayed (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, channel_id TEXT NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
70-
statement.executeUpdate("CREATE TABLE relayed_trampoline (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, next_node_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
71-
statement.executeUpdate("CREATE TABLE network_fees (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, tx_id TEXT NOT NULL, fee_sat BIGINT NOT NULL, tx_type TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
72-
statement.executeUpdate("CREATE TABLE channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
73-
statement.executeUpdate("CREATE TABLE channel_errors (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal BOOLEAN NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
78+
statement.executeUpdate("CREATE SCHEMA audit")
7479

75-
statement.executeUpdate("CREATE INDEX sent_timestamp_idx ON sent(timestamp)")
76-
statement.executeUpdate("CREATE INDEX received_timestamp_idx ON received(timestamp)")
77-
statement.executeUpdate("CREATE INDEX relayed_timestamp_idx ON relayed(timestamp)")
78-
statement.executeUpdate("CREATE INDEX relayed_payment_hash_idx ON relayed(payment_hash)")
79-
statement.executeUpdate("CREATE INDEX relayed_trampoline_timestamp_idx ON relayed_trampoline(timestamp)")
80-
statement.executeUpdate("CREATE INDEX relayed_trampoline_payment_hash_idx ON relayed_trampoline(payment_hash)")
81-
statement.executeUpdate("CREATE INDEX network_fees_timestamp_idx ON network_fees(timestamp)")
82-
statement.executeUpdate("CREATE INDEX channel_events_timestamp_idx ON channel_events(timestamp)")
83-
statement.executeUpdate("CREATE INDEX channel_errors_timestamp_idx ON channel_errors(timestamp)")
80+
statement.executeUpdate("CREATE TABLE audit.sent (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, recipient_amount_msat BIGINT NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash TEXT NOT NULL, payment_preimage TEXT NOT NULL, recipient_node_id TEXT NOT NULL, to_channel_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
81+
statement.executeUpdate("CREATE TABLE audit.received (amount_msat BIGINT NOT NULL, payment_hash TEXT NOT NULL, from_channel_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
82+
statement.executeUpdate("CREATE TABLE audit.relayed (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, channel_id TEXT NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
83+
statement.executeUpdate("CREATE TABLE audit.relayed_trampoline (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, next_node_id TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
84+
statement.executeUpdate("CREATE TABLE audit.network_fees (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, tx_id TEXT NOT NULL, fee_sat BIGINT NOT NULL, tx_type TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
85+
statement.executeUpdate("CREATE TABLE audit.channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
86+
87+
statement.executeUpdate("CREATE TABLE audit.channel_errors (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal BOOLEAN NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
88+
statement.executeUpdate("CREATE INDEX sent_timestamp_idx ON audit.sent(timestamp)")
89+
statement.executeUpdate("CREATE INDEX received_timestamp_idx ON audit.received(timestamp)")
90+
statement.executeUpdate("CREATE INDEX relayed_timestamp_idx ON audit.relayed(timestamp)")
91+
statement.executeUpdate("CREATE INDEX relayed_payment_hash_idx ON audit.relayed(payment_hash)")
92+
statement.executeUpdate("CREATE INDEX relayed_trampoline_timestamp_idx ON audit.relayed_trampoline(timestamp)")
93+
statement.executeUpdate("CREATE INDEX relayed_trampoline_payment_hash_idx ON audit.relayed_trampoline(payment_hash)")
94+
statement.executeUpdate("CREATE INDEX network_fees_timestamp_idx ON audit.network_fees(timestamp)")
95+
statement.executeUpdate("CREATE INDEX channel_events_timestamp_idx ON audit.channel_events(timestamp)")
96+
statement.executeUpdate("CREATE INDEX channel_errors_timestamp_idx ON audit.channel_errors(timestamp)")
8497
case Some(v@4) =>
8598
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
8699
migration45(statement)
87100
migration56(statement)
101+
migration67(statement)
88102
case Some(v@5) =>
89103
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
90104
migration56(statement)
105+
migration67(statement)
106+
case Some(v@6) =>
107+
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
108+
migration67(statement)
91109
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
92110
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
93111
}
@@ -97,7 +115,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
97115

98116
override def add(e: ChannelEvent): Unit = withMetrics("audit/add-channel-lifecycle", DbBackends.Postgres) {
99117
inTransaction { pg =>
100-
using(pg.prepareStatement("INSERT INTO channel_events VALUES (?, ?, ?, ?, ?, ?, ?)")) { statement =>
118+
using(pg.prepareStatement("INSERT INTO audit.channel_events VALUES (?, ?, ?, ?, ?, ?, ?)")) { statement =>
101119
statement.setString(1, e.channelId.toHex)
102120
statement.setString(2, e.remoteNodeId.value.toHex)
103121
statement.setLong(3, e.capacity.toLong)
@@ -112,7 +130,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
112130

113131
override def add(e: PaymentSent): Unit = withMetrics("audit/add-payment-sent", DbBackends.Postgres) {
114132
inTransaction { pg =>
115-
using(pg.prepareStatement("INSERT INTO sent VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
133+
using(pg.prepareStatement("INSERT INTO audit.sent VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
116134
e.parts.foreach(p => {
117135
statement.setLong(1, p.amount.toLong)
118136
statement.setLong(2, p.feesPaid.toLong)
@@ -133,7 +151,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
133151

134152
override def add(e: PaymentReceived): Unit = withMetrics("audit/add-payment-received", DbBackends.Postgres) {
135153
inTransaction { pg =>
136-
using(pg.prepareStatement("INSERT INTO received VALUES (?, ?, ?, ?)")) { statement =>
154+
using(pg.prepareStatement("INSERT INTO audit.received VALUES (?, ?, ?, ?)")) { statement =>
137155
e.parts.foreach(p => {
138156
statement.setLong(1, p.amount.toLong)
139157
statement.setString(2, e.paymentHash.toHex)
@@ -153,7 +171,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
153171
// non-trampoline relayed payments have one input and one output
154172
Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", ts))
155173
case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, ts) =>
156-
using(pg.prepareStatement("INSERT INTO relayed_trampoline VALUES (?, ?, ?, ?)")) { statement =>
174+
using(pg.prepareStatement("INSERT INTO audit.relayed_trampoline VALUES (?, ?, ?, ?)")) { statement =>
157175
statement.setString(1, e.paymentHash.toHex)
158176
statement.setLong(2, nextTrampolineAmount.toLong)
159177
statement.setString(3, nextTrampolineNodeId.value.toHex)
@@ -164,7 +182,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
164182
incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", ts)) ++ outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", ts))
165183
}
166184
for (p <- payments) {
167-
using(pg.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
185+
using(pg.prepareStatement("INSERT INTO audit.relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
168186
statement.setString(1, e.paymentHash.toHex)
169187
statement.setLong(2, p.amount.toLong)
170188
statement.setString(3, p.channelId.toHex)
@@ -179,7 +197,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
179197

180198
override def add(e: NetworkFeePaid): Unit = withMetrics("audit/add-network-fee", DbBackends.Postgres) {
181199
inTransaction { pg =>
182-
using(pg.prepareStatement("INSERT INTO network_fees VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
200+
using(pg.prepareStatement("INSERT INTO audit.network_fees VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
183201
statement.setString(1, e.channelId.toHex)
184202
statement.setString(2, e.remoteNodeId.value.toHex)
185203
statement.setString(3, e.tx.txid.toHex)
@@ -193,7 +211,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
193211

194212
override def add(e: ChannelErrorOccurred): Unit = withMetrics("audit/add-channel-error", DbBackends.Postgres) {
195213
inTransaction { pg =>
196-
using(pg.prepareStatement("INSERT INTO channel_errors VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
214+
using(pg.prepareStatement("INSERT INTO audit.channel_errors VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
197215
val (errorName, errorMessage) = e.error match {
198216
case LocalError(t) => (t.getClass.getSimpleName, t.getMessage)
199217
case RemoteError(error) => ("remote", error.toAscii)
@@ -211,7 +229,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
211229

212230
override def listSent(from: Long, to: Long): Seq[PaymentSent] =
213231
inTransaction { pg =>
214-
using(pg.prepareStatement("SELECT * FROM sent WHERE timestamp BETWEEN ? AND ?")) { statement =>
232+
using(pg.prepareStatement("SELECT * FROM audit.sent WHERE timestamp BETWEEN ? AND ?")) { statement =>
215233
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
216234
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
217235
statement.executeQuery()
@@ -241,7 +259,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
241259

242260
override def listReceived(from: Long, to: Long): Seq[PaymentReceived] =
243261
inTransaction { pg =>
244-
using(pg.prepareStatement("SELECT * FROM received WHERE timestamp BETWEEN ? AND ?")) { statement =>
262+
using(pg.prepareStatement("SELECT * FROM audit.received WHERE timestamp BETWEEN ? AND ?")) { statement =>
245263
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
246264
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
247265
statement.executeQuery()
@@ -262,7 +280,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
262280

263281
override def listRelayed(from: Long, to: Long): Seq[PaymentRelayed] =
264282
inTransaction { pg =>
265-
val trampolineByHash = using(pg.prepareStatement("SELECT * FROM relayed_trampoline WHERE timestamp BETWEEN ? and ?")) { statement =>
283+
val trampolineByHash = using(pg.prepareStatement("SELECT * FROM audit.relayed_trampoline WHERE timestamp BETWEEN ? and ?")) { statement =>
266284
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
267285
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
268286
statement.executeQuery()
@@ -273,7 +291,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
273291
trampolineByHash + (paymentHash -> (amount, nodeId))
274292
}
275293
}
276-
val relayedByHash = using(pg.prepareStatement("SELECT * FROM relayed WHERE timestamp BETWEEN ? and ?")) { statement =>
294+
val relayedByHash = using(pg.prepareStatement("SELECT * FROM audit.relayed WHERE timestamp BETWEEN ? and ?")) { statement =>
277295
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
278296
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
279297
statement.executeQuery()
@@ -308,7 +326,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
308326

309327
override def listNetworkFees(from: Long, to: Long): Seq[NetworkFee] =
310328
inTransaction { pg =>
311-
using(pg.prepareStatement("SELECT * FROM network_fees WHERE timestamp BETWEEN ? and ? ORDER BY timestamp")) { statement =>
329+
using(pg.prepareStatement("SELECT * FROM audit.network_fees WHERE timestamp BETWEEN ? and ? ORDER BY timestamp")) { statement =>
312330
statement.setTimestamp(1, Timestamp.from(Instant.ofEpochMilli(from)))
313331
statement.setTimestamp(2, Timestamp.from(Instant.ofEpochMilli(to)))
314332
statement.executeQuery().map { rs =>

0 commit comments

Comments
 (0)