Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
0.3.0
* BREAKING CHANGE - dropped JDK 7 support
* BREAKING CHANGE - removed Guava dependency(and so is UnsignedLong)
* JDBC 4.2 support
* add connection setting client_name for load-balancing and troubleshooting
* add writeBytes & writeUUIDArray and remove UnsignedLong related methods in ClickHouseRowBinaryStream
* support more data types: IPv4, IPv6, Int128, UInt128, Int256, UInt256, Decimal256, DateTime*, and Map
* support ORC/Parquet streaming
* support read/write Bitmap from/into AggregateFunction(groupBitmap, UInt[8-64]) column
* throw SQLException instead of RuntimeException when instantiating ClickHouseConnectionImpl
* fix error when using ClickHouseCompression.none against 19.16
* fix NegativeArraySizeException when dealing with large array
* fix datetime/date display issue caused by timezone differences(between client and column/server)
0.2.6
* add new feature for sending compressed files/streams
* introduce an experimental SQL parser to fix parsing related issues - set connection setting use_new_parser to false to disable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ public enum ClickHouseFormat {
Vertical,
JSON,
JSONCompact,
JSONCompactString,
JSONEachRow,
JSONStringEachRow,
JSONCompactEachRow,
JSONCompactStringEachRow,
TSKV,
TSV,
Pretty,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ru.yandex.clickhouse.util;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutput;
Expand All @@ -9,13 +10,11 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Objects;

import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

import ru.yandex.clickhouse.domain.ClickHouseDataType;

public abstract class ClickHouseBitmap {
Expand Down Expand Up @@ -131,16 +130,24 @@ public long getLongCardinality() {
@Override
public void serialize(ByteBuffer buffer) {
int size = serializedSizeInBytes();
// TODO use custom data output so that we can handle large byte array
try (ByteArrayOutputStream bas = new ByteArrayOutputStream(size)) {
DataOutput out = new DataOutputStream(bas);
try {
// https://github.com/RoaringBitmap/RoaringBitmap/blob/0.9.9/RoaringBitmap/src/main/java/org/roaringbitmap/longlong/Roaring64NavigableMap.java#L1105
rb.serialize(out);
} catch (IOException e) {
throw new IllegalArgumentException("Failed to serialize given bitmap", e);
}
buffer.put(bas.toByteArray(), 5, size - 5);

byte[] bytes = bas.toByteArray();
for (int i = 4; i > 0; i--) {
buffer.put(bytes[i]);
}
buffer.putInt(0);
buffer.put(bytes, 5, size - 5);
} catch (IOException e) {
throw new IllegalArgumentException("Failed to serialize given bitmap", e);
throw new IllegalStateException("Failed to serialize given bitmap", e);
}
}

Expand Down Expand Up @@ -253,6 +260,8 @@ public static ClickHouseBitmap wrap(Object bitmap, ClickHouseDataType innerType)
}

public static ClickHouseBitmap deserialize(DataInputStream in, ClickHouseDataType innerType) throws IOException {
final ClickHouseBitmap rb;

int byteLen = byteLength(innerType);
int flag = in.readUnsignedByte();
if (flag == 0) {
Expand All @@ -262,20 +271,36 @@ public static ClickHouseBitmap deserialize(DataInputStream in, ClickHouseDataTyp
bytes[1] = cardinality;
in.read(bytes, 2, bytes.length - 2);

return ClickHouseBitmap.deserialize(bytes, innerType);
} else if (byteLen <= 4) {
rb = ClickHouseBitmap.deserialize(bytes, innerType);
} else {
int len = Utils.readVarInt(in);
byte[] bytes = new byte[len];
Utils.readFully(in, bytes);
RoaringBitmap b = new RoaringBitmap();
b.deserialize(flip(newBuffer(len).put(bytes)));
return ClickHouseBitmap.wrap(b, innerType);
} else {
// why? when serializing Roaring64NavigableMap, the initial 5 bytes were removed
// with 8 unknown bytes appended
throw new UnsupportedOperationException(
"Deserializing Roaring64NavigableMap with cardinality larger than 32 is currently not supported.");

if (byteLen <= 4) {
Utils.readFully(in, bytes);
RoaringBitmap b = new RoaringBitmap();
b.deserialize(flip(newBuffer(len).put(bytes)));
rb = ClickHouseBitmap.wrap(b, innerType);
} else {
// TODO implement a wrapper of DataInput to get rid of byte array here
bytes[0] = (byte) 0; // always unsigned
// read map size in big-endian byte order
for (int i = 4; i > 0; i--) {
bytes[i] = in.readByte();
}
if (in.readByte() != 0 || in.readByte() != 0 || in.readByte() != 0 || in.readByte() != 0) {
throw new IllegalStateException(
"Not able to deserialize ClickHouseBitmap for too many bitmaps(>" + 0xFFFFFFFFL + ")!");
}
// read the rest
Utils.readFully(in, bytes, 5, len - 5);
Roaring64NavigableMap b = new Roaring64NavigableMap();
b.deserialize(new DataInputStream(new ByteArrayInputStream(bytes)));
rb = ClickHouseBitmap.wrap(b, innerType);
}
}

return rb;
}

public static ClickHouseBitmap deserialize(byte[] bytes, ClickHouseDataType innerType) throws IOException {
Expand All @@ -287,10 +312,7 @@ public static ClickHouseBitmap deserialize(byte[] bytes, ClickHouseDataType inne
}

int byteLen = byteLength(innerType);
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
if (buffer.order() != ByteOrder.LITTLE_ENDIAN) {
buffer = buffer.slice().order(ByteOrder.LITTLE_ENDIAN);
}
ByteBuffer buffer = newBuffer(bytes.length);
buffer = (ByteBuffer) ((Buffer) buffer.put(bytes)).flip();

if (buffer.get() == (byte) 0) { // small set
Expand Down Expand Up @@ -331,10 +353,29 @@ public static ClickHouseBitmap deserialize(byte[] bytes, ClickHouseDataType inne
b.deserialize(buffer);
rb = ClickHouseBitmap.wrap(b, innerType);
} else {
// why? when serializing Roaring64NavigableMap, the initial 5 bytes were removed
// with 8 unknown bytes appended
throw new UnsupportedOperationException(
"Deserializing Roaring64NavigableMap with cardinality larger than 32 is currently not supported.");
// consume map size(long in little-endian byte order)
byte[] bitmaps = new byte[4];
buffer.get(bitmaps);
if (buffer.get() != 0 || buffer.get() != 0 || buffer.get() != 0 || buffer.get() != 0) {
throw new IllegalStateException(
"Not able to deserialize ClickHouseBitmap for too many bitmaps(>" + 0xFFFFFFFFL + ")!");
}
// replace the last 5 bytes to flag(boolean for signed/unsigned) and map
// size(integer)
buffer.position(buffer.position() - 5);
// always unsigned due to limit of CRoaring
buffer.put((byte) 0);
// big-endian -> little-endian
for (int i = 3; i >= 0; i--) {
buffer.put(bitmaps[i]);
}

buffer.position(buffer.position() - 5);
bitmaps = new byte[buffer.remaining()];
buffer.get(bitmaps);
Roaring64NavigableMap b = new Roaring64NavigableMap();
b.deserialize(new DataInputStream(new ByteArrayInputStream(bitmaps)));
rb = ClickHouseBitmap.wrap(b, innerType);
}
}

Expand Down Expand Up @@ -436,15 +477,17 @@ public long[] toLongArray() {
return longs;
}

/**
* Serialize the bitmap into a flipped ByteBuffer.
*
* @return flipped byte buffer
*/
public ByteBuffer toByteBuffer() {
ByteBuffer buf;

int cardinality = getCardinality();
if (cardinality <= 32) {
buf = ByteBuffer.allocate(2 + byteLen * cardinality);
if (buf.order() != ByteOrder.LITTLE_ENDIAN) {
buf = buf.slice().order(ByteOrder.LITTLE_ENDIAN);
}
buf = newBuffer(2 + byteLen * cardinality);
buf.put((byte) 0);
buf.put((byte) cardinality);
if (byteLen == 1) {
Expand All @@ -468,28 +511,23 @@ public ByteBuffer toByteBuffer() {
int size = serializedSizeInBytes();
int varIntSize = Utils.getVarIntSize(size);

buf = ByteBuffer.allocate(1 + varIntSize + size);
if (buf.order() != ByteOrder.LITTLE_ENDIAN) {
buf = buf.slice().order(ByteOrder.LITTLE_ENDIAN);
}
buf = newBuffer(1 + varIntSize + size);
buf.put((byte) 1);
Utils.writeVarInt(size, buf);
serialize(buf);
} else { // 64
// 1) exclude the leading 5 bytes - boolean flag + map size, see below:
// 1) deduct one to exclude the leading byte - boolean flag, see below:
// https://github.com/RoaringBitmap/RoaringBitmap/blob/0.9.9/RoaringBitmap/src/main/java/org/roaringbitmap/longlong/Roaring64NavigableMap.java#L1107
// 2) not sure what's the extra 8 bytes?
long size = serializedSizeInBytesAsLong() - 5 + 8;
// 2) add 4 bytes because CRoaring uses long to store count of 32-bit bitmaps,
// while Java uses int - see
// https://github.com/RoaringBitmap/CRoaring/blob/v0.2.66/cpp/roaring64map.hh#L597
long size = serializedSizeInBytesAsLong() - 1 + 4;
int varIntSize = Utils.getVarLongSize(size);
// TODO add serialize(DataOutput) to handle more
int intSize = (int) size;
buf = ByteBuffer.allocate(1 + varIntSize + intSize);
if (buf.order() != ByteOrder.LITTLE_ENDIAN) {
buf = buf.slice().order(ByteOrder.LITTLE_ENDIAN);
}
buf = newBuffer(1 + varIntSize + intSize);
buf.put((byte) 1);
Utils.writeVarInt(intSize, buf);
buf.putLong(1L); // what's this?
serialize(buf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,11 @@ private int[] genRoaringBitmapValues(int length, ClickHouseDataType innerType) {
return values;
}

private long[] genRoaring64BitmapValues(int length) {
private long[] gen64BitmapValues(int length, long base, long step) {
long[] values = new long[length];

for (int i = 0; i < length; i++) {
values[i] = 100000L + i;
values[i] = base + i * step;
}

return values;
Expand Down Expand Up @@ -219,11 +219,11 @@ public void writeTo(ClickHouseRowBinaryStream stream) throws IOException {
}
}

private void testBitmap64(int valueLength) throws Exception {
private void testBitmap64(int valueLength, long base, long step) throws Exception {
ClickHouseDataType innerType = ClickHouseDataType.UInt64;
try (ClickHouseStatement statement = connection.createStatement()) {
String tableName = createtestBitmapTable(innerType);
long[] values = genRoaring64BitmapValues(valueLength);
long[] values = gen64BitmapValues(valueLength, base, step);
statement.sendRowBinaryStream("insert into table " + tableName, new ClickHouseStreamCallback() {
@Override
public void writeTo(ClickHouseRowBinaryStream stream) throws IOException {
Expand All @@ -247,11 +247,7 @@ public void writeTo(ClickHouseRowBinaryStream stream) throws IOException {

sql = "select b from " + tableName + " order by i";
try (ClickHouseRowBinaryInputStream in = statement.executeQueryClickhouseRowBinaryStream(sql)) {
if (valueLength <= 32) {
assertEquals(in.readBitmap(innerType), ClickHouseBitmap.wrap(Roaring64NavigableMap.bitmapOf(values), innerType));
} else {
assertThrows(UnsupportedOperationException.class, () -> in.readBitmap(innerType));
}
assertEquals(in.readBitmap(innerType), ClickHouseBitmap.wrap(Roaring64NavigableMap.bitmapOf(values), innerType));
}

statement.execute("drop table if exists " + tableName);
Expand All @@ -268,13 +264,15 @@ public void testBitmap() throws Exception {
testBitmap(ClickHouseDataType.UInt32, 32);
testBitmap(ClickHouseDataType.UInt32, 65537);

testBitmap64(32);
testBitmap64(32, 0L, 1L);
testBitmap64(32, Long.MAX_VALUE, -1L);

String versionNumber = connection.getServerVersion();
int majorVersion = ClickHouseVersionNumberUtil.getMajorVersion(versionNumber);
int minorVersion = ClickHouseVersionNumberUtil.getMinorVersion(versionNumber);
if (majorVersion > 20 || (majorVersion == 20 && minorVersion > 8)) {
testBitmap64(65537);
testBitmap64(65537, 100000L, 1L); // highToBitmap.size() == 1
testBitmap64(65537, 9223372036854775807L, -1000000000L); // highToBitmap.size() > 1
}
}

Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@
<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
<repository>
<id>ossrh</id>
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2/</url>
<url>https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/</url>
</repository>
</distributionManagement>

Expand Down Expand Up @@ -394,7 +394,7 @@
</executions>
<configuration>
<serverId>ossrh</serverId>
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
<nexusUrl>https://s01.oss.sonatype.org/</nexusUrl>
</configuration>
</plugin>
</plugins>
Expand Down