diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java index 96562767..958951a2 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java @@ -29,32 +29,56 @@ public class IcebergTableWriterFactory { @ConfigProperty(name = "debezium.sink.iceberg.upsert-keep-deletes", defaultValue = "true") boolean upsertKeepDeletes; + private static int partitionId() { + return Integer.parseInt(dtFormater.format(Instant.now())); + } + public BaseTaskWriter create(Table icebergTable) { + // file format of the table parquet, orc ... FileFormat format = IcebergUtil.getTableFileFormat(icebergTable); + // appender factory GenericAppenderFactory appenderFactory = IcebergUtil.getTableAppender(icebergTable); - int partitionId = Integer.parseInt(dtFormater.format(Instant.now())); - OutputFileFactory fileFactory = OutputFileFactory.builderFor(icebergTable, partitionId, 1L) + OutputFileFactory fileFactory = OutputFileFactory.builderFor(icebergTable, partitionId(), 1L) .defaultSpec(icebergTable.spec()).format(format).build(); + // equality Field Ids List equalityFieldIds = new ArrayList<>(icebergTable.schema().identifierFieldIds()); BaseTaskWriter writer; + + // 1. TABLE DONT HAVE identifierFieldIds + // 2. OR RUNNING APPEND MODE + // THEN USE APPEND WRITERS if (icebergTable.schema().identifierFieldIds().isEmpty() || !upsert) { + if (upsert) { + // running in upsert mode but table dont have identifierFieldIds to do delete! LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!"); } + if (icebergTable.spec().isUnpartitioned()) { + // table is un partitioned use un partitioned append writer writer = new UnpartitionedWriter<>( icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), Long.MAX_VALUE); + } else { + // table is partitioned use partitioned append writer writer = new PartitionedAppendWriter( icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), Long.MAX_VALUE, icebergTable.schema()); } - } else if (icebergTable.spec().isUnpartitioned()) { + + } + + // ITS UPSERT MODE + // AND TABLE HAS identifierFieldIds + // USE DELTA WRITERS + else if (icebergTable.spec().isUnpartitioned()) { + // running with upsert mode + un partitioned table writer = new UnpartitionedDeltaWriter(icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), Long.MAX_VALUE, icebergTable.schema(), equalityFieldIds, true, upsertKeepDeletes); } else { + // running with upsert mode + partitioned table writer = new PartitionedDeltaWriter(icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), Long.MAX_VALUE, icebergTable.schema(), equalityFieldIds, true, upsertKeepDeletes);