diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java index ef20201a37b..f72c7d7fdb7 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java +++ b/lang/java/avro/src/test/java/org/apache/avro/TestDataFile.java @@ -25,6 +25,7 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.List; import java.util.Random; @@ -36,11 +37,15 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.file.FileReader; import org.apache.avro.file.SeekableFileInput; +import org.apache.avro.file.SyncableFileOutputStream; import org.apache.avro.file.Syncable; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.avro.specific.SpecificDatumWriter; import org.apache.avro.util.RandomData; import org.junit.jupiter.api.Test; @@ -102,6 +107,7 @@ public void runTestsInOrder(CodecFactory codec) throws Exception { testReadWithHeader(codec); testFSync(codec, false); testFSync(codec, true); + testAppendStream(codec); } private void testGenericWrite(CodecFactory codec) throws IOException { @@ -333,6 +339,64 @@ private void testFSync(CodecFactory codec, boolean useFile) throws IOException { } } + private void testAppendStream(CodecFactory codec) throws IOException { + File file = makeFile(codec); + + DatumWriter datumWriter = new SpecificDatumWriter<>(); + + // write COUNT objects to datafile + try (DataFileWriter writer = new DataFileWriter<>(datumWriter)) { + try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) { + + SyncableFileOutputStream fileOutputStream = new SyncableFileOutputStream(raf.getFD()); + + writer.create(SCHEMA, fileOutputStream); + for (Object datum : new RandomData(SCHEMA, COUNT, SEED)) { + writer.append(datum); + writer.flush(); + } + } + } + + // append to existing file + try (DataFileWriter writer = new DataFileWriter<>(datumWriter)) { + try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) { + try (RandomAccessFile rif = new RandomAccessFile(file, "r")) { + + SyncableFileOutputStream fileOutputStream = new SyncableFileOutputStream(raf.getFD()); + + // seek to end + fileOutputStream.getChannel().position(fileOutputStream.getChannel().size()); + + // append to existing fileStream + try (SeekableFileInput in = new SeekableFileInput(rif.getFD())) { + writer.appendTo(in, fileOutputStream); + } + + for (Object datum : new RandomData(SCHEMA, COUNT, SEED + 1)) { + writer.append(datum); + writer.flush(); + } + } + } + } + + // verify objects in file + long recordCounter = 0; + + try (RandomAccessFile raf = new RandomAccessFile(file, "r")) { + SeekableFileInput in = new SeekableFileInput(raf.getFD()); + DatumReader datumReader = new SpecificDatumReader<>(); + try (FileReader reader = DataFileReader.openReader(in, datumReader)) { + while (reader.hasNext()) { + reader.next(); + recordCounter++; + } + } + } + assertEquals(COUNT * 2, recordCounter); + } + static void readFile(File f, DatumReader datumReader) throws IOException { try (FileReader reader = DataFileReader.openReader(f, datumReader)) { for (Object datum : reader) {