Skip to content

Commit d5a79fb

Browse files
authored
Merge branch 'master' into fix-proto-layout
2 parents bc4850e + 8e76a50 commit d5a79fb

15 files changed

+222
-194
lines changed

Diff for: CHANGELOG.md

+28-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,33 @@ Changes by Version
77
#### Backend Changes
88

99
##### Breaking Changes
10+
- The `kafka` flags were removed in favor of `kafka.producer` and `kafka.consumer` flags ([#1424](https://github.com/jaegertracing/jaeger/pull/1424), [@ledor473](https://github.com/ledor473))
11+
12+
The following flags have been **removed** in the Collector and the Ingester:
13+
```
14+
--kafka.brokers
15+
--kafka.encoding
16+
--kafka.topic
17+
--ingester.brokers
18+
--ingester.encoding
19+
--ingester.topic
20+
--ingester.group-id
21+
```
22+
23+
In the Collector, they are replaced by:
24+
```
25+
--kafka.producer.brokers
26+
--kafka.producer.encoding
27+
--kafka.producer.topic
28+
```
29+
30+
In the Ingester, they are replaced by:
31+
```
32+
--kafka.consumer.brokers
33+
--kafka.consumer.encoding
34+
--kafka.consumer.topic
35+
--kafka.consumer.group-id
36+
```
1037
1138
##### New Features
1239
@@ -21,7 +48,7 @@ Changes by Version
2148
#### Backend Changes
2249
2350
##### Breaking Changes
24-
- Introduce `kafka.producer` and `kafka.consumer` flags to replace `kafka` flags ([1360](https://github.com/jaegertracing/jaeger/pull/1360), [@ledor473](https://github.com/ledor473))
51+
- Introduce `kafka.producer` and `kafka.consumer` flags to replace `kafka` flags ([#1360](https://github.com/jaegertracing/jaeger/pull/1360), [@ledor473](https://github.com/ledor473))
2552
2653
The following flags have been deprecated in the Collector and the Ingester:
2754
```

Diff for: cmd/agent/app/builder.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ func CreateCollectorProxy(
230230
}
231231
switch opts.ReporterType {
232232
case reporter.GRPC:
233-
return grpc.NewCollectorProxy(grpcRepOpts, mFactory, logger)
233+
return grpc.NewCollectorProxy(grpcRepOpts, opts.AgentTags, mFactory, logger)
234234
case reporter.TCHANNEL:
235235
return tchannel.NewCollectorProxy(tchanRep, mFactory, logger)
236236
default:

Diff for: cmd/agent/app/reporter/flags.go

+34
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,17 @@ package reporter
1717
import (
1818
"flag"
1919
"fmt"
20+
"os"
21+
"strings"
2022

2123
"github.com/spf13/viper"
2224
)
2325

2426
const (
27+
// Whether to use grpc or tchannel reporter.
2528
reporterType = "reporter.type"
29+
// Agent tags
30+
agentTags = "jaeger.tags"
2631
// TCHANNEL is name of tchannel reporter.
2732
TCHANNEL Type = "tchannel"
2833
// GRPC is name of gRPC reporter.
@@ -35,15 +40,44 @@ type Type string
3540
// Options holds generic reporter configuration.
3641
type Options struct {
3742
ReporterType Type
43+
AgentTags map[string]string
3844
}
3945

4046
// AddFlags adds flags for Options.
4147
func AddFlags(flags *flag.FlagSet) {
4248
flags.String(reporterType, string(GRPC), fmt.Sprintf("Reporter type to use e.g. %s, %s", string(GRPC), string(TCHANNEL)))
49+
flags.String(agentTags, "", "One or more tags to be added to the Process tags of all spans passing through this agent. Ex: key1=value1,key2=${envVar:defaultValue}")
4350
}
4451

4552
// InitFromViper initializes Options with properties retrieved from Viper.
4653
func (b *Options) InitFromViper(v *viper.Viper) *Options {
4754
b.ReporterType = Type(v.GetString(reporterType))
55+
b.AgentTags = parseAgentTags(v.GetString(agentTags))
4856
return b
4957
}
58+
59+
// Parsing logic borrowed from jaegertracing/jaeger-client-go
60+
func parseAgentTags(agentTags string) map[string]string {
61+
if agentTags == "" {
62+
return nil
63+
}
64+
tagPairs := strings.Split(string(agentTags), ",")
65+
tags := make(map[string]string)
66+
for _, p := range tagPairs {
67+
kv := strings.SplitN(p, "=", 2)
68+
k, v := strings.TrimSpace(kv[0]), strings.TrimSpace(kv[1])
69+
70+
if strings.HasPrefix(v, "${") && strings.HasSuffix(v, "}") {
71+
ed := strings.SplitN(string(v[2:len(v)-1]), ":", 2)
72+
e, d := ed[0], ed[1]
73+
v = os.Getenv(e)
74+
if v == "" && d != "" {
75+
v = d
76+
}
77+
}
78+
79+
tags[k] = v
80+
}
81+
82+
return tags
83+
}

Diff for: cmd/agent/app/reporter/flags_test.go

+31
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package reporter
1616

1717
import (
1818
"flag"
19+
"os"
1920
"testing"
2021

2122
"github.com/spf13/cobra"
@@ -24,6 +25,25 @@ import (
2425
"github.com/stretchr/testify/require"
2526
)
2627

28+
func TestBindFlags_NoJaegerTags(t *testing.T) {
29+
v := viper.New()
30+
command := cobra.Command{}
31+
flags := &flag.FlagSet{}
32+
AddFlags(flags)
33+
command.PersistentFlags().AddGoFlagSet(flags)
34+
v.BindPFlags(command.PersistentFlags())
35+
36+
err := command.ParseFlags([]string{
37+
"--reporter.type=grpc",
38+
})
39+
require.NoError(t, err)
40+
41+
b := &Options{}
42+
b.InitFromViper(v)
43+
assert.Equal(t, Type("grpc"), b.ReporterType)
44+
assert.Len(t, b.AgentTags, 0)
45+
}
46+
2747
func TestBindFlags(t *testing.T) {
2848
v := viper.New()
2949
command := cobra.Command{}
@@ -34,10 +54,21 @@ func TestBindFlags(t *testing.T) {
3454

3555
err := command.ParseFlags([]string{
3656
"--reporter.type=grpc",
57+
"--jaeger.tags=key=value,envVar1=${envKey1:defaultVal1},envVar2=${envKey2:defaultVal2}",
3758
})
3859
require.NoError(t, err)
3960

4061
b := &Options{}
62+
os.Setenv("envKey1", "envVal1")
4163
b.InitFromViper(v)
64+
65+
expectedTags := map[string]string{
66+
"key" : "value",
67+
"envVar1" : "envVal1",
68+
"envVar2" : "defaultVal2",
69+
}
70+
4271
assert.Equal(t, Type("grpc"), b.ReporterType)
72+
assert.Equal(t, expectedTags, b.AgentTags)
73+
os.Unsetenv("envKey1")
4374
}

Diff for: cmd/agent/app/reporter/grpc/collector_proxy.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ type ProxyBuilder struct {
4242
var systemCertPool = x509.SystemCertPool // to allow overriding in unit test
4343

4444
// NewCollectorProxy creates ProxyBuilder
45-
func NewCollectorProxy(o *Options, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
45+
func NewCollectorProxy(o *Options, agentTags map[string]string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
4646
if len(o.CollectorHostPort) == 0 {
4747
return nil, errors.New("could not create collector proxy, address is missing")
4848
}
@@ -87,7 +87,7 @@ func NewCollectorProxy(o *Options, mFactory metrics.Factory, logger *zap.Logger)
8787
grpcMetrics := mFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"protocol": "grpc"}})
8888
return &ProxyBuilder{
8989
conn: conn,
90-
reporter: aReporter.WrapWithMetrics(NewReporter(conn, logger), grpcMetrics),
90+
reporter: aReporter.WrapWithMetrics(NewReporter(conn, agentTags, logger), grpcMetrics),
9191
manager: configmanager.WrapWithMetrics(grpcManager.NewConfigManager(conn), grpcMetrics)}, nil
9292
}
9393

Diff for: cmd/agent/app/reporter/grpc/collector_proxy_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ iPKnCkzNgxMzQtwdgpAOXIAqXyNibvyOAv1C+3QSMLKbuPEHaIxlCuvl1suX/g25
5050
-----END CERTIFICATE-----`
5151

5252
func TestProxyBuilderMissingAddress(t *testing.T) {
53-
proxy, err := NewCollectorProxy(&Options{}, metrics.NullFactory, zap.NewNop())
53+
proxy, err := NewCollectorProxy(&Options{}, nil, metrics.NullFactory, zap.NewNop())
5454
require.Nil(t, proxy)
5555
assert.EqualError(t, err, "could not create collector proxy, address is missing")
5656
}
@@ -99,7 +99,7 @@ func TestProxyBuilder(t *testing.T) {
9999

100100
for _, test := range tests {
101101
t.Run(test.name, func(t *testing.T) {
102-
proxy, err := NewCollectorProxy(test.proxyOptions, metrics.NullFactory, zap.NewNop())
102+
proxy, err := NewCollectorProxy(test.proxyOptions, nil, metrics.NullFactory, zap.NewNop())
103103
if test.expectError {
104104
require.Error(t, err)
105105
} else {
@@ -125,7 +125,7 @@ func TestSystemCertPoolError(t *testing.T) {
125125
_, err := NewCollectorProxy(&Options{
126126
CollectorHostPort: []string{"foo", "bar"},
127127
TLS: true,
128-
}, nil, nil)
128+
}, nil, nil, nil)
129129
assert.Equal(t, fakeErr, err)
130130
}
131131

@@ -142,7 +142,7 @@ func TestMultipleCollectors(t *testing.T) {
142142
defer s2.Stop()
143143

144144
mFactory := metricstest.NewFactory(time.Microsecond)
145-
proxy, err := NewCollectorProxy(&Options{CollectorHostPort: []string{addr1.String(), addr2.String()}}, mFactory, zap.NewNop())
145+
proxy, err := NewCollectorProxy(&Options{CollectorHostPort: []string{addr1.String(), addr2.String()}}, nil, mFactory, zap.NewNop())
146146
require.NoError(t, err)
147147
require.NotNil(t, proxy)
148148
assert.NotNil(t, proxy.GetReporter())

Diff for: cmd/agent/app/reporter/grpc/reporter.go

+30-1
Original file line numberDiff line numberDiff line change
@@ -32,14 +32,16 @@ import (
3232
// Reporter reports data to collector over gRPC.
3333
type Reporter struct {
3434
collector api_v2.CollectorServiceClient
35+
agentTags []model.KeyValue
3536
logger *zap.Logger
3637
sanitizer zipkin2.Sanitizer
3738
}
3839

3940
// NewReporter creates gRPC reporter.
40-
func NewReporter(conn *grpc.ClientConn, logger *zap.Logger) *Reporter {
41+
func NewReporter(conn *grpc.ClientConn, agentTags map[string]string, logger *zap.Logger) *Reporter {
4142
return &Reporter{
4243
collector: api_v2.NewCollectorServiceClient(conn),
44+
agentTags: makeModelKeyValue(agentTags),
4345
logger: logger,
4446
sanitizer: zipkin2.NewChainedSanitizer(zipkin2.StandardSanitizers...),
4547
}
@@ -63,6 +65,7 @@ func (r *Reporter) EmitZipkinBatch(zSpans []*zipkincore.Span) error {
6365
}
6466

6567
func (r *Reporter) send(spans []*model.Span, process *model.Process) error {
68+
spans, process = addProcessTags(spans, process, r.agentTags)
6669
batch := model.Batch{Spans: spans, Process: process}
6770
req := &api_v2.PostSpansRequest{Batch: batch}
6871
_, err := r.collector.PostSpans(context.Background(), req)
@@ -71,3 +74,29 @@ func (r *Reporter) send(spans []*model.Span, process *model.Process) error {
7174
}
7275
return err
7376
}
77+
78+
// addTags appends jaeger tags for the agent to every span it sends to the collector.
79+
func addProcessTags(spans []*model.Span, process *model.Process, agentTags []model.KeyValue) ([]*model.Span, *model.Process) {
80+
if len(agentTags) == 0 {
81+
return spans, process
82+
}
83+
if process != nil {
84+
process.Tags = append(process.Tags, agentTags...)
85+
}
86+
for _, span := range spans {
87+
if span.Process != nil {
88+
span.Process.Tags = append(span.Process.Tags, agentTags...)
89+
}
90+
}
91+
return spans, process
92+
}
93+
94+
func makeModelKeyValue(agentTags map[string]string) []model.KeyValue {
95+
tags := make([]model.KeyValue, 0, len(agentTags))
96+
for k, v := range agentTags {
97+
tag := model.String(k, v)
98+
tags = append(tags, tag)
99+
}
100+
101+
return tags
102+
}

Diff for: cmd/agent/app/reporter/grpc/reporter_test.go

+49-4
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ func TestReporter_EmitZipkinBatch(t *testing.T) {
5858
conn, err := grpc.Dial(addr.String(), grpc.WithInsecure())
5959
defer conn.Close()
6060
require.NoError(t, err)
61-
rep := NewReporter(conn, zap.NewNop())
61+
62+
rep := NewReporter(conn, nil, zap.NewNop())
6263

6364
tm := time.Unix(158, 0)
6465
a := tm.Unix() * 1000 * 1000
@@ -71,7 +72,8 @@ func TestReporter_EmitZipkinBatch(t *testing.T) {
7172
{in: &zipkincore.Span{Name: "jonatan", TraceID: 1, ID: 2, Timestamp: &a, Annotations: []*zipkincore.Annotation{{Value: zipkincore.CLIENT_SEND, Host: &zipkincore.Endpoint{ServiceName: "spring"}}}},
7273
expected: model.Batch{
7374
Spans: []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan", Duration: time.Microsecond * 1,
74-
Tags: model.KeyValues{{Key: "span.kind", VStr: "client", VType: model.StringType}}, Process: &model.Process{ServiceName: "spring"}, StartTime: tm.UTC()}}}},
75+
Tags: model.KeyValues{{Key: "span.kind", VStr: "client", VType: model.StringType}},
76+
Process: &model.Process{ServiceName: "spring"}, StartTime: tm.UTC()}}}},
7577
}
7678
for _, test := range tests {
7779
err = rep.EmitZipkinBatch([]*zipkincore.Span{test.in})
@@ -93,7 +95,7 @@ func TestReporter_EmitBatch(t *testing.T) {
9395
conn, err := grpc.Dial(addr.String(), grpc.WithInsecure())
9496
defer conn.Close()
9597
require.NoError(t, err)
96-
rep := NewReporter(conn, zap.NewNop())
98+
rep := NewReporter(conn, nil, zap.NewNop())
9799

98100
tm := time.Unix(158, 0)
99101
tests := []struct {
@@ -118,7 +120,50 @@ func TestReporter_EmitBatch(t *testing.T) {
118120
func TestReporter_SendFailure(t *testing.T) {
119121
conn, err := grpc.Dial("", grpc.WithInsecure())
120122
require.NoError(t, err)
121-
rep := NewReporter(conn, zap.NewNop())
123+
rep := NewReporter(conn, nil, zap.NewNop())
122124
err = rep.send(nil, nil)
123125
assert.EqualError(t, err, "rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = \"transport: Error while dialing dial tcp: missing address\"")
124126
}
127+
128+
func TestReporter_AddProcessTags_EmptyTags(t *testing.T) {
129+
tags := map[string]string{}
130+
spans := []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan"}}
131+
actualSpans, _ := addProcessTags(spans, nil, makeModelKeyValue(tags))
132+
assert.Equal(t, spans, actualSpans)
133+
}
134+
135+
func TestReporter_AddProcessTags_ZipkinBatch(t *testing.T) {
136+
tags := map[string]string{ "key" : "value" }
137+
spans := []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan", Process: &model.Process{ServiceName: "spring"}}}
138+
139+
expectedSpans := []*model.Span{
140+
{
141+
TraceID: model.NewTraceID(0, 1),
142+
SpanID: model.NewSpanID(2),
143+
OperationName: "jonatan",
144+
Process: &model.Process{ServiceName: "spring", Tags: []model.KeyValue{model.String("key", "value")}},
145+
},
146+
}
147+
actualSpans, _ := addProcessTags(spans, nil, makeModelKeyValue(tags))
148+
149+
assert.Equal(t, expectedSpans, actualSpans)
150+
}
151+
152+
func TestReporter_AddProcessTags_JaegerBatch(t *testing.T) {
153+
tags := map[string]string{ "key" : "value" }
154+
spans := []*model.Span{{TraceID: model.NewTraceID(0, 1), SpanID: model.NewSpanID(2), OperationName: "jonatan"}}
155+
process := &model.Process{ServiceName: "spring"}
156+
157+
expectedProcess := &model.Process{ServiceName: "spring", Tags: []model.KeyValue{model.String("key", "value")}}
158+
_, actualProcess := addProcessTags(spans, process, makeModelKeyValue(tags))
159+
160+
assert.Equal(t, expectedProcess, actualProcess)
161+
}
162+
163+
func TestReporter_MakeModelKeyValue(t *testing.T) {
164+
expectedTags := []model.KeyValue{model.String("key", "value")}
165+
stringTags := map[string]string{ "key" : "value" }
166+
actualTags := makeModelKeyValue(stringTags)
167+
168+
assert.Equal(t, expectedTags, actualTags)
169+
}

Diff for: cmd/agent/app/reporter/metrics_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,7 @@ func TestMetricsReporter(t *testing.T) {
104104
err := reporter.EmitBatch(&jaeger.Batch{Spans: []*jaeger.Span{{}}})
105105
require.Error(t, err)
106106
}, rep: &noopReporter{err: errors.New("foo")}},
107-
{expectedCounters:
108-
[]metricstest.ExpectedMetric{
107+
{expectedCounters: []metricstest.ExpectedMetric{
109108
{Name: "reporter.batches.failures", Tags: map[string]string{"format": "zipkin"}, Value: 1},
110109
{Name: "reporter.spans.failures", Tags: map[string]string{"format": "zipkin"}, Value: 2},
111110
}, expectedGauges: []metricstest.ExpectedMetric{

0 commit comments

Comments
 (0)