Skip to content

Commit

Permalink
Shutdown close channel returned by RequestCtx.Done
Browse files Browse the repository at this point in the history
RequestCtx.Done() now returns a channel that will be closed when
Server.Shutdown() is called.
  • Loading branch information
erikdubbelboer committed Feb 2, 2019
1 parent 9574c37 commit 51532b9
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 18 deletions.
21 changes: 12 additions & 9 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ type Server struct {
mu sync.Mutex
open int32
stop int32
done chan struct{}
}

// TimeoutHandler creates RequestHandler, which returns StatusRequestTimeout
Expand Down Expand Up @@ -1526,6 +1527,7 @@ func (s *Server) Serve(ln net.Listener) error {
}

s.ln = ln
s.done = make(chan struct{})
}
s.mu.Unlock()

Expand Down Expand Up @@ -1607,6 +1609,10 @@ func (s *Server) Shutdown() error {
return err
}

if s.done != nil {
close(s.done)
}

// Closing the listener will make Serve() call Stop on the worker pool.
// Setting .stop to 1 will make serveConn() break out of its loop.
// Now we just have to wait until all workers are done.
Expand Down Expand Up @@ -1824,11 +1830,6 @@ func (s *Server) serveConn(c net.Conn) error {
isHTTP11 bool
)
for {
if atomic.LoadInt32(&s.stop) == 1 {
err = nil
break
}

connRequestNum++
ctx.time = currentTime

Expand Down Expand Up @@ -2013,6 +2014,11 @@ func (s *Server) serveConn(c net.Conn) error {

currentTime = time.Now()
s.setState(c, StateIdle)

if atomic.LoadInt32(&s.stop) == 1 {
err = nil
break
}
}

if br != nil {
Expand Down Expand Up @@ -2297,11 +2303,8 @@ func (ctx *RequestCtx) Deadline() (deadline time.Time, ok bool) {
// Done returns a channel that's closed when work done on behalf of this
// context should be canceled. Done may return nil if this context can
// never be canceled. Successive calls to Done return the same value.
//
// This method always returns nil and is only present to make
// RequestCtx implement the context interface.
func (ctx *RequestCtx) Done() <-chan struct{} {
return nil
return ctx.s.done
}

// Err returns a non-nil error value after Done is closed,
Expand Down
50 changes: 41 additions & 9 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2722,12 +2722,11 @@ func TestServeConnMultiRequests(t *testing.T) {

func TestShutdown(t *testing.T) {
ln := fasthttputil.NewInmemoryListener()
h := func(ctx *RequestCtx) {
time.Sleep(time.Millisecond * 500)
ctx.Success("aaa/bbb", []byte("real response"))
}
s := &Server{
Handler: h,
Handler: func(ctx *RequestCtx) {
time.Sleep(time.Millisecond * 500)
ctx.Success("aaa/bbb", []byte("real response"))
},
}
serveCh := make(chan struct{})
go func() {
Expand Down Expand Up @@ -2778,13 +2777,13 @@ func TestShutdown(t *testing.T) {
}
}
}

func TestShutdownReuse(t *testing.T) {
ln := fasthttputil.NewInmemoryListener()
h := func(ctx *RequestCtx) {
ctx.Success("aaa/bbb", []byte("real response"))
}
s := &Server{
Handler: h,
Handler: func(ctx *RequestCtx) {
ctx.Success("aaa/bbb", []byte("real response"))
},
ReadTimeout: time.Second,
Logger: &customLogger{}, // Ignore log output.
}
Expand Down Expand Up @@ -2825,6 +2824,39 @@ func TestShutdownReuse(t *testing.T) {
}
}

func TestShutdownDone(t *testing.T) {
ln := fasthttputil.NewInmemoryListener()
s := &Server{
Handler: func(ctx *RequestCtx) {
<-ctx.Done()
ctx.Success("aaa/bbb", []byte("real response"))
},
}
go func() {
if err := s.Serve(ln); err != nil {
t.Fatalf("unexepcted error: %s", err)
}
}()
conn, err := ln.Dial()
if err != nil {
t.Fatalf("unexepcted error: %s", err)
}
if _, err = conn.Write([]byte("GET / HTTP/1.1\r\nHost: google.com\r\n\r\n")); err != nil {
t.Fatalf("unexpected error: %s", err)
}
go func() {
// Shutdown won't return if the connection doesn't close,
// which doesn't happen until we read the response.
if err := s.Shutdown(); err != nil {
t.Fatalf("unexepcted error: %s", err)
}
}()
// We can only reach this point and get a valid response
// if reading from ctx.Done() returned.
br := bufio.NewReader(conn)
verifyResponse(t, br, StatusOK, "aaa/bbb", "real response")
}

func verifyResponse(t *testing.T, r *bufio.Reader, expectedStatusCode int, expectedContentType, expectedBody string) {
var resp Response
if err := resp.Read(r); err != nil {
Expand Down

1 comment on commit 51532b9

@peczenyj
Copy link
Contributor

@peczenyj peczenyj commented on 51532b9 Feb 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello

when I try to use the new feature, I got a panic.

the reason is: method Err() returns nil hard coded, but the documentation is explicit

2321 // Err returns a non-nil error value after Done is closed,
2322 // successive calls to Err return the same error.
2323 // If Done is not yet closed, Err returns nil.
2324 // If Done is closed, Err returns a non-nil error explaining why:
2325 // Canceled if the context was canceled
2326 // or DeadlineExceeded if the context's deadline passed.
2327 //
2328 // This method always returns nil and is only present to make
2329 // RequestCtx implement the context interface.
2330 func (ctx *RequestCtx) Err() error {
2331   return nil
2332 }

A better solution can be

func (ctx *RequestCtx) Err() error {
   select {
       case <- ctx.Done():
          return context.Canceled
       default:
          return nil
   } 
}

example of panic:

panic: context: internal error: missing cancel error

goroutine 22 [running]:
context.(*cancelCtx).cancel(0xc0000886c0, 0x0, 0x0, 0x0)
	/usr/local/go/src/context/context.go:350 +0x258
context.(*timerCtx).cancel(0xc0000886c0, 0x0, 0x0, 0x0)
	/usr/local/go/src/context/context.go:428 +0x4a
context.propagateCancel.func1(0x1076d40, 0xc0001c0000, 0x106f740, 0xc0000886c0)
	/usr/local/go/src/context/context.go:262 +0x145
created by context.propagateCancel
	/usr/local/go/src/context/context.go:259 +0x18d

where /usr/local/go/src/context/context.go:350

346 // cancel closes c.done, cancels each of c's children, and, if
347 // removeFromParent is true, removes c from its parent's children.
348 func (c *cancelCtx) cancel(removeFromParent bool, err error) {
349   if err == nil {
350     panic("context: internal error: missing cancel error")
351   }

Please sign in to comment.