Skip to content

Commit 936f36b

Browse files
authored
Refactor Postgres code (#1743)
More symmetry between postgres and sqlite init. * define SqliteDatabases and PostgresDatabase Those classes implement traits `Databases`, `FileBackup` and `ExclusiveLock`. The goal is to have access to backend-specific attributes, particularly in tests. It arguably makes the `Databases` cleaner and simpler, with a nice symmetry between the `apply methods`. * replace 5s lock timeout by NOLOCK * use chaindir instead of datadir for jdbcurl file It is more consistent with sqlite, and makes sense because we don't want to mix up testnet and mainnet databases. * add tests on locks and jdbc url check
1 parent 75cb777 commit 936f36b

22 files changed

+649
-505
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class Setup(datadir: File,
9898

9999
logger.info(s"instanceid=$instanceId")
100100

101-
val databases = Databases.init(config.getConfig("db"), instanceId, datadir, chaindir, db)
101+
val databases = Databases.init(config.getConfig("db"), instanceId, chaindir, db)
102102

103103
/**
104104
* This counter holds the current blockchain height.

eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala

+141-159
Original file line numberDiff line numberDiff line change
@@ -16,89 +16,146 @@
1616

1717
package fr.acinq.eclair.db
1818

19-
import java.io.File
20-
import java.nio.file._
21-
import java.sql.{Connection, DriverManager}
22-
import java.util.UUID
23-
2419
import akka.actor.ActorSystem
2520
import com.typesafe.config.Config
26-
import fr.acinq.eclair.db.pg.PgUtils.LockType.LockType
21+
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
22+
import fr.acinq.eclair.db.pg.PgUtils.PgLock.LockFailureHandler
2723
import fr.acinq.eclair.db.pg.PgUtils._
2824
import fr.acinq.eclair.db.pg._
2925
import fr.acinq.eclair.db.sqlite._
3026
import grizzled.slf4j.Logging
31-
import javax.sql.DataSource
3227

28+
import java.io.File
29+
import java.nio.file._
30+
import java.sql.{Connection, DriverManager}
31+
import java.util.UUID
3332
import scala.concurrent.duration._
3433

3534
trait Databases {
35+
//@formatter:off
36+
def network: NetworkDb
37+
def audit: AuditDb
38+
def channels: ChannelsDb
39+
def peers: PeersDb
40+
def payments: PaymentsDb
41+
def pendingRelay: PendingRelayDb
42+
//@formatter:on
43+
}
3644

37-
val network: NetworkDb
45+
object Databases extends Logging {
3846

39-
val audit: AuditDb
47+
trait FileBackup {
48+
this: Databases =>
49+
def backup(backupFile: File): Unit
50+
}
4051

41-
val channels: ChannelsDb
52+
trait ExclusiveLock {
53+
this: Databases =>
54+
def obtainExclusiveLock(): Unit
55+
}
4256

43-
val peers: PeersDb
57+
case class SqliteDatabases private (network: SqliteNetworkDb,
58+
audit: SqliteAuditDb,
59+
channels: SqliteChannelsDb,
60+
peers: SqlitePeersDb,
61+
payments: SqlitePaymentsDb,
62+
pendingRelay: SqlitePendingRelayDb,
63+
private val backupConnection: Connection) extends Databases with FileBackup {
64+
override def backup(backupFile: File): Unit = SqliteUtils.using(backupConnection.createStatement()) {
65+
statement => {
66+
statement.executeUpdate(s"backup to ${backupFile.getAbsolutePath}")
67+
}
68+
}
69+
}
4470

45-
val payments: PaymentsDb
71+
object SqliteDatabases {
72+
def apply(auditJdbc: Connection, networkJdbc: Connection, eclairJdbc: Connection): Databases = SqliteDatabases(
73+
network = new SqliteNetworkDb(networkJdbc),
74+
audit = new SqliteAuditDb(auditJdbc),
75+
channels = new SqliteChannelsDb(eclairJdbc),
76+
peers = new SqlitePeersDb(eclairJdbc),
77+
payments = new SqlitePaymentsDb(eclairJdbc),
78+
pendingRelay = new SqlitePendingRelayDb(eclairJdbc),
79+
backupConnection = eclairJdbc
80+
)
81+
}
4682

47-
val pendingRelay: PendingRelayDb
48-
}
83+
case class PostgresDatabases private (network: PgNetworkDb,
84+
audit: PgAuditDb,
85+
channels: PgChannelsDb,
86+
peers: PgPeersDb,
87+
payments: PgPaymentsDb,
88+
pendingRelay: PgPendingRelayDb,
89+
dataSource: HikariDataSource,
90+
lock: PgLock) extends Databases with ExclusiveLock {
91+
override def obtainExclusiveLock(): Unit = lock.obtainExclusiveLock(dataSource)
92+
}
4993

50-
object Databases extends Logging {
94+
object PostgresDatabases {
95+
def apply(hikariConfig: HikariConfig,
96+
instanceId: UUID,
97+
lock: PgLock = PgLock.NoLock,
98+
jdbcUrlFile_opt: Option[File])(implicit system: ActorSystem): PostgresDatabases = {
99+
100+
jdbcUrlFile_opt.foreach(jdbcUrlFile => checkIfDatabaseUrlIsUnchanged(hikariConfig.getJdbcUrl, jdbcUrlFile))
101+
102+
implicit val ds: HikariDataSource = new HikariDataSource(hikariConfig)
103+
implicit val implicitLock: PgLock = lock
104+
105+
val databases = PostgresDatabases(
106+
network = new PgNetworkDb,
107+
audit = new PgAuditDb,
108+
channels = new PgChannelsDb,
109+
peers = new PgPeersDb,
110+
payments = new PgPaymentsDb,
111+
pendingRelay = new PgPendingRelayDb,
112+
dataSource = ds,
113+
lock = lock)
114+
115+
lock match {
116+
case PgLock.NoLock => ()
117+
case l: PgLock.LeaseLock =>
118+
// we obtain a lock right now...
119+
databases.obtainExclusiveLock()
120+
// ...and renew the lease regularly
121+
import system.dispatcher
122+
system.scheduler.scheduleWithFixedDelay(l.leaseRenewInterval, l.leaseRenewInterval)(() => databases.obtainExclusiveLock())
123+
}
51124

52-
trait FileBackup { this: Databases =>
53-
def backup(backupFile: File): Unit
54-
}
125+
databases
126+
}
55127

56-
trait ExclusiveLock { this: Databases =>
57-
def obtainExclusiveLock(): Unit
128+
private def checkIfDatabaseUrlIsUnchanged(url: String, urlFile: File): Unit = {
129+
def readString(path: Path): String = Files.readAllLines(path).get(0)
130+
131+
def writeString(path: Path, string: String): Unit = Files.write(path, java.util.Arrays.asList(string))
132+
133+
if (urlFile.exists()) {
134+
val oldUrl = readString(urlFile.toPath)
135+
if (oldUrl != url)
136+
throw JdbcUrlChanged(oldUrl, url)
137+
} else {
138+
writeString(urlFile.toPath, url)
139+
}
140+
}
58141
}
59142

60-
def init(dbConfig: Config, instanceId: UUID, datadir: File, chaindir: File, db: Option[Databases] = None)(implicit system: ActorSystem): Databases = {
143+
def init(dbConfig: Config, instanceId: UUID, chaindir: File, db: Option[Databases] = None)(implicit system: ActorSystem): Databases = {
61144
db match {
62145
case Some(d) => d
63146
case None =>
64147
dbConfig.getString("driver") match {
65-
case "sqlite" => Databases.sqliteJDBC(chaindir)
66-
case "postgres" =>
67-
val pg = Databases.setupPgDatabases(dbConfig, instanceId, datadir, { ex =>
68-
logger.error("fatal error: Cannot obtain lock on the database.\n", ex)
69-
sys.exit(-2)
70-
})
71-
if (LockType(dbConfig.getString("postgres.lock-type")) == LockType.LEASE) {
72-
val dbLockLeaseRenewInterval = dbConfig.getDuration("postgres.lease.renew-interval").toSeconds.seconds
73-
val dbLockLeaseInterval = dbConfig.getDuration("postgres.lease.interval").toSeconds.seconds
74-
if (dbLockLeaseInterval <= dbLockLeaseRenewInterval)
75-
throw new RuntimeException("Invalid configuration: `db.postgres.lease.interval` must be greater than `db.postgres.lease.renew-interval`")
76-
import system.dispatcher
77-
system.scheduler.scheduleWithFixedDelay(dbLockLeaseRenewInterval, dbLockLeaseRenewInterval)(new Runnable {
78-
override def run(): Unit = {
79-
try {
80-
pg.obtainExclusiveLock()
81-
} catch {
82-
case e: Throwable =>
83-
logger.error("fatal error: Cannot obtain the database lease.\n", e)
84-
sys.exit(-1)
85-
}
86-
}
87-
})
88-
}
89-
pg
90-
case driver => throw new RuntimeException(s"Unknown database driver `$driver`")
148+
case "sqlite" => Databases.sqlite(chaindir)
149+
case "postgres" => Databases.postgres(dbConfig, instanceId, chaindir)
150+
case driver => throw new RuntimeException(s"unknown database driver `$driver`")
91151
}
92152
}
93153
}
94154

95155
/**
96-
* Given a parent folder it creates or loads all the databases from a JDBC connection
97-
*
98-
* @param dbdir
99-
* @return
100-
*/
101-
def sqliteJDBC(dbdir: File): Databases = {
156+
* Given a parent folder it creates or loads all the databases from a JDBC connection
157+
*/
158+
def sqlite(dbdir: File): Databases = {
102159
dbdir.mkdir()
103160
var sqliteEclair: Connection = null
104161
var sqliteNetwork: Connection = null
@@ -109,127 +166,52 @@ object Databases extends Logging {
109166
sqliteAudit = DriverManager.getConnection(s"jdbc:sqlite:${new File(dbdir, "audit.sqlite")}")
110167
SqliteUtils.obtainExclusiveLock(sqliteEclair) // there should only be one process writing to this file
111168
logger.info("successful lock on eclair.sqlite")
112-
sqliteDatabaseByConnections(sqliteAudit, sqliteNetwork, sqliteEclair)
169+
SqliteDatabases(sqliteAudit, sqliteNetwork, sqliteEclair)
113170
} catch {
114-
case t: Throwable => {
171+
case t: Throwable =>
115172
logger.error("could not create connection to sqlite databases: ", t)
116173
if (sqliteEclair != null) sqliteEclair.close()
117174
if (sqliteNetwork != null) sqliteNetwork.close()
118175
if (sqliteAudit != null) sqliteAudit.close()
119176
throw t
120-
}
121-
}
122-
}
123-
124-
def postgresJDBC(database: String, host: String, port: Int,
125-
username: Option[String], password: Option[String],
126-
poolProperties: Map[String, Long],
127-
instanceId: UUID,
128-
databaseLeaseInterval: FiniteDuration,
129-
lockExceptionHandler: LockExceptionHandler = { _ => () },
130-
lockType: LockType = LockType.NONE, datadir: File): Databases with ExclusiveLock = {
131-
val url = s"jdbc:postgresql://${host}:${port}/${database}"
132-
133-
checkIfDatabaseUrlIsUnchanged(url, datadir)
134-
135-
implicit val lock: DatabaseLock = lockType match {
136-
case LockType.NONE => NoLock
137-
case LockType.LEASE => LeaseLock(instanceId, databaseLeaseInterval, lockExceptionHandler)
138-
case _ => throw new RuntimeException(s"Unknown postgres lock type: `$lockType`")
139-
}
140-
141-
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
142-
143-
val config = new HikariConfig()
144-
config.setJdbcUrl(url)
145-
username.foreach(config.setUsername)
146-
password.foreach(config.setPassword)
147-
poolProperties.get("max-size").foreach(x => config.setMaximumPoolSize(x.toInt))
148-
poolProperties.get("connection-timeout").foreach(config.setConnectionTimeout)
149-
poolProperties.get("idle-timeout").foreach(config.setIdleTimeout)
150-
poolProperties.get("max-life-time").foreach(config.setMaxLifetime)
151-
152-
implicit val ds: DataSource = new HikariDataSource(config)
153-
154-
val databases = new Databases with ExclusiveLock {
155-
override val network = new PgNetworkDb
156-
override val audit = new PgAuditDb
157-
override val channels = new PgChannelsDb
158-
override val peers = new PgPeersDb
159-
override val payments = new PgPaymentsDb
160-
override val pendingRelay = new PgPendingRelayDb
161-
override def obtainExclusiveLock(): Unit = lock.obtainExclusiveLock
162-
}
163-
databases.obtainExclusiveLock()
164-
databases
165-
}
166-
167-
def sqliteDatabaseByConnections(auditJdbc: Connection, networkJdbc: Connection, eclairJdbc: Connection): Databases = new Databases with FileBackup {
168-
override val network = new SqliteNetworkDb(networkJdbc)
169-
override val audit = new SqliteAuditDb(auditJdbc)
170-
override val channels = new SqliteChannelsDb(eclairJdbc)
171-
override val peers = new SqlitePeersDb(eclairJdbc)
172-
override val payments = new SqlitePaymentsDb(eclairJdbc)
173-
override val pendingRelay = new SqlitePendingRelayDb(eclairJdbc)
174-
override def backup(backupFile: File): Unit = {
175-
176-
SqliteUtils.using(eclairJdbc.createStatement()) {
177-
statement => {
178-
statement.executeUpdate(s"backup to ${backupFile.getAbsolutePath}")
179-
}
180-
}
181-
182177
}
183178
}
184179

185-
def setupPgDatabases(dbConfig: Config, instanceId: UUID, datadir: File, lockExceptionHandler: LockExceptionHandler): Databases with ExclusiveLock = {
180+
def postgres(dbConfig: Config, instanceId: UUID, dbdir: File, lockExceptionHandler: LockFailureHandler = LockFailureHandler.logAndStop)(implicit system: ActorSystem): PostgresDatabases = {
186181
val database = dbConfig.getString("postgres.database")
187182
val host = dbConfig.getString("postgres.host")
188183
val port = dbConfig.getInt("postgres.port")
189-
val username = if (dbConfig.getIsNull("postgres.username") || dbConfig.getString("postgres.username").isEmpty)
190-
None
191-
else
192-
Some(dbConfig.getString("postgres.username"))
193-
val password = if (dbConfig.getIsNull("postgres.password") || dbConfig.getString("postgres.password").isEmpty)
194-
None
195-
else
196-
Some(dbConfig.getString("postgres.password"))
197-
val properties = {
198-
val poolConfig = dbConfig.getConfig("postgres.pool")
199-
Map.empty
200-
.updated("max-size", poolConfig.getInt("max-size").toLong)
201-
.updated("connection-timeout", poolConfig.getDuration("connection-timeout").toMillis)
202-
.updated("idle-timeout", poolConfig.getDuration("idle-timeout").toMillis)
203-
.updated("max-life-time", poolConfig.getDuration("max-life-time").toMillis)
204-
184+
val username = if (dbConfig.getIsNull("postgres.username") || dbConfig.getString("postgres.username").isEmpty) None else Some(dbConfig.getString("postgres.username"))
185+
val password = if (dbConfig.getIsNull("postgres.password") || dbConfig.getString("postgres.password").isEmpty) None else Some(dbConfig.getString("postgres.password"))
186+
187+
val hikariConfig = new HikariConfig()
188+
hikariConfig.setJdbcUrl(s"jdbc:postgresql://$host:$port/$database")
189+
username.foreach(hikariConfig.setUsername)
190+
password.foreach(hikariConfig.setPassword)
191+
val poolConfig = dbConfig.getConfig("postgres.pool")
192+
hikariConfig.setMaximumPoolSize(poolConfig.getInt("max-size"))
193+
hikariConfig.setConnectionTimeout(poolConfig.getDuration("connection-timeout").toMillis)
194+
hikariConfig.setIdleTimeout(poolConfig.getDuration("idle-timeout").toMillis)
195+
hikariConfig.setMaxLifetime(poolConfig.getDuration("max-life-time").toMillis)
196+
197+
val lock = dbConfig.getString("postgres.lock-type") match {
198+
case "none" => PgLock.NoLock
199+
case "lease" =>
200+
val leaseInterval = dbConfig.getDuration("postgres.lease.interval").toSeconds.seconds
201+
val leaseRenewInterval = dbConfig.getDuration("postgres.lease.renew-interval").toSeconds.seconds
202+
require(leaseInterval > leaseRenewInterval, "invalid configuration: `db.postgres.lease.interval` must be greater than `db.postgres.lease.renew-interval`")
203+
PgLock.LeaseLock(instanceId, leaseInterval, leaseRenewInterval, lockExceptionHandler)
204+
case unknownLock => throw new RuntimeException(s"unknown postgres lock type: `$unknownLock`")
205205
}
206-
val lockType = LockType(dbConfig.getString("postgres.lock-type"))
207-
val leaseInterval = dbConfig.getDuration("postgres.lease.interval").toSeconds.seconds
208206

209-
Databases.postgresJDBC(
210-
database = database, host = host, port = port,
211-
username = username, password = password,
212-
poolProperties = properties,
207+
val jdbcUrlFile = new File(dbdir, "last_jdbcurl")
208+
209+
Databases.PostgresDatabases(
210+
hikariConfig = hikariConfig,
213211
instanceId = instanceId,
214-
databaseLeaseInterval = leaseInterval,
215-
lockExceptionHandler = lockExceptionHandler, lockType = lockType, datadir = datadir
212+
lock = lock,
213+
jdbcUrlFile_opt = Some(jdbcUrlFile)
216214
)
217215
}
218216

219-
private def checkIfDatabaseUrlIsUnchanged(url: String, datadir: File ): Unit = {
220-
val urlFile = new File(datadir, "last_jdbcurl")
221-
222-
def readString(path: Path): String = Files.readAllLines(path).get(0)
223-
224-
def writeString(path: Path, string: String): Unit = Files.write(path, java.util.Arrays.asList(string))
225-
226-
if (urlFile.exists()) {
227-
val oldUrl = readString(urlFile.toPath)
228-
if (oldUrl != url)
229-
throw new RuntimeException(s"The database URL has changed since the last start. It was `$oldUrl`, now it's `$url`")
230-
} else {
231-
writeString(urlFile.toPath, url)
232-
}
233-
}
234-
235-
}
217+
}

eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,15 @@ import fr.acinq.eclair.channel.HasCommitments
2222
import fr.acinq.eclair.db.ChannelsDb
2323
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
2424
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
25-
import fr.acinq.eclair.db.pg.PgUtils.DatabaseLock
25+
import fr.acinq.eclair.db.pg.PgUtils.PgLock
2626
import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.stateDataCodec
2727
import grizzled.slf4j.Logging
2828

2929
import java.sql.Statement
3030
import javax.sql.DataSource
3131
import scala.collection.immutable.Queue
3232

33-
class PgChannelsDb(implicit ds: DataSource, lock: DatabaseLock) extends ChannelsDb with Logging {
33+
class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb with Logging {
3434

3535
import PgUtils.ExtendedResultSet._
3636
import PgUtils._

0 commit comments

Comments
 (0)