Skip to content

Commit

Permalink
Add zstd compression support to SAPM client (#138)
Browse files Browse the repository at this point in the history
I added a simple benchmark of compression methods and a test to print compressed
byte sizes. Take with a grain of salt since the input is based on just one trivial
data generator and may not be representative of production loads.

For the inputs used zstd wins both in compression size and speed (more apparent in
larger input sizes).

## Compression Sizes

```
=== RUN   TestCompressionSize
Message byte size by batch and compression method.
Compression       none      gzip      zstd
Batch=1            148       139       135
Batch=10          1558       503       490
Batch=100        15958      2495      2360
Batch=1000      161590     22110     22978
Batch=10000    1644106    222217    181468
--- PASS: TestCompressionSize (0.13s)
```

## Benchmark

```
BenchmarkCompression/none/batch=1-8                56662             85501 ns/op          233172 B/op         28 allocs/op
BenchmarkCompression/gzip/batch=1-8                27688             64055 ns/op          117560 B/op         35 allocs/op
BenchmarkCompression/zstd/batch=1-8                41965             69071 ns/op          174616 B/op         33 allocs/op
BenchmarkCompression/none/batch=100-8              20131             63991 ns/op          117124 B/op        325 allocs/op
BenchmarkCompression/gzip/batch=100-8               3043            408736 ns/op           54255 B/op        335 allocs/op
BenchmarkCompression/zstd/batch=100-8              10276            120161 ns/op           80178 B/op        330 allocs/op
BenchmarkCompression/none/batch=1000-8              3066            382949 ns/op          323067 B/op       3025 allocs/op
BenchmarkCompression/gzip/batch=1000-8               243           4787768 ns/op          379947 B/op       3038 allocs/op
BenchmarkCompression/zstd/batch=1000-8              1128           1014972 ns/op          391926 B/op       3034 allocs/op
```
  • Loading branch information
tigrannajaryan authored Jun 8, 2023
1 parent 6c867a0 commit 755ac0a
Show file tree
Hide file tree
Showing 9 changed files with 312 additions and 96 deletions.
36 changes: 26 additions & 10 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,26 @@ type sendRequest struct {
batches int64
}

// CompressionMethod strings MUST match the Content-Encoding http header values.
type CompressionMethod string

const CompressionMethodGzip CompressionMethod = "gzip"
const CompressionMethodZstd CompressionMethod = "zstd"

// Client implements an HTTP sender for the SAPM protocol
type Client struct {
tracerProvider trace.TracerProvider
numWorkers uint
maxIdleCons uint
endpoint string
accessToken string
httpClient *http.Client
tracerProvider trace.TracerProvider
numWorkers uint
maxIdleCons uint
endpoint string
accessToken string
httpClient *http.Client

disableCompression bool
closeCh chan struct{}
// compressionMethod to use for payload. Ignored if disableCompression==true.
compressionMethod CompressionMethod

closeCh chan struct{}

workers chan *worker
}
Expand All @@ -69,8 +79,9 @@ func New(opts ...Option) (*Client, error) {
}

c := &Client{
numWorkers: defaultNumWorkers,
maxIdleCons: defaultMaxIdleCons,
numWorkers: defaultNumWorkers,
maxIdleCons: defaultMaxIdleCons,
compressionMethod: CompressionMethodGzip,
}

for _, opt := range opts {
Expand Down Expand Up @@ -117,7 +128,12 @@ func New(opts ...Option) (*Client, error) {
c.closeCh = make(chan struct{})
c.workers = make(chan *worker, c.numWorkers)
for i := uint(0); i < c.numWorkers; i++ {
w := newWorker(c.httpClient, c.endpoint, c.accessToken, c.disableCompression, c.tracerProvider)
w, err := newWorker(
c.httpClient, c.endpoint, c.accessToken, c.disableCompression, c.compressionMethod, c.tracerProvider,
)
if err != nil {
return nil, err
}
c.workers <- w
}

Expand Down
108 changes: 78 additions & 30 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ var defaultEndpointOption = WithEndpoint("http://local")

func assertRequestEqualBatches(t *testing.T, r *http.Request, b []*jaegerpb.Batch) {
psr, err := sapmprotocol.ParseTraceV2Request(r)
assert.NoError(t, err)
assert.EqualValues(t, b, psr.Batches)
require.NoError(t, err)
if psr != nil {
require.EqualValues(t, b, psr.Batches)
}
}

func TestDefaults(t *testing.T) {
Expand All @@ -48,37 +50,74 @@ func TestDefaults(t *testing.T) {
assert.Equal(t, defaultNumWorkers, uint(len(c.workers)))
}

func TestClient(t *testing.T) {
transport := &mockTransport{}
c, err := New(defaultEndpointOption, WithHTTPClient(newMockHTTPClient(transport)), WithAccessToken("ClientToken"))
require.NoError(t, err)
var compressionTests = []struct {
name string
disableCompression bool
compressionMethod CompressionMethod
}{
{
name: "none",
disableCompression: true,
},
{
name: "gzip",
compressionMethod: CompressionMethodGzip,
},
{
name: "zstd",
compressionMethod: CompressionMethodZstd,
},
}

requestsBatches := [][]*jaegerpb.Batch{}
func TestClient(t *testing.T) {

for i := 0; i < 10; i++ {
batches := []*jaegerpb.Batch{
{
Process: &jaegerpb.Process{ServiceName: "test_service_" + strconv.Itoa(i)},
Spans: []*jaegerpb.Span{{}},
for _, test := range compressionTests {
t.Run(
test.name, func(t *testing.T) {
transport := &mockTransport{}
opts := []Option{
defaultEndpointOption,
WithHTTPClient(newMockHTTPClient(transport)),
WithAccessToken("ClientToken"),
}
if test.disableCompression {
opts = append(opts, WithDisabledCompression())
} else {
opts = append(opts, WithCompressionMethod(test.compressionMethod))
}
client, err := New(opts...)
require.NoError(t, err)

requestsBatches := [][]*jaegerpb.Batch{}

for i := 0; i < 10; i++ {
batches := []*jaegerpb.Batch{
{
Process: &jaegerpb.Process{ServiceName: "test_service_" + strconv.Itoa(i)},
Spans: []*jaegerpb.Span{{}},
},
}
requestsBatches = append(requestsBatches, batches)
}

for _, batches := range requestsBatches {
err := client.Export(context.Background(), batches)
require.Nil(t, err)
}

requests := transport.requests()
assert.Len(t, requests, len(requestsBatches))

for i, want := range requestsBatches {
assertRequestEqualBatches(t, requests[i].r, want)
}

for _, request := range requests {
assert.Equal(t, request.r.Header.Get(headerAccessToken), "ClientToken")
assert.EqualValues(t, request.r.Header.Get(headerContentEncoding), test.compressionMethod)
}
},
}
requestsBatches = append(requestsBatches, batches)
}

for _, batches := range requestsBatches {
err := c.Export(context.Background(), batches)
require.Nil(t, err)
}

requests := transport.requests()
assert.Len(t, requests, len(requestsBatches))

for i, want := range requestsBatches {
assertRequestEqualBatches(t, requests[i].r, want)
}

for _, request := range requests {
assert.Equal(t, request.r.Header.Get(headerAccessToken), "ClientToken")
)
}
}

Expand Down Expand Up @@ -411,3 +450,12 @@ func TestPauses(t *testing.T) {
assert.True(t, e >= time.Second*time.Duration(retryDelaySeconds))
}
}

func TestInvalidCompressionMethod(t *testing.T) {
_, err := New(
defaultEndpointOption,
WithHTTPClient(newMockHTTPClient(&mockTransport{})),
WithCompressionMethod("wrong"),
)
require.Error(t, err)
}
1 change: 0 additions & 1 deletion client/oc_status_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const (
headerRetryAfter = "Retry-After"
headerContentEncoding = "Content-Encoding"
headerContentType = "Content-Type"
headerValueGZIP = "gzip"
headerValueXProtobuf = "application/x-protobuf"
)

Expand Down
19 changes: 18 additions & 1 deletion client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package client

import (
"fmt"
"net/http"

"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -64,14 +65,30 @@ func WithAccessToken(t string) Option {
}
}

// WithDisabledCompression configures the client to not apply GZip compression on the outgoing requests.
// WithDisabledCompression configures the client to not apply compression on the outgoing requests.
func WithDisabledCompression() Option {
return func(a *Client) error {
a.disableCompression = true
return nil
}
}

// WithCompressionMethod chooses the compression method for the outgoing requests.
// The default compression method is CompressionMethodGzip.
// This option is ignored if WithDisabledCompression() is used.
func WithCompressionMethod(compressionMethod CompressionMethod) Option {
return func(a *Client) error {
switch compressionMethod {
case CompressionMethodGzip, CompressionMethodZstd:
a.compressionMethod = compressionMethod
default:
return fmt.Errorf("invalid compression method %q", string(compressionMethod))
}

return nil
}
}

// WithTracerProvider returns an Option to use the TracerProvider when
// creating a Tracer.
func WithTracerProvider(tracerProvider trace.TracerProvider) Option {
Expand Down
54 changes: 44 additions & 10 deletions client/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strconv"

jaegerpb "github.com/jaegertracing/jaeger/model"
"github.com/klauspost/compress/zstd"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
Expand All @@ -39,6 +40,11 @@ type IngestResponse struct {
Err error
}

type resetWriteCloser interface {
io.WriteCloser
Reset(w io.Writer)
}

// worker is not safe to be called from multiple goroutines. Each caller must use locks to avoid races
// and data corruption. In case a caller needs to export multiple requests at the same time, it should
// use one worker per request.
Expand All @@ -47,11 +53,19 @@ type worker struct {
client *http.Client
accessToken string
endpoint string
gzipWriter *gzip.Writer
compressWriter resetWriteCloser
disableCompression bool
compressionMethod CompressionMethod
}

func newWorker(client *http.Client, endpoint string, accessToken string, disableCompression bool, tracerProvider trace.TracerProvider) *worker {
func newWorker(
client *http.Client,
endpoint string,
accessToken string,
disableCompression bool,
compressionMethod CompressionMethod,
tracerProvider trace.TracerProvider,
) (*worker, error) {
if tracerProvider == nil {
tracerProvider = trace.NewNoopTracerProvider()
}
Expand All @@ -61,9 +75,29 @@ func newWorker(client *http.Client, endpoint string, accessToken string, disable
accessToken: accessToken,
endpoint: endpoint,
disableCompression: disableCompression,
gzipWriter: gzip.NewWriter(nil),
compressionMethod: compressionMethod,
}

if !disableCompression {
switch w.compressionMethod {
case CompressionMethodGzip:
w.compressWriter = gzip.NewWriter(nil)
case CompressionMethodZstd:
var err error
w.compressWriter, err = zstd.NewWriter(
nil,
// Enable sync mode to avoid concurrency overheads.
zstd.WithEncoderConcurrency(1),
)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown compression method %v", w.compressionMethod)
}
}
return w

return w, nil
}

func (w *worker) export(ctx context.Context, batches []*jaegerpb.Batch, accessToken string) (*IngestResponse, *ErrSend) {
Expand Down Expand Up @@ -113,7 +147,7 @@ func (w *worker) send(ctx context.Context, r *sendRequest, accessToken string) (
req.Header.Add(headerContentType, headerValueXProtobuf)

if !w.disableCompression {
req.Header.Add(headerContentEncoding, headerValueGZIP)
req.Header.Add(headerContentEncoding, string(w.compressionMethod))
}

if accessToken == "" {
Expand Down Expand Up @@ -194,14 +228,14 @@ func (w *worker) prepare(batches []*jaegerpb.Batch, spansCount int) (*sendReques
}

buf := bytes.NewBuffer([]byte{})
w.gzipWriter.Reset(buf)
w.compressWriter.Reset(buf)

if _, err = w.gzipWriter.Write(encoded); err != nil {
return nil, fmt.Errorf("failed to gzip request: %w", err)
if _, err = w.compressWriter.Write(encoded); err != nil {
return nil, fmt.Errorf("failed to compress request: %w", err)
}

if err = w.gzipWriter.Close(); err != nil {
return nil, fmt.Errorf("failed to gzip request: %w", err)
if err = w.compressWriter.Close(); err != nil {
return nil, fmt.Errorf("failed to compress request: %w", err)
}
sr := &sendRequest{
message: buf.Bytes(),
Expand Down
Loading

0 comments on commit 755ac0a

Please sign in to comment.