16
16
17
17
package fr .acinq .eclair .db
18
18
19
- import akka .actor .{Actor , ActorLogging , Props }
20
- import akka .dispatch .{BoundedMessageQueueSemantics , RequiresMessageQueue }
19
+ import akka .Done
20
+ import akka .actor .typed .Behavior
21
+ import akka .actor .typed .eventstream .EventStream
22
+ import akka .actor .typed .scaladsl .{ActorContext , Behaviors }
21
23
import fr .acinq .eclair .KamonExt
22
24
import fr .acinq .eclair .channel .ChannelPersisted
23
25
import fr .acinq .eclair .db .Databases .FileBackup
26
+ import fr .acinq .eclair .db .FileBackupHandler ._
24
27
import fr .acinq .eclair .db .Monitoring .Metrics
25
28
26
29
import java .io .File
27
30
import java .nio .file .{Files , StandardCopyOption }
31
+ import java .util .concurrent .Executors
32
+ import scala .concurrent .{ExecutionContext , Future }
33
+ import scala .concurrent .duration .FiniteDuration
28
34
import scala .sys .process .Process
29
35
import scala .util .{Failure , Success , Try }
30
36
31
37
32
38
/**
33
- * This actor will synchronously make a backup of the database it was initialized with whenever it receives
34
- * a ChannelPersisted event.
35
- * To avoid piling up messages and entering an endless backup loop, it is supposed to be used with a bounded mailbox
36
- * with a single item:
37
- *
38
- * backup-mailbox {
39
- * mailbox-type = "akka.dispatch.NonBlockingBoundedMailbox"
40
- * mailbox-capacity = 1
41
- * }
42
- *
43
- * Messages that cannot be processed will be sent to dead letters
44
- *
45
- * NB: Constructor is private so users will have to use BackupHandler.props() which always specific a custom mailbox.
46
- *
47
- * @param databases database to backup
48
- * @param backupFile backup file
49
- * @param backupScript_opt (optional) script to execute after the backup completes
39
+ * This actor will make a backup of the database it was initialized with at a scheduled interval. It will only
40
+ * perform a backup if a ChannelPersisted event was received since the previous backup.
50
41
*/
51
- class FileBackupHandler private ( databases : FileBackup , backupFile : File , backupScript_opt : Option [ String ]) extends Actor with RequiresMessageQueue [ BoundedMessageQueueSemantics ] with ActorLogging {
42
+ object FileBackupHandler {
52
43
53
- // we listen to ChannelPersisted events, which will trigger a backup
54
- context.system.eventStream.subscribe(self, classOf [ChannelPersisted ])
44
+ // @formatter:off
55
45
56
- def receive : Receive = {
57
- case persisted : ChannelPersisted =>
58
- KamonExt .time(Metrics .FileBackupDuration .withoutTags()) {
59
- val tmpFile = new File (backupFile.getAbsolutePath.concat(" .tmp" ))
60
- databases.backup(tmpFile)
46
+ /**
47
+ * @param targetFile backup file
48
+ * @param script_opt (optional) script to execute after the backup completes
49
+ * @param interval interval between two backups
50
+ */
51
+ case class FileBackupParams (interval : FiniteDuration ,
52
+ targetFile : File ,
53
+ script_opt : Option [String ])
61
54
62
- // this will throw an exception if it fails, which is possible if the backup file is not on the same filesystem
63
- // as the temporary file
64
- Files .move(tmpFile.toPath, backupFile.toPath, StandardCopyOption .REPLACE_EXISTING , StandardCopyOption .ATOMIC_MOVE )
55
+ sealed trait Command
56
+ case class WrappedChannelPersisted (wrapped : ChannelPersisted ) extends Command
57
+ private case object TickBackup extends Command
58
+ private case class BackupResult (result : Try [Done ]) extends Command
65
59
66
- // publish a notification that we have updated our backup
67
- context.system.eventStream.publish(BackupCompleted )
68
- Metrics .FileBackupCompleted .withoutTags().increment()
60
+ sealed trait BackupEvent
61
+ // this notification is sent when we have completed our backup process (our backup file is ready to be used)
62
+ case object BackupCompleted extends BackupEvent
63
+ // @formatter:on
64
+
65
+ // the backup task will run in this thread pool
66
+ private val ec = ExecutionContext .fromExecutor(Executors .newSingleThreadExecutor())
67
+
68
+ def apply (databases : FileBackup , backupParams : FileBackupParams ): Behavior [Command ] =
69
+ Behaviors .setup { context =>
70
+ // we listen to ChannelPersisted events, which will trigger a backup
71
+ context.system.eventStream ! EventStream .Subscribe (context.messageAdapter[ChannelPersisted ](WrappedChannelPersisted ))
72
+ Behaviors .withTimers { timers =>
73
+ timers.startTimerAtFixedRate(TickBackup , backupParams.interval)
74
+ new FileBackupHandler (databases, backupParams, context).waiting(willBackupAtNextTick = false )
69
75
}
76
+ }
77
+ }
78
+
79
+ class FileBackupHandler private (databases : FileBackup ,
80
+ backupParams : FileBackupParams ,
81
+ context : ActorContext [Command ]) {
70
82
71
- backupScript_opt.foreach(backupScript => {
72
- Try {
73
- // run the script in the current thread and wait until it terminates
74
- Process (backupScript).!
75
- } match {
76
- case Success (exitCode) => log.debug(s " backup notify script $backupScript returned $exitCode" )
77
- case Failure (cause) => log.warning(s " cannot start backup notify script $backupScript: $cause" )
83
+ def waiting (willBackupAtNextTick : Boolean ): Behavior [Command ] =
84
+ Behaviors .receiveMessagePartial {
85
+ case _ : WrappedChannelPersisted =>
86
+ context.log.debug(" will perform backup at next tick" )
87
+ waiting(willBackupAtNextTick = true )
88
+ case TickBackup => if (willBackupAtNextTick) {
89
+ context.log.debug(" performing backup" )
90
+ context.pipeToSelf(doBackup())(BackupResult )
91
+ backuping(willBackupAtNextTick = false )
92
+ } else {
93
+ Behaviors .same
94
+ }
95
+ }
96
+
97
+ def backuping (willBackupAtNextTick : Boolean ): Behavior [Command ] =
98
+ Behaviors .receiveMessagePartial {
99
+ case _ : WrappedChannelPersisted =>
100
+ context.log.debug(" will perform backup at next tick" )
101
+ backuping(willBackupAtNextTick = true )
102
+ case BackupResult (res) =>
103
+ res match {
104
+ case Success (Done ) => context.log.debug(" backup succeeded" )
105
+ case Failure (cause) => context.log.warn(s " backup failed: $cause" )
78
106
}
79
- })
80
- }
81
- }
107
+ waiting(willBackupAtNextTick)
108
+ }
82
109
83
- sealed trait BackupEvent
110
+ private def doBackup (): Future [Done ] = Future {
111
+ KamonExt .time(Metrics .FileBackupDuration .withoutTags()) {
112
+ val tmpFile = new File (backupParams.targetFile.getAbsolutePath.concat(" .tmp" ))
113
+ databases.backup(tmpFile)
84
114
85
- // this notification is sent when we have completed our backup process (our backup file is ready to be used)
86
- case object BackupCompleted extends BackupEvent
115
+ // this will throw an exception if it fails, which is possible if the backup file is not on the same filesystem
116
+ // as the temporary file
117
+ Files .move(tmpFile.toPath, backupParams.targetFile.toPath, StandardCopyOption .REPLACE_EXISTING , StandardCopyOption .ATOMIC_MOVE )
87
118
88
- object FileBackupHandler {
89
- // using this method is the only way to create a BackupHandler actor
90
- // we make sure that it uses a custom bounded mailbox, and a custom pinned dispatcher (i.e our actor will have its own thread pool with 1 single thread)
91
- def props (databases : FileBackup , backupFile : File , backupScript_opt : Option [String ]) =
92
- Props (new FileBackupHandler (databases, backupFile, backupScript_opt))
93
- .withMailbox(" eclair.backup-mailbox" )
94
- .withDispatcher(" eclair.backup-dispatcher" )
95
- }
119
+ // publish a notification that we have updated our backup
120
+ context.system.eventStream ! EventStream .Publish (BackupCompleted )
121
+ Metrics .FileBackupCompleted .withoutTags().increment()
122
+ }
123
+
124
+ // run the script in the current thread and wait until it terminates
125
+ backupParams.script_opt.foreach(backupScript => Process (backupScript).! )
126
+
127
+ Done
128
+ }(ec)
129
+
130
+ }
0 commit comments