1515
1616import java .nio .ByteBuffer ;
1717import java .util .ArrayList ;
18- import java .util .Arrays ;
1918import java .util .Iterator ;
2019import java .util .List ;
2120import java .util .concurrent .Executor ;
2221import java .util .function .Supplier ;
2322import java .util .logging .Logger ;
24- import zipkin2 .Call ;
2523import zipkin2 .Callback ;
2624import zipkin2 .Span ;
2725import zipkin2 .SpanBytesDecoderDetector ;
2826import zipkin2 .codec .BytesDecoder ;
2927import zipkin2 .codec .SpanBytesDecoder ;
30- import zipkin2 .collector .handler .CollectedSpanHandler ;
3128import zipkin2 .storage .StorageComponent ;
3229
3330import static java .lang .String .format ;
3633
3734/**
3835 * This component takes action on spans received from a transport. This includes deserializing,
39- * sampling, invoking any handlers, and scheduling for storage.
36+ * sampling and scheduling for storage.
4037 *
4138 * <p>Callbacks passed do not propagate to the storage layer. They only return success or failures
4239 * before storage is attempted. This ensures that calling threads are disconnected from storage
4340 * threads.
4441 */
4542public class Collector { // not final for mock
43+ static final Callback <Void > NOOP_CALLBACK = new Callback <Void >() {
44+ @ Override public void onSuccess (Void value ) {
45+ }
46+
47+ @ Override public void onError (Throwable t ) {
48+ }
49+ };
50+
4651 /** Needed to scope this to the correct logging category */
4752 public static Builder newBuilder (Class <?> loggingClass ) {
4853 if (loggingClass == null ) throw new NullPointerException ("loggingClass == null" );
@@ -51,35 +56,14 @@ public static Builder newBuilder(Class<?> loggingClass) {
5156
5257 public static final class Builder {
5358 final Logger logger ;
54- final ArrayList <CollectedSpanHandler > collectedSpanHandlers = new ArrayList <>();
5559 StorageComponent storage ;
60+ CollectorSampler sampler ;
5661 CollectorMetrics metrics ;
5762
5863 Builder (Logger logger ) {
5964 this .logger = logger ;
6065 }
6166
62- /** @see {@link CollectorComponent.Builder#sampler(CollectorSampler)} */
63- public Builder sampler (CollectorSampler sampler ) {
64- if (sampler == null ) throw new NullPointerException ("sampler == null" );
65- this .collectedSpanHandlers .add (0 , sampler ); // sample first
66- return this ;
67- }
68-
69- /**
70- * @see {@link CollectorComponent.Builder#addCollectedSpanHandler(CollectedSpanHandler)}
71- * @since 2.17
72- */
73- public Builder addCollectedSpanHandler (CollectedSpanHandler collectedSpanHandler ) {
74- if (collectedSpanHandler == null ) {
75- throw new NullPointerException ("collectedSpanHandler == null" );
76- }
77- if (collectedSpanHandler != CollectedSpanHandler .NOOP ) { // lenient on config bug
78- this .collectedSpanHandlers .add (collectedSpanHandler );
79- }
80- return this ;
81- }
82-
8367 /** @see {@link CollectorComponent.Builder#storage(StorageComponent)} */
8468 public Builder storage (StorageComponent storage ) {
8569 if (storage == null ) throw new NullPointerException ("storage == null" );
@@ -94,14 +78,21 @@ public Builder metrics(CollectorMetrics metrics) {
9478 return this ;
9579 }
9680
81+ /** @see {@link CollectorComponent.Builder#sampler(CollectorSampler)} */
82+ public Builder sampler (CollectorSampler sampler ) {
83+ if (sampler == null ) throw new NullPointerException ("sampler == null" );
84+ this .sampler = sampler ;
85+ return this ;
86+ }
87+
9788 public Collector build () {
9889 return new Collector (this );
9990 }
10091 }
10192
10293 final Logger logger ;
10394 final CollectorMetrics metrics ;
104- final CollectedSpanHandler handler ;
95+ final CollectorSampler sampler ;
10596 final StorageComponent storage ;
10697
10798 Collector (Builder builder ) {
@@ -110,7 +101,7 @@ public Collector build() {
110101 this .metrics = builder .metrics == null ? CollectorMetrics .NOOP_METRICS : builder .metrics ;
111102 if (builder .storage == null ) throw new NullPointerException ("storage == null" );
112103 this .storage = builder .storage ;
113- this .handler = consolidate ( builder .collectedSpanHandlers ) ;
104+ this .sampler = builder .sampler == null ? CollectorSampler . ALWAYS_SAMPLE : builder . sampler ;
114105 }
115106
116107 public void accept (List <Span > spans , Callback <Void > callback ) {
@@ -128,25 +119,24 @@ public void accept(List<Span> spans, Callback<Void> callback, Executor executor)
128119 callback .onSuccess (null );
129120 return ;
130121 }
122+ metrics .incrementSpans (spans .size ());
131123
132- try {
133- metrics .incrementSpans (spans .size ());
134-
135- List <Span > handledSpans = handle (spans );
136- if (handledSpans .isEmpty ()) {
137- callback .onSuccess (null );
138- return ;
139- }
124+ List <Span > sampledSpans = sample (spans );
125+ if (sampledSpans .isEmpty ()) {
126+ callback .onSuccess (null );
127+ return ;
128+ }
140129
141- // In order to ensure callers are not blocked, we swap callbacks when we get to the storage
142- // phase of this process. Here, we create a callback whose sole purpose is classifying later
143- // errors on this bundle of spans in the same log category. This allows people to only turn on
144- // debug logging in one place.
145- executor .execute (new StoreSpans (handledSpans ));
130+ // In order to ensure callers are not blocked, we swap callbacks when we get to the storage
131+ // phase of this process. Here, we create a callback whose sole purpose is classifying later
132+ // errors on this bundle of spans in the same log category. This allows people to only turn on
133+ // debug logging in one place.
134+ try {
135+ executor .execute (new StoreSpans (sampledSpans ));
146136 callback .onSuccess (null );
147137 } catch (Throwable unexpected ) { // ensure if a future is supplied we always set value or error
148- Call .propagateIfFatal (unexpected );
149138 callback .onError (unexpected );
139+ throw unexpected ;
150140 }
151141 }
152142
@@ -214,15 +204,17 @@ String idString(Span span) {
214204 return span .traceId () + "/" + span .id ();
215205 }
216206
217- List <Span > handle (List <Span > input ) {
218- List <Span > handled = new ArrayList <>(input .size ());
207+ List <Span > sample (List <Span > input ) {
208+ List <Span > sampled = new ArrayList <>(input .size ());
219209 for (int i = 0 , length = input .size (); i < length ; i ++) {
220- Span s = handler .handle (input .get (i ));
221- if (s != null ) handled .add (s );
210+ Span s = input .get (i );
211+ if (sampler .isSampled (s .traceId (), Boolean .TRUE .equals (s .debug ()))) {
212+ sampled .add (s );
213+ }
222214 }
223- int dropped = input .size () - handled .size ();
215+ int dropped = input .size () - sampled .size ();
224216 if (dropped > 0 ) metrics .incrementSpansDropped (dropped );
225- return handled ;
217+ return sampled ;
226218 }
227219
228220 class StoreSpans implements Callback <Void >, Runnable {
@@ -247,7 +239,7 @@ class StoreSpans implements Callback<Void>, Runnable {
247239 }
248240
249241 @ Override public void onError (Throwable t ) {
250- handleStorageError (spans , t , NOOP_VOID );
242+ handleStorageError (spans , t , NOOP_CALLBACK );
251243 }
252244
253245 @ Override public String toString () {
@@ -301,30 +293,4 @@ String appendSpanIds(List<Span> spans, StringBuilder message) {
301293
302294 return message .append ("]" ).toString ();
303295 }
304-
305- static CollectedSpanHandler consolidate (List <CollectedSpanHandler > handlers ) {
306- if (handlers .isEmpty ()) return CollectedSpanHandler .NOOP ;
307- if (handlers .size () == 1 ) return handlers .get (0 );
308- return new MultipleHandler (handlers );
309- }
310-
311- static final class MultipleHandler implements CollectedSpanHandler {
312- final CollectedSpanHandler [] handlers ; // Array ensures no iterators are created at runtime
313-
314- MultipleHandler (List <CollectedSpanHandler > handlers ) {
315- this .handlers = handlers .toArray (new CollectedSpanHandler [0 ]);
316- }
317-
318- @ Override public Span handle (Span span ) {
319- for (CollectedSpanHandler handler : handlers ) {
320- span = handler .handle (span );
321- if (span == null ) return null ;
322- }
323- return span ;
324- }
325-
326- @ Override public String toString () {
327- return Arrays .toString (handlers );
328- }
329- }
330296}
0 commit comments