Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Zipkin Thrift as kafka ingestion format #1256

Merged
merged 6 commits into from
Dec 18, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package builder

import (
"fmt"
"strings"

"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
Expand All @@ -31,13 +32,15 @@ import (
// CreateConsumer creates a new span consumer for the ingester
func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWriter spanstore.Writer, options app.Options) (*consumer.Consumer, error) {
var unmarshaller kafka.Unmarshaller
if options.Encoding == app.EncodingJSON {
if options.Encoding == kafka.EncodingJSON {
unmarshaller = kafka.NewJSONUnmarshaller()
} else if options.Encoding == app.EncodingProto {
} else if options.Encoding == kafka.EncodingProto {
unmarshaller = kafka.NewProtobufUnmarshaller()
} else if options.Encoding == kafka.EncodingZipkinThrift {
unmarshaller = kafka.NewZipkinThriftUnmarshaller()
} else {
return nil, fmt.Errorf(`encoding '%s' not recognised, use one of ("%s" or "%s")`,
options.Encoding, app.EncodingProto, app.EncodingJSON)
return nil, fmt.Errorf(`encoding '%s' not recognised, use one of ("%s")`,
options.Encoding, strings.Join(kafka.AllEncodings, "\", \""))
}

spParams := processor.SpanProcessorParams{
Expand Down
10 changes: 3 additions & 7 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,10 @@ import (
"github.com/spf13/viper"

kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

const (
// EncodingJSON indicates spans are encoded as a json byte array
EncodingJSON = "json"
// EncodingProto indicates spans are encoded as a protobuf byte array
EncodingProto = "protobuf"

// ConfigPrefix is a prefix for the ingester flags
ConfigPrefix = "ingester"
// KafkaConfigPrefix is a prefix for the Kafka flags
Expand Down Expand Up @@ -60,7 +56,7 @@ const (
// DefaultParallelism is the default parallelism for the span processor
DefaultParallelism = 1000
// DefaultEncoding is the default span encoding
DefaultEncoding = EncodingProto
DefaultEncoding = kafka.EncodingProto
// DefaultDeadlockInterval is the default deadlock interval
DefaultDeadlockInterval = 1 * time.Minute
// DefaultHTTPPort is the default HTTP port (e.g. for /metrics)
Expand Down Expand Up @@ -96,7 +92,7 @@ func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(
KafkaConfigPrefix+SuffixEncoding,
DefaultEncoding,
fmt.Sprintf(`The encoding of spans ("%s" or "%s") consumed from kafka`, EncodingProto, EncodingJSON))
fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(kafka.AllEncodings, "\", \"")))
flagSet.String(
ConfigPrefix+SuffixParallelism,
strconv.Itoa(DefaultParallelism),
Expand Down
6 changes: 3 additions & 3 deletions plugin/storage/kafka/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
}
f.producer = p
switch f.options.encoding {
case encodingProto:
case EncodingProto:
f.marshaller = newProtobufMarshaller()
case encodingJSON:
case EncodingJSON:
f.marshaller = newJSONMarshaller()
default:
return errors.New("kafka encoding is not one of '" + encodingJSON + "' or '" + encodingProto + "'")
return errors.New("kafka encoding is not one of '" + EncodingJSON + "' or '" + EncodingProto + "'")
}
return nil
}
Expand Down
13 changes: 9 additions & 4 deletions plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@ const (
suffixTopic = ".topic"
suffixEncoding = ".encoding"

encodingJSON = "json"
encodingProto = "protobuf"
EncodingJSON = "json"
EncodingProto = "protobuf"
EncodingZipkinThrift = "zipkin-thrift"

defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-spans"
defaultEncoding = encodingProto
defaultEncoding = EncodingProto
)

var (
AllEncodings = []string{EncodingJSON, EncodingProto, EncodingZipkinThrift}
)

// Options stores the configuration options for Kafka
Expand All @@ -58,7 +63,7 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
flagSet.String(
configPrefix+suffixEncoding,
defaultEncoding,
fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, encodingProto, encodingJSON),
fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto),
)
}

Expand Down
34 changes: 34 additions & 0 deletions plugin/storage/kafka/unmarshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ package kafka
import (
"bytes"

"github.com/apache/thrift/lib/go/thrift"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/model/converter/thrift/zipkin"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

// Unmarshaller decodes a byte array to a span
Expand Down Expand Up @@ -57,3 +60,34 @@ func (h *JSONUnmarshaller) Unmarshal(msg []byte) (*model.Span, error) {
err := jsonpb.Unmarshal(bytes.NewReader(msg), newSpan)
return newSpan, err
}

// zipkinThriftUnmarshaller implements Unmarshaller
type zipkinThriftUnmarshaller struct{}

// NewZipkinThriftUnmarshaller constructs a zipkinThriftUnmarshaller
func NewZipkinThriftUnmarshaller() *zipkinThriftUnmarshaller {
return &zipkinThriftUnmarshaller{}
}

// Unmarshal decodes a json byte array to a span
func (h *zipkinThriftUnmarshaller) Unmarshal(msg []byte) (*model.Span, error) {
buffer := thrift.NewTMemoryBuffer()
buffer.Write(msg)

transport := thrift.NewTBinaryProtocolTransport(buffer)
_, _, err := transport.ReadListBegin() // Ignore the returned element type
if err != nil {
return nil, err
}

zs := &zipkincore.Span{}
if err = zs.Read(transport); err != nil {
return nil, err
}
mSpan, err := zipkin.ToDomainSpan(zs)

if err != nil {
return nil, err
}
return mSpan[0], err
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro I am not sure it's some good code, any advices to avoid using [0]. I thought about changing the interface to return []model.Span and handle the array in the calling function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the ingester is currently built with the assumption that a single Kafka message contains just a single span. Even in this case a single Zipkin span can sometimes be transformed to two Jaeger spans, so it would be better to extend the ingester to support the notion of many spans per message, at least after unmarshaling. However, I would not recommend doing it in this PR, it should be a separate PR as it might get quite large. For now I would suggest logging a warning.

There are good reasons to expect a single span per Kafka message, it makes the streaming jobs easier to write.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

}