Skip to content

Commit

Permalink
[#24789][Go SDK] Fix Minor race conditions (#24808)
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck authored Dec 29, 2022
1 parent 80de3ef commit ba3dcd1
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 8 deletions.
19 changes: 14 additions & 5 deletions sdks/go/pkg/beam/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"fmt"
"os"
"sync/atomic"
)

// Severity is the severity of the log message.
Expand All @@ -44,23 +45,31 @@ type Logger interface {
Log(ctx context.Context, sev Severity, calldepth int, msg string)
}

var (
logger Logger = &Standard{}
)
var logger atomic.Value

// concreteLogger works around atomic.Value's requirement that the type
// be identical for all callers.
type concreteLogger struct {
Logger
}

func init() {
logger.Store(&concreteLogger{&Standard{}})
}

// SetLogger sets the global Logger. Intended to be called during initialization
// only.
func SetLogger(l Logger) {
if l == nil {
panic("Logger cannot be nil")
}
logger = l
logger.Store(&concreteLogger{l})
}

// Output logs the given message to the global logger. Calldepth is the count
// of the number of frames to skip when computing the file name and line number.
func Output(ctx context.Context, sev Severity, calldepth int, msg string) {
logger.Log(ctx, sev, calldepth+1, msg) // +1 for this frame
logger.Load().(Logger).Log(ctx, sev, calldepth+1, msg) // +1 for this frame
}

// User-facing logging functions.
Expand Down
19 changes: 16 additions & 3 deletions sdks/go/pkg/beam/runners/universal/extworker/extworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ func (s *Loopback) StartWorker(ctx context.Context, req *fnpb.StartWorkerRequest
log.Infof(ctx, "starting worker %v", req.GetWorkerId())
s.mu.Lock()
defer s.mu.Unlock()
if s.workers == nil {
return &fnpb.StartWorkerResponse{
Error: "worker pool shutting down",
}, nil
}

if _, ok := s.workers[req.GetWorkerId()]; ok {
return &fnpb.StartWorkerResponse{
Error: fmt.Sprintf("worker with ID %q already exists", req.GetWorkerId()),
Expand Down Expand Up @@ -92,6 +98,10 @@ func (s *Loopback) StopWorker(ctx context.Context, req *fnpb.StopWorkerRequest)
log.Infof(ctx, "stopping worker %v", req.GetWorkerId())
s.mu.Lock()
defer s.mu.Unlock()
if s.workers == nil {
// Worker pool is already shutting down, so no action is needed.
return &fnpb.StopWorkerResponse{}, nil
}
if cancelfn, ok := s.workers[req.GetWorkerId()]; ok {
cancelfn()
delete(s.workers, req.GetWorkerId())
Expand All @@ -106,12 +116,15 @@ func (s *Loopback) StopWorker(ctx context.Context, req *fnpb.StopWorkerRequest)
// Stop terminates the service and stops all workers.
func (s *Loopback) Stop(ctx context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()

log.Infof(ctx, "stopping Loopback, and %d workers", len(s.workers))
s.workers = map[string]context.CancelFunc{}
s.lis.Close()
s.workers = nil
s.rootCancel()

// There can be a deadlock between the StopWorker RPC and GracefulStop
// which waits for all RPCs to finish, so it must be outside the critical section.
s.mu.Unlock()

s.grpcServer.GracefulStop()
return nil
}
Expand Down

0 comments on commit ba3dcd1

Please sign in to comment.