Skip to content

Commit

Permalink
fix(lambda): fix race condition in lambda server spin up
Browse files Browse the repository at this point in the history
  • Loading branch information
NamanJain8 committed Sep 3, 2021
1 parent a515d0d commit 8f17dda
Showing 1 changed file with 20 additions and 6 deletions.
26 changes: 20 additions & 6 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
Expand Down Expand Up @@ -487,11 +488,13 @@ func setupLambdaServer(closer *z.Closer) {
}

type lambda struct {
sync.Mutex
cmd *exec.Cmd
active bool
lastActive int64
health string
port int

health string
port int
}

lambdas := make([]*lambda, 0, num)
Expand Down Expand Up @@ -519,9 +522,11 @@ func setupLambdaServer(closer *z.Closer) {
cmd.Env = append(cmd.Env, fmt.Sprintf("DGRAPH_URL="+dgraphUrl))
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
lambdas[i].Lock()
lambdas[i].cmd = cmd
lambdas[i].lastActive = time.Now().UnixNano()
lambdas[i].active = true
lambdas[i].Unlock()
glog.Infof("Running node command: %+v\n", cmd)
if err := cmd.Run(); err != nil {
glog.Errorf("Lambda server at port: %d stopped with error: %v",
Expand All @@ -544,23 +549,32 @@ func setupLambdaServer(closer *z.Closer) {
case <-closer.HasBeenClosed():
return
case <-ticker.C:
timestamp := time.Now().UnixNano()
for _, l := range lambdas {
healthCheck := func(l *lambda) {
l.Lock()
defer l.Unlock()

if !l.active {
continue
return
}

timestamp := time.Now().UnixNano()
resp, err := client.Get(l.health)
if err != nil || resp.StatusCode != 200 {
if time.Duration(timestamp-l.lastActive) > x.Config.Lambda.RestartAfter {
glog.Warningf("Lambda Server at port: %d not responding."+
" Killed it with err: %v", l.port, l.cmd.Process.Kill())
l.active = false
}
continue
return
}

resp.Body.Close()
l.lastActive = timestamp
}

for _, l := range lambdas {
healthCheck(l)
}
}
}
}()
Expand Down

0 comments on commit 8f17dda

Please sign in to comment.