diff --git a/cmd/agent/app/agent_test.go b/cmd/agent/app/agent_test.go index 4c080e68aab..3e37e282bf7 100644 --- a/cmd/agent/app/agent_test.go +++ b/cmd/agent/app/agent_test.go @@ -128,15 +128,6 @@ func withRunningAgent(t *testing.T, testcase func(string, chan error)) { testcase(agent.HTTPAddr(), ch) - // TODO (wjang) We sleep because the processors in the agent might not have had time to - // start up yet. If we were to call Stop() before the processors have time to startup, - // it'll panic because of a DATA RACE between wg.Add() and wg.Wait() in thrift_processor. - // A real fix for this issue would be to add semaphores to tbuffered_server, thrift_processor, - // and agent itself. Given all this extra overhead and testing required to get this to work - // "elegantly", I opted to just sleep here given how unlikely this situation will occur in - // production. - time.Sleep(2 * time.Second) - agent.Stop() assert.NoError(t, <-ch) @@ -181,10 +172,6 @@ func TestStartStopRace(t *testing.T) { t.Fatalf("error from agent.Run(): %s", err) } - // FIXME https://github.com/jaegertracing/jaeger/issues/2601 - t.Log("give some time for processors to start") - time.Sleep(500 * time.Millisecond) - t.Log("stopping agent") agent.Stop() diff --git a/cmd/agent/app/servers/tbuffered_server.go b/cmd/agent/app/servers/tbuffered_server.go index 5ba757ffb65..93686e57571 100644 --- a/cmd/agent/app/servers/tbuffered_server.go +++ b/cmd/agent/app/servers/tbuffered_server.go @@ -59,6 +59,16 @@ type TBufferedServer struct { } } +// state values for TBufferedServer.serving +// +// init -> serving -> stopped +// init -> stopped (might happen in unit tests) +const ( + stateStopped = iota + stateServing + stateInit +) + // NewTBufferedServer creates a TBufferedServer func NewTBufferedServer( transport ThriftTransport, @@ -79,14 +89,20 @@ func NewTBufferedServer( maxQueueSize: maxQueueSize, maxPacketSize: maxPacketSize, readBufPool: readBufPool, + serving: stateInit, } + metrics.MustInit(&res.metrics, mFactory, nil) return res, nil } // Serve initiates the readers and starts serving traffic func (s *TBufferedServer) Serve() { - atomic.StoreUint32(&s.serving, 1) + defer close(s.dataChan) + if !atomic.CompareAndSwapUint32(&s.serving, stateInit, stateServing) { + return // Stop already called + } + for s.IsServing() { readBuf := s.readBufPool.Get().(*ReadBuf) n, err := s.transport.Read(readBuf.bytes) @@ -106,7 +122,6 @@ func (s *TBufferedServer) Serve() { s.metrics.ReadError.Inc(1) } } - close(s.dataChan) } func (s *TBufferedServer) updateQueueSize(delta int64) { @@ -116,13 +131,13 @@ func (s *TBufferedServer) updateQueueSize(delta int64) { // IsServing indicates whether the server is currently serving traffic func (s *TBufferedServer) IsServing() bool { - return atomic.LoadUint32(&s.serving) == 1 + return atomic.LoadUint32(&s.serving) == stateServing } // Stop stops the serving of traffic and waits until the queue is // emptied by the readers func (s *TBufferedServer) Stop() { - atomic.StoreUint32(&s.serving, 0) + atomic.StoreUint32(&s.serving, stateStopped) _ = s.transport.Close() }