Skip to content

Commit fe5f922

Browse files
committed
Add replicate flags to pump Litestream manually
For some special setups it is sometimes useful to run Litestream manually instead of letting it replicate in the background. This commit implements the following flags for replicate: * -once for doing synchronous replication and then exit * -force-snapshot to force a snapshot during -once * -enforce-retention to enforce retention rules during -once Because running once does not respect the snapshot interval the caller is expected to use -force-snapshot and -enforce-retention regularly to ensure the replication targets stay clean. For this to work correctly with a live database it needs to be opened with auto checkpointing disabled and SQLITE_FCNTL_PERSIST_WAL. Other uses include only using -force-snapshot to create regular backups of the database instead of live replication. Fixes benbjohnson#486
1 parent 749bc0d commit fe5f922

File tree

3 files changed

+104
-14
lines changed

3 files changed

+104
-14
lines changed

cmd/litestream/main.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -94,19 +94,19 @@ func (m *Main) Run(ctx context.Context, args []string) (err error) {
9494

9595
// Wait for signal to stop program.
9696
select {
97-
case err = <-c.execCh:
97+
case err = <-c.runCh:
9898
fmt.Println("subprocess exited, litestream shutting down")
9999
case sig := <-signalCh:
100100
fmt.Println("signal received, litestream shutting down")
101101

102-
if c.cmd != nil {
102+
if c.runSignal != nil {
103103
fmt.Println("sending signal to exec process")
104-
if err := c.cmd.Process.Signal(sig); err != nil {
104+
if err := c.runSignal(sig); err != nil {
105105
return fmt.Errorf("cannot signal exec process: %w", err)
106106
}
107107

108108
fmt.Println("waiting for exec process to close")
109-
if err := <-c.execCh; err != nil && !strings.HasPrefix(err.Error(), "signal:") {
109+
if err := <-c.runCh; err != nil && !strings.HasPrefix(err.Error(), "signal:") {
110110
return fmt.Errorf("cannot wait for exec process: %w", err)
111111
}
112112
}

cmd/litestream/replicate.go

+99-9
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@ import (
2323

2424
// ReplicateCommand represents a command that continuously replicates SQLite databases.
2525
type ReplicateCommand struct {
26-
cmd *exec.Cmd // subcommand
27-
execCh chan error // subcommand error channel
26+
runSignal func(os.Signal) error // run cancel signaler
27+
runCh chan error // run error channel
28+
29+
once bool // replicate once and exit
30+
forceSnapshot bool // force snapshot to all replicas
31+
enforceRetention bool // enforce retention of old snapshots
2832

2933
Config Config
3034

@@ -34,14 +38,17 @@ type ReplicateCommand struct {
3438

3539
func NewReplicateCommand() *ReplicateCommand {
3640
return &ReplicateCommand{
37-
execCh: make(chan error),
41+
runCh: make(chan error),
3842
}
3943
}
4044

4145
// ParseFlags parses the CLI flags and loads the configuration file.
4246
func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err error) {
4347
fs := flag.NewFlagSet("litestream-replicate", flag.ContinueOnError)
4448
execFlag := fs.String("exec", "", "execute subcommand")
49+
onceFlag := fs.Bool("once", false, "replicate once and exit")
50+
forceSnapshotFlag := fs.Bool("force-snapshot", false, "force snapshot when replicating once")
51+
enforceRetentionFlag := fs.Bool("enforce-retention", false, "enforce retention of old snapshots")
4552
tracePath := fs.String("trace", "", "trace path")
4653
configPath, noExpandEnv := registerConfigFlag(fs)
4754
fs.Usage = c.Usage
@@ -90,6 +97,22 @@ func (c *ReplicateCommand) ParseFlags(ctx context.Context, args []string) (err e
9097
litestream.Tracef = log.New(f, "", log.LstdFlags|log.Lmicroseconds|log.LUTC|log.Lshortfile).Printf
9198
}
9299

100+
// Once is mutually exclusive with exec
101+
c.once = *onceFlag
102+
if c.once && c.Config.Exec != "" {
103+
return fmt.Errorf("cannot specify -once flag with exec")
104+
}
105+
106+
c.forceSnapshot = *forceSnapshotFlag
107+
if !c.once && c.forceSnapshot {
108+
return fmt.Errorf("cannot specify -force-snapshot flag without -once")
109+
}
110+
111+
c.enforceRetention = *enforceRetentionFlag
112+
if !c.once && c.enforceRetention {
113+
return fmt.Errorf("cannot specify -enforce-retention flag without -once")
114+
}
115+
93116
return nil
94117
}
95118

@@ -109,6 +132,14 @@ func (c *ReplicateCommand) Run() (err error) {
109132
return err
110133
}
111134

135+
// Disable monitors if we're running once.
136+
if c.once {
137+
db.MonitorInterval = 0
138+
for _, r := range db.Replicas {
139+
r.MonitorEnabled = false
140+
}
141+
}
142+
112143
// Open database & attach to program.
113144
if err := db.Open(); err != nil {
114145
return err
@@ -162,14 +193,64 @@ func (c *ReplicateCommand) Run() (err error) {
162193
return fmt.Errorf("cannot parse exec command: %w", err)
163194
}
164195

165-
c.cmd = exec.Command(execArgs[0], execArgs[1:]...)
166-
c.cmd.Env = os.Environ()
167-
c.cmd.Stdout = os.Stdout
168-
c.cmd.Stderr = os.Stderr
169-
if err := c.cmd.Start(); err != nil {
196+
cmd := exec.Command(execArgs[0], execArgs[1:]...)
197+
cmd.Env = os.Environ()
198+
cmd.Stdout = os.Stdout
199+
cmd.Stderr = os.Stderr
200+
if err := cmd.Start(); err != nil {
170201
return fmt.Errorf("cannot start exec command: %w", err)
171202
}
172-
go func() { c.execCh <- c.cmd.Wait() }()
203+
c.runSignal = cmd.Process.Signal
204+
go func() { c.runCh <- cmd.Wait() }()
205+
} else if c.once {
206+
// Run replication once for each replica with cancel.
207+
ctx, cancel := context.WithCancel(context.Background())
208+
c.runSignal = func(s os.Signal) error {
209+
cancel()
210+
return nil
211+
}
212+
213+
go func() {
214+
var err error
215+
216+
defer func() {
217+
cancel()
218+
c.runCh <- err
219+
}()
220+
221+
for _, db := range c.DBs {
222+
if c.forceSnapshot {
223+
// Force next index with RESTART checkpoint.
224+
db.MaxCheckpointPageN = 1
225+
}
226+
227+
if err = db.Sync(ctx); err != nil {
228+
return
229+
}
230+
231+
// Prevent checkpointing on Close()
232+
db.MinCheckpointPageN = 0
233+
db.MaxCheckpointPageN = 0
234+
db.CheckpointInterval = 0
235+
236+
for _, r := range db.Replicas {
237+
if c.forceSnapshot {
238+
_, err = r.Snapshot(ctx)
239+
} else {
240+
err = r.Sync(ctx)
241+
}
242+
if err != nil {
243+
return
244+
}
245+
246+
if c.enforceRetention {
247+
if err = r.EnforceRetention(ctx); err != nil {
248+
return
249+
}
250+
}
251+
}
252+
}
253+
}()
173254
}
174255

175256
return nil
@@ -212,6 +293,15 @@ Arguments:
212293
Executes a subcommand. Litestream will exit when the child
213294
process exits. Useful for simple process management.
214295
296+
-once
297+
Execute replication once and exit.
298+
299+
-force-snapshot
300+
When replicating once, force taking a snapshot to all replicas.
301+
302+
-enforce-retention
303+
When replicating once, enforce rentention of old snapshots.
304+
215305
-no-expand-env
216306
Disables environment variable expansion in configuration file.
217307

db.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -761,7 +761,7 @@ func (db *DB) Sync(ctx context.Context) (err error) {
761761
checkpointMode := CheckpointModePassive
762762
if db.MaxCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MaxCheckpointPageN) {
763763
checkpoint, checkpointMode = true, CheckpointModeRestart
764-
} else if newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) {
764+
} else if db.MinCheckpointPageN > 0 && newWALSize >= calcWALSize(db.pageSize, db.MinCheckpointPageN) {
765765
checkpoint = true
766766
} else if db.CheckpointInterval > 0 && !info.dbModTime.IsZero() && time.Since(info.dbModTime) > db.CheckpointInterval && newWALSize > calcWALSize(db.pageSize, 1) {
767767
checkpoint = true

0 commit comments

Comments
 (0)