Skip to content

Commit

Permalink
hadoop: use BufferPool for inputstream
Browse files Browse the repository at this point in the history
  • Loading branch information
tangyoupeng committed Aug 29, 2022
1 parent bb84127 commit 42e7cee
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
4 changes: 3 additions & 1 deletion sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.kenai.jffi.internal.StubLoader;
import io.juicefs.metrics.JuiceFSInstrumentation;
import io.juicefs.utils.BufferPool;
import io.juicefs.utils.ConsistentHash;
import io.juicefs.utils.NodesFetcher;
import io.juicefs.utils.NodesFetcherBuilder;
Expand Down Expand Up @@ -732,7 +733,7 @@ class FileInputStream extends FSInputStream implements ByteBufferReadable {
public FileInputStream(Path f, int fd, int size) throws IOException {
path = f;
this.fd = fd;
buf = ByteBuffer.allocate(size);
buf = BufferPool.getBuffer(size);
buf.limit(0);
position = 0;
}
Expand Down Expand Up @@ -885,6 +886,7 @@ public synchronized void close() throws IOException {
if (buf == null) {
return; // already closed
}
BufferPool.returnBuffer(buf);
buf = null;
int r = lib.jfs_close(Thread.currentThread().getId(), fd);
fd = 0;
Expand Down
48 changes: 48 additions & 0 deletions sdk/java/src/main/java/io/juicefs/utils/BufferPool.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.juicefs.utils;

import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;

/**
* thread safe
*/
public class BufferPool {

private static final ConcurrentMap<Integer, Queue<WeakReference<ByteBuffer>>> buffersBySize = new ConcurrentHashMap<>();

public static ByteBuffer getBuffer(int size) {
Queue<WeakReference<ByteBuffer>> list = buffersBySize.get(size);
if (list == null) {
return ByteBuffer.allocate(size);
}

WeakReference<ByteBuffer> ref;
while ((ref = list.poll()) != null) {
ByteBuffer b = ref.get();
if (b != null) {
return b;
}
}

return ByteBuffer.allocate(size);
}

public static void returnBuffer(ByteBuffer buf) {
buf.clear();
int size = buf.capacity();
Queue<WeakReference<ByteBuffer>> list = buffersBySize.get(size);
if (list == null) {
list = new ConcurrentLinkedQueue<>();
Queue<WeakReference<ByteBuffer>> prev = buffersBySize.putIfAbsent(size, list);
// someone else put a queue in the map before we did
if (prev != null) {
list = prev;
}
}
list.add(new WeakReference<>(buf));
}
}

0 comments on commit 42e7cee

Please sign in to comment.