Skip to content

Commit

Permalink
Add support for tuple ClickHouse (#29715)
Browse files Browse the repository at this point in the history
* First version WIP

* Implement write tuple in RowBinary format

* Added complex tuple test

* Disable debug of javacc

* Move tuple preprocessing logic

* Adding to CHANGES.md & auto generated docs.

* Fix CHANGES.md & fix docs

* Fix spotless syntax

* Remove :; from parsing. Only adding ' to field name.

* Fix CHANGES.md to the correct version

* Change new types javadoc

---------

Co-authored-by: mzitnik <[email protected]>
  • Loading branch information
mzitnik and mzitnik authored Jan 2, 2024
1 parent 9da9b8d commit 1c1cfa8
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 3 deletions.
4 changes: 3 additions & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Adding support for Tuples DataType in ClickHouse (Java) ([#29715](https://github.com/apache/beam/pull/29715)).


## New Features / Improvements

Expand Down Expand Up @@ -98,10 +100,10 @@

* TextIO now supports skipping multiple header lines (Java) ([#17990](https://github.com/apache/beam/issues/17990)).
* Python GCSIO is now implemented with GCP GCS Client instead of apitools ([#25676](https://github.com/apache/beam/issues/25676))
* Adding support for LowCardinality DataType in ClickHouse (Java) ([#29533](https://github.com/apache/beam/pull/29533)).
* Added support for handling bad records to KafkaIO (Java) ([#29546](https://github.com/apache/beam/pull/29546))
* Add support for generating text embeddings in MLTransform for Vertex AI and Hugging Face Hub models.([#29564](https://github.com/apache/beam/pull/29564))
* NATS IO connector added (Go) ([#29000](https://github.com/apache/beam/issues/29000)).
* Adding support for LowCardinality (Java) ([#29533](https://github.com/apache/beam/pull/29533)).

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -109,6 +110,7 @@
* <tr><td>{@link TableSchema.TypeName#ENUM8}</td> <td>{@link Schema.TypeName#STRING}</td></tr>
* <tr><td>{@link TableSchema.TypeName#ENUM16}</td> <td>{@link Schema.TypeName#STRING}</td></tr>
* <tr><td>{@link TableSchema.TypeName#BOOL}</td> <td>{@link Schema.TypeName#BOOLEAN}</td></tr>
* <tr><td>{@link TableSchema.TypeName#TUPLE}</td> <td>{@link Schema.TypeName#ROW}</td></tr>
* </table>
*
* Nullable row columns are supported through Nullable type in ClickHouse. Low cardinality hint is
Expand Down Expand Up @@ -475,6 +477,15 @@ abstract static class Builder<T> {
}
}

private static String tuplePreprocessing(String payload) {
List<String> l =
Arrays.stream(payload.trim().split(","))
.map(s -> s.trim().replaceAll(" +", "' "))
.collect(Collectors.toList());
String content =
String.join(",", l).trim().replaceAll("Tuple\\(", "Tuple('").replaceAll(",", ",'");
return content;
}
/**
* Returns {@link TableSchema} for a given table.
*
Expand All @@ -498,7 +509,13 @@ public static TableSchema getTableSchema(String jdbcUrl, String table) {
String defaultTypeStr = rs.getString("default_type");
String defaultExpression = rs.getString("default_expression");

ColumnType columnType = ColumnType.parse(type);
ColumnType columnType = null;
if (type.toLowerCase().trim().startsWith("tuple(")) {
String content = tuplePreprocessing(type);
columnType = ColumnType.parse(content);
} else {
columnType = ColumnType.parse(type);
}
DefaultType defaultType = DefaultType.parse(defaultTypeStr).orElse(null);

Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import com.clickhouse.client.ClickHousePipedOutputStream;
import com.clickhouse.client.data.BinaryStreamUtils;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.RowWithStorage;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Charsets;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.joda.time.Days;
Expand Down Expand Up @@ -146,6 +148,20 @@ static void writeValue(ClickHouseOutputStream stream, ColumnType columnType, Obj
case BOOL:
BinaryStreamUtils.writeBoolean(stream, (Boolean) value);
break;
case TUPLE:
RowWithStorage rowValues = (RowWithStorage) value;
List<Object> tupleValues = rowValues.getValues();
Collection<ColumnType> columnTypesList = columnType.tupleTypes().values();
int index = 0;
for (ColumnType ct : columnTypesList) {
if (ct.nullable()) {
writeNullableValue(stream, ct, tupleValues.get(index));
} else {
writeValue(stream, ct, tupleValues.get(index));
}
index++;
}
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -111,6 +112,14 @@ public static Schema.FieldType getEquivalentFieldType(ColumnType columnType) {
return Schema.FieldType.STRING;
case BOOL:
return Schema.FieldType.BOOLEAN;
case TUPLE:
List<Schema.Field> fields =
columnType.tupleTypes().entrySet().stream()
.map(x -> Schema.Field.of(x.getKey(), Schema.FieldType.DATETIME))
.collect(Collectors.toList());
Schema.Field[] array = fields.toArray(new Schema.Field[fields.size()]);
Schema schema = Schema.of(array);
return Schema.FieldType.row(schema);
}

// not possible, errorprone checks for exhaustive switch
Expand Down Expand Up @@ -168,7 +177,9 @@ public enum TypeName {
// Composite type
ARRAY,
// Primitive type
BOOL
BOOL,
// Composite type
TUPLE
}

/**
Expand Down Expand Up @@ -208,6 +219,7 @@ public abstract static class ColumnType implements Serializable {
public static final ColumnType UINT32 = ColumnType.of(TypeName.UINT32);
public static final ColumnType UINT64 = ColumnType.of(TypeName.UINT64);
public static final ColumnType BOOL = ColumnType.of(TypeName.BOOL);
public static final ColumnType TUPLE = ColumnType.of(TypeName.TUPLE);

// ClickHouse doesn't allow nested nullables, so boolean flag is enough
public abstract boolean nullable();
Expand All @@ -220,6 +232,8 @@ public abstract static class ColumnType implements Serializable {

public abstract @Nullable ColumnType arrayElementType();

public abstract @Nullable Map<String, ColumnType> tupleTypes();

public ColumnType withNullable(boolean nullable) {
return toBuilder().nullable(nullable).build();
}
Expand Down Expand Up @@ -265,6 +279,14 @@ public static ColumnType array(ColumnType arrayElementType) {
.build();
}

public static ColumnType tuple(Map<String, ColumnType> elements) {
return ColumnType.builder()
.typeName(TypeName.TUPLE)
.nullable(false)
.tupleTypes(elements)
.build();
}

/**
* Parse string with ClickHouse type to {@link ColumnType}.
*
Expand Down Expand Up @@ -339,6 +361,8 @@ abstract static class Builder {

public abstract Builder fixedStringSize(Integer size);

public abstract Builder tupleTypes(Map<String, ColumnType> tupleElements);

public abstract ColumnType build();
}
}
Expand Down
47 changes: 47 additions & 0 deletions sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
options {
IGNORE_CASE=true;
DEBUG_PARSER = false;
DEBUG_LOOKAHEAD = false;
DEBUG_TOKEN_MANAGER = false;
STATIC = false;
}

PARSER_BEGIN(ColumnTypeParser)
Expand Down Expand Up @@ -99,6 +103,7 @@ TOKEN :
| < EQ : "=" >
| < BOOL : "BOOL" >
| < LOWCARDINALITY : "LOWCARDINALITY" >
| < TUPLE : "TUPLE" >
}

public ColumnType columnType() :
Expand All @@ -113,6 +118,7 @@ public ColumnType columnType() :
| ct = array()
| ct = nullable()
| ct = lowcardenality()
| ct = tuple()
)
{
return ct;
Expand Down Expand Up @@ -263,6 +269,33 @@ private Map<String, Integer> enumElements() :
}
}

private Map.Entry<String, ColumnType> tupleElement() :
{
String key;
ColumnType value;
Token token;
}
{
( (key = string() ) ( value = columnType() ) ) {
return Maps.immutableEntry(key, value);
}
}

private Map<String, ColumnType> tupleElements() :
{
Map.Entry<String, ColumnType> el;
List<Map.Entry<String, ColumnType>> entries = Lists.newArrayList();
}
{
(
( el = tupleElement() { entries.add(el); } )
( <COMMA> ( el = tupleElement() { entries.add(el); } ) )*
)
{
return ImmutableMap.copyOf(entries);
}
}

private ColumnType enum_() :
{
Map<String, Integer> elements;
Expand All @@ -289,4 +322,18 @@ private ColumnType lowcardenality() :
(
(<LOWCARDINALITY> <LPAREN> (ct = primitive()) <RPAREN>) { return ct; }
)
}

private ColumnType tuple() :
{
Map<String, ColumnType> elements;
}
{
(
(<TUPLE> <LPAREN> ( elements = tupleElements() ) <RPAREN>)
{
return ColumnType.tuple(elements);
}
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,84 @@ public void testArrayOfArrayOfInt64() throws Exception {
assertEquals(15L, sum0);
}

@Test
public void testTupleType() throws Exception {
Schema tupleSchema =
Schema.of(
Schema.Field.of("f0", FieldType.STRING), Schema.Field.of("f1", FieldType.BOOLEAN));
Schema schema = Schema.of(Schema.Field.of("t0", FieldType.row(tupleSchema)));
Row row1Tuple = Row.withSchema(tupleSchema).addValue("tuple").addValue(true).build();

Row row1 = Row.withSchema(schema).addValue(row1Tuple).build();

executeSql(
"CREATE TABLE test_named_tuples (" + "t0 Tuple(`f0` String, `f1` Bool)" + ") ENGINE=Log");

pipeline.apply(Create.of(row1).withRowSchema(schema)).apply(write("test_named_tuples"));

pipeline.run().waitUntilFinish();

try (ResultSet rs = executeQuery("SELECT * FROM test_named_tuples")) {
rs.next();
assertEquals("('tuple',true)", rs.getString("t0"));
}

try (ResultSet rs = executeQuery("SELECT t0.f0 as f0, t0.f1 as f1 FROM test_named_tuples")) {
rs.next();
assertEquals("tuple", rs.getString("f0"));
assertEquals("true", rs.getString("f1"));
}
}

@Test
public void testComplexTupleType() throws Exception {
Schema sizeSchema =
Schema.of(
Schema.Field.of("width", FieldType.INT64.withNullable(true)),
Schema.Field.of("height", FieldType.INT64.withNullable(true)));

Schema browserSchema =
Schema.of(
Schema.Field.of("name", FieldType.STRING.withNullable(true)),
Schema.Field.of("size", FieldType.row(sizeSchema)),
Schema.Field.of("version", FieldType.STRING.withNullable(true)));

Schema propSchema =
Schema.of(
Schema.Field.of("browser", FieldType.row(browserSchema)),
Schema.Field.of("deviceCategory", FieldType.STRING.withNullable(true)));

Schema schema = Schema.of(Schema.Field.of("prop", FieldType.row(propSchema)));

Row sizeRow = Row.withSchema(sizeSchema).addValue(10L).addValue(20L).build();
Row browserRow =
Row.withSchema(browserSchema).addValue("test").addValue(sizeRow).addValue("1.0.0").build();
Row propRow = Row.withSchema(propSchema).addValue(browserRow).addValue("mobile").build();
Row row1 = Row.withSchema(schema).addValue(propRow).build();

executeSql(
"CREATE TABLE test_named_complex_tuples ("
+ "`prop` Tuple(`browser` Tuple(`name` Nullable(String),`size` Tuple(`width` Nullable(Int64), `height` Nullable(Int64)),`version` Nullable(String)),`deviceCategory` Nullable(String))"
+ ") ENGINE=Log");

pipeline.apply(Create.of(row1).withRowSchema(schema)).apply(write("test_named_complex_tuples"));

pipeline.run().waitUntilFinish();

try (ResultSet rs = executeQuery("SELECT * FROM test_named_complex_tuples")) {
rs.next();
assertEquals("(('test',(10,20),'1.0.0'),'mobile')", rs.getString("prop"));
}

try (ResultSet rs =
executeQuery(
"SELECT prop.browser.name as name, prop.browser.size as size FROM test_named_complex_tuples")) {
rs.next();
assertEquals("test", rs.getString("name"));
assertEquals("(10,20)", rs.getString("size"));
}
}

@Test
public void testPrimitiveTypes() throws Exception {
Schema schema =
Expand Down
Loading

0 comments on commit 1c1cfa8

Please sign in to comment.