diff --git a/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java b/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java index 8e5f2e598da0f..534442f935ce0 100644 --- a/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java +++ b/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java @@ -734,7 +734,7 @@ class FileInputStream extends FSInputStream implements ByteBufferReadable { public FileInputStream(Path f, int fd, int size) throws IOException { path = f; this.fd = fd; - buf = bufferPool.getBuffer(size); + buf = ByteBuffer.allocate(size); buf.limit(0); position = 0; } @@ -818,7 +818,6 @@ private boolean refill() throws IOException { buf.limit(0); return false; // EOF } - statistics.incrementBytesRead(-read); buf.position(0); buf.limit(read); position += read; @@ -839,14 +838,7 @@ public synchronized int read(long pos, byte[] b, int off, int len) throws IOExce if (len > 128 << 20) { len = 128 << 20; } - ByteBuffer buf = ByteBuffer.wrap(b, off, len); - int got = lib.jfs_pread(Thread.currentThread().getId(), fd, buf, len, pos); - if (got == 0) - return -1; - if (got == EINVAL) - throw new IOException("stream was closed"); - if (got < 0) - throw error(got, path); + int got = read(pos, ByteBuffer.wrap(b, off, len)); statistics.incrementBytesRead(got); return got; } @@ -857,8 +849,11 @@ public synchronized int read(ByteBuffer b) throws IOException { return 0; if (buf == null) throw new IOException("stream was closed"); - if (!buf.hasRemaining() && b.remaining() <= buf.capacity() && !refill()) { - return -1; + // only refill in java side when it is heap bytebuffer + if (b.hasArray()) { + if (!buf.hasRemaining() && b.remaining() <= buf.capacity() && !refill()) { + return -1; + } } int got = 0; while (b.hasRemaining() && buf.hasRemaining()) { @@ -872,31 +867,25 @@ public synchronized int read(ByteBuffer b) throws IOException { if (more <= 0) return got > 0 ? got : -1; position += more; + statistics.incrementBytesRead(more); buf.position(0); buf.limit(0); return got + more; } - public synchronized int read(long pos, ByteBuffer b) throws IOException { + private synchronized int read(long pos, ByteBuffer b) throws IOException { if (!b.hasRemaining()) return 0; int got; - if (b.hasArray()) { - got = read(pos, b.array(), b.position(), b.remaining()); - if (got <= 0) - return got; - } else { - assert b.isDirect(); - got = lib.jfs_pread(Thread.currentThread().getId(), fd, b, b.remaining(), pos); - if (got == EINVAL) - throw new IOException("stream was closed"); - if (got < 0) - throw error(got, path); - if (got == 0) - return -1; - statistics.incrementBytesRead(got); - } - b.position(b.position() + got); + int startPos = b.position(); + got = lib.jfs_pread(Thread.currentThread().getId(), fd, b, b.remaining(), pos); + if (got == EINVAL) + throw new IOException("stream was closed"); + if (got < 0) + throw error(got, path); + if (got == 0) + return -1; + b.position(startPos + got); return got; } diff --git a/sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java b/sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java index 02252cf8aa5bf..206cccd9839d0 100644 --- a/sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java +++ b/sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java @@ -422,6 +422,10 @@ public void testReadStats() throws IOException { in.read(3000, new byte[6000], 0, 3000); assertEquals(readSize * 2 + 3000 + 3000, statistics.getBytesRead()); + in.read(new byte[3000], 0, 3000); + assertEquals(readSize * 2 + 3000 + 3000 + 3000, statistics.getBytesRead()); + + in.close(); } public void testChecksum() throws IOException {