From 9da79765bb6d42075251245a79b966af0124f3dc Mon Sep 17 00:00:00 2001 From: Rafael Acevedo Date: Mon, 25 Oct 2021 11:09:12 -0300 Subject: [PATCH] refactor(IcebergTableOperatorUpsert): use `DeleteWriteBuilder.forTable` instead of setting table params manually We were setting the parameters "manually" when there is a `forTable` helper in the iceberg API. Also, this refactors the `getDeleteFile` to return a `Optional`, providing more clarity that there can be no delete file for a given batch --- .../IcebergTableOperatorUpsert.java | 32 +++++++------------ 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java index c488deb4..b528d28d 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java @@ -16,6 +16,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -26,7 +27,6 @@ import javax.inject.Named; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Lists; import com.google.common.primitives.Ints; import org.apache.iceberg.*; import org.apache.iceberg.data.GenericRecord; @@ -63,11 +63,10 @@ public void initialize() { icebergTableAppend.initialize(); } - private DeleteFile getDeleteFile(Table icebergTable, ArrayList icebergRecords) { + private Optional getDeleteFile(Table icebergTable, ArrayList icebergRecords) { final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET; OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName)); - Set equalityDeleteFieldIds = icebergTable.schema().identifierFieldIds(); EqualityDeleteWriter deleteWriter; @@ -82,23 +81,18 @@ private DeleteFile getDeleteFile(Table icebergTable, ArrayList icebergRe ).collect(Collectors.toList()); if (deleteRows.size() == 0) { - return null; + return Optional.empty(); } try { LOGGER.debug("Writing data to equality delete file: {}!", out); - deleteWriter = Parquet.writeDeletes(out) .createWriterFunc(GenericParquetWriter::buildWriter) .overwrite() - .rowSchema(icebergTable.sortOrder().schema()) - .withSpec(icebergTable.spec()) - .equalityFieldIds(Lists.newArrayList(icebergTable.schema().identifierFieldIds())) - .metricsConfig(MetricsConfig.fromProperties(icebergTable.properties())) + .forTable(icebergTable) + .equalityFieldIds(List.copyOf(icebergTable.schema().identifierFieldIds())) .withSortOrder(icebergTable.sortOrder()) - .setAll(icebergTable.properties()) - .buildEqualityWriter() - ; + .buildEqualityWriter(); try (Closeable toClose = deleteWriter) { deleteWriter.deleteAll(deleteRows); @@ -111,7 +105,7 @@ private DeleteFile getDeleteFile(Table icebergTable, ArrayList icebergRe LOGGER.debug("Creating iceberg equality delete file!"); // Equality delete files identify deleted rows in a collection of data files by one or more column values, // and may optionally contain additional columns of the deleted row. - return FileMetadata.deleteFileBuilder(icebergTable.spec()) + return Optional.of(FileMetadata.deleteFileBuilder(icebergTable.spec()) .ofEqualityDeletes(Ints.toArray(icebergTable.schema().identifierFieldIds())) .withFormat(FileFormat.PARQUET) .withPath(out.location()) @@ -119,7 +113,7 @@ private DeleteFile getDeleteFile(Table icebergTable, ArrayList icebergRe .withFileSizeInBytes(deleteWriter.length()) .withRecordCount(deleteRows.size()) .withSortOrder(icebergTable.sortOrder()) - .build(); + .build()); } private ArrayList toDeduppedIcebergRecords(Schema schema, ArrayList> events) { @@ -171,16 +165,12 @@ public void addToTable(Table icebergTable, ArrayList // DO UPSERT >>> DELETE + INSERT ArrayList icebergRecords = toDeduppedIcebergRecords(icebergTable.schema(), events); DataFile dataFile = getDataFile(icebergTable, icebergRecords); - DeleteFile deleteDataFile = getDeleteFile(icebergTable, icebergRecords); - LOGGER.debug("Committing new file as Upsert (has deletes:{}) '{}' !", deleteDataFile != null, dataFile.path()); + Optional deleteDataFile = getDeleteFile(icebergTable, icebergRecords); + LOGGER.debug("Committing new file as Upsert (has deletes:{}) '{}' !", deleteDataFile.isPresent(), dataFile.path()); RowDelta c = icebergTable .newRowDelta() .addRows(dataFile); - - if (deleteDataFile != null) { - c.addDeletes(deleteDataFile) - .validateDeletedFiles(); - } + deleteDataFile.ifPresent(deleteFile -> c.addDeletes(deleteFile).validateDeletedFiles()); c.commit(); LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location());