Skip to content

Commit

Permalink
Refactor the encoding definition
Browse files Browse the repository at this point in the history
  • Loading branch information
geobeau committed Dec 16, 2018
1 parent 04d8a2b commit 83c8a2d
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 22 deletions.
11 changes: 6 additions & 5 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package builder

import (
"fmt"
"strings"

"github.com/uber/jaeger-lib/metrics"
"go.uber.org/zap"
Expand All @@ -31,15 +32,15 @@ import (
// CreateConsumer creates a new span consumer for the ingester
func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWriter spanstore.Writer, options app.Options) (*consumer.Consumer, error) {
var unmarshaller kafka.Unmarshaller
if options.Encoding == app.EncodingJSON {
if options.Encoding == kafka.EncodingJSON {
unmarshaller = kafka.NewJSONUnmarshaller()
} else if options.Encoding == app.EncodingProto {
} else if options.Encoding == kafka.EncodingProto {
unmarshaller = kafka.NewProtobufUnmarshaller()
} else if options.Encoding == app.EncodingZipkinT {
} else if options.Encoding == kafka.EncodingZipkinThrift {
unmarshaller = kafka.NewZipkinThriftUnmarshaller()
} else {
return nil, fmt.Errorf(`encoding '%s' not recognised, use one of ("%s", "%s" or "%s")`,
options.Encoding, app.EncodingProto, app.EncodingJSON, app.EncodingZipkinT)
return nil, fmt.Errorf(`encoding '%s' not recognised, use one of ("%s")`,
options.Encoding, strings.Join(kafka.AllEncodings, "\", \""))
}

spParams := processor.SpanProcessorParams{
Expand Down
12 changes: 3 additions & 9 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,10 @@ import (
"github.com/spf13/viper"

kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

const (
// EncodingJSON indicates spans are encoded as a json byte array
EncodingJSON = "json"
// EncodingProto indicates spans are encoded as a protobuf byte array
EncodingProto = "protobuf"
// EncodingZipkinT indicates spans are encoded as a Zipkin thrift byte array
EncodingZipkinT = "zipkinThrift"

// ConfigPrefix is a prefix for the ingester flags
ConfigPrefix = "ingester"
// KafkaConfigPrefix is a prefix for the Kafka flags
Expand Down Expand Up @@ -62,7 +56,7 @@ const (
// DefaultParallelism is the default parallelism for the span processor
DefaultParallelism = 1000
// DefaultEncoding is the default span encoding
DefaultEncoding = EncodingProto
DefaultEncoding = kafka.EncodingProto
// DefaultDeadlockInterval is the default deadlock interval
DefaultDeadlockInterval = 1 * time.Minute
// DefaultHTTPPort is the default HTTP port (e.g. for /metrics)
Expand Down Expand Up @@ -98,7 +92,7 @@ func AddFlags(flagSet *flag.FlagSet) {
flagSet.String(
KafkaConfigPrefix+SuffixEncoding,
DefaultEncoding,
fmt.Sprintf(`The encoding of spans ("%s", "%s" or "%s") consumed from kafka`, EncodingProto, EncodingJSON, EncodingZipkinT))
fmt.Sprintf(`The encoding of spans ("%s") consumed from kafka`, strings.Join(kafka.AllEncodings, "\", \"")))
flagSet.String(
ConfigPrefix+SuffixParallelism,
strconv.Itoa(DefaultParallelism),
Expand Down
6 changes: 3 additions & 3 deletions plugin/storage/kafka/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
}
f.producer = p
switch f.options.encoding {
case encodingProto:
case EncodingProto:
f.marshaller = newProtobufMarshaller()
case encodingJSON:
case EncodingJSON:
f.marshaller = newJSONMarshaller()
default:
return errors.New("kafka encoding is not one of '" + encodingJSON + "' or '" + encodingProto + "'")
return errors.New("kafka encoding is not one of '" + EncodingJSON + "' or '" + EncodingProto + "'")
}
return nil
}
Expand Down
14 changes: 9 additions & 5 deletions plugin/storage/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@ const (
suffixTopic = ".topic"
suffixEncoding = ".encoding"

encodingJSON = "json"
encodingProto = "protobuf"
EncodingZipkinT = "zipkinThrift"
EncodingJSON = "json"
EncodingProto = "protobuf"
EncodingZipkinThrift = "zipkin-thrift"

defaultBroker = "127.0.0.1:9092"
defaultTopic = "jaeger-spans"
defaultEncoding = encodingProto
defaultEncoding = EncodingProto
)

var (
AllEncodings = []string{EncodingJSON, EncodingProto, EncodingZipkinThrift}
)

// Options stores the configuration options for Kafka
Expand All @@ -59,7 +63,7 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
flagSet.String(
configPrefix+suffixEncoding,
defaultEncoding,
fmt.Sprintf(`(experimental) Encoding of spans ("%s", "%s" or "%s") sent to kafka.`, encodingProto, encodingJSON, EncodingZipkinT),
fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto),
)
}

Expand Down

0 comments on commit 83c8a2d

Please sign in to comment.