Skip to content

Commit 08faf3b

Browse files
authored
Add json columns in Postgres (#1865)
A json column has been added to the few tables that contains an opaque serialized blob: - `local_channels.data` - `nodes.data` - `channels.channel_announcement`, `channels.channel_update_x` We can now access all the individual data fields from SQL. For the serialization, we use the same serializers than the one that were previously used by the API. They have been moved to the `eclair-core` module and simplified a bit. There are two json data types in Postgres: `JSON` and `JSONB`. We use the latter one, which is more recent, and allows indexing. An alternative to this PR would have been to use columns, but: - there would have been a *lot* of columns for the channel data - every modification of our types would have required a db migration NB: to handle non-backwards compatible changes in the json serializersi, all the json columns can be recomputed on restart by setting `eclair.db.reset-json-columns=true`. Change in in ChannelCodecsSpec: The goal of this test is to make sure that, in addition to successfully decoding data that encoded with an older codec, we actually read the correct data. Just because there is no error doesn't mean that we interpreted the data properly. For example we could invert a `payment_hash` and a `payment_preimage`. We can't compare object to object, because the current version of the class has probably changed too. That's why we compare using the json representation of the data, that we amend to ignore new or modified fields. After doing a manual comparison, I updated the test to use the current json serializers, and replaced the test data with the latest json serialization. This allows us to remove all the tweaks that we added over time to take into account new and updated fields.
1 parent bd57d41 commit 08faf3b

File tree

13 files changed

+651
-529
lines changed

13 files changed

+651
-529
lines changed

eclair-core/src/main/resources/reference.conf

+1
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ eclair {
224224
username = ""
225225
password = ""
226226
readonly-user = "" // if defined, this user will be granted read-only access to all tables in the database
227+
reset-json-columns = false // in case of a json format change, this allows a full re-serialization of json data
227228
pool {
228229
max-size = 10 // recommended value = number_of_cpu_cores * 2
229230
connection-timeout = 30 seconds

eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala

+14-3
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ object Databases extends Logging {
9696
instanceId: UUID,
9797
lock: PgLock = PgLock.NoLock,
9898
jdbcUrlFile_opt: Option[File],
99-
readOnlyUser_opt: Option[String])(implicit system: ActorSystem): PostgresDatabases = {
99+
readOnlyUser_opt: Option[String],
100+
resetJsonColumns: Boolean)(implicit system: ActorSystem): PostgresDatabases = {
100101

101102
jdbcUrlFile_opt.foreach(jdbcUrlFile => checkIfDatabaseUrlIsUnchanged(hikariConfig.getJdbcUrl, jdbcUrlFile))
102103

@@ -132,6 +133,14 @@ object Databases extends Logging {
132133
}
133134
}
134135

136+
if (resetJsonColumns) {
137+
logger.warn("resetting json columns...")
138+
PgUtils.inTransaction { connection =>
139+
databases.channels.resetJsonColumns(connection)
140+
databases.network.resetJsonColumns(connection)
141+
}
142+
}
143+
135144
databases
136145
}
137146

@@ -197,7 +206,8 @@ object Databases extends Logging {
197206
val port = dbConfig.getInt("postgres.port")
198207
val username = if (dbConfig.getIsNull("postgres.username") || dbConfig.getString("postgres.username").isEmpty) None else Some(dbConfig.getString("postgres.username"))
199208
val password = if (dbConfig.getIsNull("postgres.password") || dbConfig.getString("postgres.password").isEmpty) None else Some(dbConfig.getString("postgres.password"))
200-
val readOnlyUser_opt = if (dbConfig.getIsNull("postgres.readonly-user") || dbConfig.getString("postgres.readonly-user").isEmpty) None else Some(dbConfig.getString("postgres.readonly-user"))
209+
val readOnlyUser_opt = if (dbConfig.getIsNull("postgres.readonly-user") || dbConfig.getString("postgres.readonly-user").isEmpty) None else Some(dbConfig.getString("postgres.readonly-user"))
210+
val resetJsonColumns = dbConfig.getBoolean("postgres.reset-json-columns")
201211

202212
val hikariConfig = new HikariConfig()
203213
hikariConfig.setJdbcUrl(s"jdbc:postgresql://$host:$port/$database")
@@ -230,7 +240,8 @@ object Databases extends Logging {
230240
instanceId = instanceId,
231241
lock = lock,
232242
jdbcUrlFile_opt = Some(jdbcUrlFile),
233-
readOnlyUser_opt = readOnlyUser_opt
243+
readOnlyUser_opt = readOnlyUser_opt,
244+
resetJsonColumns = resetJsonColumns
234245
)
235246
}
236247

eclair-core/src/main/scala/fr/acinq/eclair/db/jdbc/JdbcUtils.scala

+31-1
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@ package fr.acinq.eclair.db.jdbc
1818

1919
import fr.acinq.bitcoin.ByteVector32
2020
import fr.acinq.eclair.MilliSatoshi
21+
import grizzled.slf4j.Logger
2122
import org.sqlite.SQLiteConnection
2223
import scodec.Decoder
2324
import scodec.bits.{BitVector, ByteVector}
2425

25-
import java.sql.{Connection, ResultSet, Statement, Timestamp}
26+
import java.sql.{Connection, PreparedStatement, ResultSet, Statement, Timestamp}
2627
import java.util.UUID
2728
import javax.sql.DataSource
2829

@@ -98,6 +99,35 @@ trait JdbcUtils {
9899
}
99100
}
100101

102+
/**
103+
* A utility method that efficiently migrate a table using a provided migration method
104+
*/
105+
def migrateTable(source: Connection,
106+
destination: Connection,
107+
sourceTable: String,
108+
migrateSql: String,
109+
migrate: (ResultSet, PreparedStatement) => Unit)(implicit logger: Logger): Int = {
110+
val insertStatement = destination.prepareStatement(migrateSql)
111+
val batchSize = 50
112+
JdbcUtils.using(source.prepareStatement(s"SELECT * FROM $sourceTable")) { queryStatement =>
113+
val rs = queryStatement.executeQuery()
114+
var inserted = 0
115+
var batchCount = 0
116+
while (rs.next()) {
117+
migrate(rs, insertStatement)
118+
insertStatement.addBatch()
119+
batchCount = batchCount + 1
120+
if (batchCount % batchSize == 0) {
121+
inserted = inserted + insertStatement.executeBatch().sum
122+
batchCount = 0
123+
}
124+
}
125+
inserted = inserted + insertStatement.executeBatch().sum
126+
logger.info(s"migrated $inserted rows from table $sourceTable")
127+
inserted
128+
}
129+
}
130+
101131
case class ExtendedResultSet(rs: ResultSet) extends Iterable[ResultSet] {
102132

103133
/**

eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala

+39-6
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,21 @@ import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
2727
import fr.acinq.eclair.db.pg.PgUtils.PgLock
2828
import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.stateDataCodec
2929
import grizzled.slf4j.Logging
30+
import scodec.bits.BitVector
3031

31-
import java.sql.{Statement, Timestamp}
32+
import java.sql.{Connection, Statement, Timestamp}
3233
import java.time.Instant
3334
import javax.sql.DataSource
3435

3536
class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb with Logging {
3637

3738
import PgUtils.ExtendedResultSet._
3839
import PgUtils._
40+
import fr.acinq.eclair.json.JsonSerializers.{formats, serialization}
3941
import lock._
4042

4143
val DB_NAME = "channels"
42-
val CURRENT_VERSION = 4
44+
val CURRENT_VERSION = 5
4345

4446
inTransaction { pg =>
4547
using(pg.createStatement()) { statement =>
@@ -62,37 +64,68 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
6264
statement.executeUpdate("ALTER TABLE htlc_infos ALTER COLUMN commitment_number SET DATA TYPE BIGINT USING commitment_number::BIGINT")
6365
}
6466

67+
def migration45(statement: Statement): Unit = {
68+
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN json JSONB")
69+
resetJsonColumns(pg)
70+
statement.executeUpdate("ALTER TABLE local_channels ALTER COLUMN json SET NOT NULL")
71+
statement.executeUpdate("CREATE INDEX local_channels_type_idx ON local_channels ((json->>'type'))")
72+
statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local_channels ((json->'commitments'->'remoteParams'->>'nodeId'))")
73+
}
74+
6575
getVersion(statement, DB_NAME) match {
6676
case None =>
67-
statement.executeUpdate("CREATE TABLE local_channels (channel_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE, created_timestamp TIMESTAMP WITH TIME ZONE, last_payment_sent_timestamp TIMESTAMP WITH TIME ZONE, last_payment_received_timestamp TIMESTAMP WITH TIME ZONE, last_connected_timestamp TIMESTAMP WITH TIME ZONE, closed_timestamp TIMESTAMP WITH TIME ZONE)")
77+
statement.executeUpdate("CREATE TABLE local_channels (channel_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, json JSONB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE, created_timestamp TIMESTAMP WITH TIME ZONE, last_payment_sent_timestamp TIMESTAMP WITH TIME ZONE, last_payment_received_timestamp TIMESTAMP WITH TIME ZONE, last_connected_timestamp TIMESTAMP WITH TIME ZONE, closed_timestamp TIMESTAMP WITH TIME ZONE)")
6878
statement.executeUpdate("CREATE TABLE htlc_infos (channel_id TEXT NOT NULL, commitment_number BIGINT NOT NULL, payment_hash TEXT NOT NULL, cltv_expiry BIGINT NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))")
79+
80+
statement.executeUpdate("CREATE INDEX local_channels_type_idx ON local_channels ((json->>'type'))")
81+
statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local_channels ((json->'commitments'->'remoteParams'->>'nodeId'))")
6982
statement.executeUpdate("CREATE INDEX htlc_infos_idx ON htlc_infos(channel_id, commitment_number)")
7083
case Some(v@2) =>
7184
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
7285
migration23(statement)
7386
migration34(statement)
87+
migration45(statement)
7488
case Some(v@3) =>
7589
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
7690
migration34(statement)
91+
migration45(statement)
92+
case Some(v@4) =>
93+
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
94+
migration45(statement)
7795
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
7896
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
7997
}
8098
setVersion(statement, DB_NAME, CURRENT_VERSION)
8199
}
82100
}
83101

102+
/** Sometimes we may want to do a full reset when we update the json format */
103+
def resetJsonColumns(connection: Connection): Unit = {
104+
migrateTable(connection, connection,
105+
"local_channels",
106+
"UPDATE local_channels SET json=?::JSONB WHERE channel_id=?",
107+
(rs, statement) => {
108+
val state = stateDataCodec.decode(BitVector(rs.getBytes("data"))).require.value
109+
val json = serialization.writePretty(state)
110+
statement.setString(1, json)
111+
statement.setString(2, state.channelId.toHex)
112+
}
113+
)(logger)
114+
}
115+
84116
override def addOrUpdateChannel(state: HasCommitments): Unit = withMetrics("channels/add-or-update-channel", DbBackends.Postgres) {
85117
withLock { pg =>
86118
val data = stateDataCodec.encode(state).require.toByteArray
87119
using(pg.prepareStatement(
88120
"""
89-
| INSERT INTO local_channels (channel_id, data, is_closed)
90-
| VALUES (?, ?, FALSE)
121+
| INSERT INTO local_channels (channel_id, data, json, is_closed)
122+
| VALUES (?, ?, ?::JSONB, FALSE)
91123
| ON CONFLICT (channel_id)
92-
| DO UPDATE SET data = EXCLUDED.data ;
124+
| DO UPDATE SET data = EXCLUDED.data, json = EXCLUDED.json ;
93125
| """.stripMargin)) { statement =>
94126
statement.setString(1, state.channelId.toHex)
95127
statement.setBytes(2, data)
128+
statement.setString(3, serialization.writePretty(state))
96129
statement.executeUpdate()
97130
}
98131
}

0 commit comments

Comments
 (0)