Skip to content

Commit ca51a2d

Browse files
pm47t-bast
andauthored
Enable WAL mode on Sqlite (#1871)
[Write-Ahead Logging](https://sqlite.org/wal.html) is both much more performant in general, and more suited to our particular access patterns. With a simple throughput performance test, it improves performance by a factor of 5-20x depending on the sync flag. version | throughput -------------------------------|------------- mode=journal sync=normal (*)| 11 htlc/s mode=journal sync=full| 7 htlc/s mode=wal sync=normal| 248 htlc/s mode=wal sync=full (**)| 62 htlc/s (*) previous setting (**) new setting I went with a conservative new setting of wal+full sync, which is both 5x more performant, and more secure than what we had before. > In WAL mode when synchronous is NORMAL (1), the WAL file is synchronized before each checkpoint and the database file is synchronized after each completed checkpoint and the WAL file header is synchronized when a WAL file begins to be reused after a checkpoint, but no sync operations occur during most transactions. With synchronous=FULL in WAL mode, an additional sync operation of the WAL file happens after each transaction commit. The extra WAL sync following each transaction help ensure that transactions are durable across a power loss. Transactions are consistent with or without the extra syncs provided by synchronous=FULL. If durability is not a concern, then synchronous=NORMAL is normally all one needs in WAL mode. Co-authored-by: Bastien Teinturier <[email protected]>
1 parent 733c6e7 commit ca51a2d

File tree

5 files changed

+171
-29
lines changed

5 files changed

+171
-29
lines changed

eclair-core/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@
224224
<dependency>
225225
<groupId>org.xerial</groupId>
226226
<artifactId>sqlite-jdbc</artifactId>
227-
<version>3.27.2.1</version>
227+
<version>3.34.0</version>
228228
</dependency>
229229
<dependency>
230230
<groupId>org.postgresql</groupId>

eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala

+8-20
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import grizzled.slf4j.Logging
2727

2828
import java.io.File
2929
import java.nio.file._
30-
import java.sql.{Connection, DriverManager}
30+
import java.sql.Connection
3131
import java.util.UUID
3232
import scala.concurrent.duration._
3333

@@ -183,28 +183,16 @@ object Databases extends Logging {
183183
* Given a parent folder it creates or loads all the databases from a JDBC connection
184184
*/
185185
def sqlite(dbdir: File): SqliteDatabases = {
186-
dbdir.mkdir()
187-
var sqliteEclair: Connection = null
188-
var sqliteNetwork: Connection = null
189-
var sqliteAudit: Connection = null
190-
try {
191-
sqliteEclair = DriverManager.getConnection(s"jdbc:sqlite:${new File(dbdir, "eclair.sqlite")}")
192-
sqliteNetwork = DriverManager.getConnection(s"jdbc:sqlite:${new File(dbdir, "network.sqlite")}")
193-
sqliteAudit = DriverManager.getConnection(s"jdbc:sqlite:${new File(dbdir, "audit.sqlite")}")
194-
SqliteUtils.obtainExclusiveLock(sqliteEclair) // there should only be one process writing to this file
195-
logger.info("successful lock on eclair.sqlite")
196-
SqliteDatabases(sqliteAudit, sqliteNetwork, sqliteEclair)
197-
} catch {
198-
case t: Throwable =>
199-
logger.error("could not create connection to sqlite databases: ", t)
200-
if (sqliteEclair != null) sqliteEclair.close()
201-
if (sqliteNetwork != null) sqliteNetwork.close()
202-
if (sqliteAudit != null) sqliteAudit.close()
203-
throw t
204-
}
186+
dbdir.mkdirs()
187+
SqliteDatabases(
188+
eclairJdbc = SqliteUtils.openSqliteFile(dbdir, "eclair.sqlite", exclusiveLock = true, journalMode = "wal", syncFlag = "full"), // there should only be one process writing to this file
189+
networkJdbc = SqliteUtils.openSqliteFile(dbdir, "network.sqlite", exclusiveLock = false, journalMode = "wal", syncFlag = "normal"), // we don't need strong durability guarantees on the network db
190+
auditJdbc = SqliteUtils.openSqliteFile(dbdir, "audit.sqlite", exclusiveLock = false, journalMode = "wal", syncFlag = "full")
191+
)
205192
}
206193

207194
def postgres(dbConfig: Config, instanceId: UUID, dbdir: File, lockExceptionHandler: LockFailureHandler = LockFailureHandler.logAndStop)(implicit system: ActorSystem): PostgresDatabases = {
195+
dbdir.mkdirs()
208196
val database = dbConfig.getString("postgres.database")
209197
val host = dbConfig.getString("postgres.host")
210198
val port = dbConfig.getInt("postgres.port")

eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteUtils.scala

+51-7
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,32 @@
1717
package fr.acinq.eclair.db.sqlite
1818

1919
import fr.acinq.eclair.db.jdbc.JdbcUtils
20+
import grizzled.slf4j.Logging
2021

21-
import java.sql.Connection
22+
import java.io.File
23+
import java.sql.{Connection, DriverManager}
2224

23-
object SqliteUtils extends JdbcUtils {
25+
object SqliteUtils extends JdbcUtils with Logging {
26+
27+
def openSqliteFile(directory: File, filename: String, exclusiveLock: Boolean, journalMode: String, syncFlag: String): Connection = {
28+
var sqlite: Connection = null
29+
try {
30+
sqlite = DriverManager.getConnection(s"jdbc:sqlite:${new File(directory, filename)}")
31+
if (exclusiveLock) {
32+
obtainExclusiveLock(sqlite)
33+
}
34+
setJournalMode(sqlite, journalMode)
35+
setSynchronousFlag(sqlite, syncFlag)
36+
sqlite
37+
} catch {
38+
case t: Throwable =>
39+
logger.error("could not create connection to sqlite databases: ", t)
40+
if (sqlite != null) {
41+
sqlite.close()
42+
}
43+
throw t
44+
}
45+
}
2446

2547
/**
2648
* Obtain an exclusive lock on a sqlite database. This is useful when we want to make sure that only one process
@@ -29,11 +51,33 @@ object SqliteUtils extends JdbcUtils {
2951
* The lock will be kept until the database is closed, or if the locking mode is explicitly reset.
3052
*/
3153
def obtainExclusiveLock(sqlite: Connection): Unit = synchronized {
32-
val statement = sqlite.createStatement()
33-
statement.execute("PRAGMA locking_mode = EXCLUSIVE")
34-
// we have to make a write to actually obtain the lock
35-
statement.executeUpdate("CREATE TABLE IF NOT EXISTS dummy_table_for_locking (a INTEGER NOT NULL)")
36-
statement.executeUpdate("INSERT INTO dummy_table_for_locking VALUES (42)")
54+
using(sqlite.createStatement()) { statement =>
55+
statement.execute("PRAGMA locking_mode = EXCLUSIVE")
56+
// we have to make a write to actually obtain the lock
57+
statement.executeUpdate("CREATE TABLE IF NOT EXISTS dummy_table_for_locking (a INTEGER NOT NULL)")
58+
statement.executeUpdate("INSERT INTO dummy_table_for_locking VALUES (42)")
59+
}
60+
}
61+
62+
/**
63+
* See https://www.sqlite.org/pragma.html#pragma_journal_mode
64+
*/
65+
def setJournalMode(sqlite: Connection, mode: String): Unit = {
66+
using(sqlite.createStatement()) { statement =>
67+
val res = statement.executeQuery(s"PRAGMA journal_mode=$mode")
68+
res.next()
69+
val currentMode = res.getString(1)
70+
assert(currentMode == mode, s"couldn't activate mode=$mode")
71+
}
72+
}
73+
74+
/**
75+
* See https://www.sqlite.org/pragma.html#pragma_synchronous
76+
*/
77+
def setSynchronousFlag(sqlite: Connection, flag: String): Unit = {
78+
using(sqlite.createStatement()) { statement =>
79+
statement.executeUpdate(s"PRAGMA synchronous=$flag")
80+
}
3781
}
3882

3983
}

eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ class ChannelsDbSpec extends AnyFunSuite {
105105
Future(db.updateChannelMeta(channelId, ChannelEvent.EventType.PaymentSent))
106106
}
107107
val res = Future.sequence(futures)
108-
Await.result(res, 60 seconds)
108+
Await.result(res, 5 minutes)
109109
}
110110
}
111111

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright 2019 ACINQ SAS
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package fr.acinq.eclair.integration
18+
19+
import akka.testkit.TestProbe
20+
import com.typesafe.config.ConfigFactory
21+
import fr.acinq.bitcoin.SatoshiLong
22+
import fr.acinq.eclair.MilliSatoshiLong
23+
import fr.acinq.eclair.channel._
24+
import fr.acinq.eclair.payment._
25+
import fr.acinq.eclair.payment.receive.MultiPartHandler.ReceivePayment
26+
import fr.acinq.eclair.payment.send.MultiPartPaymentLifecycle.PreimageReceived
27+
import fr.acinq.eclair.payment.send.PaymentInitiator
28+
import fr.acinq.eclair.router.Router
29+
import org.scalatest.Ignore
30+
31+
import java.util.UUID
32+
import java.util.concurrent.Executors
33+
import scala.concurrent.duration._
34+
import scala.concurrent.{Await, ExecutionContext, Future}
35+
import scala.jdk.CollectionConverters._
36+
37+
/**
38+
* Created by PM on 12/07/2021.
39+
*/
40+
41+
@Ignore
42+
class PerformanceIntegrationSpec extends IntegrationSpec {
43+
44+
test("start eclair nodes") {
45+
val commonPerfTestConfig = ConfigFactory.parseMap(Map(
46+
"eclair.max-funding-satoshis" -> 100_000_000,
47+
"eclair.max-accepted-htlcs" -> Channel.MAX_ACCEPTED_HTLCS,
48+
"eclair.file-backup.enabled" -> false,
49+
).asJava)
50+
51+
instantiateEclairNode("A", ConfigFactory.parseMap(Map("eclair.node-alias" -> "A", "eclair.server.port" -> 29730).asJava).withFallback(commonPerfTestConfig).withFallback(commonFeatures).withFallback(commonConfig)) // A's channels are private
52+
instantiateEclairNode("B", ConfigFactory.parseMap(Map("eclair.node-alias" -> "B", "eclair.server.port" -> 29731).asJava).withFallback(commonPerfTestConfig).withFallback(commonFeatures).withFallback(commonConfig))
53+
}
54+
55+
test("connect nodes") {
56+
// A---B
57+
58+
val eventListener = TestProbe()
59+
nodes.values.foreach(_.system.eventStream.subscribe(eventListener.ref, classOf[ChannelStateChanged]))
60+
61+
connect(nodes("A"), nodes("B"), 100_000_000 sat, 0 msat)
62+
63+
// confirming the funding tx
64+
generateBlocks(6)
65+
66+
within(60 seconds) {
67+
eventListener.expectMsgType[ChannelStateChanged](60 seconds).currentState == NORMAL
68+
}
69+
}
70+
71+
test("wait for channels") {
72+
// Channels should now be available in the router
73+
val sender = TestProbe()
74+
awaitCond({
75+
sender.send(nodes("A").router, Router.GetRoutingState)
76+
val routingState = sender.expectMsgType[Router.RoutingState]
77+
routingState.channels.nonEmpty
78+
}, 60 seconds)
79+
}
80+
81+
def sendPayment()(implicit ec: ExecutionContext): Future[PaymentSent] = Future {
82+
val sender = TestProbe()
83+
val amountMsat = 100_000.msat
84+
// first we retrieve a payment hash from B
85+
sender.send(nodes("B").paymentHandler, ReceivePayment(Some(amountMsat), "1 coffee"))
86+
val pr = sender.expectMsgType[PaymentRequest]
87+
// then we make the actual payment
88+
sender.send(nodes("A").paymentInitiator, PaymentInitiator.SendPayment(amountMsat, pr, fallbackFinalExpiryDelta = finalCltvExpiryDelta, routeParams = integrationTestRouteParams, maxAttempts = 1))
89+
val paymentId = sender.expectMsgType[UUID]
90+
sender.expectMsgType[PreimageReceived]
91+
val ps = sender.expectMsgType[PaymentSent]
92+
assert(ps.id == paymentId)
93+
ps
94+
}
95+
96+
test("send a large number of htlcs A->B") {
97+
val SENDERS_COUNT = 16
98+
val PAYMENTS_COUNT = 3_000
99+
val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(SENDERS_COUNT))
100+
val start = System.currentTimeMillis()
101+
val futures = (0 until PAYMENTS_COUNT).map(_ => sendPayment()(ec))
102+
implicit val dummyEc: ExecutionContext = ExecutionContext.Implicits.global
103+
val f = Future.sequence(futures)
104+
Await.result(f, 1 hour)
105+
val end = System.currentTimeMillis()
106+
val duration = end - start
107+
println(s"$PAYMENTS_COUNT payments in ${duration}ms ${PAYMENTS_COUNT * 1000 / duration}htlc/s (senders=$SENDERS_COUNT)")
108+
}
109+
110+
}

0 commit comments

Comments
 (0)