@@ -3,7 +3,10 @@ package controller
3
3
import (
4
4
"fmt"
5
5
"github.com/refs/pman/pkg/config"
6
- "io/ioutil"
6
+ "github.com/refs/pman/pkg/process"
7
+ "github.com/refs/pman/pkg/storage"
8
+ "github.com/refs/pman/pkg/watcher"
9
+ "github.com/rs/zerolog"
7
10
"os"
8
11
"os/exec"
9
12
"sort"
@@ -12,25 +15,25 @@ import (
12
15
"sync"
13
16
"time"
14
17
15
- "github.com/refs/pman/pkg/log"
16
- "github.com/refs/pman/pkg/process"
17
- "github.com/refs/pman/pkg/watcher"
18
- "github.com/rs/zerolog"
19
-
20
18
"github.com/olekukonko/tablewriter"
21
19
)
22
20
23
- // Controller writes the current managed processes onto a file, or any ReadWrite .
21
+ // Controller supervises processes.
24
22
type Controller struct {
25
23
m * sync.RWMutex
26
24
options Options
27
25
log zerolog.Logger
28
26
Config * config.Config
27
+
28
+ Store * storage.Map
29
+
29
30
// Bin is the OCIS single binary name.
30
31
Bin string
32
+
31
33
// BinPath is the OCIS single binary path withing the host machine.
32
34
// The Controller needs to know the binary location in order to spawn new extensions.
33
35
BinPath string
36
+
34
37
// Terminated facilitates communication from Watcher <-> Controller. Writes to this
35
38
// channel WILL always attempt to restart the crashed process.
36
39
Terminated chan process.ProcEntry
@@ -49,14 +52,14 @@ func NewController(o ...Option) Controller {
49
52
}
50
53
51
54
c := Controller {
52
- m : & sync.RWMutex {},
53
- options : * opts ,
54
- log : log .NewLogger (
55
- log .WithPretty (true ),
56
- ),
57
- Config : opts .Config ,
55
+ m : & sync.RWMutex {},
56
+ options : * opts ,
57
+ log : * opts .Log ,
58
58
Bin : "ocis" ,
59
59
Terminated : make (chan process.ProcEntry ),
60
+ Store : storage .NewMapStorage (),
61
+
62
+ Config : opts .Config ,
60
63
}
61
64
62
65
if opts .Bin != "" {
@@ -69,45 +72,33 @@ func NewController(o ...Option) Controller {
69
72
c .log .Debug ().Msg ("OCIS binary not present in PATH, using Args[0]" )
70
73
path = os .Args [0 ]
71
74
}
72
-
73
75
c .BinPath = path
74
-
75
- if _ , err := os .Stat (opts .Config .File ); err != nil {
76
- c .log .Debug ().Str ("package" , "controller" ).Msgf ("setting up db" )
77
- ioutil .WriteFile (opts .Config .File , []byte ("{}" ), 0644 )
78
- }
79
-
80
76
return c
81
77
}
82
78
83
79
// Start and watches a process.
84
80
func (c * Controller ) Start (pe process.ProcEntry ) error {
85
- var err error
86
- var pid int
87
-
88
- if pid , err = c .storedPID (pe .Extension ); pid != 0 {
81
+ if pid := c .Store .Load (pe .Extension ); pid != 0 {
89
82
c .log .Debug ().Msg (fmt .Sprintf ("extension already running: %s" , pe .Extension ))
90
83
return nil
91
84
}
92
- if err != nil {
93
- return err
94
- }
95
85
96
86
w := watcher .NewWatcher ()
97
87
if err := pe .Start (c .BinPath ); err != nil {
98
88
return err
99
89
}
100
90
101
- if err := c .write (pe ); err != nil {
91
+ // store the spawned child process PID.
92
+ if err := c .Store .Store (pe ); err != nil {
102
93
return err
103
94
}
95
+
104
96
w .Follow (pe , c .Terminated , c .options .Config .KeepAlive )
105
97
106
98
once .Do (func () {
107
99
j := janitor {
108
- c .m ,
109
- c .Config .File ,
110
100
time .Second ,
101
+ c .Store ,
111
102
}
112
103
113
104
go j .run ()
@@ -117,40 +108,29 @@ func (c *Controller) Start(pe process.ProcEntry) error {
117
108
}
118
109
119
110
// Kill a managed process.
120
- // TODO(refs) this interface MUST also work with PIDs.
121
111
// Should a process managed by the runtime be allowed to be killed if the runtime is configured not to?
122
- func (c * Controller ) Kill (ext * string ) error {
123
- pid , err := c . storedPID ( * ext )
124
- if err != nil {
125
- return err
126
- }
112
+ func (c * Controller ) Kill (pe process. ProcEntry ) error {
113
+ // load stored PID
114
+ pid := c . Store . Load ( pe . Extension )
115
+
116
+ // find process in host by PID
127
117
p , err := os .FindProcess (pid )
128
118
if err != nil {
129
119
return err
130
120
}
131
121
132
- if err := c .delete ( * ext ); err != nil {
122
+ if err := c .Store . Delete ( pe ); err != nil {
133
123
return err
134
124
}
135
- c .log .Info ().Str ("package" , "watcher" ).Msgf ("terminating %v" , * ext )
125
+ c .log .Info ().Str ("package" , "watcher" ).Msgf ("terminating %v" , pe .Extension )
126
+
127
+ // terminate child process
136
128
return p .Kill ()
137
129
}
138
130
139
131
// Shutdown a running runtime.
140
132
func (c * Controller ) Shutdown (ch chan struct {}) error {
141
- // We cannot be sure when was the last write before a shutdown routine begins without a plan. "Plan" in the sense of
142
- // an argument with every extension that must be started. This way we could block access to the io.Writer the "db"
143
- // uses, and either halt and prevent main from forking more children, or let them all run and once the reader gets
144
- // freed, start shutting them all down, a.k.a reverse the process. For the time being a simple Sleep would ensure
145
- // that all children are spawned and the last writer has been executed. Alternatively proper synchronization can
146
- // be ensured with the combination of a set of extensions that must run and a wait group.
147
- time .Sleep (1 * time .Second )
148
-
149
- entries , err := loadDB (c .Config .File )
150
- if err != nil {
151
- return err
152
- }
153
-
133
+ entries := c .Store .LoadAll ()
154
134
for cmd , pid := range entries {
155
135
c .log .Info ().Str ("package" , "watcher" ).Msgf ("gracefully terminating %v" , cmd )
156
136
p , _ := os .FindProcess (pid )
@@ -159,10 +139,6 @@ func (c *Controller) Shutdown(ch chan struct{}) error {
159
139
}
160
140
}
161
141
162
- if err := c .Reset (); err != nil {
163
- return err
164
- }
165
-
166
142
ch <- struct {}{}
167
143
return nil
168
144
}
@@ -173,12 +149,7 @@ func (c *Controller) List() string {
173
149
table := tablewriter .NewWriter (tableString )
174
150
table .SetHeader ([]string {"Extension" , "PID" })
175
151
176
- c .m .Lock ()
177
- entries , err := loadDB (c .Config .File )
178
- if err != nil {
179
- c .log .Err (err ).Msg (fmt .Sprintf ("error loading file: %s" , c .Config .File ))
180
- }
181
- c .m .Unlock ()
152
+ entries := c .Store .LoadAll ()
182
153
183
154
keys := make ([]string , 0 , len (entries ))
184
155
for k := range entries {
@@ -194,10 +165,3 @@ func (c *Controller) List() string {
194
165
table .Render ()
195
166
return tableString .String ()
196
167
}
197
-
198
- // Reset clears the db file.
199
- func (c * Controller ) Reset () error {
200
- c .m .RLock ()
201
- defer c .m .RUnlock ()
202
- return os .Remove (c .Config .File )
203
- }
0 commit comments