Skip to content

Commit

Permalink
remove directbuffer in FileInputStream
Browse files Browse the repository at this point in the history
  • Loading branch information
tangyoupeng committed Aug 23, 2022
1 parent 2a6744e commit 2c52d3c
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 32 deletions.
50 changes: 18 additions & 32 deletions sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DirectBufferPool;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.VersionInfo;
import org.json.JSONObject;
Expand Down Expand Up @@ -90,7 +89,6 @@ public class JuiceFileSystemImpl extends FileSystem {
private ScheduledExecutorService nodesFetcherThread;
private ScheduledExecutorService refreshUidThread;
private Map<String, FileStatus> lastFileStatus = new HashMap<>();
private static final DirectBufferPool bufferPool = new DirectBufferPool();
private boolean metricsEnable = false;

/*
Expand Down Expand Up @@ -734,7 +732,7 @@ class FileInputStream extends FSInputStream implements ByteBufferReadable {
public FileInputStream(Path f, int fd, int size) throws IOException {
path = f;
this.fd = fd;
buf = bufferPool.getBuffer(size);
buf = ByteBuffer.allocate(size);
buf.limit(0);
position = 0;
}
Expand Down Expand Up @@ -818,7 +816,6 @@ private boolean refill() throws IOException {
buf.limit(0);
return false; // EOF
}
statistics.incrementBytesRead(-read);
buf.position(0);
buf.limit(read);
position += read;
Expand All @@ -839,14 +836,7 @@ public synchronized int read(long pos, byte[] b, int off, int len) throws IOExce
if (len > 128 << 20) {
len = 128 << 20;
}
ByteBuffer buf = ByteBuffer.wrap(b, off, len);
int got = lib.jfs_pread(Thread.currentThread().getId(), fd, buf, len, pos);
if (got == 0)
return -1;
if (got == EINVAL)
throw new IOException("stream was closed");
if (got < 0)
throw error(got, path);
int got = read(pos, ByteBuffer.wrap(b, off, len));
statistics.incrementBytesRead(got);
return got;
}
Expand All @@ -857,8 +847,11 @@ public synchronized int read(ByteBuffer b) throws IOException {
return 0;
if (buf == null)
throw new IOException("stream was closed");
if (!buf.hasRemaining() && b.remaining() <= buf.capacity() && !refill()) {
return -1;
// only refill in java side when it is heap bytebuffer
if (b.hasArray()) {
if (!buf.hasRemaining() && b.remaining() <= buf.capacity() && !refill()) {
return -1;
}
}
int got = 0;
while (b.hasRemaining() && buf.hasRemaining()) {
Expand All @@ -872,31 +865,25 @@ public synchronized int read(ByteBuffer b) throws IOException {
if (more <= 0)
return got > 0 ? got : -1;
position += more;
statistics.incrementBytesRead(more);
buf.position(0);
buf.limit(0);
return got + more;
}

public synchronized int read(long pos, ByteBuffer b) throws IOException {
private synchronized int read(long pos, ByteBuffer b) throws IOException {
if (!b.hasRemaining())
return 0;
int got;
if (b.hasArray()) {
got = read(pos, b.array(), b.position(), b.remaining());
if (got <= 0)
return got;
} else {
assert b.isDirect();
got = lib.jfs_pread(Thread.currentThread().getId(), fd, b, b.remaining(), pos);
if (got == EINVAL)
throw new IOException("stream was closed");
if (got < 0)
throw error(got, path);
if (got == 0)
return -1;
statistics.incrementBytesRead(got);
}
b.position(b.position() + got);
int startPos = b.position();
got = lib.jfs_pread(Thread.currentThread().getId(), fd, b, b.remaining(), pos);
if (got == EINVAL)
throw new IOException("stream was closed");
if (got < 0)
throw error(got, path);
if (got == 0)
return -1;
b.position(startPos + got);
return got;
}

Expand Down Expand Up @@ -937,7 +924,6 @@ public synchronized void close() throws IOException {
if (buf == null) {
return; // already closed
}
bufferPool.returnBuffer(buf);
buf = null;
int r = lib.jfs_close(Thread.currentThread().getId(), fd);
fd = 0;
Expand Down
4 changes: 4 additions & 0 deletions sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,10 @@ public void testReadStats() throws IOException {
in.read(3000, new byte[6000], 0, 3000);
assertEquals(readSize * 2 + 3000 + 3000, statistics.getBytesRead());

in.read(new byte[3000], 0, 3000);
assertEquals(readSize * 2 + 3000 + 3000 + 3000, statistics.getBytesRead());

in.close();
}

public void testChecksum() throws IOException {
Expand Down

0 comments on commit 2c52d3c

Please sign in to comment.