diff --git a/transport.go b/transport.go index 2c9f81008..bc290f021 100644 --- a/transport.go +++ b/transport.go @@ -6,6 +6,8 @@ import ( "encoding/json" "errors" "fmt" + "io" + "io/ioutil" "net/http" "net/url" "strconv" @@ -17,6 +19,17 @@ const defaultBufferSize = 30 const defaultRetryAfter = time.Second * 60 const defaultTimeout = time.Second * 30 +// maxDrainResponseBytes is the maximum number of bytes that transport +// implementations will read from response bodies when draining them. +// +// Sentry's ingestion API responses are typically short and the SDK doesn't need +// the contents of the response body. However, the net/http HTTP client requires +// response bodies to be fully drained (and closed) for TCP keep-alive to work. +// +// maxDrainResponseBytes strikes a balance between reading too much data (if the +// server is misbehaving) and reusing TCP connections. +const maxDrainResponseBytes = 16 << 10 + // Transport is used by the Client to deliver events to remote server. type Transport interface { Flush(timeout time.Duration) bool @@ -371,18 +384,21 @@ func (t *HTTPTransport) worker() { } response, err := t.client.Do(request) - if err != nil { Logger.Printf("There was an issue with sending an event: %v", err) + continue } - - if response != nil && response.StatusCode == http.StatusTooManyRequests { + if response.StatusCode == http.StatusTooManyRequests { deadline := time.Now().Add(retryAfter(time.Now(), response)) t.mu.Lock() t.disabledUntil = deadline t.mu.Unlock() Logger.Printf("Too many requests, backing off till: %s\n", deadline) } + // Drain body up to a limit and close it, allowing the + // transport to reuse TCP connections. + _, _ = io.CopyN(ioutil.Discard, response.Body, maxDrainResponseBytes) + response.Body.Close() } // Signal that processing of the batch is done. @@ -472,15 +488,18 @@ func (t *HTTPSyncTransport) SendEvent(event *Event) { ) response, err := t.client.Do(request) - if err != nil { Logger.Printf("There was an issue with sending an event: %v", err) + return } - - if response != nil && response.StatusCode == http.StatusTooManyRequests { + if response.StatusCode == http.StatusTooManyRequests { t.disabledUntil = time.Now().Add(retryAfter(time.Now(), response)) Logger.Printf("Too many requests, backing off till: %s\n", t.disabledUntil) } + // Drain body up to a limit and close it, allowing the + // transport to reuse TCP connections. + _, _ = io.CopyN(ioutil.Discard, response.Body, maxDrainResponseBytes) + response.Body.Close() } // Flush is a no-op for HTTPSyncTransport. It always returns true immediately. diff --git a/transport_test.go b/transport_test.go index 40a77822e..b4b73d5ca 100644 --- a/transport_test.go +++ b/transport_test.go @@ -5,6 +5,8 @@ import ( "fmt" "net/http" "net/http/httptest" + "net/http/httptrace" + "strings" "sync" "sync/atomic" "testing" @@ -361,3 +363,102 @@ func TestHTTPTransport(t *testing.T) { wg.Wait() }) } + +// httptraceRoundTripper implements http.RoundTripper by wrapping +// http.DefaultTransport and keeps track of whether TCP connections have been +// reused for every request. +// +// For simplicity, httptraceRoundTripper is not safe for concurrent use. +type httptraceRoundTripper struct { + reusedConn []bool +} + +func (rt *httptraceRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + trace := &httptrace.ClientTrace{ + GotConn: func(connInfo httptrace.GotConnInfo) { + rt.reusedConn = append(rt.reusedConn, connInfo.Reused) + }, + } + req = req.WithContext(httptrace.WithClientTrace(req.Context(), trace)) + return http.DefaultTransport.RoundTrip(req) +} + +func testKeepAlive(t *testing.T, tr Transport) { + // event is a test event. It is empty because here we only care about + // the reuse of TCP connections between client and server, not the + // specific contents of the event itself. + event := &Event{} + + // largeResponse controls whether the test server should simulate an + // unexpectedly large response from Relay -- the SDK should not try to + // consume arbitrarily large responses. + largeResponse := false + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Simulates a response from Relay. The event_id is arbitrary, + // it doesn't matter for this test. + fmt.Fprintln(w, `{"id":"ec71d87189164e79ab1e61030c183af0"}`) + if largeResponse { + fmt.Fprintln(w, strings.Repeat(" ", maxDrainResponseBytes)) + } + })) + defer srv.Close() + + dsn := strings.Replace(srv.URL, "//", "//pubkey@", 1) + "/1" + + rt := &httptraceRoundTripper{} + tr.Configure(ClientOptions{ + Dsn: dsn, + HTTPTransport: rt, + }) + + reqCount := 0 + checkLastConnReuse := func(reused bool) { + t.Helper() + reqCount++ + if !tr.Flush(time.Second) { + t.Fatal("Flush timed out") + } + if len(rt.reusedConn) != reqCount { + t.Fatalf("unexpected number of requests: got %d, want %d", len(rt.reusedConn), reqCount) + } + if rt.reusedConn[reqCount-1] != reused { + if reused { + t.Fatal("TCP connection not reused") + } + t.Fatal("unexpected TCP connection reuse") + } + } + + // First event creates a new TCP connection. + tr.SendEvent(event) + checkLastConnReuse(false) + + // Next events reuse the TCP connection. + for i := 0; i < 10; i++ { + tr.SendEvent(event) + checkLastConnReuse(true) + } + + // If server responses are too large, the SDK should close the + // connection instead of consuming an arbitrarily large number of bytes. + largeResponse = true + + // Next event, first one to get a large response, reuses the connection. + tr.SendEvent(event) + checkLastConnReuse(true) + + // All future events create a new TCP connection. + for i := 0; i < 10; i++ { + tr.SendEvent(event) + checkLastConnReuse(false) + } +} + +func TestKeepAlive(t *testing.T) { + t.Run("AsyncTransport", func(t *testing.T) { + testKeepAlive(t, NewHTTPTransport()) + }) + t.Run("SyncTransport", func(t *testing.T) { + testKeepAlive(t, NewHTTPSyncTransport()) + }) +}