Skip to content

Commit d79d24a

Browse files
committed
split constructor New and NewWithCleanup, fix race conditions
1 parent f855d64 commit d79d24a

File tree

3 files changed

+81
-24
lines changed

3 files changed

+81
-24
lines changed

goproc.go

+37-22
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ type Cmd struct {
6969
OnStderr StringCallback // one line fetched from stderr
7070
OnRestart IntReturningCallback // this overwrites RestartDelayMs
7171
OnExit ParameterlessCallback // when max restart reached, or manually killed
72-
OnProcessCompleted IntCallback // when 1x process done, can be restarting depends on RestartCount and MaxCount
72+
OnProcessCompleted IntCallback // when 1x process done, return durationMs can be restarting depends on RestartCount and MaxCount
7373
OnStateChanged CmdStateCallback // triggered when stated changed
7474

7575
state CmdState
@@ -131,7 +131,7 @@ type Process struct {
131131
type Goproc struct {
132132
cmds []*Cmd
133133
procs []*Process
134-
lock sync.Mutex
134+
lock sync.RWMutex
135135
HasErrFunc func(err error, fmt string, args ...any) bool
136136
}
137137

@@ -158,7 +158,8 @@ func DiscardHasErr(err error, _ string, _ ...any) bool {
158158
return err != nil
159159
}
160160

161-
func New() *Goproc {
161+
// NewWithCleanup might cause stray goroutine if called too many times
162+
func NewWithCleanup() *Goproc {
162163
c := make(chan os.Signal)
163164
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
164165
res := &Goproc{
@@ -175,6 +176,15 @@ func New() *Goproc {
175176
return res
176177
}
177178

179+
// New must call Cleanup before exit or there will be stray
180+
func New() *Goproc {
181+
res := &Goproc{
182+
cmds: []*Cmd{},
183+
HasErrFunc: L.IsError,
184+
}
185+
return res
186+
}
187+
178188
// AddCommand add a new command to run, not yet started until Start called
179189
// returns command id
180190
func (g *Goproc) AddCommand(cmd *Cmd) CommandId {
@@ -220,13 +230,13 @@ func (g *Goproc) Signal(cmdId CommandId, signal os.Signal) error {
220230
// * stop them; signal=os.Kill
221231
err := proc.exe.Process.Kill()
222232
cmd.setState(Killed)
223-
if g.HasErrFunc(err, `error proc.exe.Process.Kill`) {
233+
if g.HasErrFunc(err, `error globalRunner.exe.Process.Kill`) {
224234
return err
225235
}
226236
} else {
227237
// * relay termination signals;
228238
err := proc.exe.Process.Signal(signal)
229-
if g.HasErrFunc(err, `error proc.exe.Process.Signal %d`, signal) {
239+
if g.HasErrFunc(err, `error globalRunner.exe.Process.Signal %d`, signal) {
230240
return err
231241
}
232242
}
@@ -236,11 +246,14 @@ func (g *Goproc) Signal(cmdId CommandId, signal os.Signal) error {
236246
// Start start certain command
237247
func (g *Goproc) Start(cmdId CommandId) error {
238248
idx := int(cmdId)
249+
g.lock.RLock()
239250
if idx >= len(g.cmds) || idx < 0 {
251+
g.lock.RUnlock()
240252
return fmt.Errorf(`invalid command index, should be zero to %d`, len(g.cmds)-1)
241253
}
242254

243255
cmd := g.cmds[idx]
256+
g.lock.RUnlock()
244257
cmd.strCache = `` // reset cache
245258

246259
prefix := S.IfEmpty(cmd.PrefixLabel, `CMD:`+I.ToStr(idx)) + `: `
@@ -253,7 +266,9 @@ func (g *Goproc) Start(cmdId CommandId) error {
253266

254267
for {
255268
// refill process
269+
g.lock.RLock()
256270
proc := g.procs[idx]
271+
g.lock.RUnlock()
257272
proc.exe = exec.Command(cmd.Program, cmd.Parameters...)
258273
proc.exe.Dir = cmd.WorkDir
259274
if cmd.InheritEnv {
@@ -264,17 +279,17 @@ func (g *Goproc) Start(cmdId CommandId) error {
264279

265280
// get output buffer and start
266281
stderr, err := proc.exe.StderrPipe()
267-
if g.HasErrFunc(err, prefix+`error proc.exe.StderrPipe %s`, cmd) {
282+
if g.HasErrFunc(err, prefix+`error globalRunner.exe.StderrPipe %s`, cmd) {
268283
return err
269284
}
270285
stdout, err := proc.exe.StdoutPipe()
271-
if g.HasErrFunc(err, prefix+`error proc.exe.StdoutPipe %s`, cmd) {
286+
if g.HasErrFunc(err, prefix+`error globalRunner.exe.StdoutPipe %s`, cmd) {
272287
return err
273288
}
274-
log.Printf(prefix + `starting: ` + cmd.String())
289+
log.Println(prefix + `starting: ` + cmd.String())
275290
start := time.Now()
276291
err = proc.exe.Start()
277-
if g.HasErrFunc(err, prefix+`error proc.exe.Start %s`, cmd) {
292+
if g.HasErrFunc(err, prefix+`error globalRunner.exe.Start %s`, cmd) {
278293
cmd.LastExecutionError = err
279294
if cmd.OnProcessCompleted != nil {
280295
durationMs := time.Since(start).Milliseconds()
@@ -346,12 +361,12 @@ func (g *Goproc) Start(cmdId CommandId) error {
346361

347362
// wait for exit
348363
err = proc.exe.Wait()
349-
if g.HasErrFunc(err, prefix+`error proc.exe.Wait %s`, cmd) {
364+
if g.HasErrFunc(err, prefix+`error globalRunner.exe.Wait %s`, cmd) {
350365
if cmd.state != Killed {
351366
cmd.setState(Crashed)
352367
}
353368
} else {
354-
log.Println("exited")
369+
log.Println(prefix + "exited")
355370
if cmd.state != Killed {
356371
cmd.setState(Exited)
357372
}
@@ -410,8 +425,8 @@ func (g *Goproc) Start(cmdId CommandId) error {
410425

411426
// StartAll start all that not yet started
412427
func (g *Goproc) StartAll() {
413-
g.lock.Lock()
414-
defer g.lock.Unlock()
428+
g.lock.RLock()
429+
defer g.lock.RUnlock()
415430
for idx, cmd := range g.cmds {
416431
if cmd.state == NotStarted {
417432
g.Start(CommandId(idx))
@@ -421,8 +436,8 @@ func (g *Goproc) StartAll() {
421436

422437
// StartAllParallel start all that not yet started in parallel
423438
func (g *Goproc) StartAllParallel() *sync.WaitGroup {
424-
g.lock.Lock()
425-
defer g.lock.Unlock()
439+
g.lock.RLock()
440+
defer g.lock.RUnlock()
426441
wg := &sync.WaitGroup{}
427442
for idx, cmd := range g.cmds {
428443
if cmd.state == NotStarted {
@@ -463,7 +478,6 @@ func (g *Goproc) CommandString(cmdId CommandId) string {
463478

464479
// Run1 execute one command and get stdout stderr output
465480
func Run1(cmd *Cmd) (string, string, error, int) {
466-
proc := New()
467481
onStdout := cmd.OnStdout
468482
onStderr := cmd.OnStderr
469483
stdoutBuff := bytes.Buffer{}
@@ -490,14 +504,15 @@ func Run1(cmd *Cmd) (string, string, error, int) {
490504
}
491505
return nil
492506
}
493-
proc.AddCommand(cmd)
494-
proc.StartAll()
507+
cmdId := globalRunner.AddCommand(cmd)
508+
globalRunner.Start(cmdId)
495509
return stdoutBuff.String(), stderrBuff.String(), cmd.LastExecutionError, cmd.LastExitCode
496510
}
497511

498-
// Run1 execute one command and get stdout stderr output
512+
var globalRunner = NewWithCleanup()
513+
514+
// RunLines execute one command and get stdout stderr output
499515
func RunLines(cmd *Cmd) ([]string, []string, error, int) {
500-
proc := New()
501516
onStdout := cmd.OnStdout
502517
onStderr := cmd.OnStderr
503518
stdoutBuff := []string{}
@@ -522,7 +537,7 @@ func RunLines(cmd *Cmd) ([]string, []string, error, int) {
522537
}
523538
return nil
524539
}
525-
proc.AddCommand(cmd)
526-
proc.StartAll()
540+
cmdId := globalRunner.AddCommand(cmd)
541+
globalRunner.Start(cmdId)
527542
return stdoutBuff, stderrBuff, cmd.LastExecutionError, cmd.LastExitCode
528543
}

parallel_test.go

+43-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
package goproc
22

33
import (
4+
"strconv"
5+
"sync/atomic"
46
"testing"
7+
"time"
8+
9+
"github.com/zeebo/assert"
510
)
611

712
func TestParallel(t *testing.T) {
8-
proc := New()
13+
proc := NewWithCleanup()
914
proc.AddCommand(&Cmd{
1015
Program: `echo`,
1116
Parameters: []string{`1`},
@@ -18,3 +23,40 @@ func TestParallel(t *testing.T) {
1823
})
1924
proc.StartAllParallel().Wait()
2025
}
26+
27+
func TestParallelMultiple(t *testing.T) {
28+
proc := NewWithCleanup()
29+
var total uint32
30+
firstCmdId := proc.AddCommand(&Cmd{
31+
Program: `echo`,
32+
Parameters: []string{`-1`},
33+
MaxRestart: 4,
34+
OnProcessCompleted: func(cmd *Cmd, durationMs int64) {
35+
atomic.AddUint32(&total, 1)
36+
time.Sleep(100 * time.Millisecond)
37+
},
38+
})
39+
wg := proc.StartAllParallel()
40+
const n = 10
41+
for z := range n {
42+
go func() {
43+
cmdId := proc.AddCommand(&Cmd{
44+
Program: `echo`,
45+
Parameters: []string{strconv.Itoa(z)},
46+
MaxRestart: 4,
47+
OnProcessCompleted: func(cmd *Cmd, durationMs int64) {
48+
atomic.AddUint32(&total, 1)
49+
},
50+
})
51+
proc.Start(cmdId)
52+
}()
53+
}
54+
time.Sleep(500 * time.Millisecond)
55+
wg.Wait()
56+
assert.Equal(t, uint32(5+n*5), total)
57+
58+
// call again once
59+
proc.Start(firstCmdId)
60+
assert.Equal(t, uint32(5+n*5+5), total)
61+
62+
}

single_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
)
99

1010
func TestNotExists(t *testing.T) {
11-
proc := New()
11+
proc := NewWithCleanup()
1212
proc.AddCommand(&Cmd{
1313
Program: `not_exists`,
1414
})

0 commit comments

Comments
 (0)