diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 0b1252f6..a208a839 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -155,22 +155,15 @@ public void handleBatch(List> records, DebeziumEngin }) .collect(Collectors.groupingBy(IcebergChangeEvent::destinationTable)); + // consume list of events for each destination table for (Map.Entry> event : result.entrySet()) { final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey()); - Table icebergTable = IcebergUtil.loadIcebergTable(icebergCatalog, tableIdentifier) - .orElseGet(() -> { - if (!eventSchemaEnabled) { - throw new RuntimeException("Table '" + tableIdentifier + "' not found! " + - "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); - } - return IcebergUtil.createIcebergTable(icebergCatalog, tableIdentifier, - event.getValue().get(0).getSchema(), writeFormat, !upsert); - }); - //addToTable(icebergTable, event.getValue()); + Table icebergTable = this.loadIcebergTable(icebergCatalog, tableIdentifier, event.getValue().get(0)); icebergTableOperator.addToTable(icebergTable, event.getValue()); } + // workaround! somehow offset is not saved to file unless we call committer.markProcessed - // even its should be saved to file periodically + // even it's should be saved to file periodically for (ChangeEvent record : records) { LOGGER.trace("Processed event '{}'", record); committer.markProcessed(record); @@ -179,9 +172,26 @@ public void handleBatch(List> records, DebeziumEngin this.logConsumerProgress(records.size()); batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis()); + } + /** + * @param icebergCatalog iceberg catalog + * @param tableId iceberg table identifier + * @param sampleEvent sample debezium event. event schema used to create iceberg table when table not found + * @return iceberg table, throws RuntimeException when table not found and it's not possible to create it + */ + public Table loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId, IcebergChangeEvent sampleEvent) { + return IcebergUtil.loadIcebergTable(icebergCatalog, tableId).orElseGet(() -> { + if (!eventSchemaEnabled) { + throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); + } + return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.getSchema(), writeFormat, !upsert); + }); } + /** + * @param numUploadedEvents periodically log number of events consumed + */ protected void logConsumerProgress(long numUploadedEvents) { numConsumedEvents += numUploadedEvents; if (logTimer.expired()) { diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java index f4a40c7d..431cd557 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java @@ -12,42 +12,43 @@ import io.debezium.engine.ChangeEvent; import io.debezium.engine.DebeziumEngine; import io.debezium.engine.format.Json; +import io.debezium.serde.DebeziumSerdes; import io.debezium.server.BaseChangeConsumer; import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait; +import io.debezium.server.iceberg.tableoperator.PartitionedAppendWriter; -import java.io.Closeable; import java.io.IOException; import java.time.Duration; import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.time.format.DateTimeFormatter; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import javax.annotation.PostConstruct; import javax.enterprise.context.Dependent; import javax.enterprise.inject.Any; import javax.enterprise.inject.Instance; -import javax.enterprise.inject.literal.NamedLiteral; import javax.inject.Inject; import javax.inject.Named; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.*; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.parquet.GenericParquetWriter; -import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.io.BaseTaskWriter; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.types.Types; -import org.apache.iceberg.util.DateTimeUtil; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; @@ -64,22 +65,32 @@ @Dependent public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer> { + protected static final DateTimeFormatter dtFormater = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneOffset.UTC); + protected static final ObjectMapper mapper = new ObjectMapper(); + protected static final Serde valSerde = DebeziumSerdes.payloadJson(JsonNode.class); + protected static final Serde keySerde = DebeziumSerdes.payloadJson(JsonNode.class); static final Schema TABLE_SCHEMA = new Schema( required(1, "event_destination", Types.StringType.get()), - optional(2, "event_key", Types.StringType.get()), - optional(3, "event_value", Types.StringType.get()), - optional(4, "event_sink_epoch_ms", Types.LongType.get()), - optional(5, "event_sink_timestamptz", Types.TimestampType.withZone()) + optional(2, "event_key_schema", Types.StringType.get()), + optional(3, "event_key_payload", Types.StringType.get()), + optional(4, "event_value_schema", Types.StringType.get()), + optional(5, "event_value_payload", Types.StringType.get()), + optional(6, "event_sink_epoch_ms", Types.LongType.get()), + optional(7, "event_sink_timestamptz", Types.TimestampType.withZone()) ); + static final PartitionSpec TABLE_PARTITION = PartitionSpec.builderFor(TABLE_SCHEMA) .identity("event_destination") .hour("event_sink_timestamptz") .build(); static final SortOrder TABLE_SORT_ORDER = SortOrder.builderFor(TABLE_SCHEMA) .asc("event_sink_epoch_ms", NullOrder.NULLS_LAST) + .asc("event_sink_timestamptz", NullOrder.NULLS_LAST) .build(); private static final Logger LOGGER = LoggerFactory.getLogger(IcebergEventsChangeConsumer.class); private static final String PROP_PREFIX = "debezium.sink.iceberg."; + static Deserializer valDeserializer; + static Deserializer keyDeserializer; final Configuration hadoopConf = new Configuration(); final Map icebergProperties = new ConcurrentHashMap<>(); @ConfigProperty(name = "debezium.sink.iceberg." + CatalogProperties.WAREHOUSE_LOCATION) @@ -103,7 +114,6 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D Catalog icebergCatalog; Table eventTable; - @PostConstruct void connect() { if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) { @@ -138,20 +148,41 @@ void connect() { // load table eventTable = icebergCatalog.loadTable(tableIdentifier); - Instance instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName)); batchSizeWait = IcebergUtil.selectInstance(batchSizeWaitInstances, batchSizeWaitName); batchSizeWait.initizalize(); + + // configure and set + valSerde.configure(Collections.emptyMap(), false); + valDeserializer = valSerde.deserializer(); + // configure and set + keySerde.configure(Collections.emptyMap(), true); + keyDeserializer = keySerde.deserializer(); + LOGGER.info("Using {}", batchSizeWait.getClass().getName()); } - public GenericRecord getIcebergRecord(String destination, ChangeEvent record, OffsetDateTime batchTime) { - GenericRecord rec = GenericRecord.create(TABLE_SCHEMA.asStruct()); - rec.setField("event_destination", destination); - rec.setField("event_key", getString(record.key())); - rec.setField("event_value", getString(record.value())); - rec.setField("event_sink_epoch_ms", batchTime.toEpochSecond()); - rec.setField("event_sink_timestamptz", batchTime); - return rec; + public GenericRecord getIcebergRecord(ChangeEvent record, OffsetDateTime batchTime) { + + try { + // deserialize + JsonNode valueSchema = record.value() == null ? null : mapper.readTree(getBytes(record.value())).get("schema"); + JsonNode valuePayload = valDeserializer.deserialize(record.destination(), getBytes(record.value())); + JsonNode keyPayload = record.key() == null ? null : keyDeserializer.deserialize(record.destination(), getBytes(record.key())); + JsonNode keySchema = record.key() == null ? null : mapper.readTree(getBytes(record.key())).get("schema"); + // convert to GenericRecord + GenericRecord rec = GenericRecord.create(TABLE_SCHEMA.asStruct()); + rec.setField("event_destination", record.destination()); + rec.setField("event_key_schema", mapper.writeValueAsString(keySchema)); + rec.setField("event_key_payload", mapper.writeValueAsString(keyPayload)); + rec.setField("event_value_schema", mapper.writeValueAsString(valueSchema)); + rec.setField("event_value_payload", mapper.writeValueAsString(valuePayload)); + rec.setField("event_sink_epoch_ms", batchTime.toEpochSecond()); + rec.setField("event_sink_timestamptz", batchTime); + + return rec; + } catch (IOException e) { + throw new DebeziumException(e); + } } public String map(String destination) { @@ -164,72 +195,49 @@ public void handleBatch(List> records, DebeziumEngin Instant start = Instant.now(); OffsetDateTime batchTime = OffsetDateTime.now(ZoneOffset.UTC); - - Map>> result = records.stream() - .collect(Collectors.groupingBy( - objectObjectChangeEvent -> map(objectObjectChangeEvent.destination()), - Collectors.mapping(p -> p, - Collectors.toCollection(ArrayList::new)))); - - for (Map.Entry>> destEvents : result.entrySet()) { - // each destEvents is set of events for a single table - ArrayList destIcebergRecords = destEvents.getValue().stream() - .map(e -> getIcebergRecord(destEvents.getKey(), e, batchTime)) - .collect(Collectors.toCollection(ArrayList::new)); - - commitBatch(destEvents.getKey(), batchTime, destIcebergRecords); + ArrayList icebergRecords = records.stream() + .map(e -> getIcebergRecord(e, batchTime)) + .collect(Collectors.toCollection(ArrayList::new)); + commitBatch(icebergRecords); + + // workaround! somehow offset is not saved to file unless we call committer.markProcessed + // even it's should be saved to file periodically + for (ChangeEvent record : records) { + LOGGER.trace("Processed event '{}'", record); + committer.markProcessed(record); } - // committer.markProcessed(record); committer.markBatchFinished(); - batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis()); - } - private void commitBatch(String destination, OffsetDateTime batchTime, ArrayList icebergRecords) { - final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET; + private void commitBatch(ArrayList icebergRecords) { - PartitionKey pk = new PartitionKey(TABLE_PARTITION, TABLE_SCHEMA); - Record pr = GenericRecord.create(TABLE_SCHEMA) - .copy("event_destination", - destination, "event_sink_timestamptz", - DateTimeUtil.microsFromTimestamptz(batchTime)); - pk.partition(pr); + FileFormat format = IcebergUtil.getTableFileFormat(eventTable); + GenericAppenderFactory appenderFactory = IcebergUtil.getTableAppender(eventTable); + int partitionId = Integer.parseInt(dtFormater.format(Instant.now())); + OutputFileFactory fileFactory = OutputFileFactory.builderFor(eventTable, partitionId, 1L) + .defaultSpec(eventTable.spec()).format(format).build(); - OutputFile out = - eventTable.io().newOutputFile(eventTable.locationProvider().newDataLocation(pk.toPath() + "/" + fileName)); + BaseTaskWriter writer = new PartitionedAppendWriter( + eventTable.spec(), format, appenderFactory, fileFactory, eventTable.io(), Long.MAX_VALUE, eventTable.schema()); - FileAppender writer; try { - writer = Parquet.write(out) - .createWriterFunc(GenericParquetWriter::buildWriter) - .forTable(eventTable) - .overwrite() - .build(); - - try (Closeable toClose = writer) { - writer.addAll(icebergRecords); + for (Record icebergRecord : icebergRecords) { + writer.write(icebergRecord); } + writer.close(); + WriteResult files = writer.complete(); + AppendFiles appendFiles = eventTable.newAppend(); + Arrays.stream(files.dataFiles()).forEach(appendFiles::appendFile); + appendFiles.commit(); + } catch (IOException e) { LOGGER.error("Failed committing events to iceberg table!", e); - throw new RuntimeException("Failed commiting events to iceberg table!", e); + throw new DebeziumException("Failed committing events to iceberg table!", e); } - DataFile dataFile = DataFiles.builder(eventTable.spec()) - .withFormat(FileFormat.PARQUET) - .withPath(out.location()) - .withFileSizeInBytes(writer.length()) - .withSplitOffsets(writer.splitOffsets()) - .withMetrics(writer.metrics()) - .withPartition(pk) - .build(); - - LOGGER.debug("Appending new file '{}'", dataFile.path()); - eventTable.newAppend() - .appendFile(dataFile) - .commit(); - LOGGER.info("Committed {} events to table {}", icebergRecords.size(), eventTable.location()); + LOGGER.info("Committed {} events to table! {}", icebergRecords.size(), eventTable.location()); } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index 7799239a..a1c33452 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -68,8 +68,8 @@ public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier t LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema, schema.identifierFieldNames()); - PartitionSpec ps; - if (partition) { + final PartitionSpec ps; + if (partition && schema.findField("__source_ts") != null) { ps = PartitionSpec.builderFor(schema).day("__source_ts").build(); } else { ps = PartitionSpec.builderFor(schema).build(); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index 5222e238..82c369e4 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -16,6 +16,7 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import javax.enterprise.context.Dependent; import javax.inject.Inject; @@ -89,18 +90,22 @@ private int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) { } public void addToTable(Table icebergTable, List events) { + + final List batchEvents; + // when its operation mode is not upsert deduplicate the events to avoid inserting duplicate row + if (upsert && !icebergTable.schema().identifierFieldIds().isEmpty()) { + batchEvents = deduplicatedBatchRecords(icebergTable.schema(), events); + } else { + batchEvents = events.stream(). + map(e -> e.asIcebergRecord(icebergTable.schema())). + collect(Collectors.toList()); + } + // Initialize a task writer to write both INSERT and equality DELETE. BaseTaskWriter writer = writerFactory.create(icebergTable); try { - if (upsert && !icebergTable.schema().identifierFieldIds().isEmpty()) { - ArrayList icebergRecords = deduplicatedBatchRecords(icebergTable.schema(), events); - for (Record icebergRecord : icebergRecords) { - writer.write(icebergRecord); - } - } else { - for (IcebergChangeEvent e : events) { - writer.write(e.asIcebergRecord(icebergTable.schema())); - } + for (Record icebergRecord : batchEvents) { + writer.write(icebergRecord); } writer.close(); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java index 1aceba8d..78215c2a 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java @@ -43,13 +43,15 @@ public void testSimpleUpload() { Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { try { Dataset ds = spark.newSession().sql("SELECT * FROM debeziumevents.debezium_events"); - ds.show(); - return ds.count() >= 5 + ds.show(false); + return ds.count() >= 10 && ds.select("event_destination").distinct().count() >= 2; } catch (Exception e) { return false; } }); + + S3Minio.listFiles(); } }