From 76ad0b8db5ec64a6cc815b10f32a012811ac433d Mon Sep 17 00:00:00 2001 From: wobu Date: Mon, 21 Nov 2022 15:39:07 +0100 Subject: [PATCH] reworked partition field implementation. Partitioning is now decoupled from upsert config key. Any field can be specified. based on https://github.com/memiiso/debezium-server-iceberg/pull/108 --- CHANGELOG.md | 1 + README.md | 45 +- .../iceberg/sink/IcebergChangeConsumer.java | 2 +- .../iceberg/sink/IcebergChangeEvent.java | 510 +++++++++--------- .../sink/IcebergSinkConfiguration.java | 8 + .../connect/iceberg/sink/IcebergUtil.java | 110 ++-- .../connect/iceberg/sink/TestIcebergUtil.java | 50 +- .../IcebergTableOperatorTest.java | 3 +- 8 files changed, 393 insertions(+), 336 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a2d142..7b11ce1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## [Unreleased] - removed 'table.write-format', can be replaced with 'iceberg.table-default.write.format.default' +- Reworked partition field implementation. Added config key 'table.partition-field'. Removed dependency on 'upsert' feature ## [0.2.0] - 2022-11-16 diff --git a/README.md b/README.md index 955ebb4..d83a8f5 100644 --- a/README.md +++ b/README.md @@ -12,21 +12,22 @@ mvn clean package ### Configuration reference -| Key | Type | Default value | Description | -|-------------------------|---------|----------------|----------------------------------------------------------------------------------------------------------------------------------------| -| upsert | boolean | true | When *true* Iceberg rows will be updated based on table primary key. When *false* all modification will be added as separate rows. | -| upsert.keep-deletes | boolean | true | When *true* delete operation will leave a tombstone that will have only a primary key and *__deleted** flag set to true | -| upsert.dedup-column | String | __source_ts_ms | Column used to check which state is newer during upsert | -| upsert.op-column | String | __op | Column used to check which state is newer during upsert when *upsert.dedup-column* is not enough to resolve | -| allow-field-addition | boolean | true | When *true* sink will be adding new columns to Iceberg tables on schema changes | -| table.auto-create | boolean | false | When *true* sink will automatically create new Iceberg tables | -| table.namespace | String | default | Table namespace. In Glue it will be used as database name | -| table.prefix | String | *empty string* | Prefix added to all table names | -| iceberg.name | String | default | Iceberg catalog name | -| iceberg.catalog-impl | String | *null* | Iceberg catalog implementation (Only one of iceberg.catalog-impl and iceberg.type can be set to non null value at the same time | -| iceberg.type | String | *null* | Iceberg catalog type (Only one of iceberg.catalog-impl and iceberg.type can be set to non null value at the same time) | -| iceberg.* | | | All properties with this prefix will be passed to Iceberg Catalog implementation | -| iceberg.table-default.* | | | Iceberg specific table settings can be changed with this prefix, e.g. 'iceberg.table-default.write.format.default' can be set to 'orc' | +| Key | Type | Default value | Description | +|-------------------------|---------|----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------| +| upsert | boolean | true | When *true* Iceberg rows will be updated based on table primary key. When *false* all modification will be added as separate rows. | +| upsert.keep-deletes | boolean | true | When *true* delete operation will leave a tombstone that will have only a primary key and *__deleted** flag set to true | +| upsert.dedup-column | String | __source_ts_ms | Column used to check which state is newer during upsert | +| upsert.op-column | String | __op | Column used to check which state is newer during upsert when *upsert.dedup-column* is not enough to resolve | +| allow-field-addition | boolean | true | When *true* sink will be adding new columns to Iceberg tables on schema changes | +| table.auto-create | boolean | false | When *true* sink will automatically create new Iceberg tables | +| table.namespace | String | default | Table namespace. In Glue it will be used as database name | +| table.prefix | String | *empty string* | Prefix added to all table names | +| table.partition-field | String | *null* | Field name used for partitioning. When a Timestamp or Date field is used, the partitioning is transformed by 'day()', otherwise 'identity()' is used. | +| iceberg.name | String | default | Iceberg catalog name | +| iceberg.catalog-impl | String | *null* | Iceberg catalog implementation (Only one of iceberg.catalog-impl and iceberg.type can be set to non null value at the same time | +| iceberg.type | String | *null* | Iceberg catalog type (Only one of iceberg.catalog-impl and iceberg.type can be set to non null value at the same time) | +| iceberg.* | | | All properties with this prefix will be passed to Iceberg Catalog implementation | +| iceberg.table-default.* | | | Iceberg specific table settings can be changed with this prefix, e.g. 'iceberg.table-default.write.format.default' can be set to 'orc' | ### REST / Manual based installation @@ -302,18 +303,10 @@ Rows cannot be updated nor removed unless primary key is defined. In case of del ### Iceberg partitioning support -Currently, partitioning is done automatically based on event time. Partitioning only works when Debezium is configured in append-only mode (`upsert: false`). +In case when the Iceberg table gets auto created, a partitioning spec can be configured by setting config key `table.partition-field` to the field which shall be partitioned by. +When the field is of [type](https://iceberg.apache.org/spec/#partition-transforms) `date` or `timestamp` the [partition transformation](https://iceberg.apache.org/spec/#partition-transforms) `day()` is applied, otherwise `identity()` is used. -Any event produced by debezium source contains a source time at which the transaction was committed: - -```sql -"sourceOffset": { - ... - "ts_ms": "1482918357011" -} -``` - -From this value day part is extracted and used as partition. +Note: At the moment only the fields `__ts_ms`, `__source_ts_ms` are converted to the type `timestamp`. This fields are normally added by debezium transformed, e.g. [New Record State Extraction](https://debezium.io/documentation/reference/stable/transformations/event-flattening.html#_adding_metadata) ## Debezium change event format support diff --git a/src/main/java/com/getindata/kafka/connect/iceberg/sink/IcebergChangeConsumer.java b/src/main/java/com/getindata/kafka/connect/iceberg/sink/IcebergChangeConsumer.java index eee1409..aa3ca26 100644 --- a/src/main/java/com/getindata/kafka/connect/iceberg/sink/IcebergChangeConsumer.java +++ b/src/main/java/com/getindata/kafka/connect/iceberg/sink/IcebergChangeConsumer.java @@ -60,7 +60,7 @@ private Table loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId, if (!configuration.isTableAutoCreate()) { throw new ConnectException(String.format("Table '%s' not found! Set '%s' to true to create tables automatically!", tableId, TABLE_AUTO_CREATE)); } - return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), !configuration.isUpsert()); + return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), configuration.getTablePartitionField()); }); } } diff --git a/src/main/java/com/getindata/kafka/connect/iceberg/sink/IcebergChangeEvent.java b/src/main/java/com/getindata/kafka/connect/iceberg/sink/IcebergChangeEvent.java index 1e10d0b..9d55bd2 100644 --- a/src/main/java/com/getindata/kafka/connect/iceberg/sink/IcebergChangeEvent.java +++ b/src/main/java/com/getindata/kafka/connect/iceberg/sink/IcebergChangeEvent.java @@ -29,296 +29,296 @@ * @author Ismail Simsek */ public class IcebergChangeEvent { - private static final ObjectMapper MAPPER = new ObjectMapper(); - private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeEvent.class); - private final String destination; - private final JsonNode value; - private final JsonNode key; - private final JsonSchema jsonSchema; - - public IcebergChangeEvent(String destination, - JsonNode value, - JsonNode key, - JsonNode valueSchema, - JsonNode keySchema) { - this.destination = destination; - this.value = value; - this.key = key; - this.jsonSchema = new JsonSchema(valueSchema, keySchema); - } - - public JsonNode key() { - return key; - } - - public JsonNode value() { - return value; - } - - public JsonSchema jsonSchema() { - return jsonSchema; - } - - public Schema icebergSchema() { - return jsonSchema.icebergSchema(); - } - - public String destinationTable() { - return destination.replace(".", "_").replace("-", "_"); - } - - public GenericRecord asIcebergRecord(Schema schema) { - final GenericRecord record = asIcebergRecord(schema.asStruct(), value); - - if (value != null && value.has("__source_ts_ms") && value.get("__source_ts_ms") != null) { - final long source_ts_ms = value.get("__source_ts_ms").longValue(); - final OffsetDateTime odt = OffsetDateTime.ofInstant(Instant.ofEpochMilli(source_ts_ms), ZoneOffset.UTC); - record.setField("__source_ts", odt); - } else { - record.setField("__source_ts", null); - } - return record; - } - - private GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode data) { - LOGGER.debug("Processing nested field:{}", tableFields); - GenericRecord record = GenericRecord.create(tableFields); - - for (Types.NestedField field : tableFields.fields()) { - // Set value to null if json event don't have the field - if (data == null || !data.has(field.name()) || data.get(field.name()) == null) { - record.setField(field.name(), null); - continue; - } - // get the value of the field from json event, map it to iceberg value - record.setField(field.name(), jsonValToIcebergVal(field, data.get(field.name()))); + private static final ObjectMapper MAPPER = new ObjectMapper(); + public static final List TS_MS_FIELDS = List.of("__ts_ms", "__source_ts_ms"); + private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeEvent.class); + private final String destination; + private final JsonNode value; + private final JsonNode key; + private final JsonSchema jsonSchema; + + public IcebergChangeEvent(String destination, + JsonNode value, + JsonNode key, + JsonNode valueSchema, + JsonNode keySchema) { + this.destination = destination; + this.value = value; + this.key = key; + this.jsonSchema = new JsonSchema(valueSchema, keySchema); } - return record; - } - - private Type.PrimitiveType icebergFieldType(String fieldType) { - switch (fieldType) { - case "int8": - case "int16": - case "int32": // int 4 bytes - return Types.IntegerType.get(); - case "int64": // long 8 bytes - return Types.LongType.get(); - case "float8": - case "float16": - case "float32": // float is represented in 32 bits, - return Types.FloatType.get(); - case "float64": // double is represented in 64 bits - return Types.DoubleType.get(); - case "boolean": - return Types.BooleanType.get(); - case "string": - return Types.StringType.get(); - case "bytes": - return Types.BinaryType.get(); - default: - // default to String type - return Types.StringType.get(); - //throw new RuntimeException("'" + fieldName + "' has "+fieldType+" type, "+fieldType+" not supported!"); - } - } - - private Object jsonValToIcebergVal(Types.NestedField field, - JsonNode node) { - LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type()); - final Object val; - switch (field.type().typeId()) { - case INTEGER: // int 4 bytes - val = node.isNull() ? null : node.asInt(); - break; - case LONG: // long 8 bytes - val = node.isNull() ? null : node.asLong(); - break; - case FLOAT: // float is represented in 32 bits, - val = node.isNull() ? null : node.floatValue(); - break; - case DOUBLE: // double is represented in 64 bits - val = node.isNull() ? null : node.asDouble(); - break; - case BOOLEAN: - val = node.isNull() ? null : node.asBoolean(); - break; - case STRING: - // if the node is not a value node (method isValueNode returns false), convert it to string. - val = node.isValueNode() ? node.asText(null) : node.toString(); - break; - case BINARY: - try { - val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue()); - } catch (IOException e) { - LOGGER.error("Failed to convert binary value to iceberg value, field:" + field.name(), e); - throw new RuntimeException("Failed Processing Event!", e); - } - break; - case LIST: - val = MAPPER.convertValue(node, ArrayList.class); - break; - case MAP: - val = MAPPER.convertValue(node, Map.class); - break; - case STRUCT: - // create it as struct, nested type - // recursive call to get nested data/record - val = asIcebergRecord(field.type().asStructType(), node); - break; - default: - // default to String type - // if the node is not a value node (method isValueNode returns false), convert it to string. - val = node.isValueNode() ? node.asText(null) : node.toString(); - break; + public JsonNode key() { + return key; } - return val; - } - - public class JsonSchema { - private final JsonNode valueSchema; - private final JsonNode keySchema; + public JsonNode value() { + return value; + } - JsonSchema(JsonNode valueSchema, JsonNode keySchema) { - this.valueSchema = valueSchema; - this.keySchema = keySchema; + public JsonSchema jsonSchema() { + return jsonSchema; } - public JsonNode valueSchema() { - return valueSchema; + public Schema icebergSchema() { + return jsonSchema.icebergSchema(); } - public JsonNode keySchema() { - return keySchema; + public String destinationTable() { + return destination.replace(".", "_").replace("-", "_"); } - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - JsonSchema that = (JsonSchema) o; - return Objects.equals(valueSchema, that.valueSchema) && Objects.equals(keySchema, that.keySchema); + public GenericRecord asIcebergRecord(Schema schema) { + return asIcebergRecord(schema.asStruct(), value); } - @Override - public int hashCode() { - return Objects.hash(valueSchema, keySchema); + private static GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode data) { + LOGGER.debug("Processing nested field:{}", tableFields); + GenericRecord record = GenericRecord.create(tableFields); + + for (Types.NestedField field : tableFields.fields()) { + // Set value to null if json event don't have the field + if (data == null || !data.has(field.name()) || data.get(field.name()) == null) { + record.setField(field.name(), null); + continue; + } + // get the value of the field from json event, map it to iceberg value + record.setField(field.name(), jsonValToIcebergVal(field, data.get(field.name()))); + } + + return record; } - //getIcebergFieldsFromEventSchema - private List KeySchemaFields() { - if (keySchema != null && keySchema.has("fields") && keySchema.get("fields").isArray()) { - LOGGER.debug(keySchema.toString()); - return icebergSchema(keySchema, "", 0); - } - LOGGER.trace("Key schema not found!"); - return new ArrayList<>(); + private static Type.PrimitiveType icebergFieldType(String fieldName, String fieldType) { + switch (fieldType) { + case "int8": + case "int16": + case "int32": // int 4 bytes + return Types.IntegerType.get(); + case "int64": // long 8 bytes + if (TS_MS_FIELDS.contains(fieldName)) { + return Types.TimestampType.withZone(); + } else { + return Types.LongType.get(); + } + case "float8": + case "float16": + case "float32": // float is represented in 32 bits, + return Types.FloatType.get(); + case "float64": // double is represented in 64 bits + return Types.DoubleType.get(); + case "boolean": + return Types.BooleanType.get(); + case "string": + return Types.StringType.get(); + case "uuid": + return Types.UUIDType.get(); + case "bytes": + return Types.BinaryType.get(); + default: + // default to String type + return Types.StringType.get(); + //throw new RuntimeException("'" + fieldName + "' has "+fieldType+" type, "+fieldType+" not supported!"); + } } - private List valueSchemaFields() { - if (valueSchema != null && valueSchema.has("fields") && valueSchema.get("fields").isArray()) { - LOGGER.debug(valueSchema.toString()); - return icebergSchema(valueSchema, "", 0, true); - } - LOGGER.trace("Event schema not found!"); - return new ArrayList<>(); + private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) { + LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type()); + final Object val; + switch (field.type().typeId()) { + case INTEGER: // int 4 bytes + val = node.isNull() ? null : node.asInt(); + break; + case LONG: // long 8 bytes + val = node.isNull() ? null : node.asLong(); + break; + case FLOAT: // float is represented in 32 bits, + val = node.isNull() ? null : node.floatValue(); + break; + case DOUBLE: // double is represented in 64 bits + val = node.isNull() ? null : node.asDouble(); + break; + case BOOLEAN: + val = node.isNull() ? null : node.asBoolean(); + break; + case STRING: + // if the node is not a value node (method isValueNode returns false), convert it to string. + val = node.isValueNode() ? node.asText(null) : node.toString(); + break; + case UUID: + val = node.isValueNode() ? UUID.fromString(node.asText(null)) : UUID.fromString(node.toString()); + break; + case TIMESTAMP: + if (node.isLong() && TS_MS_FIELDS.contains(field.name())) { + val = OffsetDateTime.ofInstant(Instant.ofEpochMilli(node.longValue()), ZoneOffset.UTC); + } else if (node.isTextual()) { + val = OffsetDateTime.parse(node.asText()); + } else { + throw new RuntimeException("Failed to convert timestamp value, field: " + field.name() + " value: " + node); + } + break; + case BINARY: + try { + val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue()); + } catch (IOException e) { + LOGGER.error("Failed to convert binary value to iceberg value, field:" + field.name(), e); + throw new RuntimeException("Failed Processing Event!", e); + } + break; + case LIST: + val = MAPPER.convertValue(node, ArrayList.class); + break; + case MAP: + val = MAPPER.convertValue(node, Map.class); + break; + case STRUCT: + // create it as struct, nested type + // recursive call to get nested data/record + val = asIcebergRecord(field.type().asStructType(), node); + break; + default: + // default to String type + // if the node is not a value node (method isValueNode returns false), convert it to string. + val = node.isValueNode() ? node.asText(null) : node.toString(); + break; + } + + return val; } - public Schema icebergSchema() { + public class JsonSchema { + private final JsonNode valueSchema; + private final JsonNode keySchema; - if (this.valueSchema == null) { - throw new RuntimeException("Failed to get event schema, event schema is null"); - } + JsonSchema(JsonNode valueSchema, JsonNode keySchema) { + this.valueSchema = valueSchema; + this.keySchema = keySchema; + } - final List tableColumns = valueSchemaFields(); + public JsonNode valueSchema() { + return valueSchema; + } - if (tableColumns.isEmpty()) { - throw new RuntimeException("Failed to get event schema, event schema has no fields!"); - } + public JsonNode keySchema() { + return keySchema; + } - final List keyColumns = KeySchemaFields(); - Set identifierFieldIds = new HashSet<>(); + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + JsonSchema that = (JsonSchema) o; + return Objects.equals(valueSchema, that.valueSchema) && Objects.equals(keySchema, that.keySchema); + } - for (Types.NestedField ic : keyColumns) { - boolean found = false; + @Override + public int hashCode() { + return Objects.hash(valueSchema, keySchema); + } - ListIterator colsIterator = tableColumns.listIterator(); - while (colsIterator.hasNext()) { - Types.NestedField tc = colsIterator.next(); - if (Objects.equals(tc.name(), ic.name())) { - identifierFieldIds.add(tc.fieldId()); - // set column as required its part of identifier filed - colsIterator.set(tc.asRequired()); - found = true; - break; - } + //getIcebergFieldsFromEventSchema + private List KeySchemaFields() { + if (keySchema != null && keySchema.has("fields") && keySchema.get("fields").isArray()) { + LOGGER.debug(keySchema.toString()); + return icebergSchema(keySchema, "", 0); + } + LOGGER.trace("Key schema not found!"); + return new ArrayList<>(); } - if (!found) { - throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns"); + private List valueSchemaFields() { + if (valueSchema != null && valueSchema.has("fields") && valueSchema.get("fields").isArray()) { + LOGGER.debug(valueSchema.toString()); + return icebergSchema(valueSchema, "", 0); + } + LOGGER.trace("Event schema not found!"); + return new ArrayList<>(); } - } + public Schema icebergSchema() { - return new Schema(tableColumns, identifierFieldIds); - } + if (this.valueSchema == null) { + throw new RuntimeException("Failed to get event schema, event schema is null"); + } - private List icebergSchema(JsonNode eventSchema, String schemaName, int columnId) { - return icebergSchema(eventSchema, schemaName, columnId, false); - } + final List tableColumns = valueSchemaFields(); - private List icebergSchema(JsonNode eventSchema, String schemaName, int columnId, - boolean addSourceTsField) { - List schemaColumns = new ArrayList<>(); - String schemaType = eventSchema.get("type").textValue(); - LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType); - for (JsonNode jsonSchemaFieldNode : eventSchema.get("fields")) { - columnId++; - String fieldName = jsonSchemaFieldNode.get("field").textValue(); - String fieldType = jsonSchemaFieldNode.get("type").textValue(); - LOGGER.debug("Processing Field: [{}] {}.{}::{}", columnId, schemaName, fieldName, fieldType); - switch (fieldType) { - case "array": - JsonNode items = jsonSchemaFieldNode.get("items"); - if (items != null && items.has("type")) { - String listItemType = items.get("type").textValue(); - - if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) { - throw new RuntimeException("Complex nested array types are not supported," + - " array[" + listItemType + "], field " + fieldName); - } - - Type.PrimitiveType item = icebergFieldType(listItemType); - schemaColumns.add(Types.NestedField.optional( - columnId, fieldName, Types.ListType.ofOptional(++columnId, item))); - } else { - throw new RuntimeException("Unexpected Array type for field " + fieldName); + if (tableColumns.isEmpty()) { + throw new RuntimeException("Failed to get event schema, event schema has no fields!"); } - break; - case "map": - throw new RuntimeException("'" + fieldName + "' has Map type, Map type not supported!"); - //break; - case "struct": - // create it as struct, nested type - List subSchema = icebergSchema(jsonSchemaFieldNode, fieldName, columnId); - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema))); - columnId += subSchema.size(); - break; - default: //primitive types - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldType))); - break; + + final List keyColumns = KeySchemaFields(); + Set identifierFieldIds = new HashSet<>(); + + for (Types.NestedField ic : keyColumns) { + boolean found = false; + + ListIterator colsIterator = tableColumns.listIterator(); + while (colsIterator.hasNext()) { + Types.NestedField tc = colsIterator.next(); + if (Objects.equals(tc.name(), ic.name())) { + identifierFieldIds.add(tc.fieldId()); + // set column as required its part of identifier filed + colsIterator.set(tc.asRequired()); + found = true; + break; + } + } + + if (!found) { + throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns"); + } + + } + + return new Schema(tableColumns, identifierFieldIds); } - } - if (addSourceTsField) { - columnId++; - schemaColumns.add(Types.NestedField.optional(columnId, "__source_ts", Types.TimestampType.withZone())); - } - return schemaColumns; - } + private List icebergSchema(JsonNode eventSchema, String schemaName, int columnId) { + List schemaColumns = new ArrayList<>(); + String schemaType = eventSchema.get("type").textValue(); + LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType); + for (JsonNode jsonSchemaFieldNode : eventSchema.get("fields")) { + columnId++; + String fieldName = jsonSchemaFieldNode.get("field").textValue(); + String fieldType = jsonSchemaFieldNode.get("type").textValue(); + LOGGER.debug("Processing Field: [{}] {}.{}::{}", columnId, schemaName, fieldName, fieldType); + switch (fieldType) { + case "array": + JsonNode items = jsonSchemaFieldNode.get("items"); + if (items != null && items.has("type")) { + String listItemType = items.get("type").textValue(); + + if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) { + throw new RuntimeException("Complex nested array types are not supported," + + " array[" + listItemType + "], field " + fieldName); + } + + Type.PrimitiveType item = icebergFieldType(fieldName, listItemType); + schemaColumns.add(Types.NestedField.optional( + columnId, fieldName, Types.ListType.ofOptional(++columnId, item))); + } else { + throw new RuntimeException("Unexpected Array type for field " + fieldName); + } + break; + case "map": + throw new RuntimeException("'" + fieldName + "' has Map type, Map type not supported!"); + //break; + case "struct": + // create it as struct, nested type + List subSchema = icebergSchema(jsonSchemaFieldNode, fieldName, columnId); + schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema))); + columnId += subSchema.size(); + break; + default: //primitive types + schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldName, fieldType))); + break; + } + } + + return schemaColumns; + } - } + } } diff --git a/src/main/java/com/getindata/kafka/connect/iceberg/sink/IcebergSinkConfiguration.java b/src/main/java/com/getindata/kafka/connect/iceberg/sink/IcebergSinkConfiguration.java index 4ea4db6..90359f2 100644 --- a/src/main/java/com/getindata/kafka/connect/iceberg/sink/IcebergSinkConfiguration.java +++ b/src/main/java/com/getindata/kafka/connect/iceberg/sink/IcebergSinkConfiguration.java @@ -5,6 +5,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Optional; import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; @@ -20,6 +21,7 @@ public class IcebergSinkConfiguration { public static final String TABLE_NAMESPACE = "table.namespace"; public static final String TABLE_PREFIX = "table.prefix"; public static final String TABLE_AUTO_CREATE = "table.auto-create"; + public static final String TABLE_PARTITION_FIELD = "table.partition-field"; public static final String ICEBERG_PREFIX = "iceberg."; public static final String CATALOG_NAME = ICEBERG_PREFIX + "name"; public static final String CATALOG_IMPL = ICEBERG_PREFIX + "catalog-impl"; @@ -44,6 +46,8 @@ public class IcebergSinkConfiguration { "Table namespace. In Glue it will be used as database name") .define(TABLE_PREFIX, STRING, "", MEDIUM, "Prefix added to all table names") + .define(TABLE_PARTITION_FIELD, STRING, null, LOW, + "Table field used for partitioning") .define(CATALOG_NAME, STRING, "default", MEDIUM, "Iceberg catalog name") .define(CATALOG_IMPL, STRING, null, MEDIUM, @@ -94,6 +98,10 @@ public String getTablePrefix() { return parsedConfig.getString(TABLE_PREFIX); } + public Optional getTablePartitionField() { + return Optional.ofNullable(parsedConfig.getString(TABLE_PARTITION_FIELD)); + } + public String getCatalogName() { return parsedConfig.getString(CATALOG_NAME); } diff --git a/src/main/java/com/getindata/kafka/connect/iceberg/sink/IcebergUtil.java b/src/main/java/com/getindata/kafka/connect/iceberg/sink/IcebergUtil.java index 6b4f31b..8a8e38d 100644 --- a/src/main/java/com/getindata/kafka/connect/iceberg/sink/IcebergUtil.java +++ b/src/main/java/com/getindata/kafka/connect/iceberg/sink/IcebergUtil.java @@ -8,17 +8,13 @@ package com.getindata.kafka.connect.iceberg.sink; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.SortOrder; -import org.apache.iceberg.Table; +import org.apache.iceberg.*; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.primitives.Ints; +import org.apache.iceberg.types.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,60 +27,72 @@ * @author Ismail Simsek */ public class IcebergUtil { - protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergUtil.class); - protected static final ObjectMapper jsonObjectMapper = new ObjectMapper(); + protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergUtil.class); public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier tableIdentifier, - Schema schema, boolean partition) { + Schema schema, Optional partitionField) { - LOGGER.info("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema, - schema.identifierFieldNames()); + LOGGER.info("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema, + schema.identifierFieldNames()); - final PartitionSpec ps; - if (partition && schema.findField("__source_ts") != null) { - ps = PartitionSpec.builderFor(schema).day("__source_ts").build(); - } else { - ps = PartitionSpec.builderFor(schema).build(); + var partitionBuilder = PartitionSpec.builderFor(schema); + + final PartitionSpec ps = partitionField.flatMap((fieldName) -> { + Optional schemaField = Optional.ofNullable(schema.findField(fieldName)); + + if (schemaField.isEmpty()) { + LOGGER.warn("Table schema dont contain partition field {}! Creating table without partition", fieldName); + } + + return schemaField; + }).map((field) -> { + switch (field.type().typeId()) { + case DATE: + case TIMESTAMP: + return partitionBuilder.day(field.name()).build(); + default: + return partitionBuilder.identity(field.name()).build(); + } + }).orElseGet(partitionBuilder::build); + + return icebergCatalog.buildTable(tableIdentifier, schema) + .withProperty(FORMAT_VERSION, "2") + .withSortOrder(IcebergUtil.getIdentifierFieldsAsSortOrder(schema)) + .withPartitionSpec(ps) + .create(); } - return icebergCatalog.buildTable(tableIdentifier, schema) - .withProperty(FORMAT_VERSION, "2") - .withSortOrder(IcebergUtil.getIdentifierFieldsAsSortOrder(schema)) - .withPartitionSpec(ps) - .create(); - } - - private static SortOrder getIdentifierFieldsAsSortOrder(Schema schema) { - SortOrder.Builder sob = SortOrder.builderFor(schema); - for (String fieldName : schema.identifierFieldNames()) { - sob = sob.asc(fieldName); + private static SortOrder getIdentifierFieldsAsSortOrder(Schema schema) { + SortOrder.Builder sob = SortOrder.builderFor(schema); + for (String fieldName : schema.identifierFieldNames()) { + sob = sob.asc(fieldName); + } + + return sob.build(); + } + + public static Optional loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId) { + try { + Table table = icebergCatalog.loadTable(tableId); + return Optional.of(table); + } catch (NoSuchTableException e) { + LOGGER.info("Table not found: {}", tableId.toString()); + return Optional.empty(); + } } - return sob.build(); - } + public static FileFormat getTableFileFormat(Table icebergTable) { + String formatAsString = icebergTable.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + return FileFormat.valueOf(formatAsString.toUpperCase(Locale.ROOT)); + } - public static Optional
loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId) { - try { - Table table = icebergCatalog.loadTable(tableId); - return Optional.of(table); - } catch (NoSuchTableException e) { - LOGGER.info("Table not found: {}", tableId.toString()); - return Optional.empty(); + public static GenericAppenderFactory getTableAppender(Table icebergTable) { + return new GenericAppenderFactory( + icebergTable.schema(), + icebergTable.spec(), + Ints.toArray(icebergTable.schema().identifierFieldIds()), + icebergTable.schema(), + null); } - } - - public static FileFormat getTableFileFormat(Table icebergTable) { - String formatAsString = icebergTable.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); - return FileFormat.valueOf(formatAsString.toUpperCase(Locale.ROOT)); - } - - public static GenericAppenderFactory getTableAppender(Table icebergTable) { - return new GenericAppenderFactory( - icebergTable.schema(), - icebergTable.spec(), - Ints.toArray(icebergTable.schema().identifierFieldIds()), - icebergTable.schema(), - null); - } } diff --git a/src/test/java/com/getindata/kafka/connect/iceberg/sink/TestIcebergUtil.java b/src/test/java/com/getindata/kafka/connect/iceberg/sink/TestIcebergUtil.java index 682f203..6a30965 100644 --- a/src/test/java/com/getindata/kafka/connect/iceberg/sink/TestIcebergUtil.java +++ b/src/test/java/com/getindata/kafka/connect/iceberg/sink/TestIcebergUtil.java @@ -29,6 +29,7 @@ import java.nio.file.Path; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import static org.junit.jupiter.api.Assertions.*; @@ -132,7 +133,7 @@ public void valuePayloadWithSchemaAsJsonNode() { } @Test - public void createIcebergTablesWithCustomProperties(@TempDir Path localWarehouseDir) { + public void createIcebergTableWithCustomProperties(@TempDir Path localWarehouseDir) { IcebergSinkConfiguration config = TestConfig.builder() .withLocalCatalog(localWarehouseDir) .withUpsert(false) @@ -148,8 +149,53 @@ public void createIcebergTablesWithCustomProperties(@TempDir Path localWarehouse Set.of(1) ); - Table table1 = IcebergUtil.createIcebergTable(catalog, TableIdentifier.of("test", "test"), schema, false); + Table table1 = IcebergUtil.createIcebergTable(catalog, TableIdentifier.of("test", "test"), schema, Optional.empty()); assertTrue(IcebergUtil.getTableFileFormat(table1) == FileFormat.ORC); } + + @Test + public void createIcebergTableWithDayPartitioning(@TempDir Path localWarehouseDir) { + IcebergSinkConfiguration config = TestConfig.builder() + .withLocalCatalog(localWarehouseDir) + .withUpsert(false) + .build(); + + Catalog catalog = IcebergCatalogFactory.create(config); + + Schema schema = new Schema( + List.of( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get()), + Types.NestedField.required(3, "__ts_ms", Types.TimestampType.withoutZone())), + Set.of(1) + ); + + Table table1 = IcebergUtil.createIcebergTable(catalog, TableIdentifier.of("test", "test"), schema, Optional.of("__ts_ms")); + + assertTrue(table1.spec().isPartitioned()); + } + + @Test + public void createIcebergTableWithIdentityPartitioning(@TempDir Path localWarehouseDir) { + IcebergSinkConfiguration config = TestConfig.builder() + .withLocalCatalog(localWarehouseDir) + .withUpsert(false) + .build(); + + Catalog catalog = IcebergCatalogFactory.create(config); + + Schema schema = new Schema( + List.of( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get()), + Types.NestedField.required(3, "dimension", Types.StringType.get())), + Set.of(1) + ); + + Table table1 = IcebergUtil.createIcebergTable(catalog, TableIdentifier.of("test", "test"), schema, Optional.of("dimension")); + + assertTrue(table1.spec().isPartitioned()); + assertTrue(table1.spec().fields().get(0).transform().isIdentity()); + } } diff --git a/src/test/java/com/getindata/kafka/connect/iceberg/sink/tableoperator/IcebergTableOperatorTest.java b/src/test/java/com/getindata/kafka/connect/iceberg/sink/tableoperator/IcebergTableOperatorTest.java index c800f9f..6d3b74c 100644 --- a/src/test/java/com/getindata/kafka/connect/iceberg/sink/tableoperator/IcebergTableOperatorTest.java +++ b/src/test/java/com/getindata/kafka/connect/iceberg/sink/tableoperator/IcebergTableOperatorTest.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; import static com.getindata.kafka.connect.iceberg.sink.testresources.TestConfig.TABLE_NAMESPACE; import static com.getindata.kafka.connect.iceberg.sink.testresources.TestConfig.TABLE_PREFIX; @@ -46,7 +47,7 @@ static void setup() throws Exception { public Table createTable(IcebergChangeEvent sampleEvent) { final TableIdentifier tableId = TableIdentifier.of(Namespace.of(TABLE_NAMESPACE), TABLE_PREFIX + sampleEvent.destinationTable()); - return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), false); + return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), Optional.empty()); } @Test