From f63ddc63eed821542cc02f28cf05f8fb8acfcd70 Mon Sep 17 00:00:00 2001 From: NamanJain8 Date: Fri, 3 Sep 2021 18:16:29 +0530 Subject: [PATCH] fix(lambda): fix race condition in lambda server spin up --- dgraph/cmd/alpha/run.go | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/dgraph/cmd/alpha/run.go b/dgraph/cmd/alpha/run.go index 847f66cd11c..a5ffe0d81db 100644 --- a/dgraph/cmd/alpha/run.go +++ b/dgraph/cmd/alpha/run.go @@ -34,6 +34,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "sync/atomic" "syscall" "time" @@ -487,11 +488,13 @@ func setupLambdaServer(closer *z.Closer) { } type lambda struct { + sync.RWMutex cmd *exec.Cmd active bool lastActive int64 - health string - port int + + health string + port int } lambdas := make([]*lambda, 0, num) @@ -519,9 +522,11 @@ func setupLambdaServer(closer *z.Closer) { cmd.Env = append(cmd.Env, fmt.Sprintf("DGRAPH_URL="+dgraphUrl)) cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr + lambdas[i].Lock() lambdas[i].cmd = cmd lambdas[i].lastActive = time.Now().UnixNano() lambdas[i].active = true + lambdas[i].Unlock() glog.Infof("Running node command: %+v\n", cmd) if err := cmd.Run(); err != nil { glog.Errorf("Lambda server at port: %d stopped with error: %v", @@ -544,11 +549,15 @@ func setupLambdaServer(closer *z.Closer) { case <-closer.HasBeenClosed(): return case <-ticker.C: - timestamp := time.Now().UnixNano() - for _, l := range lambdas { + healthCheck := func(l *lambda) { + l.Lock() + defer l.Unlock() + if !l.active { - continue + return } + + timestamp := time.Now().UnixNano() resp, err := client.Get(l.health) if err != nil || resp.StatusCode != 200 { if time.Duration(timestamp-l.lastActive) > x.Config.Lambda.RestartAfter { @@ -556,11 +565,16 @@ func setupLambdaServer(closer *z.Closer) { " Killed it with err: %v", l.port, l.cmd.Process.Kill()) l.active = false } - continue + return } + resp.Body.Close() l.lastActive = timestamp } + + for _, l := range lambdas { + healthCheck(l) + } } } }()