Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
249 changes: 125 additions & 124 deletions go.mod

Large diffs are not rendered by default.

554 changes: 278 additions & 276 deletions go.sum

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions integration/util/harness_api_writes.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.opentelemetry.io/collector/pdata/ptrace"
mnoop "go.opentelemetry.io/otel/metric/noop"
Expand Down Expand Up @@ -127,12 +129,12 @@ func newOtelGRPCExporterWithAuth(endpoint, orgID, basicAuthToken string, useTLS
otlpCfg := exporterCfg.(*otlpexporter.Config)

// Configure headers for authentication (gRPC metadata format)
headers := make(map[string]configopaque.String)
var headers configopaque.MapList
if orgID != "" {
headers[xScopeOrgIDHeader] = configopaque.String(orgID)
headers.Set(xScopeOrgIDHeader, configopaque.String(orgID))
}
if basicAuthToken != "" {
headers[authorizationHeader] = configopaque.String("Basic " + basicAuthToken)
headers.Set(authorizationHeader, configopaque.String("Basic "+basicAuthToken))
}

otlpCfg.ClientConfig = configgrpc.ClientConfig{
Expand All @@ -146,7 +148,7 @@ func newOtelGRPCExporterWithAuth(endpoint, orgID, basicAuthToken string, useTLS
// Disable retries to get immediate error feedback
otlpCfg.RetryConfig.Enabled = false
// Disable queueing
otlpCfg.QueueConfig.Enabled = false
otlpCfg.QueueConfig = configoptional.None[exporterhelper.QueueBatchConfig]()
// beef up the timeout to 30 seconds to avoid flakes
otlpCfg.TimeoutConfig.Timeout = 30 * time.Second

Expand Down
8 changes: 4 additions & 4 deletions modules/distributor/receiver/metrics_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@
metrics *metrics
}

func (r Int64Counter) Enabled(_ context.Context) bool {
return r.Name != ""
}

Check notice on line 108 in modules/distributor/receiver/metrics_provider.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 106-108 are not covered by tests

func (r Int64Counter) Add(_ context.Context, value int64, options ...metric.AddOption) {
// don't do anything for metrics that we don't care
if r.Name == "" {
Expand Down Expand Up @@ -130,7 +134,3 @@
r.metrics.receiverRefusedSpans.WithLabelValues(receiver, transport).Add(float64(value))
}
}

func (r Int64Counter) Enabled(_ context.Context) bool {
return true
}
6 changes: 5 additions & 1 deletion modules/distributor/receiver/shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.opentelemetry.io/collector/service/telemetry/otelconftelemetry"
"go.opentelemetry.io/otel"
otelcodes "go.opentelemetry.io/otel/codes"
"go.uber.org/multierr"
Expand Down Expand Up @@ -234,6 +235,7 @@ func New(receiverCfg map[string]interface{}, pusher TracesPusher, middleware Mid
conf, err := pro.Get(context.Background(), otelcol.Factories{
Receivers: receiverFactories,
Exporters: map[component.Type]exporter.Factory{component.MustNewType("nop"): exportertest.NewNopFactory()}, // nop exporter to avoid errors
Telemetry: otelconftelemetry.NewFactory(),
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -361,7 +363,9 @@ func (r *receiversShim) ConsumeTraces(ctx context.Context, td ptrace.Traces) err
}

// GetExtensions implements component.Host
func (r *receiversShim) GetExtensions() map[component.ID]component.Component { return nil }
func (r *receiversShim) GetExtensions() map[component.ID]component.Component {
return map[component.ID]component.Component{}
}

// observability shims
func newLogger(level dslog.Level) *zap.Logger {
Expand Down
7 changes: 4 additions & 3 deletions modules/distributor/receiver/shim_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import (
// These tests use the OpenTelemetry Collector Exporters to validate the different protocols
func TestShim_integration(t *testing.T) {
randomTraces := testdata.GenerateTraces(5)
headers := map[string]configopaque.String{generator.NoGenerateMetricsContextKey: "true"}
var headers configopaque.MapList
headers.Set(generator.NoGenerateMetricsContextKey, "true")

testCases := []struct {
name string
Expand Down Expand Up @@ -195,11 +196,11 @@ type mockHost struct{}
var _ component.Host = (*mockHost)(nil)

func (m *mockHost) GetFactory(component.Kind, component.Type) component.Factory {
panic("implement me")
return nil
}

func (m *mockHost) GetExtensions() map[component.ID]component.Component {
panic("implement me")
return map[component.ID]component.Component{}
}

type capturingPusher struct {
Expand Down
64 changes: 50 additions & 14 deletions pkg/gogocodec/gogocodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package gogocodec

import (
"fmt"
"reflect"
"strings"

Expand All @@ -23,7 +24,6 @@
jaegerProtoGenPkgPath = "github.com/jaegertracing/jaeger-idl/proto-gen"
jaegerModelPkgPath = "github.com/jaegertracing/jaeger-idl/model"
jaegerStorageV1PkgPath = "github.com/grafana/tempo/cmd/tempo-query/jaeger/storage_v1"
otelProtoPkgPath = "go.opentelemetry.io/collector"
// etcd path can be removed once upgrade to grpc >v1.38 is released (tentatively next release from v3.5.1)
etcdAPIProtoPkgPath = "go.etcd.io/etcd/api/v3"
)
Expand All @@ -33,6 +33,15 @@

var _ encoding.Codec = (*GogoCodec)(nil)

// This mirrors OTEL collector's internal, unexported otelEncoder interface:
// go.opentelemetry.io/collector/pdata/internal/otelgrpc/encoding.go
// We cannot import it here because it's unexported and under an internal package.
type otelProtoMessage interface {
SizeProto() int
MarshalProto([]byte) int
UnmarshalProto([]byte) error
}

func NewCodec() *GogoCodec {
return &GogoCodec{}
}
Expand All @@ -43,32 +52,59 @@
}

// Marshal implements encoding.Codec
func (c *GogoCodec) Marshal(v interface{}) ([]byte, error) {
t := reflect.TypeOf(v)
elem := t.Elem()
func (c *GogoCodec) Marshal(v any) ([]byte, error) {
if m, ok := v.(otelProtoMessage); ok {
size := m.SizeProto()
buf := make([]byte, size)
n := m.MarshalProto(buf)
return buf[:n], nil
}

// use gogo proto only for Tempo/Cortex/Jaeger/etcd types
if useGogo(elem) {
return gogoproto.Marshal(v.(gogoproto.Message))
if useGogo(reflect.TypeOf(v)) {
if msg, ok := v.(gogoproto.Message); ok {
return gogoproto.Marshal(msg)
}
}

msg, ok := v.(proto.Message)
if !ok {
return nil, fmt.Errorf("gogocodec: unsupported marshal type %T", v)
}
return proto.Marshal(v.(proto.Message))
return proto.Marshal(msg)
}

// Unmarshal implements encoding.Codec
func (c *GogoCodec) Unmarshal(data []byte, v interface{}) error {
t := reflect.TypeOf(v)
elem := t.Elem()
func (c *GogoCodec) Unmarshal(data []byte, v any) error {
if m, ok := v.(otelProtoMessage); ok {
return m.UnmarshalProto(data)
}

// use gogo proto only for Tempo/Cortex/Jaeger/etcd types
if useGogo(elem) {
return gogoproto.Unmarshal(data, v.(gogoproto.Message))
if useGogo(reflect.TypeOf(v)) {
if msg, ok := v.(gogoproto.Message); ok {
return gogoproto.Unmarshal(data, msg)
}
}
return proto.Unmarshal(data, v.(proto.Message))

msg, ok := v.(proto.Message)
if !ok {
return fmt.Errorf("gogocodec: unsupported unmarshal type %T", v)
}
return proto.Unmarshal(data, msg)
}

// useGogo checks if the element belongs to Tempo/Cortex/Jaeger/etcd packages
func useGogo(t reflect.Type) bool {
if t == nil {
return false
}
for t.Kind() == reflect.Pointer {
t = t.Elem()
if t == nil {
return false
}

Check notice on line 106 in pkg/gogocodec/gogocodec.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 105-106 are not covered by tests
}
pkgPath := t.PkgPath()
return strings.HasPrefix(pkgPath, frontendProtoGenPkgPath) || strings.HasPrefix(pkgPath, tempoProtoGenPkgPath) || strings.HasPrefix(pkgPath, jaegerProtoGenPkgPath) || strings.HasPrefix(pkgPath, jaegerModelPkgPath) || strings.HasPrefix(pkgPath, jaegerStorageV1PkgPath) || strings.HasPrefix(pkgPath, otelProtoPkgPath) || strings.HasPrefix(pkgPath, etcdAPIProtoPkgPath)
return strings.HasPrefix(pkgPath, frontendProtoGenPkgPath) || strings.HasPrefix(pkgPath, tempoProtoGenPkgPath) || strings.HasPrefix(pkgPath, jaegerProtoGenPkgPath) || strings.HasPrefix(pkgPath, jaegerModelPkgPath) || strings.HasPrefix(pkgPath, jaegerStorageV1PkgPath) || strings.HasPrefix(pkgPath, etcdAPIProtoPkgPath)
}
47 changes: 47 additions & 0 deletions pkg/gogocodec/gogocodec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,24 @@ import (
"github.com/grafana/tempo/pkg/tempopb"
)

type mockOTELProtoMessage struct {
Val byte
}

func (m *mockOTELProtoMessage) SizeProto() int {
return 1
}

func (m *mockOTELProtoMessage) MarshalProto(buf []byte) int {
buf[0] = m.Val
return 1
}

func (m *mockOTELProtoMessage) UnmarshalProto(buf []byte) error {
m.Val = buf[0]
return nil
}

func TestCodecMarshallAndUnmarshall_tempo_type(t *testing.T) {
// marshal a tempo object using the custom codec
c := NewCodec()
Expand Down Expand Up @@ -66,3 +84,32 @@ func TestWireCompatibility(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, req1, req2)
}

func TestCodecMarshallAndUnmarshall_otel_proto_type(t *testing.T) {
c := NewCodec()

req1 := &mockOTELProtoMessage{Val: 42}
data, err := c.Marshal(req1)
require.NoError(t, err)

req2 := &mockOTELProtoMessage{}
err = c.Unmarshal(data, req2)
require.NoError(t, err)
assert.Equal(t, req1, req2)
}

func TestCodecMarshal_unsupported_type(t *testing.T) {
c := NewCodec()

_, err := c.Marshal(struct{}{})
require.Error(t, err)
assert.Contains(t, err.Error(), "unsupported marshal type")
}

func TestCodecUnmarshal_unsupported_type(t *testing.T) {
c := NewCodec()

err := c.Unmarshal([]byte{0x01}, struct{}{})
require.Error(t, err)
assert.Contains(t, err.Error(), "unsupported unmarshal type")
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions vendor/github.com/IBM/sarama/admin.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading