Skip to content

Commit 266b941

Browse files
committed
Part of #1260: write a manually run concurrency test to tease out problem with LockFreePool
1 parent 33a99d3 commit 266b941

File tree

1 file changed

+159
-0
lines changed

1 file changed

+159
-0
lines changed
+159
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package perf;
2+
3+
import java.io.*;
4+
import java.nio.charset.StandardCharsets;
5+
import java.util.Random;
6+
import java.util.concurrent.ExecutorService;
7+
import java.util.concurrent.Executors;
8+
import java.util.concurrent.TimeUnit;
9+
import java.util.concurrent.atomic.AtomicInteger;
10+
import java.util.concurrent.atomic.AtomicLong;
11+
12+
import com.fasterxml.jackson.core.JsonFactory;
13+
import com.fasterxml.jackson.core.JsonGenerator;
14+
import com.fasterxml.jackson.core.JsonParser;
15+
import com.fasterxml.jackson.core.util.BufferRecycler;
16+
import com.fasterxml.jackson.core.util.JsonRecyclerPools;
17+
import com.fasterxml.jackson.core.util.RecyclerPool;
18+
19+
/**
20+
* High-concurrency test that tries to see if unbounded {@link RecyclerPool}
21+
* implementations grow without bounds or not.
22+
*/
23+
public class RecyclerPoolTest
24+
{
25+
final static int THREAD_COUNT = 100;
26+
27+
final static int RUNTIME_SECS = 30;
28+
29+
private final int _threadCount;
30+
private final long _runtimeMsecs;
31+
32+
RecyclerPoolTest(int threadCount, int runtimeMinutes) {
33+
_threadCount = threadCount;
34+
_runtimeMsecs = TimeUnit.SECONDS.toMillis(runtimeMinutes);
35+
}
36+
37+
public void testPool(JsonFactory jsonF)
38+
throws InterruptedException
39+
{
40+
RecyclerPool<BufferRecycler> poolImpl = jsonF._getRecyclerPool();
41+
42+
final String poolName = poolImpl.getClass().getSimpleName();
43+
final ExecutorService exec = Executors.newFixedThreadPool(_threadCount);
44+
final AtomicLong calls = new AtomicLong();
45+
final long startTime = System.currentTimeMillis();
46+
final long endtimeMsecs = startTime + _runtimeMsecs;
47+
final AtomicInteger threadsRunning = new AtomicInteger();
48+
49+
System.out.printf("Starting test of '%s' with %d threads, for %d seconds.\n",
50+
poolImpl.getClass().getName(),
51+
_threadCount, _runtimeMsecs / 1000L);
52+
53+
for (int i = 0; i < _threadCount; ++i) {
54+
final int id = i;
55+
threadsRunning.incrementAndGet();
56+
exec.execute(new Runnable() {
57+
@Override
58+
public void run() {
59+
testUntil(jsonF, endtimeMsecs, id, calls);
60+
threadsRunning.decrementAndGet();
61+
}
62+
});
63+
}
64+
65+
long currentTime;
66+
long nextPrint = 0L;
67+
// Print if exceeds expected max whenever, otherwise every 2.5 seconds
68+
final int thresholdToPrint = _threadCount + 5;
69+
70+
while ((currentTime = System.currentTimeMillis()) < endtimeMsecs) {
71+
int poolSize;
72+
73+
if ((poolSize = poolImpl.pooledCount()) > thresholdToPrint
74+
|| (currentTime > nextPrint)) {
75+
double secs = (currentTime - startTime) / 1000.0;
76+
System.out.printf(" (%s) %.1fs, %d calls; %d threads; pool size: %d\n",
77+
poolName, secs, calls.get(), threadsRunning.get(), poolSize);
78+
Thread.sleep(100L);
79+
nextPrint = currentTime + 2500L;
80+
}
81+
}
82+
83+
System.out.printf("Completed test of '%s' with %d threads running... wait termination\n",
84+
poolImpl.getClass().getSimpleName(),
85+
threadsRunning.get());
86+
if (!exec.awaitTermination(2000, TimeUnit.MILLISECONDS)) {
87+
System.out.printf("WARNING: ExecutorService.awaitTermination() failed: %d threads left; will shut down.\n",
88+
threadsRunning.get());
89+
exec.shutdown();
90+
}
91+
}
92+
93+
void testUntil(JsonFactory jsonF,
94+
long endTimeMsecs, int threadId, AtomicLong calls)
95+
{
96+
final Random rnd = new Random(threadId);
97+
final byte[] JSON_INPUT = "\"abc\"".getBytes(StandardCharsets.UTF_8);
98+
99+
while (System.currentTimeMillis() < endTimeMsecs) {
100+
try {
101+
// Randomize call order a bit
102+
switch (rnd.nextInt() & 3) {
103+
case 0:
104+
_testRead(jsonF, JSON_INPUT);
105+
break;
106+
case 1:
107+
_testWrite(jsonF);
108+
break;
109+
case 2:
110+
_testRead(jsonF, JSON_INPUT);
111+
_testWrite(jsonF);
112+
break;
113+
default:
114+
_testWrite(jsonF);
115+
_testRead(jsonF, JSON_INPUT);
116+
break;
117+
}
118+
} catch (Exception e) {
119+
System.err.printf("ERROR: thread %d fail, will exit: (%s) %s\n",
120+
threadId, e.getClass().getName(), e.getMessage());
121+
break;
122+
}
123+
calls.incrementAndGet();
124+
}
125+
}
126+
127+
private void _testRead(JsonFactory jsonF, byte[] input) throws Exception
128+
{
129+
JsonParser p = jsonF.createParser(new ByteArrayInputStream(input));
130+
while (p.nextToken() != null) {
131+
;
132+
}
133+
p.close();
134+
}
135+
136+
private void _testWrite(JsonFactory jsonF) throws Exception
137+
{
138+
StringWriter w = new StringWriter(16);
139+
JsonGenerator g = jsonF.createGenerator(w);
140+
g.writeStartArray();
141+
g.writeString("foobar");
142+
g.writeEndArray();
143+
g.close();
144+
}
145+
146+
public static void main(String[] args) throws Exception
147+
{
148+
RecyclerPoolTest test = new RecyclerPoolTest(THREAD_COUNT, RUNTIME_SECS);
149+
test.testPool(JsonFactory.builder()
150+
.recyclerPool(JsonRecyclerPools.newLockFreePool())
151+
.build());
152+
test.testPool(JsonFactory.builder()
153+
.recyclerPool(JsonRecyclerPools.newConcurrentDequePool())
154+
.build());
155+
test.testPool(JsonFactory.builder()
156+
.recyclerPool(JsonRecyclerPools.newBoundedPool(THREAD_COUNT - 5))
157+
.build());
158+
}
159+
}

0 commit comments

Comments
 (0)