Skip to content

Commit cb0d352

Browse files
authored
SDK: span processor interface and simple span processor. (#117)
* SDK: SpanProcessor Interface. * add simple span processor. * rename span processor. * fix logic to export or process span data.
1 parent 5df3c07 commit cb0d352

6 files changed

+333
-7
lines changed

sdk/trace/simple_span_processor.go

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
// Copyright 2019, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package trace
16+
17+
// SimpleSpanProcessor implements SpanProcessor interfaces. It is used by
18+
// exporters to receive SpanData synchronously when span is finished.
19+
type SimpleSpanProcessor struct {
20+
exporter Exporter
21+
}
22+
23+
var _ SpanProcessor = (*SimpleSpanProcessor)(nil)
24+
25+
// NewSimpleSpanProcessor creates a new instance of SimpleSpanProcessor
26+
// for a given exporter.
27+
func NewSimpleSpanProcessor(exporter Exporter) *SimpleSpanProcessor {
28+
ssp := &SimpleSpanProcessor{
29+
exporter: exporter,
30+
}
31+
return ssp
32+
}
33+
34+
// OnStart method does nothing.
35+
func (ssp *SimpleSpanProcessor) OnStart(sd *SpanData) {
36+
}
37+
38+
// OnEnd method exports SpanData using associated exporter.
39+
func (ssp *SimpleSpanProcessor) OnEnd(sd *SpanData) {
40+
if ssp.exporter != nil {
41+
ssp.exporter.ExportSpan(sd)
42+
}
43+
}
44+
45+
// Shutdown method does nothing. There is no data to cleanup.
46+
func (ssp *SimpleSpanProcessor) Shutdown() {
47+
}
+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Copyright 2019, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package trace_test
16+
17+
import (
18+
"context"
19+
"testing"
20+
21+
"go.opentelemetry.io/api/core"
22+
apitrace "go.opentelemetry.io/api/trace"
23+
sdktrace "go.opentelemetry.io/sdk/trace"
24+
)
25+
26+
type testExporter struct {
27+
spans []*sdktrace.SpanData
28+
}
29+
30+
func (t *testExporter) ExportSpan(s *sdktrace.SpanData) {
31+
t.spans = append(t.spans, s)
32+
}
33+
34+
var _ sdktrace.Exporter = (*testExporter)(nil)
35+
36+
func init() {
37+
sdktrace.Register()
38+
sdktrace.ApplyConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()})
39+
}
40+
41+
func TestNewSimpleSpanProcessor(t *testing.T) {
42+
ssp := sdktrace.NewSimpleSpanProcessor(&testExporter{})
43+
if ssp == nil {
44+
t.Errorf("Error creating new instance of SimpleSpanProcessor\n")
45+
}
46+
}
47+
48+
func TestNewSimpleSpanProcessorWithNilExporter(t *testing.T) {
49+
ssp := sdktrace.NewSimpleSpanProcessor(nil)
50+
if ssp == nil {
51+
t.Errorf("Error creating new instance of SimpleSpanProcessor with nil Exporter\n")
52+
}
53+
}
54+
55+
func TestSimpleSpanProcessorOnEnd(t *testing.T) {
56+
te := testExporter{}
57+
ssp := sdktrace.NewSimpleSpanProcessor(&te)
58+
if ssp == nil {
59+
t.Errorf("Error creating new instance of SimpleSpanProcessor with nil Exporter\n")
60+
}
61+
sdktrace.RegisterSpanProcessor(ssp)
62+
tid := core.TraceID{High: 0x0102030405060708, Low: 0x0102040810203040}
63+
sid := uint64(0x0102040810203040)
64+
sc := core.SpanContext{
65+
TraceID: tid,
66+
SpanID: sid,
67+
TraceOptions: 0x1,
68+
}
69+
_, span := apitrace.GlobalTracer().Start(context.Background(), "OnEnd", apitrace.ChildOf(sc))
70+
span.Finish()
71+
72+
wantTraceID := tid
73+
gotTraceID := te.spans[0].SpanContext.TraceID
74+
if wantTraceID != gotTraceID {
75+
t.Errorf("SimplerSpanProcessor OnEnd() check: got %+v, want %+v\n", gotTraceID, wantTraceID)
76+
}
77+
}

sdk/trace/span.go

+10-7
Original file line numberDiff line numberDiff line change
@@ -123,23 +123,26 @@ func (s *span) Finish(options ...apitrace.FinishOption) {
123123
}
124124
s.endOnce.Do(func() {
125125
exp, _ := exporters.Load().(exportersMap)
126-
mustExport := s.spanContext.IsSampled() && len(exp) > 0
127-
//if s.spanStore != nil || mustExport {
128-
if mustExport {
126+
sps, _ := spanProcessors.Load().(spanProcessorMap)
127+
mustExportOrProcess := len(sps) > 0 || (s.spanContext.IsSampled() && len(exp) > 0)
128+
// TODO(rghetia): when exporter is migrated to use processors simply check for the number
129+
// of processors. Exporter will export based on sampling.
130+
if mustExportOrProcess {
129131
sd := s.makeSpanData()
130132
if opts.FinishTime.IsZero() {
131133
sd.EndTime = internal.MonotonicEndTime(sd.StartTime)
132134
} else {
133135
sd.EndTime = opts.FinishTime
134136
}
135-
//if s.spanStore != nil {
136-
// s.spanStore.finished(s, sd)
137-
//}
138-
if mustExport {
137+
// Sampling check would be in the processor if the processor is used for exporting.
138+
if s.spanContext.IsSampled() {
139139
for e := range exp {
140140
e.ExportSpan(sd)
141141
}
142142
}
143+
for sp := range sps {
144+
sp.OnEnd(sd)
145+
}
143146
}
144147
})
145148
}

sdk/trace/span_processor.go

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2019, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package trace
16+
17+
import (
18+
"sync"
19+
"sync/atomic"
20+
)
21+
22+
// SpanProcessor is interface to add hooks to start and end method invocations.
23+
type SpanProcessor interface {
24+
25+
// OnStart method is invoked when span is started. It is a synchronous call
26+
// and hence should not block.
27+
OnStart(sd *SpanData)
28+
29+
// OnEnd method is invoked when span is finished. It is a synchronous call
30+
// and hence should not block.
31+
OnEnd(sd *SpanData)
32+
33+
// Shutdown is invoked when SDK shutsdown. Use this call to cleanup any processor
34+
// data. No calls to OnStart and OnEnd method is invoked after Shutdown call is
35+
// made. It should not be blocked indefinitely.
36+
Shutdown()
37+
}
38+
39+
type spanProcessorMap map[SpanProcessor]struct{}
40+
41+
var (
42+
mu sync.Mutex
43+
spanProcessors atomic.Value
44+
)
45+
46+
// RegisterSpanProcessor adds to the list of SpanProcessors that will receive sampled
47+
// trace spans.
48+
func RegisterSpanProcessor(e SpanProcessor) {
49+
mu.Lock()
50+
defer mu.Unlock()
51+
new := make(spanProcessorMap)
52+
if old, ok := spanProcessors.Load().(spanProcessorMap); ok {
53+
for k, v := range old {
54+
new[k] = v
55+
}
56+
}
57+
new[e] = struct{}{}
58+
spanProcessors.Store(new)
59+
}
60+
61+
// UnregisterSpanProcessor removes from the list of SpanProcessors the SpanProcessor that was
62+
// registered with the given name.
63+
func UnregisterSpanProcessor(e SpanProcessor) {
64+
mu.Lock()
65+
defer mu.Unlock()
66+
new := make(spanProcessorMap)
67+
if old, ok := spanProcessors.Load().(spanProcessorMap); ok {
68+
for k, v := range old {
69+
new[k] = v
70+
}
71+
}
72+
delete(new, e)
73+
spanProcessors.Store(new)
74+
}

sdk/trace/span_processor_test.go

+118
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Copyright 2019, OpenTelemetry Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package trace_test
16+
17+
import (
18+
"context"
19+
"testing"
20+
21+
apitrace "go.opentelemetry.io/api/trace"
22+
sdktrace "go.opentelemetry.io/sdk/trace"
23+
)
24+
25+
type testSpanProcesor struct {
26+
spansStarted []*sdktrace.SpanData
27+
spansEnded []*sdktrace.SpanData
28+
shutdownCount int
29+
}
30+
31+
func init() {
32+
sdktrace.Register()
33+
sdktrace.ApplyConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()})
34+
}
35+
36+
func (t *testSpanProcesor) OnStart(s *sdktrace.SpanData) {
37+
t.spansStarted = append(t.spansStarted, s)
38+
}
39+
40+
func (t *testSpanProcesor) OnEnd(s *sdktrace.SpanData) {
41+
t.spansEnded = append(t.spansEnded, s)
42+
}
43+
44+
func (t *testSpanProcesor) Shutdown() {
45+
t.shutdownCount++
46+
}
47+
48+
func TestRegisterSpanProcessort(t *testing.T) {
49+
name := "Register span processor before span starts"
50+
sp := NewTestSpanProcessor()
51+
sdktrace.RegisterSpanProcessor(sp)
52+
defer sdktrace.UnregisterSpanProcessor(sp)
53+
_, span := apitrace.GlobalTracer().Start(context.Background(), "OnStart")
54+
span.Finish()
55+
wantCount := 1
56+
gotCount := len(sp.spansStarted)
57+
if gotCount != wantCount {
58+
t.Errorf("%s: started count: got %d, want %d\n", name, gotCount, wantCount)
59+
}
60+
gotCount = len(sp.spansEnded)
61+
if gotCount != wantCount {
62+
t.Errorf("%s: ended count: got %d, want %d\n", name, gotCount, wantCount)
63+
}
64+
}
65+
66+
func TestUnregisterSpanProcessor(t *testing.T) {
67+
name := "Start span after unregistering span processor"
68+
sp := NewTestSpanProcessor()
69+
sdktrace.RegisterSpanProcessor(sp)
70+
_, span := apitrace.GlobalTracer().Start(context.Background(), "OnStart")
71+
span.Finish()
72+
sdktrace.UnregisterSpanProcessor(sp)
73+
74+
// start another span after unregistering span processor.
75+
_, span = apitrace.GlobalTracer().Start(context.Background(), "Start span after unregister")
76+
span.Finish()
77+
78+
wantCount := 1
79+
gotCount := len(sp.spansStarted)
80+
if gotCount != wantCount {
81+
t.Errorf("%s: started count: got %d, want %d\n", name, gotCount, wantCount)
82+
}
83+
84+
gotCount = len(sp.spansEnded)
85+
if gotCount != wantCount {
86+
t.Errorf("%s: ended count: got %d, want %d\n", name, gotCount, wantCount)
87+
}
88+
}
89+
90+
func TestUnregisterSpanProcessorWhileSpanIsActive(t *testing.T) {
91+
name := "Unregister span processor while span is active"
92+
sp := NewTestSpanProcessor()
93+
sdktrace.RegisterSpanProcessor(sp)
94+
_, span := apitrace.GlobalTracer().Start(context.Background(), "OnStart")
95+
sdktrace.UnregisterSpanProcessor(sp)
96+
97+
span.Finish()
98+
99+
wantCount := 1
100+
gotCount := len(sp.spansStarted)
101+
if gotCount != wantCount {
102+
t.Errorf("%s: started count: got %d, want %d\n", name, gotCount, wantCount)
103+
}
104+
105+
wantCount = 0
106+
gotCount = len(sp.spansEnded)
107+
if gotCount != wantCount {
108+
t.Errorf("%s: ended count: got %d, want %d\n", name, gotCount, wantCount)
109+
}
110+
}
111+
112+
// TODO(rghetia): Add Shutdown test when it is implemented.
113+
func TestShutdown(t *testing.T) {
114+
}
115+
116+
func NewTestSpanProcessor() *testSpanProcesor {
117+
return &testSpanProcesor{}
118+
}

sdk/trace/tracer.go

+7
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,13 @@ func (tr *tracer) Start(ctx context.Context, name string, o ...apitrace.SpanOpti
6161
span := startSpanInternal(name, parent, remoteParent, opts)
6262
span.tracer = tr
6363

64+
if span.IsRecordingEvents() {
65+
sps, _ := spanProcessors.Load().(spanProcessorMap)
66+
for sp := range sps {
67+
sp.OnStart(span.data)
68+
}
69+
}
70+
6471
ctx, end := startExecutionTracerTask(ctx, name)
6572
span.executionTracerTaskEnd = end
6673
return newContext(ctx, span), span

0 commit comments

Comments
 (0)