Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix http transport client blocking recv #2744

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions transport/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,26 @@ func (h *httpTransportClient) Recv(msg *Message) (err error) {
var req *http.Request

if !h.dialOpts.Stream {
rc, ok := <-h.req

var rc *http.Request
var ok bool

h.Lock()
select {
case rc, ok = <-h.req:
default:
}

if !ok {
h.Lock()
if len(h.reqList) == 0 {
h.Unlock()
return io.EOF
}

rc = h.reqList[0]
h.reqList = h.reqList[1:]
h.Unlock()
}
h.Unlock()

req = rc
}
Expand Down
77 changes: 77 additions & 0 deletions transport/http_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package transport

import (
"fmt"
"testing"

"github.com/pkg/errors"
)

func TestHttpTransportClient(t *testing.T) {
// arrange
l, c, err := echoHttpTransportClient("127.0.0.1:")
if err != nil {
t.Error(err)
}
defer l.Close()
defer c.Close()

// act + assert
N := cap(c.req)
// Send N+1 messages to overflow the buffered channel and place the extra message in the internal buffer
for i := 0; i < N+1; i++ {
body := fmt.Sprintf("msg-%d", i)
if err := c.Send(&Message{Body: []byte(body)}); err != nil {
t.Errorf("Unexpected send err: %v", err)
}
}

// consume all requests from the buffered channel
for i := 0; i < N; i++ {
msg := Message{}
if err := c.Recv(&msg); err != nil {
t.Errorf("Unexpected recv err: %v", err)
}
}

if len(c.reqList) != 1 {
t.Error("Unexpected reqList")
}

msg := Message{}
if err := c.Recv(&msg); err != nil {
t.Errorf("Unexpected recv err: %v", err)
}
want := fmt.Sprintf("msg-%d", N)
got := string(msg.Body)
if want != got {
t.Errorf("Unexpected message: got %q, want %q", got, want)
}
}

func echoHttpTransportClient(addr string) (*httpTransportListener, *httpTransportClient, error) {
tr := NewHTTPTransport()
l, err := tr.Listen(addr)
if err != nil {
return nil, nil, errors.Errorf("Unexpected listen err: %v", err)
}
c, err := tr.Dial(l.Addr())
if err != nil {
return nil, nil, errors.Errorf("Unexpected dial err: %v", err)
}
go l.Accept(echoHandler)
return l.(*httpTransportListener), c.(*httpTransportClient), nil
}

func echoHandler(sock Socket) {
defer sock.Close()
for {
var msg Message
if err := sock.Recv(&msg); err != nil {
return
}
if err := sock.Send(&msg); err != nil {
return
}
}
}