diff --git a/pom.xml b/pom.xml index 91c88ef6e..a90f22521 100644 --- a/pom.xml +++ b/pom.xml @@ -57,7 +57,7 @@ 2021 UTF-8 4.5.13 - 1.3.0 + 1.7.1 2.9.10 2.9.10.8 29.0-jre @@ -98,8 +98,8 @@ ${httpclient.version} - net.jpountz.lz4 - lz4 + org.lz4 + lz4-java ${lz4.version} diff --git a/src/main/java/ru/yandex/clickhouse/response/ClickHouseLZ4Stream.java b/src/main/java/ru/yandex/clickhouse/response/ClickHouseLZ4Stream.java index dda09690d..75139a5e3 100644 --- a/src/main/java/ru/yandex/clickhouse/response/ClickHouseLZ4Stream.java +++ b/src/main/java/ru/yandex/clickhouse/response/ClickHouseLZ4Stream.java @@ -14,7 +14,7 @@ public class ClickHouseLZ4Stream extends InputStream { - private static final LZ4Factory factory = LZ4Factory.safeInstance(); + private static final LZ4Factory factory = LZ4Factory.fastestInstance(); public static final int MAGIC = 0x82; diff --git a/src/main/java/ru/yandex/clickhouse/util/ClickHouseLZ4OutputStream.java b/src/main/java/ru/yandex/clickhouse/util/ClickHouseLZ4OutputStream.java index 5c20ec2c3..07805be88 100644 --- a/src/main/java/ru/yandex/clickhouse/util/ClickHouseLZ4OutputStream.java +++ b/src/main/java/ru/yandex/clickhouse/util/ClickHouseLZ4OutputStream.java @@ -9,7 +9,7 @@ import java.io.OutputStream; public class ClickHouseLZ4OutputStream extends OutputStream { - private static final LZ4Factory factory = LZ4Factory.safeInstance(); + private static final LZ4Factory factory = LZ4Factory.fastestInstance(); private final LittleEndianDataOutputStream dataWrapper; private final LZ4Compressor compressor; @@ -26,8 +26,8 @@ public ClickHouseLZ4OutputStream(OutputStream stream, int maxCompressBlockSize) } /** - * @return Location of pointer in the byte buffer (bytes not yet flushed) - */ + * @return Location of pointer in the byte buffer (bytes not yet flushed) + */ public int position() { return pointer; } @@ -42,9 +42,36 @@ public void write(int b) throws IOException { } } + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return; + } + + int blockSize = currentBlock.length; + int rest = blockSize - pointer; + while (len >= rest) { + System.arraycopy(b, off, currentBlock, pointer, rest); + pointer += rest; + writeBlock(); + off += rest; + len -= rest; + rest = blockSize; + } + + if (len > 0) { + System.arraycopy(b, off, currentBlock, pointer, len); + pointer += len; + } + } + @Override public void flush() throws IOException { - if (pointer != 0){ + if (pointer != 0) { writeBlock(); } dataWrapper.flush(); @@ -52,7 +79,7 @@ public void flush() throws IOException { private void writeBlock() throws IOException { int compressed = compressor.compress(currentBlock, 0, pointer, compressedBlock, 0); - ClickHouseBlockChecksum checksum = ClickHouseBlockChecksum.calculateForBlock((byte)ClickHouseLZ4Stream.MAGIC, + ClickHouseBlockChecksum checksum = ClickHouseBlockChecksum.calculateForBlock((byte) ClickHouseLZ4Stream.MAGIC, compressed + 9, pointer, compressedBlock, compressed); dataWrapper.write(checksum.asBytes()); dataWrapper.writeByte(ClickHouseLZ4Stream.MAGIC); diff --git a/src/test/java/ru/yandex/clickhouse/util/ClickHouseLZ4OutputStreamTest.java b/src/test/java/ru/yandex/clickhouse/util/ClickHouseLZ4OutputStreamTest.java new file mode 100644 index 000000000..8746941da --- /dev/null +++ b/src/test/java/ru/yandex/clickhouse/util/ClickHouseLZ4OutputStreamTest.java @@ -0,0 +1,209 @@ +package ru.yandex.clickhouse.util; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import org.testng.Assert; +import org.testng.annotations.Test; + +public class ClickHouseLZ4OutputStreamTest { + private byte[] genCompressedByts(int b, int length, int blockSize) throws IOException { + ByteArrayOutputStream bas = new ByteArrayOutputStream(blockSize * 512); + try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, blockSize)) { + for (int i = 0; i < length; i++) { + out.write(b); + } + out.flush(); + } + + byte[] bytes = bas.toByteArray(); + bas.close(); + return bytes; + } + + @Test + public void testWrite() throws IOException { + ByteArrayOutputStream bas = new ByteArrayOutputStream(64); + + try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 2)) { + byte[] bytes = new byte[] { (byte) -36, (byte) -86, (byte) 31, (byte) 113, (byte) -106, (byte) 44, + (byte) 99, (byte) 96, (byte) 112, (byte) -7, (byte) 47, (byte) 15, (byte) -63, (byte) 39, + (byte) -73, (byte) -104, (byte) -126, (byte) 12, (byte) 0, (byte) 0, (byte) 0, (byte) 2, (byte) 0, + (byte) 0, (byte) 0, (byte) 32, (byte) 1, (byte) 2 }; + out.write(1); + Assert.assertEquals(bas.toByteArray(), new byte[0]); + out.write(2); + Assert.assertEquals(bas.toByteArray(), bytes); + out.write(3); + Assert.assertEquals(bas.toByteArray(), bytes); + out.flush(); + Assert.assertEquals(bas.toByteArray(), + new byte[] { (byte) -36, (byte) -86, (byte) 31, (byte) 113, (byte) -106, (byte) 44, (byte) 99, + (byte) 96, (byte) 112, (byte) -7, (byte) 47, (byte) 15, (byte) -63, (byte) 39, (byte) -73, + (byte) -104, (byte) -126, (byte) 12, (byte) 0, (byte) 0, (byte) 0, (byte) 2, (byte) 0, + (byte) 0, (byte) 0, (byte) 32, (byte) 1, (byte) 2, (byte) 64, (byte) -39, (byte) 21, + (byte) 50, (byte) -77, (byte) -124, (byte) 25, (byte) 73, (byte) -59, (byte) 9, (byte) 112, + (byte) -38, (byte) 12, (byte) 99, (byte) 71, (byte) 74, (byte) -126, (byte) 11, (byte) 0, + (byte) 0, (byte) 0, (byte) 1, (byte) 0, (byte) 0, (byte) 0, (byte) 16, (byte) 3 }); + bas.close(); + } + } + + @Test + public void testWriteBytes() throws IOException { + Assert.assertThrows(NullPointerException.class, new Assert.ThrowingRunnable() { + @Override + public void run() throws Throwable { + try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(new ByteArrayOutputStream(), 3)) { + out.write(null); + } + } + }); + + ByteArrayOutputStream bas = new ByteArrayOutputStream(64); + try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) { + out.write(new byte[0]); + Assert.assertEquals(bas.toByteArray(), new byte[0]); + out.flush(); + Assert.assertEquals(bas.toByteArray(), new byte[0]); + + byte[] bytes = new byte[] { (byte) 13, (byte) 13 }; + out.write(bytes); + Assert.assertEquals(bas.toByteArray(), new byte[0]); + out.flush(); + Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 2, 3)); + bas.close(); + } + + bas = new ByteArrayOutputStream(64); + try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) { + byte[] bytes = new byte[] { (byte) 13, (byte) 13, (byte) 13 }; + out.write(bytes); + byte[] expected = genCompressedByts(13, 3, 3); + Assert.assertEquals(bas.toByteArray(), expected); + out.flush(); + Assert.assertEquals(bas.toByteArray(), expected); + bas.close(); + } + + bas = new ByteArrayOutputStream(64); + try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) { + byte[] bytes = new byte[] { (byte) 13, (byte) 13, (byte) 13, (byte) 13 }; + out.write(bytes); + Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 3, 3)); + out.flush(); + Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 4, 3)); + bas.close(); + } + + bas = new ByteArrayOutputStream(64); + try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) { + byte[] bytes = new byte[] { (byte) 13, (byte) 13, (byte) 13, (byte) 13, (byte) 13, (byte) 13 }; + out.write(bytes); + byte[] expected = genCompressedByts(13, 6, 3); + Assert.assertEquals(bas.toByteArray(), expected); + out.flush(); + Assert.assertEquals(bas.toByteArray(), expected); + bas.close(); + } + + bas = new ByteArrayOutputStream(64); + try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) { + byte[] bytes = new byte[] { (byte) 13, (byte) 13, (byte) 13, (byte) 13, (byte) 13, (byte) 13, (byte) 13 }; + out.write(bytes); + Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 6, 3)); + out.flush(); + Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 7, 3)); + bas.close(); + } + } + + @Test + public void testWriteBytesWithOffset() throws IOException { + Assert.assertThrows(NullPointerException.class, new Assert.ThrowingRunnable() { + @Override + public void run() throws Throwable { + try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(new ByteArrayOutputStream(), 3)) { + out.write(null, 0, 1); + } + } + }); + Assert.assertThrows(IndexOutOfBoundsException.class, new Assert.ThrowingRunnable() { + @Override + public void run() throws Throwable { + try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(new ByteArrayOutputStream(), 3)) { + out.write(new byte[0], 0, 1); + } + } + }); + Assert.assertThrows(IndexOutOfBoundsException.class, new Assert.ThrowingRunnable() { + @Override + public void run() throws Throwable { + try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(new ByteArrayOutputStream(), 3)) { + out.write(new byte[0], -1, 0); + } + } + }); + Assert.assertThrows(IndexOutOfBoundsException.class, new Assert.ThrowingRunnable() { + @Override + public void run() throws Throwable { + try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(new ByteArrayOutputStream(), 3)) { + out.write(new byte[1], 1, 1); + } + } + }); + + final byte[] bytes = new byte[] { (byte) 0, (byte) 13, (byte) 13, (byte) 13, (byte) 13, (byte) 13, (byte) 13, + (byte) 13, (byte) 0 }; + ByteArrayOutputStream bas = new ByteArrayOutputStream(64); + try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) { + out.write(bytes, 1, 0); + Assert.assertEquals(bas.toByteArray(), new byte[0]); + out.flush(); + Assert.assertEquals(bas.toByteArray(), new byte[0]); + out.write(bytes, 1, 2); + Assert.assertEquals(bas.toByteArray(), new byte[0]); + out.flush(); + Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 2, 3)); + bas.close(); + } + + bas = new ByteArrayOutputStream(64); + try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) { + out.write(bytes, 1, 3); + byte[] expected = genCompressedByts(13, 3, 3); + Assert.assertEquals(bas.toByteArray(), expected); + out.flush(); + Assert.assertEquals(bas.toByteArray(), expected); + bas.close(); + } + + bas = new ByteArrayOutputStream(64); + try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) { + out.write(bytes, 1, 4); + Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 3, 3)); + out.flush(); + Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 4, 3)); + bas.close(); + } + + bas = new ByteArrayOutputStream(64); + try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) { + out.write(bytes, 1, 6); + byte[] expected = genCompressedByts(13, 6, 3); + Assert.assertEquals(bas.toByteArray(), expected); + out.flush(); + Assert.assertEquals(bas.toByteArray(), expected); + bas.close(); + } + + bas = new ByteArrayOutputStream(64); + try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) { + out.write(bytes, 1, 7); + Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 6, 3)); + out.flush(); + Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 7, 3)); + bas.close(); + } + } +}