Skip to content

Commit

Permalink
Support more encodings for Kafka in OTel Ingester (#2580)
Browse files Browse the repository at this point in the history
* Support more encodings for kafka.consumer in OTel Ingester

Signed-off-by: Sam Xie <xsambundy@gmail.com>

* Support `otlp-proto` encoding for kafka in OTel app

Signed-off-by: Sam Xie <xsambundy@gmail.com>
XSAM authored Oct 24, 2020
1 parent 9f954fe commit 20d5459
Showing 7 changed files with 111 additions and 17 deletions.
1 change: 0 additions & 1 deletion cmd/opentelemetry/app/exporter/badgerexporter/doc.go
Original file line number Diff line number Diff line change
@@ -12,6 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.


// Package badgerexporter implements Jaeger Badger storage as OpenTelemetry exporter.
package badgerexporter
13 changes: 13 additions & 0 deletions cmd/opentelemetry/app/exporter/kafkaexporter/flags.go
Original file line number Diff line number Diff line change
@@ -16,12 +16,25 @@ package kafkaexporter

import (
"flag"
"fmt"

"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

const (
// encodingOTLPProto is used for spans encoded as OTLP Protobuf.
encodingOTLPProto = "otlp-proto"
)

// AddFlags adds Ingester flags.
func AddFlags(flags *flag.FlagSet) {
opts := &kafka.Options{}
opts.AddOTELFlags(flags)
// Modify kafka.producer.encoding flag
flags.Lookup("kafka.producer.encoding").Usage = fmt.Sprintf(
`Encoding of spans ("%s", "%s" or "%s") sent to kafka.`,
kafka.EncodingJSON,
kafka.EncodingProto,
encodingOTLPProto,
)
}
17 changes: 15 additions & 2 deletions cmd/opentelemetry/app/exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@ import (
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter/kafkaexporter"

"github.com/jaegertracing/jaeger/cmd/opentelemetry/app/receiver/kafkareceiver"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

@@ -59,7 +58,7 @@ func (f Factory) CreateDefaultConfig() configmodels.Exporter {
opts := &kafka.Options{}
opts.InitFromViper(f.Viper)

cfg.Encoding = kafkareceiver.MustOtelEncodingForJaegerEncoding(opts.Encoding)
cfg.Encoding = mustOtelEncodingForJaegerEncoding(opts.Encoding)
cfg.Topic = opts.Topic
cfg.Brokers = opts.Config.Brokers
cfg.ProtocolVersion = opts.Config.ProtocolVersion
@@ -127,3 +126,17 @@ func (f Factory) CreateLogsExporter(
) (component.LogsExporter, error) {
return f.Wrapped.CreateLogsExporter(ctx, params, cfg)
}

// mustOtelEncodingForJaegerEncoding translates a jaeger encoding to a otel encoding
func mustOtelEncodingForJaegerEncoding(jaegerEncoding string) string {
switch jaegerEncoding {
case kafka.EncodingProto:
return "jaeger_proto"
case kafka.EncodingJSON:
return "jaeger_json"
case encodingOTLPProto:
return "otlp_proto"
}

panic(jaegerEncoding + " is not a supported kafka encoding in the OTEL collector.")
}
Original file line number Diff line number Diff line change
@@ -27,6 +27,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/flags"
jConfig "github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

func TestDefaultConfig(t *testing.T) {
@@ -86,3 +87,37 @@ func TestLoadConfigAndFlags(t *testing.T) {
assert.Equal(t, "/etc/foo", kafkaCfg.Authentication.Kerberos.ConfigPath)
assert.Equal(t, "from-jaeger-config", kafkaCfg.Authentication.Kerberos.Username)
}

func TestMustOtelEncodingForJaegerEncoding(t *testing.T) {
tests := []struct {
in string
expected string
expectsPanic bool
}{
{
in: kafka.EncodingProto,
expected: "jaeger_proto",
},
{
in: kafka.EncodingJSON,
expected: "jaeger_json",
},
{
in: encodingOTLPProto,
expected: "otlp_proto",
},
{
in: "not-an-encoding",
expectsPanic: true,
},
}

for _, tt := range tests {
if tt.expectsPanic {
assert.Panics(t, func() { mustOtelEncodingForJaegerEncoding(tt.in) })
continue
}

assert.Equal(t, tt.expected, mustOtelEncodingForJaegerEncoding(tt.in))
}
}
19 changes: 19 additions & 0 deletions cmd/opentelemetry/app/receiver/kafkareceiver/flags.go
Original file line number Diff line number Diff line change
@@ -16,11 +16,30 @@ package kafkareceiver

import (
"flag"
"fmt"
"strings"

ingesterApp "github.com/jaegertracing/jaeger/cmd/ingester/app"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

const (
// encodingZipkinProto is used for spans encoded as Zipkin Protobuf.
encodingZipkinProto = "zipkin-proto"
// encodingZipkinJSON is used for spans encoded as Zipkin JSON.
encodingZipkinJSON = "zipkin-json"
// encodingOTLPProto is used for spans encoded as OTLP Protobuf.
encodingOTLPProto = "otlp-proto"
)

// AddFlags adds Ingester flags.
func AddFlags(flags *flag.FlagSet) {
ingesterApp.AddOTELFlags(flags)
// Modify kafka.consumer.encoding flag
flags.Lookup(ingesterApp.KafkaConsumerConfigPrefix + ingesterApp.SuffixEncoding).Usage = fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(
append(kafka.AllEncodings,
encodingZipkinJSON,
encodingZipkinProto,
encodingOTLPProto,
), "\", \""))
}
14 changes: 11 additions & 3 deletions cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
@@ -59,7 +59,7 @@ func (f *Factory) CreateDefaultConfig() configmodels.Receiver {

cfg.Brokers = opts.Brokers
cfg.ClientID = opts.ClientID
cfg.Encoding = MustOtelEncodingForJaegerEncoding(opts.Encoding)
cfg.Encoding = mustOtelEncodingForJaegerEncoding(opts.Encoding)
cfg.GroupID = opts.GroupID
cfg.Topic = opts.Topic
cfg.ProtocolVersion = opts.ProtocolVersion
@@ -138,13 +138,21 @@ func (f Factory) CreateLogsReceiver(
return f.Wrapped.CreateLogsReceiver(ctx, params, cfg, nextConsumer)
}

// MustOtelEncodingForJaegerEncoding translates a jaeger encoding to a otel encoding
func MustOtelEncodingForJaegerEncoding(jaegerEncoding string) string {
// mustOtelEncodingForJaegerEncoding translates a jaeger encoding to a otel encoding
func mustOtelEncodingForJaegerEncoding(jaegerEncoding string) string {
switch jaegerEncoding {
case kafka.EncodingProto:
return "jaeger_proto"
case kafka.EncodingJSON:
return "jaeger_json"
case encodingOTLPProto:
return "otlp_proto"
case encodingZipkinProto:
return "zipkin_proto"
case encodingZipkinJSON:
return "zipkin_json"
case kafka.EncodingZipkinThrift:
return "zipkin_thrift"
}

panic(jaegerEncoding + " is not a supported kafka encoding in the OTEL collector.")
29 changes: 18 additions & 11 deletions cmd/opentelemetry/app/receiver/kafkareceiver/kafka_receiver_test.go
Original file line number Diff line number Diff line change
@@ -104,6 +104,22 @@ func TestMustOtelEncodingForJaegerEncoding(t *testing.T) {
in: kafka.EncodingJSON,
expected: "jaeger_json",
},
{
in: encodingOTLPProto,
expected: "otlp_proto",
},
{
in: encodingZipkinProto,
expected: "zipkin_proto",
},
{
in: encodingZipkinJSON,
expected: "zipkin_json",
},
{
in: kafka.EncodingZipkinThrift,
expected: "zipkin_thrift",
},
{
in: "not-an-encoding",
expectsPanic: true,
@@ -112,19 +128,10 @@ func TestMustOtelEncodingForJaegerEncoding(t *testing.T) {

for _, tt := range tests {
if tt.expectsPanic {
assertPanic(t, func() { MustOtelEncodingForJaegerEncoding(tt.in) })
assert.Panics(t, func() { mustOtelEncodingForJaegerEncoding(tt.in) })
continue
}

assert.Equal(t, tt.expected, MustOtelEncodingForJaegerEncoding(tt.in))
assert.Equal(t, tt.expected, mustOtelEncodingForJaegerEncoding(tt.in))
}
}

func assertPanic(t *testing.T, f func()) {
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()
f()
}

0 comments on commit 20d5459

Please sign in to comment.