Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.Options;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
Expand Down Expand Up @@ -834,7 +835,8 @@ public String toString() {
}

/** Write key/value pairs to a sequence-format file. */
public static class Writer implements java.io.Closeable, Syncable {
public static class Writer implements java.io.Closeable, Syncable,
Flushable, StreamCapabilities {
private Configuration conf;
FSDataOutputStream out;
boolean ownOutputStream = true;
Expand Down Expand Up @@ -1367,6 +1369,21 @@ public void hflush() throws IOException {
out.hflush();
}
}

@Override
public void flush() throws IOException {
if (out != null) {
out.flush();
}
}

@Override
public boolean hasCapability(String capability) {
if (out !=null && capability != null) {
return out.hasCapability(capability);
}
return false;
}

/** Returns the configuration of this file. */
Configuration getConf() { return conf; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.conf.*;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -730,6 +731,31 @@ public void testSerializationAvailability() throws IOException {
}
}

@Test
public void testSequenceFileWriter() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you are trying to do here and think it's actually overkill. And as mockito backporting is an utter nightmare (believe me), I don't want mockito code unless need be.

how about just creating a writer on the localfs (no mocking), verify that you can verify that the stream supports "HSYNC"; call hflush and hsync on it.

Now, a full end to end test would be to write data, flush it and verify that the file length is now > 0.

writer.write()
writer.hflush();
writer.hsync();
Assertions.assertThat(fs.getFileStatus(p).getLen()).isGreaterThan(0);

that should be enough. Probe passthrough and the write goes through.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@steveloughran Thank you for the feedback. I have addressed all your concerns. Please take a look.

Configuration conf = new Configuration();
// This test only works with Raw File System and not Local File System
FileSystem fs = FileSystem.getLocal(conf).getRaw();
Path p = new Path(GenericTestUtils
.getTempPath("testSequenceFileWriter.seq"));
try(SequenceFile.Writer writer = SequenceFile.createWriter(
fs, conf, p, LongWritable.class, Text.class)) {
Assertions.assertThat(writer.hasCapability
(StreamCapabilities.HSYNC)).isEqualTo(true);
Assertions.assertThat(writer.hasCapability(
StreamCapabilities.HFLUSH)).isEqualTo(true);
LongWritable key = new LongWritable();
key.set(1);
Text value = new Text();
value.set("value");
writer.append(key, value);
writer.flush();
writer.hflush();
writer.hsync();
Assertions.assertThat(fs.getFileStatus(p).getLen()).isGreaterThan(0);
}
}

/** For debugging and testing. */
public static void main(String[] args) throws Exception {
int count = 1024 * 1024;
Expand Down