Skip to content

Commit

Permalink
Fix flaky prom remote write exporter concurrency test (#37430)
Browse files Browse the repository at this point in the history
Fix #37104

This is more an artifact of the test firing an unbounded number of go
routines each one making its own HTTP request. Although keepalive is
enabled by default the code ends up not re-using many of the connections
causing the many connections to end up in a TIME_WAIT state. In order to
avoid this the test now limits the number of concurrent requests and has
a small change to the actual code to facilitate re-use of existing TCP
connections used by the HTTP client.

Although there is a change to non-test code I don't consider this a bug
worth changelog because no user of the component should reach such high
burst of "push metrics" in any reasonable production scenario.
  • Loading branch information
pjanotti authored Jan 23, 2025
1 parent 11e18af commit 39816b7
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
5 changes: 4 additions & 1 deletion exporter/prometheusremotewriteexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,10 @@ func (prwe *prwExporter) execute(ctx context.Context, writeReq *prompb.WriteRequ
if err != nil {
return err
}
defer resp.Body.Close()
defer func() {
_, _ = io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}()

// 2xx status code is considered a success
// 5xx errors are recoverable and the exporter should retry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"io"
"net/http"
"net/http/httptest"
"os"
"runtime"
"strconv"
"sync"
"testing"
Expand All @@ -32,9 +32,6 @@ import (
// Test everything works when there is more than one goroutine calling PushMetrics.
// Today we only use 1 worker per exporter, but the intention of this test is to future-proof in case it changes.
func Test_PushMetricsConcurrent(t *testing.T) {
if os.Getenv("ImageOs") == "win25" && os.Getenv("GITHUB_ACTIONS") == "true" {
t.Skip("Skipping test on Windows 2025 GH runners, see https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/37104")
}
n := 1000
ms := make([]pmetric.Metrics, n)
testIDKey := "test_id"
Expand Down Expand Up @@ -137,15 +134,22 @@ func Test_PushMetricsConcurrent(t *testing.T) {
resp, checkRequestErr := http.Get(server.URL)
require.NoError(c, checkRequestErr)
assert.NoError(c, resp.Body.Close())
}, 5*time.Second, 100*time.Millisecond)
}, 15*time.Second, 100*time.Millisecond)

var wg sync.WaitGroup
wg.Add(n)
maxConcurrentGoroutines := runtime.NumCPU() * 4
semaphore := make(chan struct{}, maxConcurrentGoroutines)
for _, m := range ms {
semaphore <- struct{}{}
go func() {
defer func() {
<-semaphore
wg.Done()
}()

err := prwe.PushMetrics(ctx, m)
assert.NoError(t, err)
wg.Done()
}()
}
wg.Wait()
Expand Down

0 comments on commit 39816b7

Please sign in to comment.