Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 9 additions & 19 deletions src/main/java/com/github/luben/zstd/RecyclingBufferPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedQueue;

/**
* A pool of buffers which uses a simple reference queue to recycle buffers.
Expand All @@ -15,36 +14,29 @@ public class RecyclingBufferPool implements BufferPool {
public static final BufferPool INSTANCE = new RecyclingBufferPool();

private static final int buffSize = Math.max(Math.max(
(int) ZstdOutputStreamNoFinalizer.recommendedCOutSize(),
(int) ZstdInputStreamNoFinalizer.recommendedDInSize()),
(int) ZstdOutputStreamNoFinalizer.recommendedCOutSize(),
(int) ZstdInputStreamNoFinalizer.recommendedDInSize()),
(int) ZstdInputStreamNoFinalizer.recommendedDOutSize());

private final Deque<SoftReference<ByteBuffer>> pool;
private final ConcurrentLinkedQueue<SoftReference<ByteBuffer>> pool;

private RecyclingBufferPool() {
// TODO: With Java 7 support, migrate this to a ConcurrentLinkedQueue and remove the 'synchronization' of it.
this.pool = new ArrayDeque<SoftReference<ByteBuffer>>();
this.pool = new ConcurrentLinkedQueue<>();
}

@Override
public ByteBuffer get(int capacity) {
if (capacity > buffSize) {
throw new RuntimeException(
"Unsupported buffer size: " + capacity +
". Supported buffer sizes: " + buffSize + " or smaller."
);
". Supported buffer sizes: " + buffSize + " or smaller."
);
}
while(true) {
SoftReference<ByteBuffer> sbuf = null;

// This if statement introduces a possible race condition of allocating a buffer while we're trying to
// release one. However, the extra allocation should be considered insignificant in terms of cost.
// Particularly with respect to throughput.
if (!pool.isEmpty()) {
synchronized (pool) {
sbuf = pool.pollFirst();
}
}
SoftReference<ByteBuffer> sbuf = pool.poll();

if (sbuf == null) {
return ByteBuffer.allocate(buffSize);
Expand All @@ -60,9 +52,7 @@ public ByteBuffer get(int capacity) {
public void release(ByteBuffer buffer) {
if (buffer.capacity() >= buffSize) {
buffer.clear();
synchronized (pool) {
pool.addLast(new SoftReference<ByteBuffer>(buffer));
}
pool.add(new SoftReference<>(buffer));
}
}
}