11package datadog .trace .common .writer ;
22
3+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
4+ import static java .util .concurrent .TimeUnit .NANOSECONDS ;
5+ import static java .util .concurrent .TimeUnit .SECONDS ;
6+
37import datadog .trace .core .DDSpan ;
48import datadog .trace .core .MetadataConsumer ;
5- import datadog .trace .core .tagprocessor .PeerServiceCalculator ;
6- import java .util .ArrayList ;
79import java .util .List ;
810import java .util .concurrent .CopyOnWriteArrayList ;
9- import java .util .concurrent .CountDownLatch ;
1011import java .util .concurrent .TimeUnit ;
1112import java .util .concurrent .TimeoutException ;
1213import java .util .concurrent .atomic .AtomicInteger ;
14+ import java .util .function .BooleanSupplier ;
1315import org .slf4j .Logger ;
1416import org .slf4j .LoggerFactory ;
1517
1618/** List writer used by tests mostly */
1719public class ListWriter extends CopyOnWriteArrayList <List <DDSpan >> implements Writer {
18-
1920 private static final Logger log = LoggerFactory .getLogger (ListWriter .class );
21+ private static final Filter ACCEPT_ALL = trace -> true ;
2022
21- public static final Filter ACCEPT_ALL =
22- new Filter () {
23- @ Override
24- public boolean accept (List <DDSpan > trace ) {
25- return true ;
26- }
27- };
28-
29- private final List <CountDownLatch > latches = new ArrayList <>();
3023 private final AtomicInteger traceCount = new AtomicInteger ();
3124 private final TraceStructureWriter structureWriter = new TraceStructureWriter (true );
25+ private final Object monitor = new Object ();
3226
33- private final PeerServiceCalculator peerServiceCalculator = new PeerServiceCalculator ();
3427 private Filter filter = ACCEPT_ALL ;
3528
3629 public List <DDSpan > firstTrace () {
@@ -47,30 +40,44 @@ public void write(List<DDSpan> trace) {
4740 // remotely realistic so the test actually test something
4841 span .processTagsAndBaggage (MetadataConsumer .NO_OP );
4942 }
43+
44+ add (trace );
45+ structureWriter .write (trace );
46+
5047 traceCount .incrementAndGet ();
51- synchronized (latches ) {
52- add (trace );
53- for (final CountDownLatch latch : latches ) {
54- if (size () >= latch .getCount ()) {
55- while (latch .getCount () > 0 ) {
56- latch .countDown ();
57- }
58- }
59- }
48+ synchronized (monitor ) {
49+ monitor .notifyAll ();
6050 }
61- structureWriter .write (trace );
6251 }
6352
64- public boolean waitForTracesMax (final int number , int seconds )
65- throws InterruptedException , TimeoutException {
66- final CountDownLatch latch = new CountDownLatch (number );
67- synchronized (latches ) {
68- if (size () >= number ) {
53+ private boolean awaitUntilDeadline (long timeout , TimeUnit unit , BooleanSupplier predicate )
54+ throws InterruptedException {
55+ final long deadline = System .nanoTime () + unit .toNanos (timeout );
56+
57+ while (true ) {
58+ if (predicate .getAsBoolean ()) {
6959 return true ;
7060 }
71- latches .add (latch );
61+
62+ long now = System .nanoTime ();
63+ long remaining = deadline - now ;
64+ if (remaining <= 0 ) {
65+ break ;
66+ }
67+
68+ long millis = NANOSECONDS .toMillis (remaining );
69+ long nanos = remaining - MILLISECONDS .toNanos (millis );
70+
71+ synchronized (monitor ) {
72+ monitor .wait (millis , (int ) nanos );
73+ }
7274 }
73- return latch .await (seconds , TimeUnit .SECONDS );
75+
76+ return false ;
77+ }
78+
79+ public boolean waitForTracesMax (final int number , int seconds ) throws InterruptedException {
80+ return awaitUntilDeadline (seconds , SECONDS , () -> traceCount .get () >= number );
7481 }
7582
7683 public void waitForTraces (final int number ) throws InterruptedException , TimeoutException {
@@ -88,24 +95,17 @@ public void waitForTraces(final int number) throws InterruptedException, Timeout
8895 }
8996
9097 public void waitUntilReported (final DDSpan span ) throws InterruptedException , TimeoutException {
91- waitUntilReported (span , 20 , TimeUnit . SECONDS );
98+ waitUntilReported (span , 20 , SECONDS );
9299 }
93100
94101 public void waitUntilReported (final DDSpan span , int timeout , TimeUnit unit )
95102 throws InterruptedException , TimeoutException {
96- while (true ) {
97- final CountDownLatch latch = new CountDownLatch (size () + 1 );
98- synchronized (latches ) {
99- latches .add (latch );
100- }
101- if (isReported (span )) {
102- return ;
103- }
104- if (!latch .await (timeout , unit )) {
105- String msg = "Timeout waiting for span to be reported: " + span ;
106- log .warn (msg );
107- throw new TimeoutException (msg );
108- }
103+ boolean reported = awaitUntilDeadline (timeout , unit , () -> isReported (span ));
104+
105+ if (!reported ) {
106+ String msg = "Timeout waiting for span to be reported: " + span ;
107+ log .warn (msg );
108+ throw new TimeoutException (msg );
109109 }
110110 }
111111
@@ -142,17 +142,16 @@ public boolean flush() {
142142 return true ;
143143 }
144144
145+ @ Override
146+ public void clear () {
147+ super .clear ();
148+
149+ traceCount .set (0 );
150+ }
151+
145152 @ Override
146153 public void close () {
147154 clear ();
148- synchronized (latches ) {
149- for (final CountDownLatch latch : latches ) {
150- while (latch .getCount () > 0 ) {
151- latch .countDown ();
152- }
153- }
154- latches .clear ();
155- }
156155 }
157156
158157 @ Override
0 commit comments