From a603a518579f18cc6d7c10b07f815f945f4a3993 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Fri, 31 Dec 2021 10:15:51 +0100 Subject: [PATCH 1/8] minor improvements --- .../tableoperator/IcebergTableOperator.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index 5222e238..ed5ddc70 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -16,6 +16,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import javax.enterprise.context.Dependent; import javax.inject.Inject; @@ -89,18 +90,22 @@ private int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) { } public void addToTable(Table icebergTable, List events) { + + List batchEvents; + if (upsert && !icebergTable.schema().identifierFieldIds().isEmpty()) { + // deduplicate the events to avoid inserting duplicate row + batchEvents = deduplicatedBatchRecords(icebergTable.schema(), events); + } else { + batchEvents = events.stream(). + map(e -> e.asIcebergRecord(icebergTable.schema())). + collect(Collectors.toList()); + } + // Initialize a task writer to write both INSERT and equality DELETE. BaseTaskWriter writer = writerFactory.create(icebergTable); try { - if (upsert && !icebergTable.schema().identifierFieldIds().isEmpty()) { - ArrayList icebergRecords = deduplicatedBatchRecords(icebergTable.schema(), events); - for (Record icebergRecord : icebergRecords) { - writer.write(icebergRecord); - } - } else { - for (IcebergChangeEvent e : events) { - writer.write(e.asIcebergRecord(icebergTable.schema())); - } + for (Record icebergRecord : batchEvents) { + writer.write(icebergRecord); } writer.close(); From e9c0a052ceb2016cd4f5d466626694569551dadc Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Fri, 31 Dec 2021 10:23:31 +0100 Subject: [PATCH 2/8] minor improvements --- .../server/iceberg/IcebergChangeConsumer.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 0b1252f6..f5aac544 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -155,20 +155,13 @@ public void handleBatch(List> records, DebeziumEngin }) .collect(Collectors.groupingBy(IcebergChangeEvent::destinationTable)); + // consume list of events to each destination table for (Map.Entry> event : result.entrySet()) { final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey()); - Table icebergTable = IcebergUtil.loadIcebergTable(icebergCatalog, tableIdentifier) - .orElseGet(() -> { - if (!eventSchemaEnabled) { - throw new RuntimeException("Table '" + tableIdentifier + "' not found! " + - "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); - } - return IcebergUtil.createIcebergTable(icebergCatalog, tableIdentifier, - event.getValue().get(0).getSchema(), writeFormat, !upsert); - }); - //addToTable(icebergTable, event.getValue()); + Table icebergTable = this.loadIcebergTable(icebergCatalog, tableIdentifier, event.getValue().get(0)); icebergTableOperator.addToTable(icebergTable, event.getValue()); } + // workaround! somehow offset is not saved to file unless we call committer.markProcessed // even its should be saved to file periodically for (ChangeEvent record : records) { @@ -182,6 +175,15 @@ public void handleBatch(List> records, DebeziumEngin } + public Table loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId, IcebergChangeEvent sampleEvent) { + return IcebergUtil.loadIcebergTable(icebergCatalog, tableId).orElseGet(() -> { + if (!eventSchemaEnabled) { + throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); + } + return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.getSchema(), writeFormat, !upsert); + }); + } + protected void logConsumerProgress(long numUploadedEvents) { numConsumedEvents += numUploadedEvents; if (logTimer.expired()) { From cd29202ddd7d3a8221a355813c3e05169736f1f1 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Fri, 31 Dec 2021 10:24:33 +0100 Subject: [PATCH 3/8] minor improvements --- .../java/io/debezium/server/iceberg/IcebergChangeConsumer.java | 1 - 1 file changed, 1 deletion(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index f5aac544..c92ffa5e 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -172,7 +172,6 @@ public void handleBatch(List> records, DebeziumEngin this.logConsumerProgress(records.size()); batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis()); - } public Table loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId, IcebergChangeEvent sampleEvent) { From 3a76d053a449319820a1705564b44a4795ee5b62 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Fri, 31 Dec 2021 10:33:57 +0100 Subject: [PATCH 4/8] minor improvements --- .../server/iceberg/IcebergChangeConsumer.java | 11 ++++++++++- .../java/io/debezium/server/iceberg/IcebergUtil.java | 4 ++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index c92ffa5e..725b8ac6 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -155,7 +155,7 @@ public void handleBatch(List> records, DebeziumEngin }) .collect(Collectors.groupingBy(IcebergChangeEvent::destinationTable)); - // consume list of events to each destination table + // consume list of events for each destination table for (Map.Entry> event : result.entrySet()) { final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey()); Table icebergTable = this.loadIcebergTable(icebergCatalog, tableIdentifier, event.getValue().get(0)); @@ -174,6 +174,12 @@ public void handleBatch(List> records, DebeziumEngin batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis()); } + /** + * @param icebergCatalog iceberg catalog + * @param tableId iceberg table identifier + * @param sampleEvent sample debezium event. event schema used to create iceberg table when table not found + * @return iceberg table, throws RuntimeException when table not found and it's not possible to create it + */ public Table loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId, IcebergChangeEvent sampleEvent) { return IcebergUtil.loadIcebergTable(icebergCatalog, tableId).orElseGet(() -> { if (!eventSchemaEnabled) { @@ -183,6 +189,9 @@ public Table loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId, I }); } + /** + * @param numUploadedEvents periodically log number of events consumed + */ protected void logConsumerProgress(long numUploadedEvents) { numConsumedEvents += numUploadedEvents; if (logTimer.expired()) { diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index 7799239a..a1c33452 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -68,8 +68,8 @@ public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier t LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema, schema.identifierFieldNames()); - PartitionSpec ps; - if (partition) { + final PartitionSpec ps; + if (partition && schema.findField("__source_ts") != null) { ps = PartitionSpec.builderFor(schema).day("__source_ts").build(); } else { ps = PartitionSpec.builderFor(schema).build(); From 320fa740d5819135890d2113dc69610b9db8aaba Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Fri, 31 Dec 2021 10:35:43 +0100 Subject: [PATCH 5/8] minor improvements --- .../server/iceberg/tableoperator/IcebergTableOperator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index ed5ddc70..82c369e4 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -91,9 +91,9 @@ private int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) { public void addToTable(Table icebergTable, List events) { - List batchEvents; + final List batchEvents; + // when its operation mode is not upsert deduplicate the events to avoid inserting duplicate row if (upsert && !icebergTable.schema().identifierFieldIds().isEmpty()) { - // deduplicate the events to avoid inserting duplicate row batchEvents = deduplicatedBatchRecords(icebergTable.schema(), events); } else { batchEvents = events.stream(). From 3c5f88d7d8544b38f62623a207c1c79404707482 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Fri, 31 Dec 2021 14:37:18 +0100 Subject: [PATCH 6/8] minor improvements --- .../server/iceberg/IcebergChangeConsumer.java | 2 +- .../iceberg/IcebergEventsChangeConsumer.java | 94 +++++++------------ .../IcebergEventsChangeConsumerTest.java | 6 +- 3 files changed, 41 insertions(+), 61 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 725b8ac6..a208a839 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -163,7 +163,7 @@ public void handleBatch(List> records, DebeziumEngin } // workaround! somehow offset is not saved to file unless we call committer.markProcessed - // even its should be saved to file periodically + // even it's should be saved to file periodically for (ChangeEvent record : records) { LOGGER.trace("Processed event '{}'", record); committer.markProcessed(record); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java index f4a40c7d..fd3b4bda 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java @@ -14,17 +14,18 @@ import io.debezium.engine.format.Json; import io.debezium.server.BaseChangeConsumer; import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait; +import io.debezium.server.iceberg.tableoperator.PartitionedAppendWriter; -import java.io.Closeable; import java.io.IOException; import java.time.Duration; import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import javax.annotation.PostConstruct; @@ -40,14 +41,13 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.parquet.GenericParquetWriter; -import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.io.BaseTaskWriter; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.DateTimeUtil; import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; @@ -79,6 +79,7 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D .asc("event_sink_epoch_ms", NullOrder.NULLS_LAST) .build(); private static final Logger LOGGER = LoggerFactory.getLogger(IcebergEventsChangeConsumer.class); + protected static final DateTimeFormatter dtFormater = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneOffset.UTC); private static final String PROP_PREFIX = "debezium.sink.iceberg."; final Configuration hadoopConf = new Configuration(); final Map icebergProperties = new ConcurrentHashMap<>(); @@ -164,72 +165,49 @@ public void handleBatch(List> records, DebeziumEngin Instant start = Instant.now(); OffsetDateTime batchTime = OffsetDateTime.now(ZoneOffset.UTC); - - Map>> result = records.stream() - .collect(Collectors.groupingBy( - objectObjectChangeEvent -> map(objectObjectChangeEvent.destination()), - Collectors.mapping(p -> p, - Collectors.toCollection(ArrayList::new)))); - - for (Map.Entry>> destEvents : result.entrySet()) { - // each destEvents is set of events for a single table - ArrayList destIcebergRecords = destEvents.getValue().stream() - .map(e -> getIcebergRecord(destEvents.getKey(), e, batchTime)) - .collect(Collectors.toCollection(ArrayList::new)); - - commitBatch(destEvents.getKey(), batchTime, destIcebergRecords); + ArrayList icebergRecords = records.stream() + .map(e -> getIcebergRecord(e.destination(), e, batchTime)) + .collect(Collectors.toCollection(ArrayList::new)); + commitBatch(icebergRecords); + + // workaround! somehow offset is not saved to file unless we call committer.markProcessed + // even it's should be saved to file periodically + for (ChangeEvent record : records) { + LOGGER.trace("Processed event '{}'", record); + committer.markProcessed(record); } - // committer.markProcessed(record); committer.markBatchFinished(); - batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis()); - } - private void commitBatch(String destination, OffsetDateTime batchTime, ArrayList icebergRecords) { - final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET; + private void commitBatch(ArrayList icebergRecords) { - PartitionKey pk = new PartitionKey(TABLE_PARTITION, TABLE_SCHEMA); - Record pr = GenericRecord.create(TABLE_SCHEMA) - .copy("event_destination", - destination, "event_sink_timestamptz", - DateTimeUtil.microsFromTimestamptz(batchTime)); - pk.partition(pr); + FileFormat format = IcebergUtil.getTableFileFormat(eventTable); + GenericAppenderFactory appenderFactory = IcebergUtil.getTableAppender(eventTable); + int partitionId = Integer.parseInt(dtFormater.format(Instant.now())); + OutputFileFactory fileFactory = OutputFileFactory.builderFor(eventTable, partitionId, 1L) + .defaultSpec(eventTable.spec()).format(format).build(); - OutputFile out = - eventTable.io().newOutputFile(eventTable.locationProvider().newDataLocation(pk.toPath() + "/" + fileName)); + BaseTaskWriter writer = new PartitionedAppendWriter( + eventTable.spec(), format, appenderFactory, fileFactory, eventTable.io(), Long.MAX_VALUE, eventTable.schema()); - FileAppender writer; try { - writer = Parquet.write(out) - .createWriterFunc(GenericParquetWriter::buildWriter) - .forTable(eventTable) - .overwrite() - .build(); - - try (Closeable toClose = writer) { - writer.addAll(icebergRecords); + for (Record icebergRecord : icebergRecords) { + writer.write(icebergRecord); } + writer.close(); + WriteResult files = writer.complete(); + AppendFiles appendFiles = eventTable.newAppend(); + Arrays.stream(files.dataFiles()).forEach(appendFiles::appendFile); + appendFiles.commit(); + } catch (IOException e) { LOGGER.error("Failed committing events to iceberg table!", e); - throw new RuntimeException("Failed commiting events to iceberg table!", e); + throw new DebeziumException("Failed committing events to iceberg table!", e); } - DataFile dataFile = DataFiles.builder(eventTable.spec()) - .withFormat(FileFormat.PARQUET) - .withPath(out.location()) - .withFileSizeInBytes(writer.length()) - .withSplitOffsets(writer.splitOffsets()) - .withMetrics(writer.metrics()) - .withPartition(pk) - .build(); - - LOGGER.debug("Appending new file '{}'", dataFile.path()); - eventTable.newAppend() - .appendFile(dataFile) - .commit(); - LOGGER.info("Committed {} events to table {}", icebergRecords.size(), eventTable.location()); + LOGGER.info("Committed {} events to table! {}", icebergRecords.size(), eventTable.location()); } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java index 1aceba8d..78215c2a 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java @@ -43,13 +43,15 @@ public void testSimpleUpload() { Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { try { Dataset ds = spark.newSession().sql("SELECT * FROM debeziumevents.debezium_events"); - ds.show(); - return ds.count() >= 5 + ds.show(false); + return ds.count() >= 10 && ds.select("event_destination").distinct().count() >= 2; } catch (Exception e) { return false; } }); + + S3Minio.listFiles(); } } From baf06b5bad9e097246adacd2c01f66889da506a2 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Fri, 31 Dec 2021 15:01:45 +0100 Subject: [PATCH 7/8] minor improvements --- .../iceberg/IcebergEventsChangeConsumer.java | 72 +++++++++++++------ 1 file changed, 51 insertions(+), 21 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java index fd3b4bda..bef385ed 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java @@ -12,6 +12,7 @@ import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.format.Json; +import io.debezium.serde.DebeziumSerdes; import io.debezium.server.BaseChangeConsumer; import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait; import io.debezium.server.iceberg.tableoperator.PartitionedAppendWriter; @@ -22,20 +23,18 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import javax.annotation.PostConstruct; import javax.enterprise.context.Dependent; import javax.enterprise.inject.Any; import javax.enterprise.inject.Instance; -import javax.enterprise.inject.literal.NamedLiteral; import javax.inject.Inject; import javax.inject.Named; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.*; import org.apache.iceberg.catalog.Catalog; @@ -48,6 +47,8 @@ import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.types.Types; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; @@ -64,23 +65,32 @@ @Dependent public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer> { + protected static final DateTimeFormatter dtFormater = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneOffset.UTC); + protected static final ObjectMapper mapper = new ObjectMapper(); + protected static final Serde valSerde = DebeziumSerdes.payloadJson(JsonNode.class); + protected static final Serde keySerde = DebeziumSerdes.payloadJson(JsonNode.class); static final Schema TABLE_SCHEMA = new Schema( required(1, "event_destination", Types.StringType.get()), - optional(2, "event_key", Types.StringType.get()), - optional(3, "event_value", Types.StringType.get()), - optional(4, "event_sink_epoch_ms", Types.LongType.get()), - optional(5, "event_sink_timestamptz", Types.TimestampType.withZone()) + optional(2, "event_key_schema", Types.StringType.get()), + optional(3, "event_key_payload", Types.StringType.get()), + optional(4, "event_value_schema", Types.StringType.get()), + optional(5, "event_value_payload", Types.StringType.get()), + optional(6, "event_sink_epoch_ms", Types.LongType.get()), + optional(7, "event_sink_timestamptz", Types.TimestampType.withZone()) ); + static final PartitionSpec TABLE_PARTITION = PartitionSpec.builderFor(TABLE_SCHEMA) .identity("event_destination") .hour("event_sink_timestamptz") .build(); static final SortOrder TABLE_SORT_ORDER = SortOrder.builderFor(TABLE_SCHEMA) .asc("event_sink_epoch_ms", NullOrder.NULLS_LAST) + .asc("event_sink_timestamptz", NullOrder.NULLS_LAST) .build(); private static final Logger LOGGER = LoggerFactory.getLogger(IcebergEventsChangeConsumer.class); - protected static final DateTimeFormatter dtFormater = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneOffset.UTC); private static final String PROP_PREFIX = "debezium.sink.iceberg."; + static Deserializer valDeserializer; + static Deserializer keyDeserializer; final Configuration hadoopConf = new Configuration(); final Map icebergProperties = new ConcurrentHashMap<>(); @ConfigProperty(name = "debezium.sink.iceberg." + CatalogProperties.WAREHOUSE_LOCATION) @@ -104,7 +114,6 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D Catalog icebergCatalog; Table eventTable; - @PostConstruct void connect() { if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) { @@ -139,20 +148,41 @@ void connect() { // load table eventTable = icebergCatalog.loadTable(tableIdentifier); - Instance instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName)); batchSizeWait = IcebergUtil.selectInstance(batchSizeWaitInstances, batchSizeWaitName); batchSizeWait.initizalize(); + + // configure and set + valSerde.configure(Collections.emptyMap(), false); + valDeserializer = valSerde.deserializer(); + // configure and set + keySerde.configure(Collections.emptyMap(), true); + keyDeserializer = keySerde.deserializer(); + LOGGER.info("Using {}", batchSizeWait.getClass().getName()); } - public GenericRecord getIcebergRecord(String destination, ChangeEvent record, OffsetDateTime batchTime) { - GenericRecord rec = GenericRecord.create(TABLE_SCHEMA.asStruct()); - rec.setField("event_destination", destination); - rec.setField("event_key", getString(record.key())); - rec.setField("event_value", getString(record.value())); - rec.setField("event_sink_epoch_ms", batchTime.toEpochSecond()); - rec.setField("event_sink_timestamptz", batchTime); - return rec; + public GenericRecord getIcebergRecord(ChangeEvent record, OffsetDateTime batchTime) { + + try { + // deserialize + JsonNode valueSchema = record.value() == null ? null : mapper.readTree(getBytes(record.value())).get("schema"); + JsonNode valuePayload = valDeserializer.deserialize(record.destination(), getBytes(record.value())); + JsonNode keyPayload = record.key() == null ? null : keyDeserializer.deserialize(record.destination(), getBytes(record.key())); + JsonNode keySchema = record.key() == null ? null : mapper.readTree(getBytes(record.key())).get("schema"); + // convert to GenericRecord + GenericRecord rec = GenericRecord.create(TABLE_SCHEMA.asStruct()); + rec.setField("event_destination", record.destination()); + rec.setField("event_key_schema", mapper.writeValueAsString(keySchema)); + rec.setField("event_key_payload", mapper.writeValueAsString(keyPayload)); + rec.setField("event_value_schema", mapper.writeValueAsString(valueSchema)); + rec.setField("event_value_payload", mapper.writeValueAsString(valuePayload)); + rec.setField("event_sink_epoch_ms", batchTime.toEpochSecond()); + rec.setField("event_sink_timestamptz", batchTime); + + return rec; + } catch (IOException e) { + throw new DebeziumException(e); + } } public String map(String destination) { @@ -166,7 +196,7 @@ public void handleBatch(List> records, DebeziumEngin OffsetDateTime batchTime = OffsetDateTime.now(ZoneOffset.UTC); ArrayList icebergRecords = records.stream() - .map(e -> getIcebergRecord(e.destination(), e, batchTime)) + .map(e -> getIcebergRecord(e, batchTime)) .collect(Collectors.toCollection(ArrayList::new)); commitBatch(icebergRecords); From 44b909321c32df599e07daf367bf0fd48f8fa852 Mon Sep 17 00:00:00 2001 From: Ismail Simsek Date: Fri, 31 Dec 2021 15:03:01 +0100 Subject: [PATCH 8/8] minor improvements --- .../io/debezium/server/iceberg/IcebergEventsChangeConsumer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java index bef385ed..431cd557 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java @@ -178,7 +178,7 @@ public GenericRecord getIcebergRecord(ChangeEvent record, Offset rec.setField("event_value_payload", mapper.writeValueAsString(valuePayload)); rec.setField("event_sink_epoch_ms", batchTime.toEpochSecond()); rec.setField("event_sink_timestamptz", batchTime); - + return rec; } catch (IOException e) { throw new DebeziumException(e);