Skip to content

Commit

Permalink
Avoid deadlock if Stop is called before Serve
Browse files Browse the repository at this point in the history
Make a tiny state machine to detect the transition init -> stopped (not via serving)

Fixes jaegertracing#2601
Related jaegertracing#103

Signed-off-by: Carl Henrik Lunde <[email protected]>
  • Loading branch information
chlunde committed Oct 30, 2020
1 parent ec03a2d commit e4bc0af
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 17 deletions.
13 changes: 0 additions & 13 deletions cmd/agent/app/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,6 @@ func withRunningAgent(t *testing.T, testcase func(string, chan error)) {

testcase(agent.HTTPAddr(), ch)

// TODO (wjang) We sleep because the processors in the agent might not have had time to
// start up yet. If we were to call Stop() before the processors have time to startup,
// it'll panic because of a DATA RACE between wg.Add() and wg.Wait() in thrift_processor.
// A real fix for this issue would be to add semaphores to tbuffered_server, thrift_processor,
// and agent itself. Given all this extra overhead and testing required to get this to work
// "elegantly", I opted to just sleep here given how unlikely this situation will occur in
// production.
time.Sleep(2 * time.Second)

agent.Stop()
assert.NoError(t, <-ch)

Expand Down Expand Up @@ -181,10 +172,6 @@ func TestStartStopRace(t *testing.T) {
t.Fatalf("error from agent.Run(): %s", err)
}

// FIXME https://github.com/jaegertracing/jaeger/issues/2601
t.Log("give some time for processors to start")
time.Sleep(500 * time.Millisecond)

t.Log("stopping agent")
agent.Stop()

Expand Down
23 changes: 19 additions & 4 deletions cmd/agent/app/servers/tbuffered_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ type TBufferedServer struct {
}
}

// state values for TBufferedServer.serving
//
// init -> serving -> stopped
// init -> stopped (might happen in unit tests)
const (
stateStopped = iota
stateServing
stateInit
)

// NewTBufferedServer creates a TBufferedServer
func NewTBufferedServer(
transport ThriftTransport,
Expand All @@ -79,14 +89,20 @@ func NewTBufferedServer(
maxQueueSize: maxQueueSize,
maxPacketSize: maxPacketSize,
readBufPool: readBufPool,
serving: stateInit,
}

metrics.MustInit(&res.metrics, mFactory, nil)
return res, nil
}

// Serve initiates the readers and starts serving traffic
func (s *TBufferedServer) Serve() {
atomic.StoreUint32(&s.serving, 1)
defer close(s.dataChan)
if !atomic.CompareAndSwapUint32(&s.serving, stateInit, stateServing) {
return // Stop already called
}

for s.IsServing() {
readBuf := s.readBufPool.Get().(*ReadBuf)
n, err := s.transport.Read(readBuf.bytes)
Expand All @@ -104,7 +120,6 @@ func (s *TBufferedServer) Serve() {
s.metrics.ReadError.Inc(1)
}
}
close(s.dataChan)
}

func (s *TBufferedServer) updateQueueSize(delta int64) {
Expand All @@ -114,13 +129,13 @@ func (s *TBufferedServer) updateQueueSize(delta int64) {

// IsServing indicates whether the server is currently serving traffic
func (s *TBufferedServer) IsServing() bool {
return atomic.LoadUint32(&s.serving) == 1
return atomic.LoadUint32(&s.serving) == stateServing
}

// Stop stops the serving of traffic and waits until the queue is
// emptied by the readers
func (s *TBufferedServer) Stop() {
atomic.StoreUint32(&s.serving, 0)
atomic.StoreUint32(&s.serving, stateStopped)
_ = s.transport.Close()
}

Expand Down

0 comments on commit e4bc0af

Please sign in to comment.