Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
<project.current.year>2021</project.current.year>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<httpclient.version>4.5.13</httpclient.version>
<lz4.version>1.3.0</lz4.version>
<lz4.version>1.7.1</lz4.version>
<jackson-core.version>2.9.10</jackson-core.version>
<jackson-databind.version>2.9.10.8</jackson-databind.version>
<guava.version>29.0-jre</guava.version>
Expand Down Expand Up @@ -98,8 +98,8 @@
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>${lz4.version}</version>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

public class ClickHouseLZ4Stream extends InputStream {

private static final LZ4Factory factory = LZ4Factory.safeInstance();
private static final LZ4Factory factory = LZ4Factory.fastestInstance();

public static final int MAGIC = 0x82;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import java.io.OutputStream;

public class ClickHouseLZ4OutputStream extends OutputStream {
private static final LZ4Factory factory = LZ4Factory.safeInstance();
private static final LZ4Factory factory = LZ4Factory.fastestInstance();
private final LittleEndianDataOutputStream dataWrapper;

private final LZ4Compressor compressor;
Expand All @@ -26,8 +26,8 @@ public ClickHouseLZ4OutputStream(OutputStream stream, int maxCompressBlockSize)
}

/**
* @return Location of pointer in the byte buffer (bytes not yet flushed)
*/
* @return Location of pointer in the byte buffer (bytes not yet flushed)
*/
public int position() {
return pointer;
}
Expand All @@ -42,17 +42,44 @@ public void write(int b) throws IOException {
}
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}

int blockSize = currentBlock.length;
int rest = blockSize - pointer;
while (len >= rest) {
System.arraycopy(b, off, currentBlock, pointer, rest);
pointer += rest;
writeBlock();
off += rest;
len -= rest;
rest = blockSize;
}

if (len > 0) {
System.arraycopy(b, off, currentBlock, pointer, len);
pointer += len;
}
}

@Override
public void flush() throws IOException {
if (pointer != 0){
if (pointer != 0) {
writeBlock();
}
dataWrapper.flush();
}

private void writeBlock() throws IOException {
int compressed = compressor.compress(currentBlock, 0, pointer, compressedBlock, 0);
ClickHouseBlockChecksum checksum = ClickHouseBlockChecksum.calculateForBlock((byte)ClickHouseLZ4Stream.MAGIC,
ClickHouseBlockChecksum checksum = ClickHouseBlockChecksum.calculateForBlock((byte) ClickHouseLZ4Stream.MAGIC,
compressed + 9, pointer, compressedBlock, compressed);
dataWrapper.write(checksum.asBytes());
dataWrapper.writeByte(ClickHouseLZ4Stream.MAGIC);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package ru.yandex.clickhouse.util;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

import org.testng.Assert;
import org.testng.annotations.Test;

public class ClickHouseLZ4OutputStreamTest {
private byte[] genCompressedByts(int b, int length, int blockSize) throws IOException {
ByteArrayOutputStream bas = new ByteArrayOutputStream(blockSize * 512);
try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, blockSize)) {
for (int i = 0; i < length; i++) {
out.write(b);
}
out.flush();
}

byte[] bytes = bas.toByteArray();
bas.close();
return bytes;
}

@Test
public void testWrite() throws IOException {
ByteArrayOutputStream bas = new ByteArrayOutputStream(64);

try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 2)) {
byte[] bytes = new byte[] { (byte) -36, (byte) -86, (byte) 31, (byte) 113, (byte) -106, (byte) 44,
(byte) 99, (byte) 96, (byte) 112, (byte) -7, (byte) 47, (byte) 15, (byte) -63, (byte) 39,
(byte) -73, (byte) -104, (byte) -126, (byte) 12, (byte) 0, (byte) 0, (byte) 0, (byte) 2, (byte) 0,
(byte) 0, (byte) 0, (byte) 32, (byte) 1, (byte) 2 };
out.write(1);
Assert.assertEquals(bas.toByteArray(), new byte[0]);
out.write(2);
Assert.assertEquals(bas.toByteArray(), bytes);
out.write(3);
Assert.assertEquals(bas.toByteArray(), bytes);
out.flush();
Assert.assertEquals(bas.toByteArray(),
new byte[] { (byte) -36, (byte) -86, (byte) 31, (byte) 113, (byte) -106, (byte) 44, (byte) 99,
(byte) 96, (byte) 112, (byte) -7, (byte) 47, (byte) 15, (byte) -63, (byte) 39, (byte) -73,
(byte) -104, (byte) -126, (byte) 12, (byte) 0, (byte) 0, (byte) 0, (byte) 2, (byte) 0,
(byte) 0, (byte) 0, (byte) 32, (byte) 1, (byte) 2, (byte) 64, (byte) -39, (byte) 21,
(byte) 50, (byte) -77, (byte) -124, (byte) 25, (byte) 73, (byte) -59, (byte) 9, (byte) 112,
(byte) -38, (byte) 12, (byte) 99, (byte) 71, (byte) 74, (byte) -126, (byte) 11, (byte) 0,
(byte) 0, (byte) 0, (byte) 1, (byte) 0, (byte) 0, (byte) 0, (byte) 16, (byte) 3 });
bas.close();
}
}

@Test
public void testWriteBytes() throws IOException {
Assert.assertThrows(NullPointerException.class, new Assert.ThrowingRunnable() {
@Override
public void run() throws Throwable {
try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(new ByteArrayOutputStream(), 3)) {
out.write(null);
}
}
});

ByteArrayOutputStream bas = new ByteArrayOutputStream(64);
try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) {
out.write(new byte[0]);
Assert.assertEquals(bas.toByteArray(), new byte[0]);
out.flush();
Assert.assertEquals(bas.toByteArray(), new byte[0]);

byte[] bytes = new byte[] { (byte) 13, (byte) 13 };
out.write(bytes);
Assert.assertEquals(bas.toByteArray(), new byte[0]);
out.flush();
Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 2, 3));
bas.close();
}

bas = new ByteArrayOutputStream(64);
try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) {
byte[] bytes = new byte[] { (byte) 13, (byte) 13, (byte) 13 };
out.write(bytes);
byte[] expected = genCompressedByts(13, 3, 3);
Assert.assertEquals(bas.toByteArray(), expected);
out.flush();
Assert.assertEquals(bas.toByteArray(), expected);
bas.close();
}

bas = new ByteArrayOutputStream(64);
try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) {
byte[] bytes = new byte[] { (byte) 13, (byte) 13, (byte) 13, (byte) 13 };
out.write(bytes);
Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 3, 3));
out.flush();
Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 4, 3));
bas.close();
}

bas = new ByteArrayOutputStream(64);
try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) {
byte[] bytes = new byte[] { (byte) 13, (byte) 13, (byte) 13, (byte) 13, (byte) 13, (byte) 13 };
out.write(bytes);
byte[] expected = genCompressedByts(13, 6, 3);
Assert.assertEquals(bas.toByteArray(), expected);
out.flush();
Assert.assertEquals(bas.toByteArray(), expected);
bas.close();
}

bas = new ByteArrayOutputStream(64);
try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) {
byte[] bytes = new byte[] { (byte) 13, (byte) 13, (byte) 13, (byte) 13, (byte) 13, (byte) 13, (byte) 13 };
out.write(bytes);
Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 6, 3));
out.flush();
Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 7, 3));
bas.close();
}
}

@Test
public void testWriteBytesWithOffset() throws IOException {
Assert.assertThrows(NullPointerException.class, new Assert.ThrowingRunnable() {
@Override
public void run() throws Throwable {
try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(new ByteArrayOutputStream(), 3)) {
out.write(null, 0, 1);
}
}
});
Assert.assertThrows(IndexOutOfBoundsException.class, new Assert.ThrowingRunnable() {
@Override
public void run() throws Throwable {
try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(new ByteArrayOutputStream(), 3)) {
out.write(new byte[0], 0, 1);
}
}
});
Assert.assertThrows(IndexOutOfBoundsException.class, new Assert.ThrowingRunnable() {
@Override
public void run() throws Throwable {
try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(new ByteArrayOutputStream(), 3)) {
out.write(new byte[0], -1, 0);
}
}
});
Assert.assertThrows(IndexOutOfBoundsException.class, new Assert.ThrowingRunnable() {
@Override
public void run() throws Throwable {
try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(new ByteArrayOutputStream(), 3)) {
out.write(new byte[1], 1, 1);
}
}
});

final byte[] bytes = new byte[] { (byte) 0, (byte) 13, (byte) 13, (byte) 13, (byte) 13, (byte) 13, (byte) 13,
(byte) 13, (byte) 0 };
ByteArrayOutputStream bas = new ByteArrayOutputStream(64);
try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) {
out.write(bytes, 1, 0);
Assert.assertEquals(bas.toByteArray(), new byte[0]);
out.flush();
Assert.assertEquals(bas.toByteArray(), new byte[0]);
out.write(bytes, 1, 2);
Assert.assertEquals(bas.toByteArray(), new byte[0]);
out.flush();
Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 2, 3));
bas.close();
}

bas = new ByteArrayOutputStream(64);
try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) {
out.write(bytes, 1, 3);
byte[] expected = genCompressedByts(13, 3, 3);
Assert.assertEquals(bas.toByteArray(), expected);
out.flush();
Assert.assertEquals(bas.toByteArray(), expected);
bas.close();
}

bas = new ByteArrayOutputStream(64);
try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) {
out.write(bytes, 1, 4);
Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 3, 3));
out.flush();
Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 4, 3));
bas.close();
}

bas = new ByteArrayOutputStream(64);
try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) {
out.write(bytes, 1, 6);
byte[] expected = genCompressedByts(13, 6, 3);
Assert.assertEquals(bas.toByteArray(), expected);
out.flush();
Assert.assertEquals(bas.toByteArray(), expected);
bas.close();
}

bas = new ByteArrayOutputStream(64);
try (ClickHouseLZ4OutputStream out = new ClickHouseLZ4OutputStream(bas, 3)) {
out.write(bytes, 1, 7);
Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 6, 3));
out.flush();
Assert.assertEquals(bas.toByteArray(), genCompressedByts(13, 7, 3));
bas.close();
}
}
}