Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: drain and close API response bodies #322

Merged
merged 1 commit into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
Expand All @@ -17,6 +19,17 @@ const defaultBufferSize = 30
const defaultRetryAfter = time.Second * 60
const defaultTimeout = time.Second * 30

// maxDrainResponseBytes is the maximum number of bytes that transport
// implementations will read from response bodies when draining them.
//
// Sentry's ingestion API responses are typically short and the SDK doesn't need
// the contents of the response body. However, the net/http HTTP client requires
// response bodies to be fully drained (and closed) for TCP keep-alive to work.
//
// maxDrainResponseBytes strikes a balance between reading too much data (if the
// server is misbehaving) and reusing TCP connections.
const maxDrainResponseBytes = 16 << 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why 16 << 10 instead of 1024 * 16 or just 16384, but ok :p


// Transport is used by the Client to deliver events to remote server.
type Transport interface {
Flush(timeout time.Duration) bool
Expand Down Expand Up @@ -371,18 +384,21 @@ func (t *HTTPTransport) worker() {
}

response, err := t.client.Do(request)

if err != nil {
Logger.Printf("There was an issue with sending an event: %v", err)
continue
}

if response != nil && response.StatusCode == http.StatusTooManyRequests {
if response.StatusCode == http.StatusTooManyRequests {
deadline := time.Now().Add(retryAfter(time.Now(), response))
t.mu.Lock()
t.disabledUntil = deadline
t.mu.Unlock()
Logger.Printf("Too many requests, backing off till: %s\n", deadline)
}
// Drain body up to a limit and close it, allowing the
// transport to reuse TCP connections.
_, _ = io.CopyN(ioutil.Discard, response.Body, maxDrainResponseBytes)
response.Body.Close()
}

// Signal that processing of the batch is done.
Expand Down Expand Up @@ -472,15 +488,18 @@ func (t *HTTPSyncTransport) SendEvent(event *Event) {
)

response, err := t.client.Do(request)

if err != nil {
rhcarvalho marked this conversation as resolved.
Show resolved Hide resolved
Logger.Printf("There was an issue with sending an event: %v", err)
return
}

if response != nil && response.StatusCode == http.StatusTooManyRequests {
if response.StatusCode == http.StatusTooManyRequests {
t.disabledUntil = time.Now().Add(retryAfter(time.Now(), response))
Logger.Printf("Too many requests, backing off till: %s\n", t.disabledUntil)
}
// Drain body up to a limit and close it, allowing the
// transport to reuse TCP connections.
_, _ = io.CopyN(ioutil.Discard, response.Body, maxDrainResponseBytes)
response.Body.Close()
}

// Flush is a no-op for HTTPSyncTransport. It always returns true immediately.
Expand Down
101 changes: 101 additions & 0 deletions transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"net/http/httptrace"
"strings"
"sync"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -361,3 +363,102 @@ func TestHTTPTransport(t *testing.T) {
wg.Wait()
})
}

// httptraceRoundTripper implements http.RoundTripper by wrapping
// http.DefaultTransport and keeps track of whether TCP connections have been
// reused for every request.
//
// For simplicity, httptraceRoundTripper is not safe for concurrent use.
type httptraceRoundTripper struct {
reusedConn []bool
}

func (rt *httptraceRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
trace := &httptrace.ClientTrace{
GotConn: func(connInfo httptrace.GotConnInfo) {
rt.reusedConn = append(rt.reusedConn, connInfo.Reused)
},
}
req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace))
return http.DefaultTransport.RoundTrip(req)
}

func testKeepAlive(t *testing.T, tr Transport) {
// event is a test event. It is empty because here we only care about
// the reuse of TCP connections between client and server, not the
// specific contents of the event itself.
event := &Event{}

// largeResponse controls whether the test server should simulate an
// unexpectedly large response from Relay -- the SDK should not try to
// consume arbitrarily large responses.
largeResponse := false
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Simulates a response from Relay. The event_id is arbitrary,
// it doesn't matter for this test.
fmt.Fprintln(w, `{"id":"ec71d87189164e79ab1e61030c183af0"}`)
if largeResponse {
fmt.Fprintln(w, strings.Repeat(" ", maxDrainResponseBytes))
}
}))
defer srv.Close()

dsn := strings.Replace(srv.URL, "//", "//pubkey@", 1) + "/1"

rt := &httptraceRoundTripper{}
tr.Configure(ClientOptions{
Dsn: dsn,
HTTPTransport: rt,
})

reqCount := 0
checkLastConnReuse := func(reused bool) {
t.Helper()
reqCount++
if !tr.Flush(time.Second) {
t.Fatal("Flush timed out")
}
if len(rt.reusedConn) != reqCount {
t.Fatalf("unexpected number of requests: got %d, want %d", len(rt.reusedConn), reqCount)
}
if rt.reusedConn[reqCount-1] != reused {
if reused {
t.Fatal("TCP connection not reused")
}
t.Fatal("unexpected TCP connection reuse")
}
}

// First event creates a new TCP connection.
tr.SendEvent(event)
checkLastConnReuse(false)

// Next events reuse the TCP connection.
for i := 0; i < 10; i++ {
tr.SendEvent(event)
checkLastConnReuse(true)
}

// If server responses are too large, the SDK should close the
// connection instead of consuming an arbitrarily large number of bytes.
largeResponse = true

// Next event, first one to get a large response, reuses the connection.
tr.SendEvent(event)
checkLastConnReuse(true)

// All future events create a new TCP connection.
rhcarvalho marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < 10; i++ {
tr.SendEvent(event)
checkLastConnReuse(false)
}
}

func TestKeepAlive(t *testing.T) {
t.Run("AsyncTransport", func(t *testing.T) {
testKeepAlive(t, NewHTTPTransport())
})
t.Run("SyncTransport", func(t *testing.T) {
testKeepAlive(t, NewHTTPSyncTransport())
})
}