From 985d4ce858966b1b95373a829dd7aed8ae0cb001 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Tue, 30 May 2023 19:55:33 -0400 Subject: [PATCH] Add zstd compression support to SAPM client I added a simple benchmark of compression methods and a test to print compressed byte sizes. Take with a grain of salt since the input is based on just one trivial data generator and may not be representative of production loads. For the inputs used zstd wins both in compression size and speed (more apparent in larger input sizes). ## Compression Sizes ``` === RUN TestCompressionSize Message byte size by batch and compression method. Compression none gzip zstd Batch=1 148 139 135 Batch=10 1558 503 490 Batch=100 15958 2495 2360 Batch=1000 161590 22110 22978 Batch=10000 1644106 222217 181468 --- PASS: TestCompressionSize (0.13s) ``` ## Benchmark ``` BenchmarkCompression/none/batch=1-8 56662 85501 ns/op 233172 B/op 28 allocs/op BenchmarkCompression/gzip/batch=1-8 27688 64055 ns/op 117560 B/op 35 allocs/op BenchmarkCompression/zstd/batch=1-8 41965 69071 ns/op 174616 B/op 33 allocs/op BenchmarkCompression/none/batch=100-8 20131 63991 ns/op 117124 B/op 325 allocs/op BenchmarkCompression/gzip/batch=100-8 3043 408736 ns/op 54255 B/op 335 allocs/op BenchmarkCompression/zstd/batch=100-8 10276 120161 ns/op 80178 B/op 330 allocs/op BenchmarkCompression/none/batch=1000-8 3066 382949 ns/op 323067 B/op 3025 allocs/op BenchmarkCompression/gzip/batch=1000-8 243 4787768 ns/op 379947 B/op 3038 allocs/op BenchmarkCompression/zstd/batch=1000-8 1128 1014972 ns/op 391926 B/op 3034 allocs/op ``` --- client/client.go | 36 ++++++++--- client/client_test.go | 108 ++++++++++++++++++++++--------- client/oc_status_code.go | 1 - client/options.go | 19 +++++- client/worker.go | 54 +++++++++++++--- client/worker_test.go | 101 +++++++++++++++++++++++++++-- go.mod | 2 +- internal/testhelpers/testdata.go | 47 ++++++++++++++ sapmprotocol/parser_test.go | 40 +----------- 9 files changed, 312 insertions(+), 96 deletions(-) create mode 100644 internal/testhelpers/testdata.go diff --git a/client/client.go b/client/client.go index 921533d..febcb17 100644 --- a/client/client.go +++ b/client/client.go @@ -47,16 +47,26 @@ type sendRequest struct { batches int64 } +// CompressionMethod strings MUST match the Content-Encoding http header values. +type CompressionMethod string + +const CompressionMethodGzip CompressionMethod = "gzip" +const CompressionMethodZstd CompressionMethod = "zstd" + // Client implements an HTTP sender for the SAPM protocol type Client struct { - tracerProvider trace.TracerProvider - numWorkers uint - maxIdleCons uint - endpoint string - accessToken string - httpClient *http.Client + tracerProvider trace.TracerProvider + numWorkers uint + maxIdleCons uint + endpoint string + accessToken string + httpClient *http.Client + disableCompression bool - closeCh chan struct{} + // compressionMethod to use for payload. Ignored if disableCompression==true. + compressionMethod CompressionMethod + + closeCh chan struct{} workers chan *worker } @@ -69,8 +79,9 @@ func New(opts ...Option) (*Client, error) { } c := &Client{ - numWorkers: defaultNumWorkers, - maxIdleCons: defaultMaxIdleCons, + numWorkers: defaultNumWorkers, + maxIdleCons: defaultMaxIdleCons, + compressionMethod: CompressionMethodGzip, } for _, opt := range opts { @@ -117,7 +128,12 @@ func New(opts ...Option) (*Client, error) { c.closeCh = make(chan struct{}) c.workers = make(chan *worker, c.numWorkers) for i := uint(0); i < c.numWorkers; i++ { - w := newWorker(c.httpClient, c.endpoint, c.accessToken, c.disableCompression, c.tracerProvider) + w, err := newWorker( + c.httpClient, c.endpoint, c.accessToken, c.disableCompression, c.compressionMethod, c.tracerProvider, + ) + if err != nil { + return nil, err + } c.workers <- w } diff --git a/client/client_test.go b/client/client_test.go index 8bef2cb..1814a61 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -34,8 +34,10 @@ var defaultEndpointOption = WithEndpoint("http://local") func assertRequestEqualBatches(t *testing.T, r *http.Request, b []*jaegerpb.Batch) { psr, err := sapmprotocol.ParseTraceV2Request(r) - assert.NoError(t, err) - assert.EqualValues(t, b, psr.Batches) + require.NoError(t, err) + if psr != nil { + require.EqualValues(t, b, psr.Batches) + } } func TestDefaults(t *testing.T) { @@ -48,37 +50,74 @@ func TestDefaults(t *testing.T) { assert.Equal(t, defaultNumWorkers, uint(len(c.workers))) } -func TestClient(t *testing.T) { - transport := &mockTransport{} - c, err := New(defaultEndpointOption, WithHTTPClient(newMockHTTPClient(transport)), WithAccessToken("ClientToken")) - require.NoError(t, err) +var compressionTests = []struct { + name string + disableCompression bool + compressionMethod CompressionMethod +}{ + { + name: "none", + disableCompression: true, + }, + { + name: "gzip", + compressionMethod: CompressionMethodGzip, + }, + { + name: "zstd", + compressionMethod: CompressionMethodZstd, + }, +} - requestsBatches := [][]*jaegerpb.Batch{} +func TestClient(t *testing.T) { - for i := 0; i < 10; i++ { - batches := []*jaegerpb.Batch{ - { - Process: &jaegerpb.Process{ServiceName: "test_service_" + strconv.Itoa(i)}, - Spans: []*jaegerpb.Span{{}}, + for _, test := range compressionTests { + t.Run( + test.name, func(t *testing.T) { + transport := &mockTransport{} + opts := []Option{ + defaultEndpointOption, + WithHTTPClient(newMockHTTPClient(transport)), + WithAccessToken("ClientToken"), + } + if test.disableCompression { + opts = append(opts, WithDisabledCompression()) + } else { + opts = append(opts, WithCompressionMethod(test.compressionMethod)) + } + client, err := New(opts...) + require.NoError(t, err) + + requestsBatches := [][]*jaegerpb.Batch{} + + for i := 0; i < 10; i++ { + batches := []*jaegerpb.Batch{ + { + Process: &jaegerpb.Process{ServiceName: "test_service_" + strconv.Itoa(i)}, + Spans: []*jaegerpb.Span{{}}, + }, + } + requestsBatches = append(requestsBatches, batches) + } + + for _, batches := range requestsBatches { + err := client.Export(context.Background(), batches) + require.Nil(t, err) + } + + requests := transport.requests() + assert.Len(t, requests, len(requestsBatches)) + + for i, want := range requestsBatches { + assertRequestEqualBatches(t, requests[i].r, want) + } + + for _, request := range requests { + assert.Equal(t, request.r.Header.Get(headerAccessToken), "ClientToken") + assert.EqualValues(t, request.r.Header.Get(headerContentEncoding), test.compressionMethod) + } }, - } - requestsBatches = append(requestsBatches, batches) - } - - for _, batches := range requestsBatches { - err := c.Export(context.Background(), batches) - require.Nil(t, err) - } - - requests := transport.requests() - assert.Len(t, requests, len(requestsBatches)) - - for i, want := range requestsBatches { - assertRequestEqualBatches(t, requests[i].r, want) - } - - for _, request := range requests { - assert.Equal(t, request.r.Header.Get(headerAccessToken), "ClientToken") + ) } } @@ -412,3 +451,12 @@ func TestPauses(t *testing.T) { assert.True(t, e >= time.Second*time.Duration(retryDelaySeconds)) } } + +func TestInvalidCompressionMethod(t *testing.T) { + _, err := New( + defaultEndpointOption, + WithHTTPClient(newMockHTTPClient(&mockTransport{})), + WithCompressionMethod("wrong"), + ) + require.Error(t, err) +} diff --git a/client/oc_status_code.go b/client/oc_status_code.go index c47ece8..c36552c 100644 --- a/client/oc_status_code.go +++ b/client/oc_status_code.go @@ -43,7 +43,6 @@ const ( headerRetryAfter = "Retry-After" headerContentEncoding = "Content-Encoding" headerContentType = "Content-Type" - headerValueGZIP = "gzip" headerValueXProtobuf = "application/x-protobuf" ) diff --git a/client/options.go b/client/options.go index d547d3b..820eef9 100644 --- a/client/options.go +++ b/client/options.go @@ -15,6 +15,7 @@ package client import ( + "fmt" "net/http" "go.opentelemetry.io/otel/trace" @@ -64,7 +65,7 @@ func WithAccessToken(t string) Option { } } -// WithDisabledCompression configures the client to not apply GZip compression on the outgoing requests. +// WithDisabledCompression configures the client to not apply compression on the outgoing requests. func WithDisabledCompression() Option { return func(a *Client) error { a.disableCompression = true @@ -72,6 +73,22 @@ func WithDisabledCompression() Option { } } +// WithCompressionMethod chooses the compression method for the outgoing requests. +// The default compression method is CompressionMethodGzip. +// This option is ignored if WithDisabledCompression() is used. +func WithCompressionMethod(compressionMethod CompressionMethod) Option { + return func(a *Client) error { + switch compressionMethod { + case CompressionMethodGzip, CompressionMethodZstd: + a.compressionMethod = compressionMethod + default: + return fmt.Errorf("invalid compression method %q", string(compressionMethod)) + } + + return nil + } +} + // WithTracerProvider returns an Option to use the TracerProvider when // creating a Tracer. func WithTracerProvider(tracerProvider trace.TracerProvider) Option { diff --git a/client/worker.go b/client/worker.go index b6a202f..b965cdd 100644 --- a/client/worker.go +++ b/client/worker.go @@ -25,6 +25,7 @@ import ( "strconv" jaegerpb "github.com/jaegertracing/jaeger/model" + "github.com/klauspost/compress/zstd" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -39,6 +40,11 @@ type IngestResponse struct { Err error } +type resetWriteCloser interface { + io.WriteCloser + Reset(w io.Writer) +} + // worker is not safe to be called from multiple goroutines. Each caller must use locks to avoid races // and data corruption. In case a caller needs to export multiple requests at the same time, it should // use one worker per request. @@ -47,11 +53,19 @@ type worker struct { client *http.Client accessToken string endpoint string - gzipWriter *gzip.Writer + compressWriter resetWriteCloser disableCompression bool + compressionMethod CompressionMethod } -func newWorker(client *http.Client, endpoint string, accessToken string, disableCompression bool, tracerProvider trace.TracerProvider) *worker { +func newWorker( + client *http.Client, + endpoint string, + accessToken string, + disableCompression bool, + compressionMethod CompressionMethod, + tracerProvider trace.TracerProvider, +) (*worker, error) { if tracerProvider == nil { tracerProvider = trace.NewNoopTracerProvider() } @@ -61,9 +75,29 @@ func newWorker(client *http.Client, endpoint string, accessToken string, disable accessToken: accessToken, endpoint: endpoint, disableCompression: disableCompression, - gzipWriter: gzip.NewWriter(nil), + compressionMethod: compressionMethod, + } + + if !disableCompression { + switch w.compressionMethod { + case CompressionMethodGzip: + w.compressWriter = gzip.NewWriter(nil) + case CompressionMethodZstd: + var err error + w.compressWriter, err = zstd.NewWriter( + nil, + // Enable sync mode to avoid concurrency overheads. + zstd.WithEncoderConcurrency(1), + ) + if err != nil { + return nil, err + } + default: + return nil, fmt.Errorf("unknown compression method %v", w.compressionMethod) + } } - return w + + return w, nil } func (w *worker) export(ctx context.Context, batches []*jaegerpb.Batch, accessToken string) (*IngestResponse, *ErrSend) { @@ -113,7 +147,7 @@ func (w *worker) send(ctx context.Context, r *sendRequest, accessToken string) ( req.Header.Add(headerContentType, headerValueXProtobuf) if !w.disableCompression { - req.Header.Add(headerContentEncoding, headerValueGZIP) + req.Header.Add(headerContentEncoding, string(w.compressionMethod)) } if accessToken == "" { @@ -194,14 +228,14 @@ func (w *worker) prepare(batches []*jaegerpb.Batch, spansCount int) (*sendReques } buf := bytes.NewBuffer([]byte{}) - w.gzipWriter.Reset(buf) + w.compressWriter.Reset(buf) - if _, err = w.gzipWriter.Write(encoded); err != nil { - return nil, fmt.Errorf("failed to gzip request: %w", err) + if _, err = w.compressWriter.Write(encoded); err != nil { + return nil, fmt.Errorf("failed to compress request: %w", err) } - if err = w.gzipWriter.Close(); err != nil { - return nil, fmt.Errorf("failed to gzip request: %w", err) + if err = w.compressWriter.Close(); err != nil { + return nil, fmt.Errorf("failed to compress request: %w", err) } sr := &sendRequest{ message: buf.Bytes(), diff --git a/client/worker_test.go b/client/worker_test.go index e4d0e39..d8beb47 100644 --- a/client/worker_test.go +++ b/client/worker_test.go @@ -19,8 +19,10 @@ import ( "compress/gzip" "context" "errors" + "fmt" "io/ioutil" "net/http" + "strconv" "testing" "github.com/gogo/protobuf/proto" @@ -30,6 +32,7 @@ import ( "go.opentelemetry.io/otel/trace" gen "github.com/signalfx/sapm-proto/gen" + "github.com/signalfx/sapm-proto/internal/testhelpers" ) var ( @@ -70,11 +73,19 @@ var ( ) func newTestWorker(c *http.Client) *worker { - return newWorker(c, "http://local", "", false, trace.NewNoopTracerProvider()) + w, err := newWorker(c, "http://local", "", false, CompressionMethodGzip, trace.NewNoopTracerProvider()) + if err != nil { + panic(err) + } + return w } func newTestWorkerWithCompression(c *http.Client, disableCompression bool) *worker { - return newWorker(c, "http://local", "", disableCompression, trace.NewNoopTracerProvider()) + w, err := newWorker(c, "http://local", "", disableCompression, CompressionMethodGzip, trace.NewNoopTracerProvider()) + if err != nil { + panic(err) + } + return w } func TestPrepare(t *testing.T) { @@ -137,7 +148,7 @@ func TestWorkerSend(t *testing.T) { r := received[0].r assert.Equal(t, r.Method, "POST") - assert.Equal(t, r.Header.Get(headerContentEncoding), headerValueGZIP) + assert.EqualValues(t, r.Header.Get(headerContentEncoding), CompressionMethodGzip) assert.Equal(t, r.Header.Get(headerContentType), headerValueXProtobuf) } @@ -156,7 +167,7 @@ func TestWorkerSendWithAccessToken(t *testing.T) { r := received[0].r assert.Equal(t, r.Method, "POST") - assert.Equal(t, r.Header.Get(headerContentEncoding), headerValueGZIP) + assert.EqualValues(t, r.Header.Get(headerContentEncoding), CompressionMethodGzip) assert.Equal(t, r.Header.Get(headerContentType), headerValueXProtobuf) assert.Equal(t, r.Header.Get(headerAccessToken), "Preferential") } @@ -177,7 +188,7 @@ func TestWorkerSendDefaultsToWorkerToken(t *testing.T) { r := received[0].r assert.Equal(t, r.Method, "POST") - assert.Equal(t, r.Header.Get(headerContentEncoding), headerValueGZIP) + assert.EqualValues(t, r.Header.Get(headerContentEncoding), CompressionMethodGzip) assert.Equal(t, r.Header.Get(headerContentType), headerValueXProtobuf) assert.Equal(t, r.Header.Get(headerAccessToken), "WorkerToken") } @@ -278,3 +289,83 @@ func TestWorkerIngestResponse(t *testing.T) { assert.Equal(t, 500, sendErr.StatusCode) assert.Equal(t, response, string(ingestResponse.Body)) } + +func TestCompressionSize(t *testing.T) { + fmt.Println("Message byte size by batch size and compression method.") + fmt.Printf("Compression ") + for _, test := range compressionTests { + fmt.Printf("%10v", test.name) + } + fmt.Printf("\n") + + batchSizes := []int{1, 10, 100, 1000, 10000} + for _, batchSize := range batchSizes { + + fmt.Printf("Batch=%-5v ", batchSize) + + var byteSizes []int + for _, test := range compressionTests { + sapmData := testhelpers.CreateSapmData(batchSize) + + transport := &mockTransport{} + client := newMockHTTPClient(transport) + w, err := newWorker( + client, + "http://local", + "", + test.disableCompression, + test.compressionMethod, + trace.NewNoopTracerProvider(), + ) + require.NoError(t, err) + sr, err := w.prepare(sapmData.Batches, len(sapmData.Batches)) + require.NoError(t, err) + + byteSizes = append(byteSizes, len(sr.message)) + } + for _, s := range byteSizes { + fmt.Printf("%10v", s) + } + fmt.Printf("\n") + } +} + +func BenchmarkCompression(b *testing.B) { + + batchSizes := []int{1, 100, 1000} + for _, batchSize := range batchSizes { + for _, test := range compressionTests { + sapmData := testhelpers.CreateSapmData(batchSize) + + b.Run( + test.name+"/batch="+strconv.Itoa(batchSize), func(b *testing.B) { + transport := &mockTransport{} + client := newMockHTTPClient(transport) + w, err := newWorker( + client, + "http://local", + "", + test.disableCompression, + test.compressionMethod, + trace.NewNoopTracerProvider(), + ) + require.NoError(b, err) + + for i := 0; i < b.N; i++ { + sr, err := w.prepare(sapmData.Batches, len(sapmData.Batches)) + require.NoError(b, err) + + _, err = w.send(context.Background(), sr, "") + require.Nil(b, err) + + received := transport.requests() + require.Len(b, received, i+1) + + r := received[i].r + assert.EqualValues(b, r.Header.Get(headerContentEncoding), test.compressionMethod) + } + }, + ) + } + } +} diff --git a/go.mod b/go.mod index 7dfaf15..5713ba0 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.5.2 github.com/jaegertracing/jaeger v1.38.0 + github.com/klauspost/compress v1.16.5 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.60.0 github.com/stretchr/testify v1.8.0 go.opencensus.io v0.23.0 @@ -26,7 +27,6 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.16.5 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.60.0 // indirect diff --git a/internal/testhelpers/testdata.go b/internal/testhelpers/testdata.go new file mode 100644 index 0000000..8dac6e6 --- /dev/null +++ b/internal/testhelpers/testdata.go @@ -0,0 +1,47 @@ +package testhelpers + +import ( + "strconv" + "time" + + "github.com/jaegertracing/jaeger/model" + + splunksapm "github.com/signalfx/sapm-proto/gen" +) + +func CreateSapmData(batchSize int) *splunksapm.PostSpansRequest { + attrs := []string{ + "service.name", "shoppingcart", "host.name", "spool.example.com", "service.id", + "adb80442-8437-46b5-a637-ce4a158ba9cf", + } + + batch := &model.Batch{ + Process: &model.Process{ServiceName: "spring"}, + Spans: []*model.Span{}, + } + for i := 0; i < batchSize; i++ { + span := &model.Span{ + TraceID: model.NewTraceID(uint64(i*5), uint64(i*10)), + SpanID: model.NewSpanID(uint64(i)), + OperationName: "jonatan" + strconv.Itoa(i), + Duration: time.Millisecond * time.Duration(i), + Tags: model.KeyValues{ + {Key: "span.kind", VStr: "client", VType: model.StringType}, + }, + StartTime: time.Now().UTC().Add(time.Second * time.Duration(i)), + } + for j := 0; j < 2; j++ { + span.Tags = append( + span.Tags, + model.KeyValue{ + Key: attrs[(i+j)%len(attrs)], + VStr: attrs[(i+j+1)%len(attrs)], + VType: model.StringType, + }, + ) + } + + batch.Spans = append(batch.Spans, span) + } + return &splunksapm.PostSpansRequest{Batches: []*model.Batch{batch}} +} diff --git a/sapmprotocol/parser_test.go b/sapmprotocol/parser_test.go index d9398d0..4da104a 100644 --- a/sapmprotocol/parser_test.go +++ b/sapmprotocol/parser_test.go @@ -34,6 +34,7 @@ import ( "github.com/stretchr/testify/require" splunksapm "github.com/signalfx/sapm-proto/gen" + "github.com/signalfx/sapm-proto/internal/testhelpers" ) func TestNewV2TraceHandler(t *testing.T) { @@ -274,43 +275,6 @@ func BenchmarkDecode(b *testing.B) { } } -func createSapmData(batchSize int) *splunksapm.PostSpansRequest { - attrs := []string{ - "service.name", "shoppingcart", "host.name", "spool.example.com", "service.id", - "adb80442-8437-46b5-a637-ce4a158ba9cf", - } - - batch := &model.Batch{ - Process: &model.Process{ServiceName: "spring"}, - Spans: []*model.Span{}, - } - for i := 0; i < batchSize; i++ { - span := &model.Span{ - TraceID: model.NewTraceID(uint64(i*5), uint64(i*10)), - SpanID: model.NewSpanID(uint64(i)), - OperationName: "jonatan" + strconv.Itoa(i), - Duration: time.Millisecond * time.Duration(i), - Tags: model.KeyValues{ - {Key: "span.kind", VStr: "client", VType: model.StringType}, - }, - StartTime: time.Now().UTC().Add(time.Second * time.Duration(i)), - } - for j := 0; j < 2; j++ { - span.Tags = append( - span.Tags, - model.KeyValue{ - Key: attrs[(i+j)%len(attrs)], - VStr: attrs[(i+j+1)%len(attrs)], - VType: model.StringType, - }, - ) - } - - batch.Spans = append(batch.Spans, span) - } - return &splunksapm.PostSpansRequest{Batches: []*model.Batch{batch}} -} - func zstdBytes(uncompressedBytes []byte) []byte { buf := bytes.NewBuffer(nil) w, err := zstd.NewWriter(buf) @@ -382,7 +346,7 @@ func BenchmarkDecodeMatrix(b *testing.B) { for _, batchSize := range batchSizes { // Encode the batch to binary ProtoBuf. - sapmData := createSapmData(batchSize) + sapmData := testhelpers.CreateSapmData(batchSize) uncompressedBytes, err := sapmData.Marshal() if err != nil {