Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zstd compression support to SAPM client #138

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,26 @@ type sendRequest struct {
batches int64
}

// CompressionMethod strings MUST match the Content-Encoding http header values.
type CompressionMethod string

const CompressionMethodGzip CompressionMethod = "gzip"
const CompressionMethodZstd CompressionMethod = "zstd"

// Client implements an HTTP sender for the SAPM protocol
type Client struct {
tracerProvider trace.TracerProvider
numWorkers uint
maxIdleCons uint
endpoint string
accessToken string
httpClient *http.Client
tracerProvider trace.TracerProvider
numWorkers uint
maxIdleCons uint
endpoint string
accessToken string
httpClient *http.Client

disableCompression bool
closeCh chan struct{}
// compressionMethod to use for payload. Ignored if disableCompression==true.
compressionMethod CompressionMethod

closeCh chan struct{}

workers chan *worker
}
Expand All @@ -69,8 +79,9 @@ func New(opts ...Option) (*Client, error) {
}

c := &Client{
numWorkers: defaultNumWorkers,
maxIdleCons: defaultMaxIdleCons,
numWorkers: defaultNumWorkers,
maxIdleCons: defaultMaxIdleCons,
compressionMethod: CompressionMethodGzip,
}

for _, opt := range opts {
Expand Down Expand Up @@ -117,7 +128,12 @@ func New(opts ...Option) (*Client, error) {
c.closeCh = make(chan struct{})
c.workers = make(chan *worker, c.numWorkers)
for i := uint(0); i < c.numWorkers; i++ {
w := newWorker(c.httpClient, c.endpoint, c.accessToken, c.disableCompression, c.tracerProvider)
w, err := newWorker(
c.httpClient, c.endpoint, c.accessToken, c.disableCompression, c.compressionMethod, c.tracerProvider,
)
if err != nil {
return nil, err
}
c.workers <- w
}

Expand Down
108 changes: 78 additions & 30 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ var defaultEndpointOption = WithEndpoint("http://local")

func assertRequestEqualBatches(t *testing.T, r *http.Request, b []*jaegerpb.Batch) {
psr, err := sapmprotocol.ParseTraceV2Request(r)
assert.NoError(t, err)
assert.EqualValues(t, b, psr.Batches)
require.NoError(t, err)
if psr != nil {
require.EqualValues(t, b, psr.Batches)
}
}

func TestDefaults(t *testing.T) {
Expand All @@ -48,37 +50,74 @@ func TestDefaults(t *testing.T) {
assert.Equal(t, defaultNumWorkers, uint(len(c.workers)))
}

func TestClient(t *testing.T) {
transport := &mockTransport{}
c, err := New(defaultEndpointOption, WithHTTPClient(newMockHTTPClient(transport)), WithAccessToken("ClientToken"))
require.NoError(t, err)
var compressionTests = []struct {
name string
disableCompression bool
compressionMethod CompressionMethod
}{
{
name: "none",
disableCompression: true,
},
{
name: "gzip",
compressionMethod: CompressionMethodGzip,
},
{
name: "zstd",
compressionMethod: CompressionMethodZstd,
},
}

requestsBatches := [][]*jaegerpb.Batch{}
func TestClient(t *testing.T) {

for i := 0; i < 10; i++ {
batches := []*jaegerpb.Batch{
{
Process: &jaegerpb.Process{ServiceName: "test_service_" + strconv.Itoa(i)},
Spans: []*jaegerpb.Span{{}},
for _, test := range compressionTests {
t.Run(
test.name, func(t *testing.T) {
transport := &mockTransport{}
opts := []Option{
defaultEndpointOption,
WithHTTPClient(newMockHTTPClient(transport)),
WithAccessToken("ClientToken"),
}
if test.disableCompression {
opts = append(opts, WithDisabledCompression())
} else {
opts = append(opts, WithCompressionMethod(test.compressionMethod))
}
client, err := New(opts...)
require.NoError(t, err)

requestsBatches := [][]*jaegerpb.Batch{}

for i := 0; i < 10; i++ {
batches := []*jaegerpb.Batch{
{
Process: &jaegerpb.Process{ServiceName: "test_service_" + strconv.Itoa(i)},
Spans: []*jaegerpb.Span{{}},
},
}
requestsBatches = append(requestsBatches, batches)
}

for _, batches := range requestsBatches {
err := client.Export(context.Background(), batches)
require.Nil(t, err)
}

requests := transport.requests()
assert.Len(t, requests, len(requestsBatches))

for i, want := range requestsBatches {
assertRequestEqualBatches(t, requests[i].r, want)
}

for _, request := range requests {
assert.Equal(t, request.r.Header.Get(headerAccessToken), "ClientToken")
assert.EqualValues(t, request.r.Header.Get(headerContentEncoding), test.compressionMethod)
}
},
}
requestsBatches = append(requestsBatches, batches)
}

for _, batches := range requestsBatches {
err := c.Export(context.Background(), batches)
require.Nil(t, err)
}

requests := transport.requests()
assert.Len(t, requests, len(requestsBatches))

for i, want := range requestsBatches {
assertRequestEqualBatches(t, requests[i].r, want)
}

for _, request := range requests {
assert.Equal(t, request.r.Header.Get(headerAccessToken), "ClientToken")
)
}
}

Expand Down Expand Up @@ -412,3 +451,12 @@ func TestPauses(t *testing.T) {
assert.True(t, e >= time.Second*time.Duration(retryDelaySeconds))
}
}

func TestInvalidCompressionMethod(t *testing.T) {
_, err := New(
defaultEndpointOption,
WithHTTPClient(newMockHTTPClient(&mockTransport{})),
WithCompressionMethod("wrong"),
)
require.Error(t, err)
}
1 change: 0 additions & 1 deletion client/oc_status_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const (
headerRetryAfter = "Retry-After"
headerContentEncoding = "Content-Encoding"
headerContentType = "Content-Type"
headerValueGZIP = "gzip"
headerValueXProtobuf = "application/x-protobuf"
)

Expand Down
19 changes: 18 additions & 1 deletion client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package client

import (
"fmt"
"net/http"

"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -64,14 +65,30 @@ func WithAccessToken(t string) Option {
}
}

// WithDisabledCompression configures the client to not apply GZip compression on the outgoing requests.
// WithDisabledCompression configures the client to not apply compression on the outgoing requests.
func WithDisabledCompression() Option {
return func(a *Client) error {
a.disableCompression = true
return nil
}
}

// WithCompressionMethod chooses the compression method for the outgoing requests.
// The default compression method is CompressionMethodGzip.
// This option is ignored if WithDisabledCompression() is used.
func WithCompressionMethod(compressionMethod CompressionMethod) Option {
return func(a *Client) error {
switch compressionMethod {
case CompressionMethodGzip, CompressionMethodZstd:
a.compressionMethod = compressionMethod
default:
return fmt.Errorf("invalid compression method %q", string(compressionMethod))
}

return nil
}
}

// WithTracerProvider returns an Option to use the TracerProvider when
// creating a Tracer.
func WithTracerProvider(tracerProvider trace.TracerProvider) Option {
Expand Down
54 changes: 44 additions & 10 deletions client/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strconv"

jaegerpb "github.com/jaegertracing/jaeger/model"
"github.com/klauspost/compress/zstd"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
Expand All @@ -39,6 +40,11 @@ type IngestResponse struct {
Err error
}

type resetWriteCloser interface {
io.WriteCloser
Reset(w io.Writer)
}

// worker is not safe to be called from multiple goroutines. Each caller must use locks to avoid races
// and data corruption. In case a caller needs to export multiple requests at the same time, it should
// use one worker per request.
Expand All @@ -47,11 +53,19 @@ type worker struct {
client *http.Client
accessToken string
endpoint string
gzipWriter *gzip.Writer
compressWriter resetWriteCloser
disableCompression bool
compressionMethod CompressionMethod
}

func newWorker(client *http.Client, endpoint string, accessToken string, disableCompression bool, tracerProvider trace.TracerProvider) *worker {
func newWorker(
client *http.Client,
endpoint string,
accessToken string,
disableCompression bool,
compressionMethod CompressionMethod,
tracerProvider trace.TracerProvider,
) (*worker, error) {
if tracerProvider == nil {
tracerProvider = trace.NewNoopTracerProvider()
}
Expand All @@ -61,9 +75,29 @@ func newWorker(client *http.Client, endpoint string, accessToken string, disable
accessToken: accessToken,
endpoint: endpoint,
disableCompression: disableCompression,
gzipWriter: gzip.NewWriter(nil),
compressionMethod: compressionMethod,
}

if !disableCompression {
switch w.compressionMethod {
case CompressionMethodGzip:
w.compressWriter = gzip.NewWriter(nil)
case CompressionMethodZstd:
var err error
w.compressWriter, err = zstd.NewWriter(
nil,
// Enable sync mode to avoid concurrency overheads.
zstd.WithEncoderConcurrency(1),
)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown compression method %v", w.compressionMethod)
}
}
return w

return w, nil
}

func (w *worker) export(ctx context.Context, batches []*jaegerpb.Batch, accessToken string) (*IngestResponse, *ErrSend) {
Expand Down Expand Up @@ -113,7 +147,7 @@ func (w *worker) send(ctx context.Context, r *sendRequest, accessToken string) (
req.Header.Add(headerContentType, headerValueXProtobuf)

if !w.disableCompression {
req.Header.Add(headerContentEncoding, headerValueGZIP)
req.Header.Add(headerContentEncoding, string(w.compressionMethod))
}

if accessToken == "" {
Expand Down Expand Up @@ -194,14 +228,14 @@ func (w *worker) prepare(batches []*jaegerpb.Batch, spansCount int) (*sendReques
}

buf := bytes.NewBuffer([]byte{})
w.gzipWriter.Reset(buf)
w.compressWriter.Reset(buf)

if _, err = w.gzipWriter.Write(encoded); err != nil {
return nil, fmt.Errorf("failed to gzip request: %w", err)
if _, err = w.compressWriter.Write(encoded); err != nil {
return nil, fmt.Errorf("failed to compress request: %w", err)
}

if err = w.gzipWriter.Close(); err != nil {
return nil, fmt.Errorf("failed to gzip request: %w", err)
if err = w.compressWriter.Close(); err != nil {
return nil, fmt.Errorf("failed to compress request: %w", err)
}
sr := &sendRequest{
message: buf.Bytes(),
Expand Down
Loading