diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java index 0996e7ce..7ff4fb2e 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java @@ -23,6 +23,7 @@ import org.apache.iceberg.types.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.TableProperties.*; /** * @@ -107,13 +108,14 @@ private Set getRowIdentifierFieldIds() { return identifierFieldIds; } - public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) { + public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier, String writeFormat) { Schema schema = new Schema(this.tableColumns, getRowIdentifierFieldIds()); if (this.hasSchema()) { Catalog.TableBuilder tb = icebergCatalog.buildTable(tableIdentifier, schema) - .withProperty("format-version", "2") + .withProperty(FORMAT_VERSION, "2") + .withProperty(DEFAULT_FILE_FORMAT, writeFormat.toLowerCase(Locale.ENGLISH)) .withSortOrder(getSortOrder(schema)); LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema, diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 819ad266..e3d7f4f7 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -47,6 +47,7 @@ import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.TableProperties.*; /** * Implementation of the consumer that delivers the messages to iceberg tables. @@ -81,6 +82,9 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu String batchSizeWaitName; @ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false") boolean eventSchemaEnabled; + @ConfigProperty(name = "debezium.sink.iceberg." + DEFAULT_FILE_FORMAT, defaultValue = DEFAULT_FILE_FORMAT_DEFAULT) + String writeFormat; + @Inject @Any Instance batchSizeWaitInstances; @@ -199,7 +203,7 @@ private Table createIcebergTable(TableIdentifier tableIdentifier, ? new DebeziumToIcebergTable(getBytes(event.value())) : new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key())); - return eventSchema.create(icebergCatalog, tableIdentifier); + return eventSchema.create(icebergCatalog, tableIdentifier, writeFormat); } private Optional loadIcebergTable(TableIdentifier tableId) { @@ -207,7 +211,7 @@ private Optional
loadIcebergTable(TableIdentifier tableId) { Table table = icebergCatalog.loadTable(tableId); return Optional.of(table); } catch (NoSuchTableException e) { - LOGGER.warn("table not found: {}", tableId.toString()); + LOGGER.warn("Table not found: {}", tableId.toString()); return Optional.empty(); } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java index 3016ae74..53e1a183 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java @@ -13,27 +13,29 @@ import io.debezium.serde.DebeziumSerdes; import io.debezium.server.iceberg.IcebergUtil; -import java.io.Closeable; import java.io.IOException; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; -import java.util.List; +import java.util.Locale; import java.util.UUID; -import java.util.stream.Collectors; import com.fasterxml.jackson.databind.JsonNode; -import org.apache.iceberg.*; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.parquet.GenericParquetWriter; -import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.parquet.Parquet; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; /** * Wrapper to perform operations in iceberg tables @@ -86,36 +88,26 @@ protected ArrayList toIcebergRecords(Schema schema, ArrayList icebergRecords) { - final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET; + + String formatAsString = icebergTable.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + FileFormat fileFormat = FileFormat.valueOf(formatAsString.toUpperCase(Locale.ROOT)); + + final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + fileFormat.name(); OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName)); + + GenericAppenderFactory apender = new GenericAppenderFactory(icebergTable.schema(), icebergTable.spec()); + DataWriter dw = apender.newDataWriter(icebergTable.encryption().encrypt(out), fileFormat, null); + + icebergRecords.stream().filter(this.filterEvents()).forEach(dw::add); - FileAppender writer; - List newRecords = icebergRecords.stream() - .filter(this.filterEvents()).collect(Collectors.toList()); try { - LOGGER.debug("Writing data to file: {}!", out); - writer = Parquet.write(out) - .createWriterFunc(GenericParquetWriter::buildWriter) - .forTable(icebergTable) - .overwrite() - .build(); - - try (Closeable toClose = writer) { - writer.addAll(newRecords); - } - + dw.close(); } catch (IOException e) { throw new RuntimeException(e); } LOGGER.debug("Creating iceberg DataFile!"); - return DataFiles.builder(icebergTable.spec()) - .withFormat(FileFormat.PARQUET) - .withPath(out.location()) - .withFileSizeInBytes(writer.length()) - .withSplitOffsets(writer.splitOffsets()) - .withMetrics(writer.metrics()) - .build(); + return dw.toDataFile(); } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java index b528d28d..731cf782 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java @@ -11,14 +11,9 @@ import io.debezium.engine.ChangeEvent; import io.debezium.server.iceberg.IcebergUtil; -import java.io.Closeable; import java.io.IOException; import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -27,24 +22,25 @@ import javax.inject.Named; import com.google.common.collect.ImmutableMap; -import com.google.common.primitives.Ints; import org.apache.iceberg.*; +import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.deletes.EqualityDeleteWriter; +import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.relocated.com.google.common.primitives.Ints; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.iceberg.TableProperties.*; @Dependent @Named("IcebergTableOperatorUpsert") public class IcebergTableOperatorUpsert extends AbstractIcebergTableOperator { - private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperatorUpsert.class); static final ImmutableMap cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4); + private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperatorUpsert.class); @ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms") String sourceTsMsColumn; @@ -64,11 +60,21 @@ public void initialize() { } private Optional getDeleteFile(Table icebergTable, ArrayList icebergRecords) { - - final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET; + + String formatAsString = icebergTable.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + FileFormat fileFormat = FileFormat.valueOf(formatAsString.toUpperCase(Locale.ROOT)); + + final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + fileFormat.name(); OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName)); + EncryptedOutputFile eout = icebergTable.encryption().encrypt(out); - EqualityDeleteWriter deleteWriter; + GenericAppenderFactory apender = new GenericAppenderFactory( + icebergTable.schema(), + icebergTable.spec(), + Ints.toArray(icebergTable.schema().identifierFieldIds()), + icebergTable.schema(), + null); + EqualityDeleteWriter edw = apender.newEqDeleteWriter(eout, fileFormat, null); // anything is not an insert. // upsertKeepDeletes = false, which means delete deletes @@ -84,36 +90,16 @@ private Optional getDeleteFile(Table icebergTable, ArrayList return Optional.empty(); } - try { - LOGGER.debug("Writing data to equality delete file: {}!", out); - deleteWriter = Parquet.writeDeletes(out) - .createWriterFunc(GenericParquetWriter::buildWriter) - .overwrite() - .forTable(icebergTable) - .equalityFieldIds(List.copyOf(icebergTable.schema().identifierFieldIds())) - .withSortOrder(icebergTable.sortOrder()) - .buildEqualityWriter(); - - try (Closeable toClose = deleteWriter) { - deleteWriter.deleteAll(deleteRows); - } + edw.deleteAll(deleteRows); + try { + edw.close(); } catch (IOException e) { throw new RuntimeException(e); } LOGGER.debug("Creating iceberg equality delete file!"); - // Equality delete files identify deleted rows in a collection of data files by one or more column values, - // and may optionally contain additional columns of the deleted row. - return Optional.of(FileMetadata.deleteFileBuilder(icebergTable.spec()) - .ofEqualityDeletes(Ints.toArray(icebergTable.schema().identifierFieldIds())) - .withFormat(FileFormat.PARQUET) - .withPath(out.location()) - .withFileSizeInBytes(deleteWriter.length()) - .withFileSizeInBytes(deleteWriter.length()) - .withRecordCount(deleteRows.size()) - .withSortOrder(icebergTable.sortOrder()) - .build()); + return Optional.of(edw.toDeleteFile()); } private ArrayList toDeduppedIcebergRecords(Schema schema, ArrayList> events) { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java index f85d977b..8592b817 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java @@ -21,6 +21,7 @@ public Map getConfigOverrides() { Map config = new HashMap<>(); config.put("debezium.sink.type", "iceberg"); + config.put("debezium.sink.iceberg.write.format.default", "orc"); return config; }