From 9fc44a8ca0fee6090df5ae16673549f8daa88e84 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sat, 24 Jun 2017 11:24:50 +0900 Subject: [PATCH 1/4] Concat byte stream --- .../net/jpountz/lz4/LZ4BlockInputStream.java | 12 ++++- .../jpountz/lz4/LZ4BlockStreamingTest.java | 45 +++++++++++++++++++ 2 files changed, 55 insertions(+), 2 deletions(-) diff --git a/src/java/net/jpountz/lz4/LZ4BlockInputStream.java b/src/java/net/jpountz/lz4/LZ4BlockInputStream.java index 21ff671c..9bf6fcee 100644 --- a/src/java/net/jpountz/lz4/LZ4BlockInputStream.java +++ b/src/java/net/jpountz/lz4/LZ4BlockInputStream.java @@ -147,7 +147,15 @@ public long skip(long n) throws IOException { } private void refill() throws IOException { - readFully(compressedBuffer, HEADER_LENGTH); + if (finished || o < originalLen) { + return; + } + try { + readFully(compressedBuffer, HEADER_LENGTH); + } catch (EOFException e) { + finished = true; + return; + } for (int i = 0; i < MAGIC_LENGTH; ++i) { if (compressedBuffer[i] != MAGIC[i]) { throw new IOException("Stream is corrupted"); @@ -175,7 +183,7 @@ private void refill() throws IOException { if (check != 0) { throw new IOException("Stream is corrupted"); } - finished = true; + refill(); return; } if (buffer.length < originalLen) { diff --git a/src/test/net/jpountz/lz4/LZ4BlockStreamingTest.java b/src/test/net/jpountz/lz4/LZ4BlockStreamingTest.java index 3bb491da..6127b065 100644 --- a/src/test/net/jpountz/lz4/LZ4BlockStreamingTest.java +++ b/src/test/net/jpountz/lz4/LZ4BlockStreamingTest.java @@ -293,4 +293,49 @@ public void testDoubleClose() throws IOException { in.close(); in.close(); } + + private static int readFully(InputStream in, byte[] b) throws IOException { + int total; + int result; + for (total = 0; total < b.length; total += result) { + result = in.read(b, total, b.length - total); + if(result == -1) { + break; + } + } + return total; + } + + @Test + public void testConcatenationOfSerializedStreams() throws IOException { + final byte[] testBytes1 = randomArray(64, 256); + final byte[] testBytes2 = randomArray(64, 256); + byte[] expected = new byte[128]; + System.arraycopy(testBytes1, 0, expected, 0, 64); + System.arraycopy(testBytes2, 0, expected, 64, 64); + + ByteArrayOutputStream bytes1os = new ByteArrayOutputStream(); + LZ4BlockOutputStream out1 = new LZ4BlockOutputStream(bytes1os); + out1.write(testBytes1); + out1.close(); + + ByteArrayOutputStream bytes2os = new ByteArrayOutputStream(); + LZ4BlockOutputStream out2 = new LZ4BlockOutputStream(bytes2os); + out2.write(testBytes2); + out2.close(); + + byte[] bytes1 = bytes1os.toByteArray(); + byte[] bytes2 = bytes2os.toByteArray(); + byte[] concatenatedBytes = new byte[bytes1.length + bytes2.length]; + System.arraycopy(bytes1, 0, concatenatedBytes, 0, bytes1.length); + System.arraycopy(bytes2, 0, concatenatedBytes, bytes1.length, bytes2.length); + + LZ4BlockInputStream in = new LZ4BlockInputStream(new ByteArrayInputStream(concatenatedBytes)); + byte[] actual = new byte[128]; + assertEquals(128, readFully(in, actual)); + assertEquals(-1, in.read()); + in.close(); + + assertArrayEquals(expected, actual); + } } From 22e173320cb45e079cf0757acb0617caba659d64 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Wed, 28 Jun 2017 10:48:14 +0900 Subject: [PATCH 2/4] Update code to keep the default behaviour --- .../net/jpountz/lz4/LZ4BlockInputStream.java | 21 ++++++++++--- .../jpountz/lz4/LZ4BlockStreamingTest.java | 31 ++++++++++++++----- 2 files changed, 41 insertions(+), 11 deletions(-) diff --git a/src/java/net/jpountz/lz4/LZ4BlockInputStream.java b/src/java/net/jpountz/lz4/LZ4BlockInputStream.java index 9bf6fcee..8bf18f55 100644 --- a/src/java/net/jpountz/lz4/LZ4BlockInputStream.java +++ b/src/java/net/jpountz/lz4/LZ4BlockInputStream.java @@ -29,7 +29,6 @@ import java.util.zip.Checksum; import net.jpountz.util.SafeUtils; -import net.jpountz.util.Utils; import net.jpountz.xxhash.StreamingXXHash32; import net.jpountz.xxhash.XXHash32; import net.jpountz.xxhash.XXHashFactory; @@ -40,7 +39,7 @@ * support {@link #mark(int)}/{@link #reset()}. * @see LZ4BlockOutputStream */ -public final class LZ4BlockInputStream extends FilterInputStream { +public class LZ4BlockInputStream extends FilterInputStream { private final LZ4FastDecompressor decompressor; private final Checksum checksum; @@ -49,6 +48,7 @@ public final class LZ4BlockInputStream extends FilterInputStream { private int originalLen; private int o; private boolean finished; + private boolean stopOnEmptyBlock; /** * Create a new {@link InputStream}. @@ -68,6 +68,11 @@ public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Che this.compressedBuffer = new byte[HEADER_LENGTH]; o = originalLen = 0; finished = false; + stopOnEmptyBlock = true; + } + + protected void setStopOnEmptyBlock(boolean stopOnEmptyBlock) { + this.stopOnEmptyBlock = stopOnEmptyBlock; } /** @@ -153,7 +158,11 @@ private void refill() throws IOException { try { readFully(compressedBuffer, HEADER_LENGTH); } catch (EOFException e) { - finished = true; + if (!stopOnEmptyBlock) { + finished = true; + } else { + throw e; + } return; } for (int i = 0; i < MAGIC_LENGTH; ++i) { @@ -183,7 +192,11 @@ private void refill() throws IOException { if (check != 0) { throw new IOException("Stream is corrupted"); } - refill(); + if (!stopOnEmptyBlock) { + refill(); + } else { + finished = true; + } return; } if (buffer.length < originalLen) { diff --git a/src/test/net/jpountz/lz4/LZ4BlockStreamingTest.java b/src/test/net/jpountz/lz4/LZ4BlockStreamingTest.java index 6127b065..9e592ae9 100644 --- a/src/test/net/jpountz/lz4/LZ4BlockStreamingTest.java +++ b/src/test/net/jpountz/lz4/LZ4BlockStreamingTest.java @@ -306,6 +306,14 @@ private static int readFully(InputStream in, byte[] b) throws IOException { return total; } + static class LZ4BlockInputStreamSupportStreamConcatenation extends LZ4BlockInputStream { + + public LZ4BlockInputStreamSupportStreamConcatenation(InputStream in) { + super(in); + setStopOnEmptyBlock(false); + } + } + @Test public void testConcatenationOfSerializedStreams() throws IOException { final byte[] testBytes1 = randomArray(64, 256); @@ -330,12 +338,21 @@ public void testConcatenationOfSerializedStreams() throws IOException { System.arraycopy(bytes1, 0, concatenatedBytes, 0, bytes1.length); System.arraycopy(bytes2, 0, concatenatedBytes, bytes1.length, bytes2.length); - LZ4BlockInputStream in = new LZ4BlockInputStream(new ByteArrayInputStream(concatenatedBytes)); - byte[] actual = new byte[128]; - assertEquals(128, readFully(in, actual)); - assertEquals(-1, in.read()); - in.close(); - - assertArrayEquals(expected, actual); + // In a default behaviour, we can read the first block of the concatenated bytes only + LZ4BlockInputStream in1 = new LZ4BlockInputStream(new ByteArrayInputStream(concatenatedBytes)); + byte[] actual1 = new byte[128]; + assertEquals(64, readFully(in1, actual1)); + assertEquals(-1, in1.read()); + in1.close(); + + // Check if we can read concatenated byte stream + LZ4BlockInputStream in2 = new LZ4BlockInputStreamSupportStreamConcatenation( + new ByteArrayInputStream(concatenatedBytes)); + byte[] actual2 = new byte[128]; + assertEquals(128, readFully(in2, actual2)); + assertEquals(-1, in2.read()); + in2.close(); + + assertArrayEquals(expected, actual2); } } From c3ee31fc461159b2d62e1b218811526e09dc52a7 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Thu, 29 Jun 2017 10:00:28 +0900 Subject: [PATCH 3/4] Add a new constructor for stopOnEmptyBlock --- .../net/jpountz/lz4/LZ4BlockInputStream.java | 46 ++++++++++++------- .../jpountz/lz4/LZ4BlockStreamingTest.java | 11 +---- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/src/java/net/jpountz/lz4/LZ4BlockInputStream.java b/src/java/net/jpountz/lz4/LZ4BlockInputStream.java index 8bf18f55..e2a08363 100644 --- a/src/java/net/jpountz/lz4/LZ4BlockInputStream.java +++ b/src/java/net/jpountz/lz4/LZ4BlockInputStream.java @@ -39,49 +39,64 @@ * support {@link #mark(int)}/{@link #reset()}. * @see LZ4BlockOutputStream */ -public class LZ4BlockInputStream extends FilterInputStream { +public final class LZ4BlockInputStream extends FilterInputStream { private final LZ4FastDecompressor decompressor; private final Checksum checksum; + private final boolean stopOnEmptyBlock; private byte[] buffer; private byte[] compressedBuffer; private int originalLen; private int o; private boolean finished; - private boolean stopOnEmptyBlock; /** * Create a new {@link InputStream}. * - * @param in the {@link InputStream} to poll - * @param decompressor the {@link LZ4FastDecompressor decompressor} instance to - * use - * @param checksum the {@link Checksum} instance to use, must be - * equivalent to the instance which has been used to - * write the stream + * @param in the {@link InputStream} to poll + * @param decompressor the {@link LZ4FastDecompressor decompressor} instance to + * use + * @param checksum the {@link Checksum} instance to use, must be + * equivalent to the instance which has been used to + * write the stream + * @param stopOnEmptyBlock whether read is stopped on an empty block */ - public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Checksum checksum) { + public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Checksum checksum, Boolean stopOnEmptyBlock) { super(in); this.decompressor = decompressor; this.checksum = checksum; + this.stopOnEmptyBlock = stopOnEmptyBlock; this.buffer = new byte[0]; this.compressedBuffer = new byte[HEADER_LENGTH]; o = originalLen = 0; finished = false; - stopOnEmptyBlock = true; } - protected void setStopOnEmptyBlock(boolean stopOnEmptyBlock) { - this.stopOnEmptyBlock = stopOnEmptyBlock; + /** + * Create a new instance using {@link XXHash32} for checksuming. + * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum) + * @see StreamingXXHash32#asChecksum() + */ + public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Checksum checksum) { + this(in, decompressor, checksum, true); } /** * Create a new instance using {@link XXHash32} for checksuming. - * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum) + * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum, Boolean) * @see StreamingXXHash32#asChecksum() */ public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor) { - this(in, decompressor, XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum()); + this(in, decompressor, XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum(), true); + } + + /** + * Create a new instance using {@link XXHash32} for checksuming. + * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum, Boolean) + * @see StreamingXXHash32#asChecksum() + */ + public LZ4BlockInputStream(InputStream in, Boolean stopOnEmptyBlock) { + this(in, LZ4Factory.fastestInstance().fastDecompressor(), XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum(), stopOnEmptyBlock); } /** @@ -152,9 +167,6 @@ public long skip(long n) throws IOException { } private void refill() throws IOException { - if (finished || o < originalLen) { - return; - } try { readFully(compressedBuffer, HEADER_LENGTH); } catch (EOFException e) { diff --git a/src/test/net/jpountz/lz4/LZ4BlockStreamingTest.java b/src/test/net/jpountz/lz4/LZ4BlockStreamingTest.java index 9e592ae9..903fceb9 100644 --- a/src/test/net/jpountz/lz4/LZ4BlockStreamingTest.java +++ b/src/test/net/jpountz/lz4/LZ4BlockStreamingTest.java @@ -306,14 +306,6 @@ private static int readFully(InputStream in, byte[] b) throws IOException { return total; } - static class LZ4BlockInputStreamSupportStreamConcatenation extends LZ4BlockInputStream { - - public LZ4BlockInputStreamSupportStreamConcatenation(InputStream in) { - super(in); - setStopOnEmptyBlock(false); - } - } - @Test public void testConcatenationOfSerializedStreams() throws IOException { final byte[] testBytes1 = randomArray(64, 256); @@ -346,8 +338,7 @@ public void testConcatenationOfSerializedStreams() throws IOException { in1.close(); // Check if we can read concatenated byte stream - LZ4BlockInputStream in2 = new LZ4BlockInputStreamSupportStreamConcatenation( - new ByteArrayInputStream(concatenatedBytes)); + LZ4BlockInputStream in2 = new LZ4BlockInputStream(new ByteArrayInputStream(concatenatedBytes), false); byte[] actual2 = new byte[128]; assertEquals(128, readFully(in2, actual2)); assertEquals(-1, in2.read()); From 247155cf4553da1f610b2d481087d8a385bb159f Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 30 Jun 2017 09:11:23 +0900 Subject: [PATCH 4/4] Change Boolean to boolean --- src/java/net/jpountz/lz4/LZ4BlockInputStream.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/java/net/jpountz/lz4/LZ4BlockInputStream.java b/src/java/net/jpountz/lz4/LZ4BlockInputStream.java index e2a08363..ddf425fd 100644 --- a/src/java/net/jpountz/lz4/LZ4BlockInputStream.java +++ b/src/java/net/jpountz/lz4/LZ4BlockInputStream.java @@ -61,7 +61,7 @@ public final class LZ4BlockInputStream extends FilterInputStream { * write the stream * @param stopOnEmptyBlock whether read is stopped on an empty block */ - public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Checksum checksum, Boolean stopOnEmptyBlock) { + public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Checksum checksum, boolean stopOnEmptyBlock) { super(in); this.decompressor = decompressor; this.checksum = checksum; @@ -83,7 +83,7 @@ public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Che /** * Create a new instance using {@link XXHash32} for checksuming. - * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum, Boolean) + * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum, boolean) * @see StreamingXXHash32#asChecksum() */ public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor) { @@ -92,10 +92,10 @@ public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor) { /** * Create a new instance using {@link XXHash32} for checksuming. - * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum, Boolean) + * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum, boolean) * @see StreamingXXHash32#asChecksum() */ - public LZ4BlockInputStream(InputStream in, Boolean stopOnEmptyBlock) { + public LZ4BlockInputStream(InputStream in, boolean stopOnEmptyBlock) { this(in, LZ4Factory.fastestInstance().fastDecompressor(), XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum(), stopOnEmptyBlock); }