Skip to content

Commit

Permalink
Refactor db migration (#1901)
Browse files Browse the repository at this point in the history
* Refactor migration to newer versions of databases
* Put DB name and version in companion object and share it with tests
  • Loading branch information
thomash-acinq authored Aug 12, 2021
1 parent a342717 commit 19f4d1f
Show file tree
Hide file tree
Showing 19 changed files with 215 additions and 198 deletions.
39 changes: 19 additions & 20 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@ import java.time.Instant
import java.util.UUID
import javax.sql.DataSource

object PgAuditDb {
val DB_NAME = "audit"
val CURRENT_VERSION = 8
}

class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

import PgUtils._
import ExtendedResultSet._

val DB_NAME = "audit"
val CURRENT_VERSION = 8
import PgAuditDb._

case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, timestamp: Long)

Expand Down Expand Up @@ -105,24 +108,20 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX channel_updates_cid_idx ON audit.channel_updates(channel_id)")
statement.executeUpdate("CREATE INDEX channel_updates_nid_idx ON audit.channel_updates(node_id)")
statement.executeUpdate("CREATE INDEX channel_updates_timestamp_idx ON audit.channel_updates(timestamp)")
case Some(v@4) =>
case Some(v@(4 | 5 | 6 | 7)) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration45(statement)
migration56(statement)
migration67(statement)
migration78(statement)
case Some(v@5) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration56(statement)
migration67(statement)
migration78(statement)
case Some(v@6) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration67(statement)
migration78(statement)
case Some(v@7) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration78(statement)
if (v < 5) {
migration45(statement)
}
if (v < 6) {
migration56(statement)
}
if (v < 7) {
migration67(statement)
}
if (v < 8) {
migration78(statement)
}
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down
49 changes: 22 additions & 27 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,19 @@ import java.sql.{Connection, Statement, Timestamp}
import java.time.Instant
import javax.sql.DataSource

object PgChannelsDb {
val DB_NAME = "channels"
val CURRENT_VERSION = 7
}

class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb with Logging {

import PgChannelsDb._
import PgUtils.ExtendedResultSet._
import PgUtils._
import fr.acinq.eclair.json.JsonSerializers.{formats, serialization}
import lock._

val DB_NAME = "channels"
val CURRENT_VERSION = 7

inTransaction { pg =>
using(pg.createStatement()) { statement =>

Expand Down Expand Up @@ -106,31 +109,23 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
statement.executeUpdate("CREATE INDEX local_channels_type_idx ON local.channels ((json->>'type'))")
statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local.channels ((json->'commitments'->'remoteParams'->>'nodeId'))")
statement.executeUpdate("CREATE INDEX htlc_infos_idx ON local.htlc_infos(channel_id, commitment_number)")
case Some(v@2) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration23(statement)
migration34(statement)
migration45(statement)
migration56(statement)
migration67()
case Some(v@3) =>
case Some(v@(2 | 3 | 4 | 5 | 6)) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration34(statement)
migration45(statement)
migration56(statement)
migration67()
case Some(v@4) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration45(statement)
migration56(statement)
migration67()
case Some(v@5) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration56(statement)
migration67()
case Some(v@6) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration67()
if (v < 3) {
migration23(statement)
}
if (v < 4) {
migration34(statement)
}
if (v < 5) {
migration45(statement)
}
if (v < 6) {
migration56(statement)
}
if (v < 7) {
migration67()
}
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down
22 changes: 13 additions & 9 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgNetworkDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@ import java.sql.{Connection, Statement}
import javax.sql.DataSource
import scala.collection.immutable.SortedMap

object PgNetworkDb {
val DB_NAME = "network"
val CURRENT_VERSION = 4
}

class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {

import PgNetworkDb._
import PgUtils.ExtendedResultSet._
import PgUtils._
import fr.acinq.eclair.json.JsonSerializers.{formats, serialization}

val DB_NAME = "network"
val CURRENT_VERSION = 4

inTransaction { pg =>
using(pg.createStatement()) { statement =>

Expand Down Expand Up @@ -68,13 +71,14 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {
statement.executeUpdate("CREATE TABLE network.nodes (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, json JSONB NOT NULL)")
statement.executeUpdate("CREATE TABLE network.public_channels (short_channel_id BIGINT NOT NULL PRIMARY KEY, txid TEXT NOT NULL, channel_announcement BYTEA NOT NULL, capacity_sat BIGINT NOT NULL, channel_update_1 BYTEA NULL, channel_update_2 BYTEA NULL, channel_announcement_json JSONB NOT NULL, channel_update_1_json JSONB NULL, channel_update_2_json JSONB NULL)")
statement.executeUpdate("CREATE TABLE network.pruned_channels (short_channel_id BIGINT NOT NULL PRIMARY KEY)")
case Some(v@2) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration23(statement)
migration34(statement)
case Some(v@3) =>
case Some(v@(2 | 3)) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration34(statement)
if (v < 3) {
migration23(statement)
}
if (v < 4) {
migration34(statement)
}
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,18 @@ import java.time.Instant
import java.util.UUID
import javax.sql.DataSource

object PgPaymentsDb {
val DB_NAME = "payments"
val CURRENT_VERSION = 6
}

class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb with Logging {

import PgPaymentsDb._
import PgUtils.ExtendedResultSet._
import PgUtils._
import lock._

val DB_NAME = "payments"
val CURRENT_VERSION = 6

private val hopSummaryCodec = (("node_id" | CommonCodecs.publicKey) :: ("next_node_id" | CommonCodecs.publicKey) :: ("short_channel_id" | optional(bool, CommonCodecs.shortchannelid))).as[HopSummary]
private val paymentRouteCodec = discriminated[List[HopSummary]].by(byte)
.typecase(0x01, listOfN(uint8, hopSummaryCodec))
Expand Down Expand Up @@ -82,13 +85,14 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
statement.executeUpdate("CREATE INDEX sent_payment_hash_idx ON payments.sent(payment_hash)")
statement.executeUpdate("CREATE INDEX sent_created_idx ON payments.sent(created_at)")
statement.executeUpdate("CREATE INDEX received_created_idx ON payments.received(created_at)")
case Some(v@4) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration45(statement)
migration56(statement)
case Some(v@5) =>
case Some(v@(4 | 5)) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration56(statement)
if (v < 5) {
migration45(statement)
}
if (v < 6) {
migration56(statement)
}
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down
25 changes: 14 additions & 11 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,21 @@ import fr.acinq.eclair.wire.protocol._
import grizzled.slf4j.Logging
import scodec.bits.BitVector

import java.sql.{Statement, Timestamp}
import java.time.Instant
import java.sql.Statement
import javax.sql.DataSource

object PgPeersDb {
val DB_NAME = "peers"
val CURRENT_VERSION = 3
}

class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logging {

import PgPeersDb._
import PgUtils.ExtendedResultSet._
import PgUtils._
import lock._

val DB_NAME = "peers"
val CURRENT_VERSION = 3

inTransaction { pg =>

def migration12(statement: Statement): Unit = {
Expand All @@ -58,13 +60,14 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local")
statement.executeUpdate("CREATE TABLE local.peers (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)")
statement.executeUpdate("CREATE TABLE local.relay_fees (node_id TEXT NOT NULL PRIMARY KEY, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL)")
case Some(v@1) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration12(statement)
migration23(statement)
case Some(v@2) =>
case Some(v@(1 | 2)) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration23(statement)
if (v < 2) {
migration12(statement)
}
if (v < 3) {
migration23(statement)
}
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,18 @@ import grizzled.slf4j.Logging
import java.sql.Statement
import javax.sql.DataSource

object PgPendingCommandsDb {
val DB_NAME = "pending_relay"
val CURRENT_VERSION = 3
}

class PgPendingCommandsDb(implicit ds: DataSource, lock: PgLock) extends PendingCommandsDb with Logging {

import PgPendingCommandsDb._
import PgUtils.ExtendedResultSet._
import PgUtils._
import lock._

val DB_NAME = "pending_relay"
val CURRENT_VERSION = 3

inTransaction { pg =>
using(pg.createStatement()) { statement =>

Expand All @@ -55,13 +58,14 @@ class PgPendingCommandsDb(implicit ds: DataSource, lock: PgLock) extends Pending
statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local")
// note: should we use a foreign key to local_channels table here?
statement.executeUpdate("CREATE TABLE local.pending_settlement_commands (channel_id TEXT NOT NULL, htlc_id BIGINT NOT NULL, data BYTEA NOT NULL, PRIMARY KEY(channel_id, htlc_id))")
case Some(v@1) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration12(statement)
migration23(statement)
case Some(v@2) =>
case Some(v@(1 | 2)) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration23(statement)
if (v < 2) {
migration12(statement)
}
if (v < 3) {
migration23(statement)
}
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ import grizzled.slf4j.Logging
import java.sql.{Connection, Statement}
import java.util.UUID

object SqliteAuditDb {
val DB_NAME = "audit"
val CURRENT_VERSION = 6
}

class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {

import SqliteUtils._
import ExtendedResultSet._

val DB_NAME = "audit"
val CURRENT_VERSION = 6
import SqliteAuditDb._

case class RelayedPart(channelId: ByteVector32, amount: MilliSatoshi, direction: String, relayType: String, timestamp: Long)

Expand Down Expand Up @@ -111,31 +114,23 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX channel_updates_cid_idx ON channel_updates(channel_id)")
statement.executeUpdate("CREATE INDEX channel_updates_nid_idx ON channel_updates(node_id)")
statement.executeUpdate("CREATE INDEX channel_updates_timestamp_idx ON channel_updates(timestamp)")
case Some(v@1)=>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration12(statement)
migration23(statement)
migration34(statement)
migration45(statement)
migration56(statement)
case Some(v@2) =>
case Some(v@(1 | 2 | 3 | 4 | 5)) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration23(statement)
migration34(statement)
migration45(statement)
migration56(statement)
case Some(v@3) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration34(statement)
migration45(statement)
migration56(statement)
case Some(v@4) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration45(statement)
migration56(statement)
case Some(v@5) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration56(statement)
if (v < 2) {
migration12(statement)
}
if (v < 3) {
migration23(statement)
}
if (v < 4) {
migration34(statement)
}
if (v < 5) {
migration45(statement)
}
if (v < 6) {
migration56(statement)
}
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down
Loading

0 comments on commit 19f4d1f

Please sign in to comment.