diff --git a/client/client.go b/client/client.go index 921533d..febcb17 100644 --- a/client/client.go +++ b/client/client.go @@ -47,16 +47,26 @@ type sendRequest struct { batches int64 } +// CompressionMethod strings MUST match the Content-Encoding http header values. +type CompressionMethod string + +const CompressionMethodGzip CompressionMethod = "gzip" +const CompressionMethodZstd CompressionMethod = "zstd" + // Client implements an HTTP sender for the SAPM protocol type Client struct { - tracerProvider trace.TracerProvider - numWorkers uint - maxIdleCons uint - endpoint string - accessToken string - httpClient *http.Client + tracerProvider trace.TracerProvider + numWorkers uint + maxIdleCons uint + endpoint string + accessToken string + httpClient *http.Client + disableCompression bool - closeCh chan struct{} + // compressionMethod to use for payload. Ignored if disableCompression==true. + compressionMethod CompressionMethod + + closeCh chan struct{} workers chan *worker } @@ -69,8 +79,9 @@ func New(opts ...Option) (*Client, error) { } c := &Client{ - numWorkers: defaultNumWorkers, - maxIdleCons: defaultMaxIdleCons, + numWorkers: defaultNumWorkers, + maxIdleCons: defaultMaxIdleCons, + compressionMethod: CompressionMethodGzip, } for _, opt := range opts { @@ -117,7 +128,12 @@ func New(opts ...Option) (*Client, error) { c.closeCh = make(chan struct{}) c.workers = make(chan *worker, c.numWorkers) for i := uint(0); i < c.numWorkers; i++ { - w := newWorker(c.httpClient, c.endpoint, c.accessToken, c.disableCompression, c.tracerProvider) + w, err := newWorker( + c.httpClient, c.endpoint, c.accessToken, c.disableCompression, c.compressionMethod, c.tracerProvider, + ) + if err != nil { + return nil, err + } c.workers <- w } diff --git a/client/client_test.go b/client/client_test.go index 8bef2cb..1814a61 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -34,8 +34,10 @@ var defaultEndpointOption = WithEndpoint("http://local") func assertRequestEqualBatches(t *testing.T, r *http.Request, b []*jaegerpb.Batch) { psr, err := sapmprotocol.ParseTraceV2Request(r) - assert.NoError(t, err) - assert.EqualValues(t, b, psr.Batches) + require.NoError(t, err) + if psr != nil { + require.EqualValues(t, b, psr.Batches) + } } func TestDefaults(t *testing.T) { @@ -48,37 +50,74 @@ func TestDefaults(t *testing.T) { assert.Equal(t, defaultNumWorkers, uint(len(c.workers))) } -func TestClient(t *testing.T) { - transport := &mockTransport{} - c, err := New(defaultEndpointOption, WithHTTPClient(newMockHTTPClient(transport)), WithAccessToken("ClientToken")) - require.NoError(t, err) +var compressionTests = []struct { + name string + disableCompression bool + compressionMethod CompressionMethod +}{ + { + name: "none", + disableCompression: true, + }, + { + name: "gzip", + compressionMethod: CompressionMethodGzip, + }, + { + name: "zstd", + compressionMethod: CompressionMethodZstd, + }, +} - requestsBatches := [][]*jaegerpb.Batch{} +func TestClient(t *testing.T) { - for i := 0; i < 10; i++ { - batches := []*jaegerpb.Batch{ - { - Process: &jaegerpb.Process{ServiceName: "test_service_" + strconv.Itoa(i)}, - Spans: []*jaegerpb.Span{{}}, + for _, test := range compressionTests { + t.Run( + test.name, func(t *testing.T) { + transport := &mockTransport{} + opts := []Option{ + defaultEndpointOption, + WithHTTPClient(newMockHTTPClient(transport)), + WithAccessToken("ClientToken"), + } + if test.disableCompression { + opts = append(opts, WithDisabledCompression()) + } else { + opts = append(opts, WithCompressionMethod(test.compressionMethod)) + } + client, err := New(opts...) + require.NoError(t, err) + + requestsBatches := [][]*jaegerpb.Batch{} + + for i := 0; i < 10; i++ { + batches := []*jaegerpb.Batch{ + { + Process: &jaegerpb.Process{ServiceName: "test_service_" + strconv.Itoa(i)}, + Spans: []*jaegerpb.Span{{}}, + }, + } + requestsBatches = append(requestsBatches, batches) + } + + for _, batches := range requestsBatches { + err := client.Export(context.Background(), batches) + require.Nil(t, err) + } + + requests := transport.requests() + assert.Len(t, requests, len(requestsBatches)) + + for i, want := range requestsBatches { + assertRequestEqualBatches(t, requests[i].r, want) + } + + for _, request := range requests { + assert.Equal(t, request.r.Header.Get(headerAccessToken), "ClientToken") + assert.EqualValues(t, request.r.Header.Get(headerContentEncoding), test.compressionMethod) + } }, - } - requestsBatches = append(requestsBatches, batches) - } - - for _, batches := range requestsBatches { - err := c.Export(context.Background(), batches) - require.Nil(t, err) - } - - requests := transport.requests() - assert.Len(t, requests, len(requestsBatches)) - - for i, want := range requestsBatches { - assertRequestEqualBatches(t, requests[i].r, want) - } - - for _, request := range requests { - assert.Equal(t, request.r.Header.Get(headerAccessToken), "ClientToken") + ) } } @@ -412,3 +451,12 @@ func TestPauses(t *testing.T) { assert.True(t, e >= time.Second*time.Duration(retryDelaySeconds)) } } + +func TestInvalidCompressionMethod(t *testing.T) { + _, err := New( + defaultEndpointOption, + WithHTTPClient(newMockHTTPClient(&mockTransport{})), + WithCompressionMethod("wrong"), + ) + require.Error(t, err) +} diff --git a/client/oc_status_code.go b/client/oc_status_code.go index c47ece8..c36552c 100644 --- a/client/oc_status_code.go +++ b/client/oc_status_code.go @@ -43,7 +43,6 @@ const ( headerRetryAfter = "Retry-After" headerContentEncoding = "Content-Encoding" headerContentType = "Content-Type" - headerValueGZIP = "gzip" headerValueXProtobuf = "application/x-protobuf" ) diff --git a/client/options.go b/client/options.go index d547d3b..820eef9 100644 --- a/client/options.go +++ b/client/options.go @@ -15,6 +15,7 @@ package client import ( + "fmt" "net/http" "go.opentelemetry.io/otel/trace" @@ -64,7 +65,7 @@ func WithAccessToken(t string) Option { } } -// WithDisabledCompression configures the client to not apply GZip compression on the outgoing requests. +// WithDisabledCompression configures the client to not apply compression on the outgoing requests. func WithDisabledCompression() Option { return func(a *Client) error { a.disableCompression = true @@ -72,6 +73,22 @@ func WithDisabledCompression() Option { } } +// WithCompressionMethod chooses the compression method for the outgoing requests. +// The default compression method is CompressionMethodGzip. +// This option is ignored if WithDisabledCompression() is used. +func WithCompressionMethod(compressionMethod CompressionMethod) Option { + return func(a *Client) error { + switch compressionMethod { + case CompressionMethodGzip, CompressionMethodZstd: + a.compressionMethod = compressionMethod + default: + return fmt.Errorf("invalid compression method %q", string(compressionMethod)) + } + + return nil + } +} + // WithTracerProvider returns an Option to use the TracerProvider when // creating a Tracer. func WithTracerProvider(tracerProvider trace.TracerProvider) Option { diff --git a/client/worker.go b/client/worker.go index b6a202f..b965cdd 100644 --- a/client/worker.go +++ b/client/worker.go @@ -25,6 +25,7 @@ import ( "strconv" jaegerpb "github.com/jaegertracing/jaeger/model" + "github.com/klauspost/compress/zstd" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -39,6 +40,11 @@ type IngestResponse struct { Err error } +type resetWriteCloser interface { + io.WriteCloser + Reset(w io.Writer) +} + // worker is not safe to be called from multiple goroutines. Each caller must use locks to avoid races // and data corruption. In case a caller needs to export multiple requests at the same time, it should // use one worker per request. @@ -47,11 +53,19 @@ type worker struct { client *http.Client accessToken string endpoint string - gzipWriter *gzip.Writer + compressWriter resetWriteCloser disableCompression bool + compressionMethod CompressionMethod } -func newWorker(client *http.Client, endpoint string, accessToken string, disableCompression bool, tracerProvider trace.TracerProvider) *worker { +func newWorker( + client *http.Client, + endpoint string, + accessToken string, + disableCompression bool, + compressionMethod CompressionMethod, + tracerProvider trace.TracerProvider, +) (*worker, error) { if tracerProvider == nil { tracerProvider = trace.NewNoopTracerProvider() } @@ -61,9 +75,29 @@ func newWorker(client *http.Client, endpoint string, accessToken string, disable accessToken: accessToken, endpoint: endpoint, disableCompression: disableCompression, - gzipWriter: gzip.NewWriter(nil), + compressionMethod: compressionMethod, + } + + if !disableCompression { + switch w.compressionMethod { + case CompressionMethodGzip: + w.compressWriter = gzip.NewWriter(nil) + case CompressionMethodZstd: + var err error + w.compressWriter, err = zstd.NewWriter( + nil, + // Enable sync mode to avoid concurrency overheads. + zstd.WithEncoderConcurrency(1), + ) + if err != nil { + return nil, err + } + default: + return nil, fmt.Errorf("unknown compression method %v", w.compressionMethod) + } } - return w + + return w, nil } func (w *worker) export(ctx context.Context, batches []*jaegerpb.Batch, accessToken string) (*IngestResponse, *ErrSend) { @@ -113,7 +147,7 @@ func (w *worker) send(ctx context.Context, r *sendRequest, accessToken string) ( req.Header.Add(headerContentType, headerValueXProtobuf) if !w.disableCompression { - req.Header.Add(headerContentEncoding, headerValueGZIP) + req.Header.Add(headerContentEncoding, string(w.compressionMethod)) } if accessToken == "" { @@ -194,14 +228,14 @@ func (w *worker) prepare(batches []*jaegerpb.Batch, spansCount int) (*sendReques } buf := bytes.NewBuffer([]byte{}) - w.gzipWriter.Reset(buf) + w.compressWriter.Reset(buf) - if _, err = w.gzipWriter.Write(encoded); err != nil { - return nil, fmt.Errorf("failed to gzip request: %w", err) + if _, err = w.compressWriter.Write(encoded); err != nil { + return nil, fmt.Errorf("failed to compress request: %w", err) } - if err = w.gzipWriter.Close(); err != nil { - return nil, fmt.Errorf("failed to gzip request: %w", err) + if err = w.compressWriter.Close(); err != nil { + return nil, fmt.Errorf("failed to compress request: %w", err) } sr := &sendRequest{ message: buf.Bytes(), diff --git a/client/worker_test.go b/client/worker_test.go index e4d0e39..d8beb47 100644 --- a/client/worker_test.go +++ b/client/worker_test.go @@ -19,8 +19,10 @@ import ( "compress/gzip" "context" "errors" + "fmt" "io/ioutil" "net/http" + "strconv" "testing" "github.com/gogo/protobuf/proto" @@ -30,6 +32,7 @@ import ( "go.opentelemetry.io/otel/trace" gen "github.com/signalfx/sapm-proto/gen" + "github.com/signalfx/sapm-proto/internal/testhelpers" ) var ( @@ -70,11 +73,19 @@ var ( ) func newTestWorker(c *http.Client) *worker { - return newWorker(c, "http://local", "", false, trace.NewNoopTracerProvider()) + w, err := newWorker(c, "http://local", "", false, CompressionMethodGzip, trace.NewNoopTracerProvider()) + if err != nil { + panic(err) + } + return w } func newTestWorkerWithCompression(c *http.Client, disableCompression bool) *worker { - return newWorker(c, "http://local", "", disableCompression, trace.NewNoopTracerProvider()) + w, err := newWorker(c, "http://local", "", disableCompression, CompressionMethodGzip, trace.NewNoopTracerProvider()) + if err != nil { + panic(err) + } + return w } func TestPrepare(t *testing.T) { @@ -137,7 +148,7 @@ func TestWorkerSend(t *testing.T) { r := received[0].r assert.Equal(t, r.Method, "POST") - assert.Equal(t, r.Header.Get(headerContentEncoding), headerValueGZIP) + assert.EqualValues(t, r.Header.Get(headerContentEncoding), CompressionMethodGzip) assert.Equal(t, r.Header.Get(headerContentType), headerValueXProtobuf) } @@ -156,7 +167,7 @@ func TestWorkerSendWithAccessToken(t *testing.T) { r := received[0].r assert.Equal(t, r.Method, "POST") - assert.Equal(t, r.Header.Get(headerContentEncoding), headerValueGZIP) + assert.EqualValues(t, r.Header.Get(headerContentEncoding), CompressionMethodGzip) assert.Equal(t, r.Header.Get(headerContentType), headerValueXProtobuf) assert.Equal(t, r.Header.Get(headerAccessToken), "Preferential") } @@ -177,7 +188,7 @@ func TestWorkerSendDefaultsToWorkerToken(t *testing.T) { r := received[0].r assert.Equal(t, r.Method, "POST") - assert.Equal(t, r.Header.Get(headerContentEncoding), headerValueGZIP) + assert.EqualValues(t, r.Header.Get(headerContentEncoding), CompressionMethodGzip) assert.Equal(t, r.Header.Get(headerContentType), headerValueXProtobuf) assert.Equal(t, r.Header.Get(headerAccessToken), "WorkerToken") } @@ -278,3 +289,83 @@ func TestWorkerIngestResponse(t *testing.T) { assert.Equal(t, 500, sendErr.StatusCode) assert.Equal(t, response, string(ingestResponse.Body)) } + +func TestCompressionSize(t *testing.T) { + fmt.Println("Message byte size by batch size and compression method.") + fmt.Printf("Compression ") + for _, test := range compressionTests { + fmt.Printf("%10v", test.name) + } + fmt.Printf("\n") + + batchSizes := []int{1, 10, 100, 1000, 10000} + for _, batchSize := range batchSizes { + + fmt.Printf("Batch=%-5v ", batchSize) + + var byteSizes []int + for _, test := range compressionTests { + sapmData := testhelpers.CreateSapmData(batchSize) + + transport := &mockTransport{} + client := newMockHTTPClient(transport) + w, err := newWorker( + client, + "http://local", + "", + test.disableCompression, + test.compressionMethod, + trace.NewNoopTracerProvider(), + ) + require.NoError(t, err) + sr, err := w.prepare(sapmData.Batches, len(sapmData.Batches)) + require.NoError(t, err) + + byteSizes = append(byteSizes, len(sr.message)) + } + for _, s := range byteSizes { + fmt.Printf("%10v", s) + } + fmt.Printf("\n") + } +} + +func BenchmarkCompression(b *testing.B) { + + batchSizes := []int{1, 100, 1000} + for _, batchSize := range batchSizes { + for _, test := range compressionTests { + sapmData := testhelpers.CreateSapmData(batchSize) + + b.Run( + test.name+"/batch="+strconv.Itoa(batchSize), func(b *testing.B) { + transport := &mockTransport{} + client := newMockHTTPClient(transport) + w, err := newWorker( + client, + "http://local", + "", + test.disableCompression, + test.compressionMethod, + trace.NewNoopTracerProvider(), + ) + require.NoError(b, err) + + for i := 0; i < b.N; i++ { + sr, err := w.prepare(sapmData.Batches, len(sapmData.Batches)) + require.NoError(b, err) + + _, err = w.send(context.Background(), sr, "") + require.Nil(b, err) + + received := transport.requests() + require.Len(b, received, i+1) + + r := received[i].r + assert.EqualValues(b, r.Header.Get(headerContentEncoding), test.compressionMethod) + } + }, + ) + } + } +} diff --git a/go.mod b/go.mod index 7dfaf15..5713ba0 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.5.2 github.com/jaegertracing/jaeger v1.38.0 + github.com/klauspost/compress v1.16.5 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.60.0 github.com/stretchr/testify v1.8.0 go.opencensus.io v0.23.0 @@ -26,7 +27,6 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.16.5 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.60.0 // indirect diff --git a/internal/testhelpers/testdata.go b/internal/testhelpers/testdata.go new file mode 100644 index 0000000..8dac6e6 --- /dev/null +++ b/internal/testhelpers/testdata.go @@ -0,0 +1,47 @@ +package testhelpers + +import ( + "strconv" + "time" + + "github.com/jaegertracing/jaeger/model" + + splunksapm "github.com/signalfx/sapm-proto/gen" +) + +func CreateSapmData(batchSize int) *splunksapm.PostSpansRequest { + attrs := []string{ + "service.name", "shoppingcart", "host.name", "spool.example.com", "service.id", + "adb80442-8437-46b5-a637-ce4a158ba9cf", + } + + batch := &model.Batch{ + Process: &model.Process{ServiceName: "spring"}, + Spans: []*model.Span{}, + } + for i := 0; i < batchSize; i++ { + span := &model.Span{ + TraceID: model.NewTraceID(uint64(i*5), uint64(i*10)), + SpanID: model.NewSpanID(uint64(i)), + OperationName: "jonatan" + strconv.Itoa(i), + Duration: time.Millisecond * time.Duration(i), + Tags: model.KeyValues{ + {Key: "span.kind", VStr: "client", VType: model.StringType}, + }, + StartTime: time.Now().UTC().Add(time.Second * time.Duration(i)), + } + for j := 0; j < 2; j++ { + span.Tags = append( + span.Tags, + model.KeyValue{ + Key: attrs[(i+j)%len(attrs)], + VStr: attrs[(i+j+1)%len(attrs)], + VType: model.StringType, + }, + ) + } + + batch.Spans = append(batch.Spans, span) + } + return &splunksapm.PostSpansRequest{Batches: []*model.Batch{batch}} +} diff --git a/sapmprotocol/parser_test.go b/sapmprotocol/parser_test.go index d9398d0..4da104a 100644 --- a/sapmprotocol/parser_test.go +++ b/sapmprotocol/parser_test.go @@ -34,6 +34,7 @@ import ( "github.com/stretchr/testify/require" splunksapm "github.com/signalfx/sapm-proto/gen" + "github.com/signalfx/sapm-proto/internal/testhelpers" ) func TestNewV2TraceHandler(t *testing.T) { @@ -274,43 +275,6 @@ func BenchmarkDecode(b *testing.B) { } } -func createSapmData(batchSize int) *splunksapm.PostSpansRequest { - attrs := []string{ - "service.name", "shoppingcart", "host.name", "spool.example.com", "service.id", - "adb80442-8437-46b5-a637-ce4a158ba9cf", - } - - batch := &model.Batch{ - Process: &model.Process{ServiceName: "spring"}, - Spans: []*model.Span{}, - } - for i := 0; i < batchSize; i++ { - span := &model.Span{ - TraceID: model.NewTraceID(uint64(i*5), uint64(i*10)), - SpanID: model.NewSpanID(uint64(i)), - OperationName: "jonatan" + strconv.Itoa(i), - Duration: time.Millisecond * time.Duration(i), - Tags: model.KeyValues{ - {Key: "span.kind", VStr: "client", VType: model.StringType}, - }, - StartTime: time.Now().UTC().Add(time.Second * time.Duration(i)), - } - for j := 0; j < 2; j++ { - span.Tags = append( - span.Tags, - model.KeyValue{ - Key: attrs[(i+j)%len(attrs)], - VStr: attrs[(i+j+1)%len(attrs)], - VType: model.StringType, - }, - ) - } - - batch.Spans = append(batch.Spans, span) - } - return &splunksapm.PostSpansRequest{Batches: []*model.Batch{batch}} -} - func zstdBytes(uncompressedBytes []byte) []byte { buf := bytes.NewBuffer(nil) w, err := zstd.NewWriter(buf) @@ -382,7 +346,7 @@ func BenchmarkDecodeMatrix(b *testing.B) { for _, batchSize := range batchSizes { // Encode the batch to binary ProtoBuf. - sapmData := createSapmData(batchSize) + sapmData := testhelpers.CreateSapmData(batchSize) uncompressedBytes, err := sapmData.Marshal() if err != nil {