From 1419aae1f1ebb0af38972c2a5287a379b3478fc8 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 8 Sep 2022 19:49:01 +0200 Subject: [PATCH] Convert __source_ts_ms and __ts_ms to TimestampType.withZone type (#108) --- .../server/iceberg/IcebergChangeConsumer.java | 6 ++- .../server/iceberg/IcebergChangeEvent.java | 47 +++++++++---------- .../debezium/server/iceberg/IcebergUtil.java | 11 +++-- .../IcebergChangeEventBuilderTest.java | 6 +-- .../IcebergTableOperatorTest.java | 4 +- 5 files changed, 40 insertions(+), 34 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 9aa00bbc..4b45de8a 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -95,6 +95,8 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu String catalogName; @ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true") boolean upsert; + @ConfigProperty(name = "debezium.sink.iceberg.partition-field", defaultValue = "__ts_ms") + String partitionField; @ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait") String batchSizeWaitName; @ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false") @@ -188,7 +190,9 @@ public Table loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId, I if (!eventSchemaEnabled) { throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); } - return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert); + return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, + !upsert, // partition if its append mode + partitionField); }); } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index e2539d9f..0f77308f 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -30,6 +30,7 @@ public class IcebergChangeEvent { protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeEvent.class); + public static final List TS_MS_FIELDS = List.of("__ts_ms", "__source_ts_ms"); protected final String destination; protected final JsonNode value; protected final JsonNode key; @@ -63,19 +64,10 @@ public String destination() { } public GenericRecord asIcebergRecord(Schema schema) { - final GenericRecord record = asIcebergRecord(schema.asStruct(), value); - - if (value != null && value.has("__source_ts_ms") && value.get("__source_ts_ms") != null) { - final long source_ts_ms = value.get("__source_ts_ms").longValue(); - final OffsetDateTime odt = OffsetDateTime.ofInstant(Instant.ofEpochMilli(source_ts_ms), ZoneOffset.UTC); - record.setField("__source_ts", odt); - } else { - record.setField("__source_ts", null); - } - return record; + return asIcebergRecord(schema.asStruct(), value); } - private GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode data) { + private static GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode data) { LOGGER.debug("Processing nested field:{}", tableFields); GenericRecord record = GenericRecord.create(tableFields); @@ -92,14 +84,18 @@ private GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode dat return record; } - private Type.PrimitiveType icebergFieldType(String fieldType) { + private static Type.PrimitiveType icebergFieldType(String fieldName, String fieldType) { switch (fieldType) { case "int8": case "int16": case "int32": // int 4 bytes return Types.IntegerType.get(); case "int64": // long 8 bytes - return Types.LongType.get(); + if (TS_MS_FIELDS.contains(fieldName)) { + return Types.TimestampType.withZone(); + } else { + return Types.LongType.get(); + } case "float8": case "float16": case "float32": // float is represented in 32 bits, @@ -121,7 +117,7 @@ private Type.PrimitiveType icebergFieldType(String fieldType) { } } - private Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) { + private static Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) { LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type()); final Object val; switch (field.type().typeId()) { @@ -147,6 +143,15 @@ private Object jsonValToIcebergVal(Types.NestedField field, JsonNode node) { case UUID: val = node.isValueNode() ? UUID.fromString(node.asText(null)) : UUID.fromString(node.toString()); break; + case TIMESTAMP: + if (node.isLong() && TS_MS_FIELDS.contains(field.name())) { + val = OffsetDateTime.ofInstant(Instant.ofEpochMilli(node.longValue()), ZoneOffset.UTC); + } else if (node.isTextual()) { + val = OffsetDateTime.parse(node.asText()); + } else { + throw new RuntimeException("Failed to convert timestamp value, field: " + field.name() + " value: " + node); + } + break; case BINARY: try { val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue()); @@ -218,7 +223,7 @@ private List KeySchemaFields() { private List valueSchemaFields() { if (valueSchema != null && valueSchema.has("fields") && valueSchema.get("fields").isArray()) { LOGGER.debug(valueSchema.toString()); - return icebergSchema(valueSchema, "", 0, true); + return icebergSchema(valueSchema, "", 0); } LOGGER.trace("Event schema not found!"); return new ArrayList<>(); @@ -264,10 +269,6 @@ public Schema icebergSchema() { } private List icebergSchema(JsonNode eventSchema, String schemaName, int columnId) { - return icebergSchema(eventSchema, schemaName, columnId, false); - } - - private List icebergSchema(JsonNode eventSchema, String schemaName, int columnId, boolean addSourceTsField) { List schemaColumns = new ArrayList<>(); String schemaType = eventSchema.get("type").textValue(); LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType); @@ -286,7 +287,7 @@ private List icebergSchema(JsonNode eventSchema, String schem throw new RuntimeException("Complex nested array types are not supported," + " array[" + listItemType + "], field " + fieldName); } - Type.PrimitiveType item = icebergFieldType(listItemType); + Type.PrimitiveType item = icebergFieldType(fieldName, listItemType); schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.ListType.ofOptional(++columnId, item))); } else { throw new RuntimeException("Unexpected Array type for field " + fieldName); @@ -302,15 +303,11 @@ private List icebergSchema(JsonNode eventSchema, String schem columnId += subSchema.size(); break; default: //primitive types - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldType))); + schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldName, fieldType))); break; } } - if (addSourceTsField) { - columnId++; - schemaColumns.add(Types.NestedField.optional(columnId, "__source_ts", Types.TimestampType.withZone())); - } return schemaColumns; } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index a1c33452..5e8c8775 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -63,14 +63,19 @@ public static T selectInstance(Instance instances, String name) { } public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier tableIdentifier, - Schema schema, String writeFormat, boolean partition) { + Schema schema, String writeFormat, boolean partition, String partitionField) { LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema, schema.identifierFieldNames()); final PartitionSpec ps; - if (partition && schema.findField("__source_ts") != null) { - ps = PartitionSpec.builderFor(schema).day("__source_ts").build(); + if (partition) { + if (schema.findField(partitionField) == null) { + LOGGER.warn("Table schema dont contain partition field {}! Creating table without partition", partition); + ps = PartitionSpec.builderFor(schema).build(); + } else { + ps = PartitionSpec.builderFor(schema).day(partitionField).build(); + } } else { ps = PartitionSpec.builderFor(schema).build(); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergChangeEventBuilderTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergChangeEventBuilderTest.java index 4398084b..63d326f4 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergChangeEventBuilderTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergChangeEventBuilderTest.java @@ -36,8 +36,7 @@ public void testIcebergChangeEventBuilder() { optional(3, "preferences", Types.StructType.of( optional(4, "feature1", Types.BooleanType.get()), optional(5, "feature2", Types.BooleanType.get()) - )), - optional(6, "__source_ts", Types.TimestampType.withZone()) + )) ) , Set.of(1) ); @@ -57,8 +56,7 @@ public void testIcebergChangeEventBuilder() { optional(3, "preferences", Types.StructType.of( optional(4, "feature1", Types.BooleanType.get()), optional(5, "feature2", Types.BooleanType.get()) - )), - optional(6, "__source_ts", Types.TimestampType.withZone()) + )) ); b = new IcebergChangeEventBuilder(); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java index 8e7fe929..6303c91d 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java @@ -47,6 +47,8 @@ class IcebergTableOperatorTest extends BaseSparkTest { String namespace; @ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true") boolean upsert; + @ConfigProperty(name = "debezium.sink.iceberg.partition-field", defaultValue = "__ts_ms") + String partitionField; @ConfigProperty(name = "debezium.sink.iceberg." + DEFAULT_FILE_FORMAT, defaultValue = DEFAULT_FILE_FORMAT_DEFAULT) String writeFormat; @Inject @@ -56,7 +58,7 @@ class IcebergTableOperatorTest extends BaseSparkTest { public Table createTable(IcebergChangeEvent sampleEvent) { HadoopCatalog icebergCatalog = getIcebergCatalog(); final TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespace), tablePrefix + sampleEvent.destination()); - return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert); + return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert, partitionField); } @Test