Skip to content

Commit

Permalink
fix: ensure HTTP client sending given request body (#156)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeroenrinzema authored Oct 21, 2020
1 parent d01c2cf commit 3364c82
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 60 deletions.
9 changes: 4 additions & 5 deletions pkg/codec/json/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,15 @@ func (manager *Manager) Marshal(store references.Store) (io.Reader, error) {

go func() {
defer encoder.Release()
defer writer.Close()

encodeElement(encoder, manager.resource, manager.property.Template, store)

if _, err := encoder.Write(); err != nil {
_ = writer.CloseWithError(err)

_, err := encoder.Write()
if err != nil {
writer.CloseWithError(err)
return
}

writer.Close()
}()

return reader, nil
Expand Down
2 changes: 0 additions & 2 deletions pkg/transport/graphql/graphql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"testing"
Expand Down Expand Up @@ -177,7 +176,6 @@ func TestNewListener(t *testing.T) {
t.Fatal(err)
}

log.Println(body)
if diff := deep.Equal(body, test.expected); diff != nil {
t.Fatal(diff)
}
Expand Down
18 changes: 14 additions & 4 deletions pkg/transport/http/caller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http
import (
"context"
"fmt"
"io"
"net/http"
"net/http/httputil"
"net/url"
Expand Down Expand Up @@ -180,15 +181,24 @@ func (call *Call) SendMsg(ctx context.Context, rw transport.ResponseWriter, pr *
req.Header.Add(ContentTypeHeaderKey, ContentTypes[pr.RequestCodec])
req.Header.Add(AcceptHeaderKey, ContentTypes[pr.ResponseCodec])

res := NewTransportResponseWriter(ctx, rw)
// TODO: configure http client
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}

rw.HeaderStatus(res.StatusCode)
rw.HeaderMessage(http.StatusText(res.StatusCode))
AppendHTTPHeader(rw.Header(), res.Header)

go func() {
defer rw.Close()
call.proxy.ServeHTTP(res, req)
_, err = io.Copy(rw, res.Body)
if err != nil && err != io.EOF {
logger.Debug(call.ctx, "unexpected error while copying response body", zap.Error(err))
}
}()

res.AwaitStatus()

return nil
}

Expand Down
56 changes: 7 additions & 49 deletions pkg/transport/http/http.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package http

import (
"context"
"net/http"
"regexp"
"strings"
"sync"

"github.com/jexia/semaphore/pkg/codec/metadata"
"github.com/jexia/semaphore/pkg/transport"
)

// AppendHTTPHeader appends the given HTTP header into a transport header
func AppendHTTPHeader(dest metadata.MD, src http.Header) {
for key, vals := range src {
dest[strings.ToLower(key)] = strings.Join(vals, ";")
}
}

// CopyHTTPHeader copies the given HTTP header into a transport header
func CopyHTTPHeader(source http.Header) metadata.MD {
result := metadata.MD{}
Expand Down Expand Up @@ -38,53 +43,6 @@ func CopyMetadataHeader(header metadata.MD) http.Header {
return result
}

// NewTransportResponseWriter constructs a new HTTP response writer of the given transport response writer
func NewTransportResponseWriter(ctx context.Context, rw transport.ResponseWriter) *TransportResponseWriter {
result := &TransportResponseWriter{
header: http.Header{},
transport: rw,
}

result.mutex.Lock()
return result
}

// A TransportResponseWriter interface is used by an HTTP handler to
// construct an HTTP response.
type TransportResponseWriter struct {
header http.Header
transport transport.ResponseWriter
once sync.Once
mutex sync.RWMutex
}

// Header returns the header map that will be sent by
// WriteHeader. The Header map also is the mechanism with which
// Handlers can set HTTP trailers.
func (rw *TransportResponseWriter) Header() http.Header {
return rw.header
}

// Write writes the data to the connection as part of an HTTP reply.
func (rw *TransportResponseWriter) Write(bb []byte) (int, error) {
return rw.transport.Write(bb)
}

// WriteHeader sends an HTTP response header with the provided
// status code.
func (rw *TransportResponseWriter) WriteHeader(status int) {
rw.once.Do(func() {
rw.transport.HeaderStatus(status)
rw.transport.HeaderMessage(http.StatusText(status))
rw.mutex.Unlock()
})
}

// AwaitStatus awaits till the response header status code has been written
func (rw *TransportResponseWriter) AwaitStatus() {
rw.mutex.RLock()
}

// NewRequest constructs a new transport request of the given http request
func NewRequest(req *http.Request) *transport.Request {
return &transport.Request{
Expand Down

0 comments on commit 3364c82

Please sign in to comment.