Skip to content

Commit af8394a

Browse files
authored
Add support for dual db backend (#1746)
We still use sqlite as the primary db, but all calls are replicated asynchronously on postgres. The goal is to prepare a smooth transition from sqlite to postgres on a production server. This is a very specific use case and most users shouldn't use it, which is why the new config `eclair.db.driver=dual` is not documented.
1 parent d9a03a5 commit af8394a

File tree

2 files changed

+382
-0
lines changed

2 files changed

+382
-0
lines changed

eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala

+4
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,10 @@ object Databases extends Logging {
157157
dbConfig.getString("driver") match {
158158
case "sqlite" => Databases.sqlite(chaindir)
159159
case "postgres" => Databases.postgres(dbConfig, instanceId, chaindir)
160+
case "dual" =>
161+
val sqlite = Databases.sqlite(chaindir)
162+
val postgres = Databases.postgres(dbConfig, instanceId, chaindir)
163+
DualDatabases(sqlite, postgres)
160164
case driver => throw new RuntimeException(s"unknown database driver `$driver`")
161165
}
162166
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,378 @@
1+
package fr.acinq.eclair.db
2+
3+
import com.google.common.util.concurrent.ThreadFactoryBuilder
4+
import fr.acinq.bitcoin.{ByteVector32, Crypto, Satoshi}
5+
import fr.acinq.eclair.channel._
6+
import fr.acinq.eclair.db.Databases.{FileBackup, PostgresDatabases, SqliteDatabases}
7+
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
8+
import fr.acinq.eclair.db.DualDatabases.runAsync
9+
import fr.acinq.eclair.db.pg._
10+
import fr.acinq.eclair.db.sqlite._
11+
import fr.acinq.eclair.io.Peer
12+
import fr.acinq.eclair.payment._
13+
import fr.acinq.eclair.router.Router
14+
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement}
15+
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, ShortChannelId}
16+
import grizzled.slf4j.Logging
17+
18+
import java.io.File
19+
import java.util.UUID
20+
import java.util.concurrent.Executors
21+
import scala.collection.immutable.SortedMap
22+
import scala.concurrent.{ExecutionContext, Future}
23+
import scala.util.{Failure, Success, Try}
24+
25+
/**
26+
* An implementation of [[Databases]] where there are two separate underlying db, one sqlite and one postgres.
27+
* Sqlite is the main database, but we also replicate all calls to postgres.
28+
* Calls to postgres are made asynchronously in a dedicated thread pool, so that it doesn't have any performance impact.
29+
*/
30+
case class DualDatabases(sqlite: SqliteDatabases, postgres: PostgresDatabases) extends Databases with FileBackup {
31+
32+
override val network: NetworkDb = DualNetworkDb(sqlite.network, postgres.network)
33+
34+
override val audit: AuditDb = DualAuditDb(sqlite.audit, postgres.audit)
35+
36+
override val channels: ChannelsDb = DualChannelsDb(sqlite.channels, postgres.channels)
37+
38+
override val peers: PeersDb = DualPeersDb(sqlite.peers, postgres.peers)
39+
40+
override val payments: PaymentsDb = DualPaymentsDb(sqlite.payments, postgres.payments)
41+
42+
override val pendingCommands: PendingCommandsDb = DualPendingCommandsDb(sqlite.pendingCommands, postgres.pendingCommands)
43+
44+
override def backup(backupFile: File): Unit = sqlite.backup(backupFile)
45+
}
46+
47+
object DualDatabases extends Logging {
48+
49+
/** Run asynchronously and print errors */
50+
def runAsync[T](f: => T)(implicit ec: ExecutionContext): Future[T] = Future {
51+
Try(f) match {
52+
case Success(res) => res
53+
case Failure(t) =>
54+
logger.error("postgres error:\n", t)
55+
throw t
56+
}
57+
}
58+
}
59+
60+
case class DualNetworkDb(sqlite: SqliteNetworkDb, postgres: PgNetworkDb) extends NetworkDb {
61+
62+
private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-network").build()))
63+
64+
override def addNode(n: NodeAnnouncement): Unit = {
65+
runAsync(postgres.addNode(n))
66+
sqlite.addNode(n)
67+
}
68+
69+
override def updateNode(n: NodeAnnouncement): Unit = {
70+
runAsync(postgres.updateNode(n))
71+
sqlite.updateNode(n)
72+
}
73+
74+
override def getNode(nodeId: Crypto.PublicKey): Option[NodeAnnouncement] = {
75+
runAsync(postgres.getNode(nodeId))
76+
sqlite.getNode(nodeId)
77+
}
78+
79+
override def removeNode(nodeId: Crypto.PublicKey): Unit = {
80+
runAsync(postgres.removeNode(nodeId))
81+
sqlite.removeNode(nodeId)
82+
}
83+
84+
override def listNodes(): Seq[NodeAnnouncement] = {
85+
runAsync(postgres.listNodes())
86+
sqlite.listNodes()
87+
}
88+
89+
override def addChannel(c: ChannelAnnouncement, txid: ByteVector32, capacity: Satoshi): Unit = {
90+
runAsync(postgres.addChannel(c, txid, capacity))
91+
sqlite.addChannel(c, txid, capacity)
92+
}
93+
94+
override def updateChannel(u: ChannelUpdate): Unit = {
95+
runAsync(postgres.updateChannel(u))
96+
sqlite.updateChannel(u)
97+
}
98+
99+
override def removeChannels(shortChannelIds: Iterable[ShortChannelId]): Unit = {
100+
runAsync(postgres.removeChannels(shortChannelIds))
101+
sqlite.removeChannels(shortChannelIds)
102+
}
103+
104+
override def listChannels(): SortedMap[ShortChannelId, Router.PublicChannel] = {
105+
runAsync(postgres.listChannels())
106+
sqlite.listChannels()
107+
}
108+
109+
override def addToPruned(shortChannelIds: Iterable[ShortChannelId]): Unit = {
110+
runAsync(postgres.addToPruned(shortChannelIds))
111+
sqlite.addToPruned(shortChannelIds)
112+
}
113+
114+
override def removeFromPruned(shortChannelId: ShortChannelId): Unit = {
115+
runAsync(postgres.removeFromPruned(shortChannelId))
116+
sqlite.removeFromPruned(shortChannelId)
117+
}
118+
119+
override def isPruned(shortChannelId: ShortChannelId): Boolean = {
120+
runAsync(postgres.isPruned(shortChannelId))
121+
sqlite.isPruned(shortChannelId)
122+
}
123+
124+
override def close(): Unit = {
125+
runAsync(postgres.close())
126+
sqlite.close()
127+
}
128+
}
129+
130+
case class DualAuditDb(sqlite: SqliteAuditDb, postgres: PgAuditDb) extends AuditDb {
131+
132+
private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-audit").build()))
133+
134+
override def add(channelLifecycle: DbEventHandler.ChannelEvent): Unit = {
135+
runAsync(postgres.add(channelLifecycle))
136+
sqlite.add(channelLifecycle)
137+
}
138+
139+
override def add(paymentSent: PaymentSent): Unit = {
140+
runAsync(postgres.add(paymentSent))
141+
sqlite.add(paymentSent)
142+
}
143+
144+
override def add(paymentReceived: PaymentReceived): Unit = {
145+
runAsync(postgres.add(paymentReceived))
146+
sqlite.add(paymentReceived)
147+
}
148+
149+
override def add(paymentRelayed: PaymentRelayed): Unit = {
150+
runAsync(postgres.add(paymentRelayed))
151+
sqlite.add(paymentRelayed)
152+
}
153+
154+
override def add(networkFeePaid: NetworkFeePaid): Unit = {
155+
runAsync(postgres.add(networkFeePaid))
156+
sqlite.add(networkFeePaid)
157+
}
158+
159+
override def add(channelErrorOccurred: ChannelErrorOccurred): Unit = {
160+
runAsync(postgres.add(channelErrorOccurred))
161+
sqlite.add(channelErrorOccurred)
162+
}
163+
164+
override def listSent(from: Long, to: Long): Seq[PaymentSent] = {
165+
runAsync(postgres.listSent(from, to))
166+
sqlite.listSent(from, to)
167+
}
168+
169+
override def listReceived(from: Long, to: Long): Seq[PaymentReceived] = {
170+
runAsync(postgres.listReceived(from, to))
171+
sqlite.listReceived(from, to)
172+
}
173+
174+
override def listRelayed(from: Long, to: Long): Seq[PaymentRelayed] = {
175+
runAsync(postgres.listRelayed(from, to))
176+
sqlite.listRelayed(from, to)
177+
}
178+
179+
override def listNetworkFees(from: Long, to: Long): Seq[AuditDb.NetworkFee] = {
180+
runAsync(postgres.listNetworkFees(from, to))
181+
sqlite.listNetworkFees(from, to)
182+
}
183+
184+
override def stats(from: Long, to: Long): Seq[AuditDb.Stats] = {
185+
runAsync(postgres.stats(from, to))
186+
sqlite.stats(from, to)
187+
}
188+
189+
override def close(): Unit = {
190+
runAsync(postgres.close())
191+
sqlite.close()
192+
}
193+
}
194+
195+
case class DualChannelsDb(sqlite: SqliteChannelsDb, postgres: PgChannelsDb) extends ChannelsDb {
196+
197+
private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-channels").build()))
198+
199+
override def addOrUpdateChannel(state: HasCommitments): Unit = {
200+
runAsync(postgres.addOrUpdateChannel(state))
201+
sqlite.addOrUpdateChannel(state)
202+
}
203+
204+
override def updateChannelMeta(channelId: ByteVector32, event: ChannelEvent.EventType): Unit = {
205+
runAsync(postgres.updateChannelMeta(channelId, event))
206+
sqlite.updateChannelMeta(channelId, event)
207+
}
208+
209+
override def removeChannel(channelId: ByteVector32): Unit = {
210+
runAsync(postgres.removeChannel(channelId))
211+
sqlite.removeChannel(channelId)
212+
}
213+
214+
override def listLocalChannels(): Seq[HasCommitments] = {
215+
runAsync(postgres.listLocalChannels())
216+
sqlite.listLocalChannels()
217+
}
218+
219+
override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = {
220+
runAsync(postgres.addHtlcInfo(channelId, commitmentNumber, paymentHash, cltvExpiry))
221+
sqlite.addHtlcInfo(channelId, commitmentNumber, paymentHash, cltvExpiry)
222+
}
223+
224+
override def listHtlcInfos(channelId: ByteVector32, commitmentNumber: Long): Seq[(ByteVector32, CltvExpiry)] = {
225+
runAsync(postgres.listHtlcInfos(channelId, commitmentNumber))
226+
sqlite.listHtlcInfos(channelId, commitmentNumber)
227+
}
228+
229+
override def close(): Unit = {
230+
runAsync(postgres.close())
231+
sqlite.close()
232+
}
233+
}
234+
235+
case class DualPeersDb(sqlite: SqlitePeersDb, postgres: PgPeersDb) extends PeersDb {
236+
237+
private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-peers").build()))
238+
239+
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, address: NodeAddress): Unit = {
240+
runAsync(postgres.addOrUpdatePeer(nodeId, address))
241+
sqlite.addOrUpdatePeer(nodeId, address)
242+
}
243+
244+
override def removePeer(nodeId: Crypto.PublicKey): Unit = {
245+
runAsync(postgres.removePeer(nodeId))
246+
sqlite.removePeer(nodeId)
247+
}
248+
249+
override def getPeer(nodeId: Crypto.PublicKey): Option[NodeAddress] = {
250+
runAsync(postgres.getPeer(nodeId))
251+
sqlite.getPeer(nodeId)
252+
}
253+
254+
override def listPeers(): Map[Crypto.PublicKey, NodeAddress] = {
255+
runAsync(postgres.listPeers())
256+
sqlite.listPeers()
257+
}
258+
259+
override def close(): Unit = {
260+
runAsync(postgres.close())
261+
sqlite.close()
262+
}
263+
}
264+
265+
case class DualPaymentsDb(sqlite: SqlitePaymentsDb, postgres: PgPaymentsDb) extends PaymentsDb {
266+
267+
private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-payments").build()))
268+
269+
override def listPaymentsOverview(limit: Int): Seq[PlainPayment] = {
270+
runAsync(postgres.listPaymentsOverview(limit))
271+
sqlite.listPaymentsOverview(limit)
272+
}
273+
274+
override def close(): Unit = {
275+
runAsync(postgres.close())
276+
sqlite.close()
277+
}
278+
279+
override def addIncomingPayment(pr: PaymentRequest, preimage: ByteVector32, paymentType: String): Unit = {
280+
runAsync(postgres.addIncomingPayment(pr, preimage, paymentType))
281+
sqlite.addIncomingPayment(pr, preimage, paymentType)
282+
}
283+
284+
override def receiveIncomingPayment(paymentHash: ByteVector32, amount: MilliSatoshi, receivedAt: Long): Unit = {
285+
runAsync(postgres.receiveIncomingPayment(paymentHash, amount, receivedAt))
286+
sqlite.receiveIncomingPayment(paymentHash, amount, receivedAt)
287+
}
288+
289+
override def getIncomingPayment(paymentHash: ByteVector32): Option[IncomingPayment] = {
290+
runAsync(postgres.getIncomingPayment(paymentHash))
291+
sqlite.getIncomingPayment(paymentHash)
292+
}
293+
294+
override def listIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = {
295+
runAsync(postgres.listIncomingPayments(from, to))
296+
sqlite.listIncomingPayments(from, to)
297+
}
298+
299+
override def listPendingIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = {
300+
runAsync(postgres.listPendingIncomingPayments(from, to))
301+
sqlite.listPendingIncomingPayments(from, to)
302+
}
303+
304+
override def listExpiredIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = {
305+
runAsync(postgres.listExpiredIncomingPayments(from, to))
306+
sqlite.listExpiredIncomingPayments(from, to)
307+
}
308+
309+
override def listReceivedIncomingPayments(from: Long, to: Long): Seq[IncomingPayment] = {
310+
runAsync(postgres.listReceivedIncomingPayments(from, to))
311+
sqlite.listReceivedIncomingPayments(from, to)
312+
}
313+
314+
override def addOutgoingPayment(outgoingPayment: OutgoingPayment): Unit = {
315+
runAsync(postgres.addOutgoingPayment(outgoingPayment))
316+
sqlite.addOutgoingPayment(outgoingPayment)
317+
}
318+
319+
override def updateOutgoingPayment(paymentResult: PaymentSent): Unit = {
320+
runAsync(postgres.updateOutgoingPayment(paymentResult))
321+
sqlite.updateOutgoingPayment(paymentResult)
322+
}
323+
324+
override def updateOutgoingPayment(paymentResult: PaymentFailed): Unit = {
325+
runAsync(postgres.updateOutgoingPayment(paymentResult))
326+
sqlite.updateOutgoingPayment(paymentResult)
327+
}
328+
329+
override def getOutgoingPayment(id: UUID): Option[OutgoingPayment] = {
330+
runAsync(postgres.getOutgoingPayment(id))
331+
sqlite.getOutgoingPayment(id)
332+
}
333+
334+
override def listOutgoingPayments(parentId: UUID): Seq[OutgoingPayment] = {
335+
runAsync(postgres.listOutgoingPayments(parentId))
336+
sqlite.listOutgoingPayments(parentId)
337+
}
338+
339+
override def listOutgoingPayments(paymentHash: ByteVector32): Seq[OutgoingPayment] = {
340+
runAsync(postgres.listOutgoingPayments(paymentHash))
341+
sqlite.listOutgoingPayments(paymentHash)
342+
}
343+
344+
override def listOutgoingPayments(from: Long, to: Long): Seq[OutgoingPayment] = {
345+
runAsync(postgres.listOutgoingPayments(from, to))
346+
sqlite.listOutgoingPayments(from, to)
347+
}
348+
}
349+
350+
case class DualPendingCommandsDb(sqlite: SqlitePendingCommandsDb, postgres: PgPendingCommandsDb) extends PendingCommandsDb {
351+
352+
private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-pending-commands").build()))
353+
354+
override def addSettlementCommand(channelId: ByteVector32, cmd: HtlcSettlementCommand): Unit = {
355+
runAsync(postgres.addSettlementCommand(channelId, cmd))
356+
sqlite.addSettlementCommand(channelId, cmd)
357+
}
358+
359+
override def removeSettlementCommand(channelId: ByteVector32, htlcId: Long): Unit = {
360+
runAsync(postgres.removeSettlementCommand(channelId, htlcId))
361+
sqlite.removeSettlementCommand(channelId, htlcId)
362+
}
363+
364+
override def listSettlementCommands(channelId: ByteVector32): Seq[HtlcSettlementCommand] = {
365+
runAsync(postgres.listSettlementCommands(channelId))
366+
sqlite.listSettlementCommands(channelId)
367+
}
368+
369+
override def listSettlementCommands(): Seq[(ByteVector32, HtlcSettlementCommand)] = {
370+
runAsync(postgres.listSettlementCommands())
371+
sqlite.listSettlementCommands()
372+
}
373+
374+
override def close(): Unit = {
375+
runAsync(postgres.close())
376+
sqlite.close()
377+
}
378+
}

0 commit comments

Comments
 (0)