Skip to content

Commit

Permalink
fix: ensure monitors work
Browse files Browse the repository at this point in the history
  • Loading branch information
iPromKnight committed Jan 6, 2025
1 parent 8b2b433 commit b05170c
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 7 deletions.
3 changes: 1 addition & 2 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ services:
security_opt:
- apparmor:unconfined
volumes:
- ./config.yaml:/data/config.yaml
- ./rclone.conf:/data/rclone.conf
- ./data:/data
- ./local-dev/mnt/rclone:/mnt/rclone:shared
- ./local-dev/caches/rclone:/caches/rclone
environment:
Expand Down
3 changes: 1 addition & 2 deletions config.yaml → data/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ mounts:
- backendName: "AllDebrid"
# This is the path to the mountpoint on the host
mountPoint: "/mnt/rclone/alldebrid"
# These will be fully converted to vfsOpt and mountOpt payloads for the rclone mount command through RC. You can use your regular rclone env vars.
# They override the shared options in the environment section of the compose / running container for this specific mount.
# These override the shared options in the environment section of the compose / running container for this specific mount.
environment:
RCLONE_BWLIMIT: off
RCLONE_READ_ONLY: true
Expand Down
File renamed without changes.
14 changes: 14 additions & 0 deletions src/internal/mount_manager/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,17 @@ func removeStaleMounts(conf *config.Config, logger zerolog.Logger) {
}
}
}

func ensureExists(mountPoint string, logger zerolog.Logger) {
if _, err := os.Stat(mountPoint); os.IsNotExist(err) {
logger.Info().Str(constants.LogMountPoint, mountPoint).Msg("Creating mount point...")
err := os.MkdirAll(mountPoint, 0777)
if err != nil {
logger.Error().Err(err).Str(constants.LogMountPoint, mountPoint).
Msg("Failed to create mount point")
} else {
logger.Info().Str(constants.LogMountPoint, mountPoint).
Msg("Mount point created successfully.")
}
}
}
23 changes: 21 additions & 2 deletions src/internal/mount_manager/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@ func MonitorMountProcesses(logger zerolog.Logger) {
shouldMonitorProcesses = true

for {
logger.Debug().Msg("Checking mount processes...")
if !shouldMonitorProcesses {
logger.Info().Msg("Stopping rclone mount process monitor")
break
}
tracker.Range(func(key, value interface{}) bool {
if !shouldMonitorProcesses {
return false
}

mountProcess := value.(*MountProcess)

if time.Since(mountProcess.StartedAt) < mountProcess.GracePeriod {
Expand All @@ -38,14 +43,26 @@ func MonitorMountProcesses(logger zerolog.Logger) {
logger.Warn().Str(constants.LogBackend, mountProcess.BackendName).
Msgf("Process (PID: %d) died. Restarting...", mountProcess.PID)

if !shouldMonitorProcesses {
return false
}

UnmountEndpoint(mountProcess, logger)

newServe := &MountProcess{
if !shouldMonitorProcesses {
return false
}

newMount := &MountProcess{
MountPoint: mountProcess.MountPoint,
RcloneProcess: mountProcess.RcloneProcess,
}

newProcess := StartMountWithRetries(newServe, logger)
newProcess := StartMountWithRetries(newMount, logger)

if !shouldMonitorProcesses {
return false
}

if newProcess != nil {
tracker.Track(newProcess.BackendName, newProcess)
Expand All @@ -56,6 +73,8 @@ func MonitorMountProcesses(logger zerolog.Logger) {
Msg("Failed to restart mount process")
}
}

logger.Debug().Str("MountPoint", mountProcess.MountPoint).Msg("Mount is mounted fine. Nothing to do.")
return true
})
time.Sleep(10 * time.Second)
Expand Down
17 changes: 16 additions & 1 deletion src/internal/mount_manager/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func InitializeMountEndpoints(conf *config.Config, logger zerolog.Logger, proces
func StartMountWithRetries(instance *MountProcess, logger zerolog.Logger) *MountProcess {
retries := 0
for retries < 3 {
ensureExists(instance.MountPoint, logger)
cmd := createMountCommand(instance)
instance.Command = cmd
err := cmd.Start()
Expand All @@ -46,6 +47,18 @@ func StartMountWithRetries(instance *MountProcess, logger zerolog.Logger) *Mount
instance.StartedAt = time.Now()
instance.GracePeriod = 10 * time.Second
tracker.Track(instance.BackendName, instance)
go func() {
err := cmd.Wait()
if err != nil {
logger.Warn().AnErr(constants.LogError, err).
Str(constants.LogBackend, instance.BackendName).
Msg("Mount process exited with error.")
} else {
logger.Info().
Str(constants.LogBackend, instance.BackendName).
Msg("Mount process exited normally.")
}
}()
return instance
}
logger.Warn().AnErr(constants.LogError, err).Msgf("Mount failed. Retrying %d/3...", retries+1)
Expand All @@ -58,6 +71,7 @@ func StartMountWithRetries(instance *MountProcess, logger zerolog.Logger) *Mount

func StopMount(instance *MountProcess, logger zerolog.Logger) {
logger.Info().Str(constants.LogBackend, instance.BackendName).Msg("Stopping mount process...")
UnmountEndpoint(instance, logger)
if err := instance.Command.Process.Kill(); err == nil {
tracker.Untrack(instance.BackendName)
logger.Info().Int(constants.LogPid, instance.PID).Str(constants.LogBackend, instance.BackendName).Msg("Mount process stopped")
Expand All @@ -67,6 +81,7 @@ func StopMount(instance *MountProcess, logger zerolog.Logger) {
}

func Cleanup(config *config.Config, logger zerolog.Logger) {
shouldMonitorProcesses = false
logger.Info().Msg("Cleaning up all rclone mount processes")
tracker.Range(func(key, value interface{}) bool {
instance := value.(*MountProcess)
Expand All @@ -82,8 +97,8 @@ func ReconcileMounts(conf *config.Config, logger zerolog.Logger, processLock *sy

logger.Info().Msg("Reconciling mounts...")

setupMountsFromConfig(conf, logger)
removeStaleMounts(conf, logger)
setupMountsFromConfig(conf, logger)
}

func UnmountEndpoint(mount *MountProcess, logger zerolog.Logger) {
Expand Down
14 changes: 14 additions & 0 deletions src/internal/serve_manager/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@ func MonitorServeProcesses(logger zerolog.Logger) {
shouldMonitorProcesses = true

for {
logger.Debug().Msg("Checking serve processes...")
if !shouldMonitorProcesses {
logger.Info().Msg("Stopping rclone serve process monitor")
break
}
tracker.Range(func(key, value interface{}) bool {
serveProcess := value.(*ServeProcess)

if !shouldMonitorProcesses {
return false
}

if time.Since(serveProcess.StartedAt) < serveProcess.GracePeriod {
logger.Debug().Int(constants.LogPid, serveProcess.PID).Msg("Skipping process check (within grace period)")
return true
Expand All @@ -38,6 +43,10 @@ func MonitorServeProcesses(logger zerolog.Logger) {
logger.Warn().Str(constants.LogBackend, serveProcess.BackendName).
Msgf("Process (PID: %d) died. Restarting...", serveProcess.PID)

if !shouldMonitorProcesses {
return false
}

newServe := &ServeProcess{
Protocol: serveProcess.Protocol,
Addr: serveProcess.Addr,
Expand All @@ -46,6 +55,10 @@ func MonitorServeProcesses(logger zerolog.Logger) {

newProcess := StartServeWithRetries(newServe, logger)

if !shouldMonitorProcesses {
return false
}

if newProcess != nil {
tracker.Track(newProcess.BackendName, newProcess)
logger.Info().Str(constants.LogBackend, serveProcess.BackendName).
Expand All @@ -55,6 +68,7 @@ func MonitorServeProcesses(logger zerolog.Logger) {
Msg("Failed to restart serve process")
}
}
logger.Debug().Str("Backend", serveProcess.BackendName).Msg("Serve is fine. Nothing to do.")
return true
})
time.Sleep(10 * time.Second)
Expand Down
13 changes: 13 additions & 0 deletions src/internal/serve_manager/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,18 @@ func StartServeWithRetries(instance *ServeProcess, logger zerolog.Logger) *Serve
instance.StartedAt = time.Now()
instance.GracePeriod = 10 * time.Second
tracker.Track(instance.BackendName, instance)
go func() {
err := cmd.Wait()
if err != nil {
logger.Warn().AnErr(constants.LogError, err).
Str(constants.LogBackend, instance.BackendName).
Msg("Serve process exited with error.")
} else {
logger.Info().
Str(constants.LogBackend, instance.BackendName).
Msg("Serve process exited normally.")
}
}()
return instance
}
logger.Warn().AnErr(constants.LogError, err).Msgf("Serve failed. Retrying %d/3...", retries+1)
Expand All @@ -69,6 +81,7 @@ func StopServe(instance *ServeProcess, logger zerolog.Logger) {
}

func Cleanup(logger zerolog.Logger) {
shouldMonitorProcesses = false
logger.Info().Msg("Cleaning up all rclone serve processes")
tracker.Range(func(key, value interface{}) bool {
instance := value.(*ServeProcess)
Expand Down

0 comments on commit b05170c

Please sign in to comment.