Skip to content

Commit

Permalink
hadoop: fix write (#2592)
Browse files Browse the repository at this point in the history
  • Loading branch information
tangyoupeng authored Aug 24, 2022
1 parent 883301c commit 433dee9
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 3 deletions.
10 changes: 7 additions & 3 deletions sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public static interface Libjfs {

int jfs_pread(long pid, int fd, @Out ByteBuffer b, int len, long offset);

int jfs_write(long pid, int fd, @In byte[] b, int len);
int jfs_write(long pid, int fd, @In ByteBuffer b, int len);

int jfs_flush(long pid, int fd);

Expand Down Expand Up @@ -954,7 +954,7 @@ public void write(byte[] b, int off, int len) throws IOException {
if (b.length - off < len) {
throw new IndexOutOfBoundsException();
}
int done = lib.jfs_write(Thread.currentThread().getId(), fd, b, len);
int done = lib.jfs_write(Thread.currentThread().getId(), fd, ByteBuffer.wrap(b, off, len), len);
if (done == EINVAL)
throw new IOException("stream was closed");
if (done < 0)
Expand All @@ -966,7 +966,7 @@ public void write(byte[] b, int off, int len) throws IOException {

@Override
public void write(int b) throws IOException {
int done = lib.jfs_write(Thread.currentThread().getId(), fd, new byte[]{(byte) b}, 1);
int done = lib.jfs_write(Thread.currentThread().getId(), fd, ByteBuffer.wrap(new byte[]{(byte) b}), 1);
if (done == EINVAL)
throw new IOException("stream was closed");
if (done < 0)
Expand Down Expand Up @@ -1010,6 +1010,10 @@ public void hsync() throws IOException {
flush();
((FSOutputStream) out).fsync();
}

public OutputStream getOutputStream() {
return out;
}
}

static class BufferedFSOutputStreamWithStreamCapabilities extends BufferedFSOutputStream
Expand Down
13 changes: 13 additions & 0 deletions sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
Expand Down Expand Up @@ -134,6 +135,18 @@ public FileSystem run() throws Exception {
}
}

public void testWrite() throws Exception {
Path f = new Path("/testWriteFile");
FSDataOutputStream fou = fs.create(f);
byte[] b = "hello world".getBytes();
OutputStream ou = ((JuiceFileSystemImpl.BufferedFSOutputStream)fou.getWrappedStream()).getOutputStream();
ou.write(b, 6, 5);
ou.close();
FSDataInputStream in = fs.open(f);
String str = IOUtils.toString(in);
assertEquals("world", str);
}

public void testReadSkip() throws Exception {
Path p = new Path("/test_readskip");
fs.create(p).close();
Expand Down

0 comments on commit 433dee9

Please sign in to comment.