From 0558cbb2a9664d3e51fced0d6e5d28bcbd8fc3c6 Mon Sep 17 00:00:00 2001 From: ismail simsek Date: Fri, 31 Dec 2021 09:58:58 +0100 Subject: [PATCH] Support writing partitioned tables (#71) Support writing partitioned tables #71 --- .../server/iceberg/DebeziumMetrics.java | 1 - .../server/iceberg/IcebergChangeConsumer.java | 11 +- .../server/iceberg/IcebergChangeEvent.java | 64 ++++--- .../iceberg/IcebergEventsChangeConsumer.java | 16 +- .../debezium/server/iceberg/IcebergUtil.java | 33 +++- .../batchsizewait/DynamicBatchSizeWait.java | 9 +- .../batchsizewait/InterfaceBatchSizeWait.java | 2 +- .../AbstractIcebergTableOperator.java | 93 ---------- .../tableoperator/BaseDeltaTaskWriter.java | 75 ++++++++ .../tableoperator/IcebergTableOperator.java | 120 +++++++++++++ .../IcebergTableOperatorAppend.java | 48 ----- .../IcebergTableOperatorUpsert.java | 166 ------------------ .../IcebergTableWriterFactory.java | 65 +++++++ .../InterfaceIcebergTableOperator.java | 26 --- .../PartitionedAppendWriter.java | 32 ++++ .../tableoperator/PartitionedDeltaWriter.java | 67 +++++++ .../UnpartitionedDeltaWriter.java | 40 +++++ .../debezium/server/iceberg/ConfigSource.java | 14 +- .../IcebergChangeConsumerMysqlTest.java | 12 +- .../iceberg/IcebergChangeConsumerTest.java | 158 +++++++++-------- ...ChangeConsumerUpsertDeleteDeletesTest.java | 50 +++--- .../IcebergChangeConsumerUpsertTest.java | 74 ++++---- .../IcebergEventsChangeConsumerTest.java | 2 +- .../server/iceberg/TestIcebergUtil.java | 60 ++++--- .../batchsizewait/MaxBatchSizeWaitTest.java | 4 +- .../iceberg/testresources/BaseSparkTest.java | 81 +++++---- .../testresources/TestChangeEvent.java | 4 +- 27 files changed, 720 insertions(+), 607 deletions(-) delete mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java create mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java create mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java delete mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java delete mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java create mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java delete mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java create mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedAppendWriter.java create mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedDeltaWriter.java create mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/UnpartitionedDeltaWriter.java diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java index 2a2cdbe7..561b5506 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java @@ -22,7 +22,6 @@ import org.slf4j.LoggerFactory; /** - * * @author Ismail Simsek */ @Dependent diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 1fea1c0b..0b1252f6 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -15,7 +15,7 @@ import io.debezium.serde.DebeziumSerdes; import io.debezium.server.BaseChangeConsumer; import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait; -import io.debezium.server.iceberg.tableoperator.InterfaceIcebergTableOperator; +import io.debezium.server.iceberg.tableoperator.IcebergTableOperator; import io.debezium.util.Clock; import io.debezium.util.Strings; import io.debezium.util.Threads; @@ -104,9 +104,7 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu InterfaceBatchSizeWait batchSizeWait; Catalog icebergCatalog; @Inject - @Any - Instance icebergTableOperatorInstances; - InterfaceIcebergTableOperator icebergTableOperator; + IcebergTableOperator icebergTableOperator; @PostConstruct void connect() { @@ -127,9 +125,6 @@ void connect() { batchSizeWait = IcebergUtil.selectInstance(batchSizeWaitInstances, batchSizeWaitName); batchSizeWait.initizalize(); - final String icebergTableOperatorName = upsert ? "IcebergTableOperatorUpsert" : "IcebergTableOperatorAppend"; - icebergTableOperator = IcebergUtil.selectInstance(icebergTableOperatorInstances, icebergTableOperatorName); - icebergTableOperator.initialize(); // configure and set valSerde.configure(Collections.emptyMap(), false); valDeserializer = valSerde.deserializer(); @@ -169,7 +164,7 @@ public void handleBatch(List> records, DebeziumEngin "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); } return IcebergUtil.createIcebergTable(icebergCatalog, tableIdentifier, - event.getValue().get(0).getSchema(), writeFormat); + event.getValue().get(0).getSchema(), writeFormat, !upsert); }); //addToTable(icebergTable, event.getValue()); icebergTableOperator.addToTable(icebergTable, event.getValue()); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java index 27cfb0e6..e9f77c8b 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeEvent.java @@ -10,6 +10,9 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.*; import com.fasterxml.jackson.databind.JsonNode; @@ -53,8 +56,17 @@ public String destinationTable() { return destination.replace(".", "_"); } - public GenericRecord getIcebergRecord(Schema schema) { - return getIcebergRecord(schema.asStruct(), value); + public GenericRecord asIcebergRecord(Schema schema) { + final GenericRecord record = asIcebergRecord(schema.asStruct(), value); + + if (value != null && value.has("__source_ts_ms") && value.get("__source_ts_ms") != null) { + final long source_ts_ms = value.get("__source_ts_ms").longValue(); + final OffsetDateTime odt = OffsetDateTime.ofInstant(Instant.ofEpochMilli(source_ts_ms), ZoneOffset.UTC); + record.setField("__source_ts", odt); + } else { + record.setField("__source_ts", null); + } + return record; } public String schemaHashCode() { @@ -67,13 +79,13 @@ public Schema getSchema() { throw new RuntimeException("Failed to get event schema, event value is null, destination:" + this.destination); } - final List tableColumns = getValueFields(); + final List tableColumns = valueSchemaFields(); if (tableColumns.isEmpty()) { throw new RuntimeException("Failed to get schema destination:" + this.destination); } - final List keyColumns = getKeyFields(); + final List keyColumns = KeySchemaFields(); Set identifierFieldIds = new HashSet<>(); for (Types.NestedField ic : keyColumns) { @@ -100,43 +112,43 @@ public Schema getSchema() { return new Schema(tableColumns, identifierFieldIds); } - private GenericRecord getIcebergRecord(Types.StructType tableFields, JsonNode data) { - Map mappedResult = new HashMap<>(); + private GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode data) { LOGGER.debug("Processing nested field:{}", tableFields); + GenericRecord record = GenericRecord.create(tableFields); for (Types.NestedField field : tableFields.fields()) { // Set value to null if json event don't have the field if (data == null || !data.has(field.name()) || data.get(field.name()) == null) { - mappedResult.put(field.name(), null); + record.setField(field.name(), null); continue; } // get the value of the field from json event, map it to iceberg value - mappedResult.put(field.name(), jsonToGenericRecordVal(field, data.get(field.name()))); + record.setField(field.name(), jsonValToIcebergVal(field, data.get(field.name()))); } - return GenericRecord.create(tableFields).copy(mappedResult); + return record; } //getIcebergFieldsFromEventSchema - private List getKeyFields() { + private List KeySchemaFields() { if (keySchema != null && keySchema.has("fields") && keySchema.get("fields").isArray()) { LOGGER.debug(keySchema.toString()); - return getIcebergSchema(keySchema, "", 0); + return icebergSchema(keySchema, "", 0); } LOGGER.trace("Key schema not found!"); return new ArrayList<>(); } - private List getValueFields() { + private List valueSchemaFields() { if (valueSchema != null && valueSchema.has("fields") && valueSchema.get("fields").isArray()) { LOGGER.debug(valueSchema.toString()); - return getIcebergSchema(valueSchema, "", 0); + return icebergSchema(valueSchema, "", 0, true); } LOGGER.trace("Event schema not found!"); return new ArrayList<>(); } - private Type.PrimitiveType getIcebergFieldType(String fieldType) { + private Type.PrimitiveType icebergFieldType(String fieldType) { switch (fieldType) { case "int8": case "int16": @@ -163,7 +175,12 @@ private Type.PrimitiveType getIcebergFieldType(String fieldType) { } } - private List getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) { + private List icebergSchema(JsonNode eventSchema, String schemaName, int columnId) { + return icebergSchema(eventSchema, schemaName, columnId, false); + } + + private List icebergSchema(JsonNode eventSchema, String schemaName, int columnId, + boolean addSourceTsField) { List schemaColumns = new ArrayList<>(); String schemaType = eventSchema.get("type").textValue(); LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType); @@ -180,7 +197,7 @@ private List getIcebergSchema(JsonNode eventSchema, String sc if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) { throw new RuntimeException("Complex Array types are not supported array[" + listItemType + "], field " + fieldName); } - Type.PrimitiveType item = getIcebergFieldType(listItemType); + Type.PrimitiveType item = icebergFieldType(listItemType); schemaColumns.add(Types.NestedField.optional( columnId, fieldName, Types.ListType.ofOptional(++columnId, item))); //throw new RuntimeException("'" + fieldName + "' has Array type, Array type not supported!"); @@ -194,20 +211,25 @@ private List getIcebergSchema(JsonNode eventSchema, String sc //break; case "struct": // create it as struct, nested type - List subSchema = getIcebergSchema(jsonSchemaFieldNode, fieldName, columnId); + List subSchema = icebergSchema(jsonSchemaFieldNode, fieldName, columnId); schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema))); columnId += subSchema.size(); break; default: //primitive types - schemaColumns.add(Types.NestedField.optional(columnId, fieldName, getIcebergFieldType(fieldType))); + schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldType))); break; } } + + if (addSourceTsField) { + columnId++; + schemaColumns.add(Types.NestedField.optional(columnId, "__source_ts", Types.TimestampType.withZone())); + } return schemaColumns; } - private Object jsonToGenericRecordVal(Types.NestedField field, - JsonNode node) { + private Object jsonValToIcebergVal(Types.NestedField field, + JsonNode node) { LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type()); final Object val; switch (field.type().typeId()) { @@ -247,7 +269,7 @@ private Object jsonToGenericRecordVal(Types.NestedField field, case STRUCT: // create it as struct, nested type // recursive call to get nested data/record - val = getIcebergRecord(field.type().asStructType(), node); + val = asIcebergRecord(field.type().asStructType(), node); break; default: // default to String type diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java index 34109b49..f4a40c7d 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java @@ -78,17 +78,16 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D static final SortOrder TABLE_SORT_ORDER = SortOrder.builderFor(TABLE_SCHEMA) .asc("event_sink_epoch_ms", NullOrder.NULLS_LAST) .build(); - - @ConfigProperty(name = "debezium.sink.iceberg." + CatalogProperties.WAREHOUSE_LOCATION) - String warehouseLocation; - private static final Logger LOGGER = LoggerFactory.getLogger(IcebergEventsChangeConsumer.class); private static final String PROP_PREFIX = "debezium.sink.iceberg."; + final Configuration hadoopConf = new Configuration(); + final Map icebergProperties = new ConcurrentHashMap<>(); + @ConfigProperty(name = "debezium.sink.iceberg." + CatalogProperties.WAREHOUSE_LOCATION) + String warehouseLocation; @ConfigProperty(name = "debezium.format.value", defaultValue = "json") String valueFormat; @ConfigProperty(name = "debezium.format.key", defaultValue = "json") String keyFormat; - final Configuration hadoopConf = new Configuration(); @ConfigProperty(name = "debezium.sink.iceberg.fs.defaultFS") String defaultFs; @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") @@ -97,13 +96,10 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D String catalogName; @ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait") String batchSizeWaitName; - @Inject @Any Instance batchSizeWaitInstances; InterfaceBatchSizeWait batchSizeWait; - - final Map icebergProperties = new ConcurrentHashMap<>(); Catalog icebergCatalog; Table eventTable; @@ -112,11 +108,11 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D void connect() { if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) { throw new DebeziumException("debezium.format.value={" + valueFormat + "} not supported, " + - "Supported (debezium.format.value=*) formats are {json,}!"); + "Supported (debezium.format.value=*) formats are {json,}!"); } if (!keyFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) { throw new DebeziumException("debezium.format.key={" + valueFormat + "} not supported, " + - "Supported (debezium.format.key=*) formats are {json,}!"); + "Supported (debezium.format.key=*) formats are {json,}!"); } TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), "debezium_events"); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index 0177679f..7799239a 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -18,17 +18,16 @@ import javax.enterprise.inject.literal.NamedLiteral; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.iceberg.Schema; -import org.apache.iceberg.SortOrder; -import org.apache.iceberg.Table; +import org.apache.iceberg.*; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.relocated.com.google.common.primitives.Ints; import org.eclipse.microprofile.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; -import static org.apache.iceberg.TableProperties.FORMAT_VERSION; +import static org.apache.iceberg.TableProperties.*; /** * @author Ismail Simsek @@ -64,15 +63,23 @@ public static T selectInstance(Instance instances, String name) { } public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier tableIdentifier, - Schema schema, String writeFormat) { + Schema schema, String writeFormat, boolean partition) { LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema, schema.identifierFieldNames()); + PartitionSpec ps; + if (partition) { + ps = PartitionSpec.builderFor(schema).day("__source_ts").build(); + } else { + ps = PartitionSpec.builderFor(schema).build(); + } + return icebergCatalog.buildTable(tableIdentifier, schema) .withProperty(FORMAT_VERSION, "2") .withProperty(DEFAULT_FILE_FORMAT, writeFormat.toLowerCase(Locale.ENGLISH)) .withSortOrder(IcebergUtil.getIdentifierFieldsAsSortOrder(schema)) + .withPartitionSpec(ps) .create(); } @@ -95,4 +102,18 @@ public static Optional loadIcebergTable(Catalog icebergCatalog, TableIden } } + public static FileFormat getTableFileFormat(Table icebergTable) { + String formatAsString = icebergTable.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + return FileFormat.valueOf(formatAsString.toUpperCase(Locale.ROOT)); + } + + public static GenericAppenderFactory getTableAppender(Table icebergTable) { + return new GenericAppenderFactory( + icebergTable.schema(), + icebergTable.spec(), + Ints.toArray(icebergTable.schema().identifierFieldIds()), + icebergTable.schema(), + null); + } + } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java index 65637ba1..ce8969c9 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java @@ -28,16 +28,13 @@ @Deprecated public class DynamicBatchSizeWait implements InterfaceBatchSizeWait { protected static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatchSizeWait.class); - + final LinkedList batchSizeHistory = new LinkedList<>(); + final LinkedList sleepMsHistory = new LinkedList<>(); @ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = DEFAULT_MAX_BATCH_SIZE + "") Integer maxBatchSize; - @ConfigProperty(name = "debezium.sink.batch.batch-size-wait.max-wait-ms", defaultValue = "300000") Integer maxWaitMs; - final LinkedList batchSizeHistory = new LinkedList<>(); - final LinkedList sleepMsHistory = new LinkedList<>(); - public DynamicBatchSizeWait() { batchSizeHistory.add(1); batchSizeHistory.add(1); @@ -82,7 +79,7 @@ else if ((getAverage(batchSizeHistory) / maxBatchSize) >= 0.90) { sleepMsHistory.removeFirst(); LOGGER.debug("Calculating Wait delay\n" + - "max.batch.size={}\npoll.interval.ms={}\nbatchSizeHistory{}\nsleepMsHistory{}\nval{}", + "max.batch.size={}\npoll.interval.ms={}\nbatchSizeHistory{}\nsleepMsHistory{}\nval{}", maxBatchSize, maxWaitMs, batchSizeHistory, sleepMsHistory, sleepMsHistory.getLast()); return sleepMsHistory.getLast(); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java index b04d04e7..60d328f0 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java @@ -18,7 +18,7 @@ public interface InterfaceBatchSizeWait { default void initizalize() { } - default void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException{ + default void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException { } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java deleted file mode 100644 index 69f25d57..00000000 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * - * * Copyright memiiso Authors. - * * - * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - * - */ - -package io.debezium.server.iceberg.tableoperator; - -import io.debezium.server.iceberg.IcebergChangeEvent; - -import java.io.IOException; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Locale; -import java.util.UUID; - -import org.apache.iceberg.DataFile; -import org.apache.iceberg.FileFormat; -import org.apache.iceberg.Schema; -import org.apache.iceberg.Table; -import org.apache.iceberg.data.GenericAppenderFactory; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.io.DataWriter; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.relocated.com.google.common.primitives.Ints; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; -import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; - -/** - * Wrapper to perform operations in iceberg tables - * - * @author Rafael Acevedo - */ -abstract class AbstractIcebergTableOperator implements InterfaceIcebergTableOperator { - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTableOperator.class); - - @Override - public void initialize() { - } - - protected ArrayList toIcebergRecords(Schema schema, List events) { - - ArrayList icebergRecords = new ArrayList<>(); - for (IcebergChangeEvent e : events) { - GenericRecord icebergRecord = e.getIcebergRecord(schema); - icebergRecords.add(icebergRecord); - } - - return icebergRecords; - } - - FileFormat getFileFormat(Table icebergTable) { - String formatAsString = icebergTable.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); - return FileFormat.valueOf(formatAsString.toUpperCase(Locale.ROOT)); - } - - GenericAppenderFactory getAppender(Table icebergTable) { - return new GenericAppenderFactory( - icebergTable.schema(), - icebergTable.spec(), - Ints.toArray(icebergTable.schema().identifierFieldIds()), - icebergTable.schema(), - null); - } - - protected DataFile getDataFile(Table icebergTable, ArrayList icebergRecords) { - - FileFormat fileFormat = getFileFormat(icebergTable); - GenericAppenderFactory appender = getAppender(icebergTable); - final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + fileFormat.name(); - OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName)); - - DataWriter dw = appender.newDataWriter(icebergTable.encryption().encrypt(out), fileFormat, null); - - icebergRecords.stream().filter(this.filterEvents()).forEach(dw::add); - - try { - dw.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - - LOGGER.debug("Creating iceberg DataFile!"); - return dw.toDataFile(); - } - -} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java new file mode 100644 index 00000000..09fe1ec4 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/BaseDeltaTaskWriter.java @@ -0,0 +1,75 @@ +package io.debezium.server.iceberg.tableoperator; + +import java.io.IOException; +import java.util.List; + +import org.apache.iceberg.*; +import org.apache.iceberg.data.InternalRecordWrapper; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.BaseTaskWriter; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.TypeUtil; + +abstract class BaseDeltaTaskWriter extends BaseTaskWriter { + + private final Schema schema; + private final Schema deleteSchema; + private final InternalRecordWrapper wrapper; + private final boolean upsert; + private final boolean upsertKeepDeletes; + + BaseDeltaTaskWriter(PartitionSpec spec, + FileFormat format, + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileIO io, + long targetFileSize, + Schema schema, + List equalityFieldIds, + boolean upsert, + boolean upsertKeepDeletes) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + this.schema = schema; + this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds)); + this.wrapper = new InternalRecordWrapper(schema.asStruct()); + this.upsert = upsert; + this.upsertKeepDeletes = upsertKeepDeletes; + } + + abstract RowDataDeltaWriter route(Record row); + + InternalRecordWrapper wrapper() { + return wrapper; + } + + @Override + public void write(Record row) throws IOException { + RowDataDeltaWriter writer = route(row); + if (upsert && !row.getField("__op").equals("c")) {// anything which not an insert is upsert + writer.delete(row); + //System.out.println("->" + row); + } + // if its deleted row and upsertKeepDeletes = true then add deleted record to target table + // else deleted records are deleted from target table + if ( + upsertKeepDeletes + || !(row.getField("__op").equals("d")))// anything which not an insert is upsert + { + writer.write(row); + } + } + + public class RowDataDeltaWriter extends BaseEqualityDeltaWriter { + RowDataDeltaWriter(PartitionKey partition) { + super(partition, schema, deleteSchema); + } + + @Override + protected StructLike asStructLike(Record data) { + return wrapper.wrap(data); + } + } +} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java new file mode 100644 index 00000000..5222e238 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -0,0 +1,120 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.tableoperator; + +import io.debezium.DebeziumException; +import io.debezium.server.iceberg.IcebergChangeEvent; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import javax.enterprise.context.Dependent; +import javax.inject.Inject; + +import com.google.common.collect.ImmutableMap; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.BaseTaskWriter; +import org.apache.iceberg.io.WriteResult; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Wrapper to perform operations in iceberg tables + * + * @author Rafael Acevedo + */ +@Dependent +public class IcebergTableOperator { + + static final ImmutableMap cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4); + private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperator.class); + @ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms") + String sourceTsMsColumn; + @ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op") + String opColumn; + @Inject + IcebergTableWriterFactory writerFactory; + + @ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true") + boolean upsert; + + private ArrayList deduplicatedBatchRecords(Schema schema, List events) { + ConcurrentHashMap icebergRecordsmap = new ConcurrentHashMap<>(); + + for (IcebergChangeEvent e : events) { + GenericRecord icebergRecord = e.asIcebergRecord(schema); + + // deduplicate over key(PK) + if (icebergRecordsmap.containsKey(e.key())) { + + // replace it if it's new + if (this.compareByTsThenOp(icebergRecordsmap.get(e.key()), icebergRecord) <= 0) { + icebergRecordsmap.put(e.key(), icebergRecord); + } + + } else { + icebergRecordsmap.put(e.key(), icebergRecord); + } + + } + return new ArrayList<>(icebergRecordsmap.values()); + } + + private int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) { + + int result = Long.compare((Long) lhs.getField(sourceTsMsColumn), (Long) rhs.getField(sourceTsMsColumn)); + + if (result == 0) { + // return (x < y) ? -1 : ((x == y) ? 0 : 1); + result = cdcOperations.getOrDefault(lhs.getField(opColumn), -1) + .compareTo( + cdcOperations.getOrDefault(rhs.getField(opColumn), -1) + ); + } + + return result; + } + + public void addToTable(Table icebergTable, List events) { + // Initialize a task writer to write both INSERT and equality DELETE. + BaseTaskWriter writer = writerFactory.create(icebergTable); + try { + if (upsert && !icebergTable.schema().identifierFieldIds().isEmpty()) { + ArrayList icebergRecords = deduplicatedBatchRecords(icebergTable.schema(), events); + for (Record icebergRecord : icebergRecords) { + writer.write(icebergRecord); + } + } else { + for (IcebergChangeEvent e : events) { + writer.write(e.asIcebergRecord(icebergTable.schema())); + } + } + + writer.close(); + WriteResult files = writer.complete(); + RowDelta newRowDelta = icebergTable.newRowDelta(); + Arrays.stream(files.dataFiles()).forEach(newRowDelta::addRows); + Arrays.stream(files.deleteFiles()).forEach(newRowDelta::addDeletes); + newRowDelta.commit(); + + } catch (IOException ex) { + throw new DebeziumException(ex); + } + + LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location()); + } + +} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java deleted file mode 100644 index ccdb57a8..00000000 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorAppend.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * * Copyright memiiso Authors. - * * - * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - * - */ - -package io.debezium.server.iceberg.tableoperator; - -import io.debezium.server.iceberg.IcebergChangeEvent; - -import java.util.ArrayList; -import java.util.List; -import java.util.function.Predicate; -import javax.enterprise.context.Dependent; -import javax.inject.Named; - -import org.apache.iceberg.AppendFiles; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.Table; -import org.apache.iceberg.data.Record; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Dependent -@Named("IcebergTableOperatorAppend") -public class IcebergTableOperatorAppend extends AbstractIcebergTableOperator { - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTableOperator.class); - - @Override - public void addToTable(Table icebergTable, List events) { - - ArrayList icebergRecords = toIcebergRecords(icebergTable.schema(), events); - DataFile dataFile = getDataFile(icebergTable, icebergRecords); - LOGGER.debug("Committing new file as Append '{}' !", dataFile.path()); - AppendFiles c = icebergTable.newAppend() - .appendFile(dataFile); - - c.commit(); - LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location()); - } - - @Override - public Predicate filterEvents() { - return p -> true; - } -} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java deleted file mode 100644 index 4d1ffdb4..00000000 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorUpsert.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * - * * Copyright memiiso Authors. - * * - * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - * - */ - -package io.debezium.server.iceberg.tableoperator; - -import io.debezium.server.iceberg.IcebergChangeEvent; - -import java.io.IOException; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Predicate; -import java.util.stream.Collectors; -import javax.enterprise.context.Dependent; -import javax.inject.Inject; -import javax.inject.Named; - -import com.google.common.collect.ImmutableMap; -import org.apache.iceberg.*; -import org.apache.iceberg.data.GenericAppenderFactory; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.deletes.EqualityDeleteWriter; -import org.apache.iceberg.encryption.EncryptedOutputFile; -import org.apache.iceberg.io.OutputFile; -import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Dependent -@Named("IcebergTableOperatorUpsert") -public class IcebergTableOperatorUpsert extends AbstractIcebergTableOperator { - - static final ImmutableMap cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4); - private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperatorUpsert.class); - @ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms") - String sourceTsMsColumn; - - @ConfigProperty(name = "debezium.sink.iceberg.upsert-keep-deletes", defaultValue = "true") - boolean upsertKeepDeletes; - @ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op") - String opColumn; - - @Inject - IcebergTableOperatorAppend icebergTableAppend; - - - @Override - public void initialize() { - super.initialize(); - icebergTableAppend.initialize(); - } - - private Optional getDeleteFile(Table icebergTable, ArrayList icebergRecords) { - - FileFormat fileFormat = getFileFormat(icebergTable); - GenericAppenderFactory appender = getAppender(icebergTable); - final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + fileFormat.name(); - OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName)); - EncryptedOutputFile eout = icebergTable.encryption().encrypt(out); - - EqualityDeleteWriter edw = appender.newEqDeleteWriter(eout, fileFormat, null); - - // anything is not an insert. - // upsertKeepDeletes = false, which means delete deletes - List deleteRows = icebergRecords.stream() - .filter(r -> - // anything is not an insert. - !r.getField(opColumn).equals("c") - // upsertKeepDeletes = false and its deleted record, which means delete deletes - // || !(upsertKeepDeletes && r.getField(opColumn).equals("d")) - ).collect(Collectors.toList()); - - if (deleteRows.size() == 0) { - return Optional.empty(); - } - - edw.deleteAll(deleteRows); - - try { - edw.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - - LOGGER.debug("Creating iceberg equality delete file!"); - return Optional.of(edw.toDeleteFile()); - } - - private ArrayList toDeduppedIcebergRecords(Schema schema, List events) { - ConcurrentHashMap icebergRecordsmap = new ConcurrentHashMap<>(); - - for (IcebergChangeEvent e : events) { - GenericRecord icebergRecord = e.getIcebergRecord(schema); - - // only replace it if its newer - if (icebergRecordsmap.containsKey(e.key())) { - - if (this.compareByTsThenOp(icebergRecordsmap.get(e.key()), icebergRecord) <= 0) { - icebergRecordsmap.put(e.key(), icebergRecord); - } - - } else { - icebergRecordsmap.put(e.key(), icebergRecord); - } - - } - return new ArrayList<>(icebergRecordsmap.values()); - } - - private int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) { - if (lhs.getField(sourceTsMsColumn).equals(rhs.getField(sourceTsMsColumn))) { - // return (x < y) ? -1 : ((x == y) ? 0 : 1); - return - cdcOperations.getOrDefault(lhs.getField(opColumn), -1) - .compareTo( - cdcOperations.getOrDefault(rhs.getField(opColumn), -1) - ) - ; - } else { - return Long.compare((Long) lhs.getField(sourceTsMsColumn), (Long) rhs.getField(sourceTsMsColumn)); - } - } - - @Override - public void addToTable(Table icebergTable, List events) { - - if (icebergTable.sortOrder().isUnsorted()) { - LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!"); - // call append here! - icebergTableAppend.addToTable(icebergTable, events); - return; - } - - // DO UPSERT >>> DELETE + INSERT - ArrayList icebergRecords = toDeduppedIcebergRecords(icebergTable.schema(), events); - DataFile dataFile = getDataFile(icebergTable, icebergRecords); - Optional deleteDataFile = getDeleteFile(icebergTable, icebergRecords); - LOGGER.debug("Committing new file as Upsert (has deletes:{}) '{}' !", deleteDataFile.isPresent(), dataFile.path()); - RowDelta c = icebergTable - .newRowDelta() - .addRows(dataFile); - deleteDataFile.ifPresent(deleteFile -> c.addDeletes(deleteFile).validateDeletedFiles()); - - c.commit(); - LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location()); - } - - @Override - public Predicate filterEvents() { - return p -> - // if its upsert and upsertKeepDeletes = true - upsertKeepDeletes - // if not then exclude deletes - || !(p.getField(opColumn).equals("d")); - } - -} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java new file mode 100644 index 00000000..96562767 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java @@ -0,0 +1,65 @@ +package io.debezium.server.iceberg.tableoperator; + +import io.debezium.server.iceberg.IcebergUtil; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import javax.enterprise.context.Dependent; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.BaseTaskWriter; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.UnpartitionedWriter; +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Dependent +public class IcebergTableWriterFactory { + protected static final DateTimeFormatter dtFormater = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneOffset.UTC); + private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperator.class); + @ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true") + boolean upsert; + @ConfigProperty(name = "debezium.sink.iceberg.upsert-keep-deletes", defaultValue = "true") + boolean upsertKeepDeletes; + + public BaseTaskWriter create(Table icebergTable) { + + FileFormat format = IcebergUtil.getTableFileFormat(icebergTable); + GenericAppenderFactory appenderFactory = IcebergUtil.getTableAppender(icebergTable); + int partitionId = Integer.parseInt(dtFormater.format(Instant.now())); + OutputFileFactory fileFactory = OutputFileFactory.builderFor(icebergTable, partitionId, 1L) + .defaultSpec(icebergTable.spec()).format(format).build(); + List equalityFieldIds = new ArrayList<>(icebergTable.schema().identifierFieldIds()); + + BaseTaskWriter writer; + if (icebergTable.schema().identifierFieldIds().isEmpty() || !upsert) { + if (upsert) { + LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!"); + } + if (icebergTable.spec().isUnpartitioned()) { + writer = new UnpartitionedWriter<>( + icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), Long.MAX_VALUE); + } else { + writer = new PartitionedAppendWriter( + icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), Long.MAX_VALUE, icebergTable.schema()); + } + } else if (icebergTable.spec().isUnpartitioned()) { + writer = new UnpartitionedDeltaWriter(icebergTable.spec(), format, appenderFactory, fileFactory, + icebergTable.io(), + Long.MAX_VALUE, icebergTable.schema(), equalityFieldIds, true, upsertKeepDeletes); + } else { + writer = new PartitionedDeltaWriter(icebergTable.spec(), format, appenderFactory, fileFactory, + icebergTable.io(), + Long.MAX_VALUE, icebergTable.schema(), equalityFieldIds, true, upsertKeepDeletes); + } + + return writer; + } +} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java deleted file mode 100644 index 68d51eb6..00000000 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * - * * Copyright memiiso Authors. - * * - * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - * - */ - -package io.debezium.server.iceberg.tableoperator; - -import io.debezium.server.iceberg.IcebergChangeEvent; - -import java.util.List; -import java.util.function.Predicate; - -import org.apache.iceberg.Table; -import org.apache.iceberg.data.Record; - -public interface InterfaceIcebergTableOperator { - - void initialize(); - - void addToTable(Table icebergTable, List events); - - Predicate filterEvents(); -} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedAppendWriter.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedAppendWriter.java new file mode 100644 index 00000000..87e02af1 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedAppendWriter.java @@ -0,0 +1,32 @@ +package io.debezium.server.iceberg.tableoperator; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.InternalRecordWrapper; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.io.PartitionedWriter; + +public class PartitionedAppendWriter extends PartitionedWriter { + private final PartitionKey partitionKey; + InternalRecordWrapper wrapper; + + public PartitionedAppendWriter(PartitionSpec spec, FileFormat format, + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, FileIO io, long targetFileSize, + Schema schema) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize); + this.partitionKey = new PartitionKey(spec, schema); + this.wrapper = new InternalRecordWrapper(schema.asStruct()); + } + + @Override + protected PartitionKey partition(Record row) { + partitionKey.partition(wrapper.wrap(row)); + return partitionKey; + } +} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedDeltaWriter.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedDeltaWriter.java new file mode 100644 index 00000000..cab3ecea --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/PartitionedDeltaWriter.java @@ -0,0 +1,67 @@ +package io.debezium.server.iceberg.tableoperator; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionKey; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.util.Tasks; + +class PartitionedDeltaWriter extends BaseDeltaTaskWriter { + + private final PartitionKey partitionKey; + + private final Map writers = Maps.newHashMap(); + + PartitionedDeltaWriter(PartitionSpec spec, + FileFormat format, + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileIO io, + long targetFileSize, + Schema schema, + List equalityFieldIds, + boolean upsert, + boolean upsertKeepDeletes) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, equalityFieldIds, upsert, upsertKeepDeletes); + this.partitionKey = new PartitionKey(spec, schema); + } + + @Override + RowDataDeltaWriter route(Record row) { + partitionKey.partition(wrapper().wrap(row)); + + RowDataDeltaWriter writer = writers.get(partitionKey); + if (writer == null) { + // NOTICE: we need to copy a new partition key here, in case of messing up the keys in writers. + PartitionKey copiedKey = partitionKey.copy(); + writer = new RowDataDeltaWriter(copiedKey); + writers.put(copiedKey, writer); + } + + return writer; + } + + @Override + public void close() { + try { + Tasks.foreach(writers.values()) + .throwFailureWhenFinished() + .noRetry() + .run(RowDataDeltaWriter::close, IOException.class); + + writers.clear(); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close equality delta writer", e); + } + } +} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/UnpartitionedDeltaWriter.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/UnpartitionedDeltaWriter.java new file mode 100644 index 00000000..cdfb89f9 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/UnpartitionedDeltaWriter.java @@ -0,0 +1,40 @@ +package io.debezium.server.iceberg.tableoperator; + +import java.io.IOException; +import java.util.List; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFileFactory; + +class UnpartitionedDeltaWriter extends BaseDeltaTaskWriter { + private final RowDataDeltaWriter writer; + + UnpartitionedDeltaWriter(PartitionSpec spec, + FileFormat format, + FileAppenderFactory appenderFactory, + OutputFileFactory fileFactory, + FileIO io, + long targetFileSize, + Schema schema, + List equalityFieldIds, + boolean upsert, + boolean upsertKeepDeletes) { + super(spec, format, appenderFactory, fileFactory, io, targetFileSize, schema, equalityFieldIds, upsert, upsertKeepDeletes); + this.writer = new RowDataDeltaWriter(null); + } + + @Override + RowDataDeltaWriter route(Record row) { + return writer; + } + + @Override + public void close() throws IOException { + writer.close(); + } +} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java index 6eb10725..5ed093d2 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java @@ -15,13 +15,6 @@ public class ConfigSource extends TestConfigSource { public static final String S3_REGION = "us-east-1"; public static final String S3_BUCKET = "test-bucket"; - @Override - public int getOrdinal() { - // Configuration property precedence is based on ordinal values and since we override the - // properties in TestConfigSource, we should give this a higher priority. - return super.getOrdinal() + 1; - } - public ConfigSource() { config.put("quarkus.profile", "postgresql"); // common sink conf @@ -73,4 +66,11 @@ public ConfigSource() { config.put("quarkus.log.category.\"org.apache.iceberg\".level", "ERROR"); } + + @Override + public int getOrdinal() { + // Configuration property precedence is based on ordinal values and since we override the + // properties in TestConfigSource, we should give this a higher priority. + return super.getOrdinal() + 1; + } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java index 2ffa4617..aac775ae 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerMysqlTest.java @@ -48,14 +48,14 @@ public void testSimpleUpload() throws Exception { // create test table String sqlCreate = "CREATE TABLE IF NOT EXISTS inventory.test_delete_table (" + - " c_id INTEGER ," + - " c_id2 INTEGER ," + - " c_data TEXT," + - " PRIMARY KEY (c_id, c_id2)" + - " );"; + " c_id INTEGER ," + + " c_id2 INTEGER ," + + " c_data TEXT," + + " PRIMARY KEY (c_id, c_id2)" + + " );"; String sqlInsert = "INSERT INTO inventory.test_delete_table (c_id, c_id2, c_data ) " + - "VALUES (1,1,'data'),(1,2,'data'),(1,3,'data'),(1,4,'data') ;"; + "VALUES (1,1,'data'),(1,2,'data'),(1,3,'data'),(1,4,'data') ;"; String sqlDelete = "DELETE FROM inventory.test_delete_table where c_id = 1 ;"; SourceMysqlDB.runSQL(sqlCreate); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index ca99ebf0..a7fce283 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -48,48 +48,48 @@ public class IcebergChangeConsumerTest extends BaseSparkTest { public void testConsumingVariousDataTypes() throws Exception { assertEquals(sinkType, "iceberg"); String sql = "\n" + - " DROP TABLE IF EXISTS inventory.data_types;\n" + - " CREATE TABLE IF NOT EXISTS inventory.data_types (\n" + - " c_id INTEGER ,\n" + - " c_text TEXT,\n" + - " c_varchar VARCHAR,\n" + - " c_int INTEGER,\n" + - " c_date DATE,\n" + - " c_timestamp TIMESTAMP,\n" + - " c_timestamptz TIMESTAMPTZ,\n" + - " c_float FLOAT,\n" + - " c_decimal DECIMAL(18,4),\n" + - " c_numeric NUMERIC(18,4),\n" + - " c_interval INTERVAL,\n" + - " c_boolean BOOLEAN,\n" + - " c_uuid UUID,\n" + - " c_bytea BYTEA,\n" + - " c_json JSON,\n" + - " c_jsonb JSONB\n" + - " );"; + " DROP TABLE IF EXISTS inventory.data_types;\n" + + " CREATE TABLE IF NOT EXISTS inventory.data_types (\n" + + " c_id INTEGER ,\n" + + " c_text TEXT,\n" + + " c_varchar VARCHAR,\n" + + " c_int INTEGER,\n" + + " c_date DATE,\n" + + " c_timestamp TIMESTAMP,\n" + + " c_timestamptz TIMESTAMPTZ,\n" + + " c_float FLOAT,\n" + + " c_decimal DECIMAL(18,4),\n" + + " c_numeric NUMERIC(18,4),\n" + + " c_interval INTERVAL,\n" + + " c_boolean BOOLEAN,\n" + + " c_uuid UUID,\n" + + " c_bytea BYTEA,\n" + + " c_json JSON,\n" + + " c_jsonb JSONB\n" + + " );"; SourcePostgresqlDB.runSQL(sql); sql = "INSERT INTO inventory.data_types (" + - "c_id, " + - "c_text, c_varchar, c_int, c_date, c_timestamp, c_timestamptz, " + - "c_float, c_decimal,c_numeric,c_interval,c_boolean,c_uuid,c_bytea, " + - "c_json, c_jsonb) " + - "VALUES (1, null, null, null,null,null,null," + - "null,null,null,null,null,null,null," + - "null,null)," + - "(2, 'val_text', 'A', 123, current_date , current_timestamp, current_timestamp," + - "'1.23'::float,'1234566.34456'::decimal,'345672123.452'::numeric, interval '1 day',false," + - "'3f207ac6-5dba-11eb-ae93-0242ac130002'::UUID, 'aBC'::bytea," + - "'{\"reading\": 1123}'::json, '{\"reading\": 1123}'::jsonb" + - ")"; + "c_id, " + + "c_text, c_varchar, c_int, c_date, c_timestamp, c_timestamptz, " + + "c_float, c_decimal,c_numeric,c_interval,c_boolean,c_uuid,c_bytea, " + + "c_json, c_jsonb) " + + "VALUES (1, null, null, null,null,null,null," + + "null,null,null,null,null,null,null," + + "null,null)," + + "(2, 'val_text', 'A', 123, current_date , current_timestamp, current_timestamp," + + "'1.23'::float,'1234566.34456'::decimal,'345672123.452'::numeric, interval '1 day',false," + + "'3f207ac6-5dba-11eb-ae93-0242ac130002'::UUID, 'aBC'::bytea," + + "'{\"reading\": 1123}'::json, '{\"reading\": 1123}'::jsonb" + + ")"; SourcePostgresqlDB.runSQL(sql); Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> { try { Dataset df = getTableData("testc.inventory.data_types"); df.show(true); return df.where("c_text is null AND c_varchar is null AND c_int is null " + - "AND c_date is null AND c_timestamp is null AND c_timestamptz is null " + - "AND c_float is null AND c_decimal is null AND c_numeric is null AND c_interval is null " + - "AND c_boolean is null AND c_uuid is null AND c_bytea is null").count() == 1; + "AND c_date is null AND c_timestamp is null AND c_timestamptz is null " + + "AND c_float is null AND c_decimal is null AND c_numeric is null AND c_interval is null " + + "AND c_boolean is null AND c_uuid is null AND c_bytea is null").count() == 1; } catch (Exception e) { return false; } @@ -108,7 +108,7 @@ public void testConsumingArrayDataType() throws Exception { " VALUES " + "('Carol2',\n" + " ARRAY[20000, 25000, 25000, 25000],\n" + - " ARRAY[['breakfast', 'consulting'], ['meeting', 'lunch']]),\n"+ + " ARRAY[['breakfast', 'consulting'], ['meeting', 'lunch']]),\n" + "('Bill',\n" + " '{10000, 10000, 10000, 10000}',\n" + " '{{\"meeting\", \"lunch\"}, {\"training\", \"presentation\"}}'),\n" + @@ -117,7 +117,7 @@ public void testConsumingArrayDataType() throws Exception { " '{{\"breakfast\", \"consulting\"}, {\"meeting\", \"lunch\"}}')" + ";"; SourcePostgresqlDB.runSQL(sql); - + Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> { try { Dataset df = getTableData("testc.inventory.array_data"); @@ -140,7 +140,7 @@ public void testSchemaChanges() throws Exception { SourcePostgresqlDB.runSQL("UPDATE inventory.customers SET first_name='George__UPDATE1' WHERE id = 1002 ;"); SourcePostgresqlDB.runSQL("ALTER TABLE inventory.customers ALTER COLUMN email DROP NOT NULL;"); SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " + - "(default,'SallyUSer2','Thomas',null,'value1',false, '2020-01-01');"); + "(default,'SallyUSer2','Thomas',null,'value1',false, '2020-01-01');"); SourcePostgresqlDB.runSQL("ALTER TABLE inventory.customers ALTER COLUMN last_name DROP NOT NULL;"); SourcePostgresqlDB.runSQL("UPDATE inventory.customers SET last_name = NULL WHERE id = 1002 ;"); SourcePostgresqlDB.runSQL("DELETE FROM inventory.customers WHERE id = 1004 ;"); @@ -151,13 +151,13 @@ public void testSchemaChanges() throws Exception { //ds.show(); return ds.where("__op == 'r'").count() == 4 // snapshot rows. initial table data - && ds.where("__op == 'u'").count() == 3 // 3 update - && ds.where("__op == 'c'").count() == 1 // 1 insert - && ds.where("__op == 'd'").count() == 1 // 1 insert - && ds.where("first_name == 'George__UPDATE1'").count() == 3 - && ds.where("first_name == 'SallyUSer2'").count() == 1 - && ds.where("last_name is null").count() == 1 - && ds.where("id == '1004'").where("__op == 'd'").count() == 1; + && ds.where("__op == 'u'").count() == 3 // 3 update + && ds.where("__op == 'c'").count() == 1 // 1 insert + && ds.where("__op == 'd'").count() == 1 // 1 insert + && ds.where("first_name == 'George__UPDATE1'").count() == 3 + && ds.where("first_name == 'SallyUSer2'").count() == 1 + && ds.where("last_name is null").count() == 1 + && ds.where("id == '1004'").where("__op == 'd'").count() == 1; } catch (Exception e) { return false; } @@ -177,20 +177,20 @@ public void testSchemaChanges() throws Exception { .commit(); // insert row after defining new column in target iceberg table SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " + - "(default,'After-Defining-Iceberg-fields','Thomas',null,'value1',false, '2020-01-01');"); + "(default,'After-Defining-Iceberg-fields','Thomas',null,'value1',false, '2020-01-01');"); // remove column from source SourcePostgresqlDB.runSQL("ALTER TABLE inventory.customers DROP COLUMN email;"); SourcePostgresqlDB.runSQL("INSERT INTO inventory.customers VALUES " + - "(default,'User3','lastname_value3','after-dropping-email-column-from-source',true, '2020-01-01'::DATE);"); + "(default,'User3','lastname_value3','after-dropping-email-column-from-source',true, '2020-01-01'::DATE);"); Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> { try { Dataset ds = getTableData("testc.inventory.customers"); ds.show(); return ds.where("first_name == 'User3'").count() == 1 - && ds.where("first_name == 'After-Defining-Iceberg-fields'").count() == 1 - && ds.where("test_varchar_column == 'after-dropping-email-column-from-source' AND email is null").count() == 1; + && ds.where("first_name == 'After-Defining-Iceberg-fields'").count() == 1 + && ds.where("test_varchar_column == 'after-dropping-email-column-from-source' AND email is null").count() == 1; } catch (Exception e) { return false; } @@ -217,38 +217,38 @@ public void testSchemaChanges() throws Exception { @Disabled public void testDataTypeChanges() throws Exception { String sql = "\n" + - " DROP TABLE IF EXISTS inventory.data_type_changes;\n" + - " CREATE TABLE IF NOT EXISTS inventory.data_type_changes (\n" + - " c_id INTEGER ,\n" + - " c_varchar VARCHAR,\n" + - " c_int2string INTEGER,\n" + - " c_date2string DATE,\n" + - " c_timestamp2string TIMESTAMP,\n" + - " string2int VARCHAR,\n" + - " string2timestamp VARCHAR,\n" + - " string2boolean VARCHAR\n" + - " );"; + " DROP TABLE IF EXISTS inventory.data_type_changes;\n" + + " CREATE TABLE IF NOT EXISTS inventory.data_type_changes (\n" + + " c_id INTEGER ,\n" + + " c_varchar VARCHAR,\n" + + " c_int2string INTEGER,\n" + + " c_date2string DATE,\n" + + " c_timestamp2string TIMESTAMP,\n" + + " string2int VARCHAR,\n" + + " string2timestamp VARCHAR,\n" + + " string2boolean VARCHAR\n" + + " );"; SourcePostgresqlDB.runSQL(sql); sql = "INSERT INTO inventory.data_type_changes " + - " (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " + - " VALUES (1, 'STRING-DATA-1', 123, current_date , current_timestamp, 111, current_timestamp, false)"; + " (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " + + " VALUES (1, 'STRING-DATA-1', 123, current_date , current_timestamp, 111, current_timestamp, false)"; SourcePostgresqlDB.runSQL(sql); sql = "INSERT INTO inventory.data_type_changes " + - " (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " + - " VALUES (2, 'STRING-DATA-2', 222, current_date , current_timestamp, 222, current_timestamp, true)"; + " (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " + + " VALUES (2, 'STRING-DATA-2', 222, current_date , current_timestamp, 222, current_timestamp, true)"; SourcePostgresqlDB.runSQL(sql); SourcePostgresqlDB.runSQL("ALTER TABLE inventory.data_type_changes " + - "ALTER COLUMN c_int2string TYPE VARCHAR(555), " + - "ALTER COLUMN c_date2string TYPE VARCHAR(555), " + - "ALTER COLUMN c_timestamp2string TYPE VARCHAR(555), " + - "ALTER COLUMN string2int TYPE INTEGER USING string2int::integer, " + - "ALTER COLUMN string2timestamp TYPE TIMESTAMP USING string2timestamp::TIMESTAMP, " + - "ALTER COLUMN string2boolean TYPE boolean USING string2boolean::boolean" + "ALTER COLUMN c_int2string TYPE VARCHAR(555), " + + "ALTER COLUMN c_date2string TYPE VARCHAR(555), " + + "ALTER COLUMN c_timestamp2string TYPE VARCHAR(555), " + + "ALTER COLUMN string2int TYPE INTEGER USING string2int::integer, " + + "ALTER COLUMN string2timestamp TYPE TIMESTAMP USING string2timestamp::TIMESTAMP, " + + "ALTER COLUMN string2boolean TYPE boolean USING string2boolean::boolean" ); sql = "INSERT INTO inventory.data_type_changes " + - " (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " + - " VALUES (3, 'STRING-DATA-3', '333', 'current_date-3' , 'current_timestamp-3', 333, current_timestamp, false)"; + " (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " + + " VALUES (3, 'STRING-DATA-3', '333', 'current_date-3' , 'current_timestamp-3', 333, current_timestamp, false)"; SourcePostgresqlDB.runSQL(sql); Awaitility.await().atMost(Duration.ofSeconds(180)).until(() -> { @@ -286,4 +286,20 @@ public void testSimpleUpload() { } }); } + + + @Test + public void testPartitionedTable() { + Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { + try { + Dataset ds = getTableData("testc.inventory.customers"); + ds.show(false); + return ds.count() >= 3; + } catch (Exception e) { + return false; + } + }); + S3Minio.listFiles(); + } + } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java index 9ab5edba..3b1f0671 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertDeleteDeletesTest.java @@ -145,40 +145,40 @@ public void testSimpleUpsertCompositeKey() throws Exception { private TestChangeEvent getCustomerRecord(Integer id, String operation, String name, Long epoch) { String key = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false," + "\"field\":\"id\"}]," + - "\"optional\":false,\"name\":\"testc.inventory.customers.Key\"}," + - "\"payload\":{\"id\":" + id + "}}"; + "\"optional\":false,\"name\":\"testc.inventory.customers.Key\"}," + + "\"payload\":{\"id\":" + id + "}}"; String val = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"}," + - "{\"type\":\"string\",\"optional\":false,\"field\":\"first_name\"},{\"type\":\"string\",\"optional\":false,\"field\":\"last_name\"}," + - "{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__op\"}," + - "{\"type\":\"string\",\"optional\":true,\"field\":\"__table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"__lsn\"}," + - "{\"type\":\"int64\",\"optional\":true,\"field\":\"__source_ts_ms\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__deleted\"}]," + - "\"optional\":false,\"name\":\"testc.inventory.customers.Value\"}," + - "\"payload\":{\"id\":" + id + ",\"first_name\":\"" + name + "\",\"last_name\":\"Walker\",\"email\":\"ed@walker" + - ".com\"," + - "\"__op\":\"" + operation + "\",\"__table\":\"customers\",\"__lsn\":33832960,\"__source_ts_ms\":" + epoch + "," + - "\"__deleted\":\"" + operation.equals("d") + "\"}} "; + "{\"type\":\"string\",\"optional\":false,\"field\":\"first_name\"},{\"type\":\"string\",\"optional\":false,\"field\":\"last_name\"}," + + "{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__op\"}," + + "{\"type\":\"string\",\"optional\":true,\"field\":\"__table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"__lsn\"}," + + "{\"type\":\"int64\",\"optional\":true,\"field\":\"__source_ts_ms\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__deleted\"}]," + + "\"optional\":false,\"name\":\"testc.inventory.customers.Value\"}," + + "\"payload\":{\"id\":" + id + ",\"first_name\":\"" + name + "\",\"last_name\":\"Walker\",\"email\":\"ed@walker" + + ".com\"," + + "\"__op\":\"" + operation + "\",\"__table\":\"customers\",\"__lsn\":33832960,\"__source_ts_ms\":" + epoch + "," + + "\"__deleted\":\"" + operation.equals("d") + "\"}} "; return new TestChangeEvent<>(key, val, "testc.inventory.customers_upsert"); } private TestChangeEvent getCustomerRecordCompositeKey(Integer id, String operation, String name, Long epoch) { String key = "{\"schema\":{\"type\":\"struct\",\"fields\":[" + - "{\"type\":\"int32\",\"optional\":false," + "\"field\":\"id\"}," + - "{\"type\":\"string\",\"optional\":false," + "\"field\":\"first_name\"}" + - "]," + - "\"optional\":false,\"name\":\"testc.inventory.customers.Key\"}," + - "\"payload\":{\"id\":" + id + ",\"first_name\":\"" + name + "\"}}"; + "{\"type\":\"int32\",\"optional\":false," + "\"field\":\"id\"}," + + "{\"type\":\"string\",\"optional\":false," + "\"field\":\"first_name\"}" + + "]," + + "\"optional\":false,\"name\":\"testc.inventory.customers.Key\"}," + + "\"payload\":{\"id\":" + id + ",\"first_name\":\"" + name + "\"}}"; String val = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"}," + - "{\"type\":\"string\",\"optional\":false,\"field\":\"first_name\"},{\"type\":\"string\",\"optional\":false,\"field\":\"last_name\"}," + - "{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__op\"}," + - "{\"type\":\"string\",\"optional\":true,\"field\":\"__table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"__lsn\"}," + - "{\"type\":\"int64\",\"optional\":true,\"field\":\"__source_ts_ms\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__deleted\"}]," + - "\"optional\":false,\"name\":\"testc.inventory.customers.Value\"}," + - "\"payload\":{\"id\":" + id + ",\"first_name\":\"" + name + "\",\"last_name\":\"Walker\",\"email\":\"ed@walker" + - ".com\"," + - "\"__op\":\"" + operation + "\",\"__table\":\"customers\",\"__lsn\":33832960,\"__source_ts_ms\":" + epoch + "," + - "\"__deleted\":\"" + operation.equals("d") + "\"}} "; + "{\"type\":\"string\",\"optional\":false,\"field\":\"first_name\"},{\"type\":\"string\",\"optional\":false,\"field\":\"last_name\"}," + + "{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__op\"}," + + "{\"type\":\"string\",\"optional\":true,\"field\":\"__table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"__lsn\"}," + + "{\"type\":\"int64\",\"optional\":true,\"field\":\"__source_ts_ms\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__deleted\"}]," + + "\"optional\":false,\"name\":\"testc.inventory.customers.Value\"}," + + "\"payload\":{\"id\":" + id + ",\"first_name\":\"" + name + "\",\"last_name\":\"Walker\",\"email\":\"ed@walker" + + ".com\"," + + "\"__op\":\"" + operation + "\",\"__table\":\"customers\",\"__lsn\":33832960,\"__source_ts_ms\":" + epoch + "," + + "\"__deleted\":\"" + operation.equals("d") + "\"}} "; return new TestChangeEvent<>(key, val, "testc.inventory.customers_upsert_compositekey"); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java index f79269fd..6e49dee6 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTest.java @@ -62,7 +62,7 @@ public void testSimpleUpsert() throws Exception { ds = getTableData("testc.inventory.customers_upsert"); ds.show(); - Assertions.assertEquals(ds.count(), 4); + Assertions.assertEquals(4, ds.count()); Assertions.assertEquals(ds.where("id = 1 AND __op= 'r'").count(), 1); Assertions.assertEquals(ds.where("id = 2 AND __op= 'd'").count(), 1); Assertions.assertEquals(ds.where("id = 3 AND __op= 'u'").count(), 1); @@ -84,7 +84,7 @@ public void testSimpleUpsert() throws Exception { records.add(getCustomerRecord(6, "u", "Updatedname-6-V1", 11L)); consumer.handleBatch(records, TestUtil.getCommitter()); ds = getTableData("testc.inventory.customers_upsert"); - ds.show(); + ds.sort("id").show(false); Assertions.assertEquals(ds.count(), 6); Assertions.assertEquals(ds.where("id = 3 AND __op= 'u' AND first_name= 'UpdatednameV4'").count(), 1); Assertions.assertEquals(ds.where("id = 4 AND __op= 'r' AND first_name= 'Updatedname-4-V3'").count(), 1); @@ -114,7 +114,7 @@ public void testSimpleUpsert() throws Exception { ds = getTableData("testc.inventory.customers_upsert"); ds.show(); Assertions.assertEquals(ds.where("id = 7 AND __op= 'u' AND first_name= 'Updatedname-7-V1'").count(), 1); - + S3Minio.listFiles(); } @Test @@ -167,40 +167,40 @@ public void testSimpleUpsertNoKey() throws Exception { private TestChangeEvent getCustomerRecord(Integer id, String operation, String name, Long epoch) { String key = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false," + "\"field\":\"id\"}]," + - "\"optional\":false,\"name\":\"testc.inventory.customers.Key\"}," + - "\"payload\":{\"id\":" + id + "}}"; + "\"optional\":false,\"name\":\"testc.inventory.customers.Key\"}," + + "\"payload\":{\"id\":" + id + "}}"; String val = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"}," + - "{\"type\":\"string\",\"optional\":false,\"field\":\"first_name\"},{\"type\":\"string\",\"optional\":false,\"field\":\"last_name\"}," + - "{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__op\"}," + - "{\"type\":\"string\",\"optional\":true,\"field\":\"__table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"__lsn\"}," + - "{\"type\":\"int64\",\"optional\":true,\"field\":\"__source_ts_ms\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__deleted\"}]," + - "\"optional\":false,\"name\":\"testc.inventory.customers.Value\"}," + - "\"payload\":{\"id\":" + id + ",\"first_name\":\"" + name + "\",\"last_name\":\"Walker\",\"email\":\"ed@walker" + - ".com\"," + - "\"__op\":\"" + operation + "\",\"__table\":\"customers\",\"__lsn\":33832960,\"__source_ts_ms\":" + epoch + "," + - "\"__deleted\":\"" + operation.equals("d") + "\"}} "; + "{\"type\":\"string\",\"optional\":false,\"field\":\"first_name\"},{\"type\":\"string\",\"optional\":false,\"field\":\"last_name\"}," + + "{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__op\"}," + + "{\"type\":\"string\",\"optional\":true,\"field\":\"__table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"__lsn\"}," + + "{\"type\":\"int64\",\"optional\":true,\"field\":\"__source_ts_ms\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__deleted\"}]," + + "\"optional\":false,\"name\":\"testc.inventory.customers.Value\"}," + + "\"payload\":{\"id\":" + id + ",\"first_name\":\"" + name + "\",\"last_name\":\"Walker\",\"email\":\"ed@walker" + + ".com\"," + + "\"__op\":\"" + operation + "\",\"__table\":\"customers\",\"__lsn\":33832960,\"__source_ts_ms\":" + epoch + "," + + "\"__deleted\":\"" + operation.equals("d") + "\"}} "; return new TestChangeEvent<>(key, val, "testc.inventory.customers_upsert"); } private TestChangeEvent getCustomerRecordCompositeKey(Integer id, String operation, String name, Long epoch) { String key = "{\"schema\":{\"type\":\"struct\",\"fields\":[" + - "{\"type\":\"int32\",\"optional\":false," + "\"field\":\"id\"}," + - "{\"type\":\"string\",\"optional\":false," + "\"field\":\"first_name\"}" + - "]," + - "\"optional\":false,\"name\":\"testc.inventory.customers.Key\"}," + - "\"payload\":{\"id\":" + id + ",\"first_name\":\"" + name + "\"}}"; + "{\"type\":\"int32\",\"optional\":false," + "\"field\":\"id\"}," + + "{\"type\":\"string\",\"optional\":false," + "\"field\":\"first_name\"}" + + "]," + + "\"optional\":false,\"name\":\"testc.inventory.customers.Key\"}," + + "\"payload\":{\"id\":" + id + ",\"first_name\":\"" + name + "\"}}"; String val = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"}," + - "{\"type\":\"string\",\"optional\":false,\"field\":\"first_name\"},{\"type\":\"string\",\"optional\":false,\"field\":\"last_name\"}," + - "{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__op\"}," + - "{\"type\":\"string\",\"optional\":true,\"field\":\"__table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"__lsn\"}," + - "{\"type\":\"int64\",\"optional\":true,\"field\":\"__source_ts_ms\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__deleted\"}]," + - "\"optional\":false,\"name\":\"testc.inventory.customers.Value\"}," + - "\"payload\":{\"id\":" + id + ",\"first_name\":\"" + name + "\",\"last_name\":\"Walker\",\"email\":\"ed@walker" + - ".com\"," + - "\"__op\":\"" + operation + "\",\"__table\":\"customers\",\"__lsn\":33832960,\"__source_ts_ms\":" + epoch + "," + - "\"__deleted\":\"" + operation.equals("d") + "\"}} "; + "{\"type\":\"string\",\"optional\":false,\"field\":\"first_name\"},{\"type\":\"string\",\"optional\":false,\"field\":\"last_name\"}," + + "{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__op\"}," + + "{\"type\":\"string\",\"optional\":true,\"field\":\"__table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"__lsn\"}," + + "{\"type\":\"int64\",\"optional\":true,\"field\":\"__source_ts_ms\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__deleted\"}]," + + "\"optional\":false,\"name\":\"testc.inventory.customers.Value\"}," + + "\"payload\":{\"id\":" + id + ",\"first_name\":\"" + name + "\",\"last_name\":\"Walker\",\"email\":\"ed@walker" + + ".com\"," + + "\"__op\":\"" + operation + "\",\"__table\":\"customers\",\"__lsn\":33832960,\"__source_ts_ms\":" + epoch + "," + + "\"__deleted\":\"" + operation.equals("d") + "\"}} "; return new TestChangeEvent<>(key, val, "testc.inventory.customers_upsert_compositekey"); } @@ -208,15 +208,15 @@ private TestChangeEvent getCustomerRecordNoKey(Integer id, Strin Long epoch) { String val = "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"}," + - "{\"type\":\"string\",\"optional\":false,\"field\":\"first_name\"},{\"type\":\"string\",\"optional\":false,\"field\":\"last_name\"}," + - "{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__op\"}," + - "{\"type\":\"string\",\"optional\":true,\"field\":\"__table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"__lsn\"}," + - "{\"type\":\"int64\",\"optional\":true,\"field\":\"__source_ts_ms\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__deleted\"}]," + - "\"optional\":false,\"name\":\"testc.inventory.customers.Value\"}," + - "\"payload\":{\"id\":" + id + ",\"first_name\":\"" + name + "\",\"last_name\":\"Walker\",\"email\":\"ed@walker" + - ".com\"," + - "\"__op\":\"" + operation + "\",\"__table\":\"customers\",\"__lsn\":33832960,\"__source_ts_ms\":" + epoch + "," + - "\"__deleted\":\"" + operation.equals("d") + "\"}} "; + "{\"type\":\"string\",\"optional\":false,\"field\":\"first_name\"},{\"type\":\"string\",\"optional\":false,\"field\":\"last_name\"}," + + "{\"type\":\"string\",\"optional\":false,\"field\":\"email\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__op\"}," + + "{\"type\":\"string\",\"optional\":true,\"field\":\"__table\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"__lsn\"}," + + "{\"type\":\"int64\",\"optional\":true,\"field\":\"__source_ts_ms\"},{\"type\":\"string\",\"optional\":true,\"field\":\"__deleted\"}]," + + "\"optional\":false,\"name\":\"testc.inventory.customers.Value\"}," + + "\"payload\":{\"id\":" + id + ",\"first_name\":\"" + name + "\",\"last_name\":\"Walker\",\"email\":\"ed@walker" + + ".com\"," + + "\"__op\":\"" + operation + "\",\"__table\":\"customers\",\"__lsn\":33832960,\"__source_ts_ms\":" + epoch + "," + + "\"__deleted\":\"" + operation.equals("d") + "\"}} "; return new TestChangeEvent<>(null, val, "testc.inventory.customers_upsert_nokey"); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java index caa899f4..1aceba8d 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergEventsChangeConsumerTest.java @@ -45,7 +45,7 @@ public void testSimpleUpload() { Dataset ds = spark.newSession().sql("SELECT * FROM debeziumevents.debezium_events"); ds.show(); return ds.count() >= 5 - && ds.select("event_destination").distinct().count() >= 2; + && ds.select("event_destination").distinct().count() >= 2; } catch (Exception e) { return false; } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java index abb28d9c..185fa27c 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java @@ -34,64 +34,66 @@ class TestIcebergUtil { @Test public void testNestedJsonRecord() throws JsonProcessingException { IcebergChangeEvent e = new IcebergChangeEvent("test", - mapper.readTree(serdeWithSchema).get("payload"),null, - mapper.readTree(serdeWithSchema).get("schema"),null); - Schema schema = e.getSchema(); + mapper.readTree(serdeWithSchema).get("payload"), null, + mapper.readTree(serdeWithSchema).get("schema"), null); + Schema schema = e.getSchema(); assertTrue(schema.toString().contains("before: optional struct<2: id: optional int, 3: first_name: optional string, " + - "4:")); + "4:")); } @Test public void testUnwrapJsonRecord() throws IOException { IcebergChangeEvent e = new IcebergChangeEvent("test", - mapper.readTree(unwrapWithSchema).get("payload"),null, - mapper.readTree(unwrapWithSchema).get("schema"),null); - Schema schema = e.getSchema(); - GenericRecord record = e.getIcebergRecord(schema); + mapper.readTree(unwrapWithSchema).get("payload"), null, + mapper.readTree(unwrapWithSchema).get("schema"), null); + Schema schema = e.getSchema(); + GenericRecord record = e.asIcebergRecord(schema); assertEquals("orders", record.getField("__table").toString()); assertEquals(16850, record.getField("order_date")); + System.out.println(schema); + System.out.println(record); } @Test public void testNestedArrayJsonRecord() throws JsonProcessingException { IcebergChangeEvent e = new IcebergChangeEvent("test", - mapper.readTree(unwrapWithArraySchema).get("payload"),null, - mapper.readTree(unwrapWithArraySchema).get("schema"),null); - Schema schema = e.getSchema(); + mapper.readTree(unwrapWithArraySchema).get("payload"), null, + mapper.readTree(unwrapWithArraySchema).get("schema"), null); + Schema schema = e.getSchema(); assertTrue(schema.asStruct().toString().contains("struct<1: name: optional string, 2: pay_by_quarter: optional list, 4: schedule: optional list, 6:")); System.out.println(schema.asStruct()); System.out.println(schema.findField("pay_by_quarter").type().asListType().elementType()); System.out.println(schema.findField("schedule").type().asListType().elementType()); - assertEquals(schema.findField("pay_by_quarter").type().asListType().elementType().toString(),"int"); - assertEquals(schema.findField("schedule").type().asListType().elementType().toString(),"string"); - GenericRecord record = e.getIcebergRecord(schema); + assertEquals(schema.findField("pay_by_quarter").type().asListType().elementType().toString(), "int"); + assertEquals(schema.findField("schedule").type().asListType().elementType().toString(), "string"); + GenericRecord record = e.asIcebergRecord(schema); //System.out.println(record); - assertTrue( record.toString().contains("[10000, 10001, 10002, 10003]")); + assertTrue(record.toString().contains("[10000, 10001, 10002, 10003]")); } - + @Test public void testNestedArray2JsonRecord() throws JsonProcessingException { assertThrows(RuntimeException.class, () -> { IcebergChangeEvent e = new IcebergChangeEvent("test", - mapper.readTree(unwrapWithArraySchema2).get("payload"),null, - mapper.readTree(unwrapWithArraySchema2).get("schema"),null); - Schema schema = e.getSchema(); - System.out.println(schema.asStruct()); - System.out.println(schema); - System.out.println(schema.findField("tableChanges")); - System.out.println(schema.findField("tableChanges").type().asListType().elementType()); - }); + mapper.readTree(unwrapWithArraySchema2).get("payload"), null, + mapper.readTree(unwrapWithArraySchema2).get("schema"), null); + Schema schema = e.getSchema(); + System.out.println(schema.asStruct()); + System.out.println(schema); + System.out.println(schema.findField("tableChanges")); + System.out.println(schema.findField("tableChanges").type().asListType().elementType()); + }); //GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), jsonPayload); //System.out.println(record); } - + @Test public void testNestedGeomJsonRecord() throws JsonProcessingException { IcebergChangeEvent e = new IcebergChangeEvent("test", - mapper.readTree(unwrapWithGeomSchema).get("payload"),null, - mapper.readTree(unwrapWithGeomSchema).get("schema"),null); - Schema schema = e.getSchema(); - GenericRecord record = e.getIcebergRecord(schema); + mapper.readTree(unwrapWithGeomSchema).get("payload"), null, + mapper.readTree(unwrapWithGeomSchema).get("schema"), null); + Schema schema = e.getSchema(); + GenericRecord record = e.asIcebergRecord(schema); //System.out.println(schema); //System.out.println(record); assertTrue(schema.toString().contains("g: optional struct<3: wkb: optional string, 4: srid: optional int>")); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java index ea1463e0..f973984f 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWaitTest.java @@ -45,8 +45,8 @@ public void testBatchsizeWait() throws Exception { df.createOrReplaceGlobalTempView("test_data_batch_size"); df = spark .sql("SELECT substring(input_file,94,60) as input_file, " + - "count(*) as batch_size FROM global_temp.test_data_batch_size group " + - "by 1"); + "count(*) as batch_size FROM global_temp.test_data_batch_size group " + + "by 1"); //df.show(false); return df.filter("batch_size = " + maxBatchSize).count() >= 5; } catch (Exception e) { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java index 5a0dc9c2..e485e41d 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java @@ -33,6 +33,11 @@ */ public class BaseSparkTest { + protected static final SparkConf sparkconf = new SparkConf() + .setAppName("CDC-S3-Batch-Spark-Sink") + .setMaster("local[2]"); + private static final String SPARK_PROP_PREFIX = "debezium.sink.sparkbatch."; + protected static SparkSession spark; @ConfigProperty(name = "debezium.sink.iceberg.table-prefix", defaultValue = "") String tablePrefix; @ConfigProperty(name = "debezium.sink.iceberg.warehouse") @@ -40,35 +45,6 @@ public class BaseSparkTest { @ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default") String namespace; - protected static final SparkConf sparkconf = new SparkConf() - .setAppName("CDC-S3-Batch-Spark-Sink") - .setMaster("local[2]"); - private static final String SPARK_PROP_PREFIX = "debezium.sink.sparkbatch."; - protected static SparkSession spark; - - protected org.apache.iceberg.Table getTable(String table) { - HadoopCatalog catalog = getIcebergCatalog(); - return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_"))); - } - - protected HadoopCatalog getIcebergCatalog() { - // loop and set hadoopConf - Configuration hadoopConf = new Configuration(); - for (String name : ConfigProvider.getConfig().getPropertyNames()) { - if (name.startsWith("debezium.sink.iceberg.")) { - hadoopConf.set(name.substring("debezium.sink.iceberg.".length()), - ConfigProvider.getConfig().getValue(name, String.class)); - } - } - HadoopCatalog icebergCatalog = new HadoopCatalog(); - icebergCatalog.setConf(hadoopConf); - - Map configMap = new HashMap<>(); - hadoopConf.forEach(e-> configMap.put(e.getKey(), e.getValue())); - icebergCatalog.initialize("iceberg", configMap); - return icebergCatalog; - } - @BeforeAll static void setup() { Map appSparkConf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), SPARK_PROP_PREFIX); @@ -103,11 +79,11 @@ static void setup() { public static void PGCreateTestDataTable() throws Exception { // create test table String sql = "" + - " CREATE TABLE IF NOT EXISTS inventory.test_data (\n" + - " c_id INTEGER ,\n" + - " c_text TEXT,\n" + - " c_varchar VARCHAR" + - " );"; + " CREATE TABLE IF NOT EXISTS inventory.test_data (\n" + + " c_id INTEGER ,\n" + + " c_text TEXT,\n" + + " c_varchar VARCHAR" + + " );"; SourcePostgresqlDB.runSQL(sql); } @@ -125,7 +101,7 @@ public static int PGLoadTestDataTable(int numRows, boolean addRandomDelay) { Thread.sleep(TestUtil.randomInt(20000, 100000)); } String sql = "INSERT INTO inventory.test_data (c_id, c_text, c_varchar ) " + - "VALUES "; + "VALUES "; StringBuilder values = new StringBuilder("\n(" + TestUtil.randomInt(15, 32) + ", '" + TestUtil.randomString(524) + "', '" + TestUtil.randomString(524) + "')"); for (int i = 0; i < 100; i++) { values.append("\n,(").append(TestUtil.randomInt(15, 32)).append(", '").append(TestUtil.randomString(524)).append("', '").append(TestUtil.randomString(524)).append("')"); @@ -145,11 +121,11 @@ public static int PGLoadTestDataTable(int numRows, boolean addRandomDelay) { public static void mysqlCreateTestDataTable() throws Exception { // create test table String sql = "\n" + - " CREATE TABLE IF NOT EXISTS inventory.test_data (\n" + - " c_id INTEGER ,\n" + - " c_text TEXT,\n" + - " c_varchar TEXT\n" + - " );"; + " CREATE TABLE IF NOT EXISTS inventory.test_data (\n" + + " c_id INTEGER ,\n" + + " c_text TEXT,\n" + + " c_varchar TEXT\n" + + " );"; SourceMysqlDB.runSQL(sql); } @@ -157,7 +133,7 @@ public static int mysqlLoadTestDataTable(int numRows) throws Exception { int numInsert = 0; do { String sql = "INSERT INTO inventory.test_data (c_id, c_text, c_varchar ) " + - "VALUES "; + "VALUES "; StringBuilder values = new StringBuilder("\n(" + TestUtil.randomInt(15, 32) + ", '" + TestUtil.randomString(524) + "', '" + TestUtil.randomString(524) + "')"); for (int i = 0; i < 10; i++) { values.append("\n,(").append(TestUtil.randomInt(15, 32)).append(", '").append(TestUtil.randomString(524)).append("', '").append(TestUtil.randomString(524)).append("')"); @@ -168,6 +144,29 @@ public static int mysqlLoadTestDataTable(int numRows) throws Exception { return numInsert; } + protected org.apache.iceberg.Table getTable(String table) { + HadoopCatalog catalog = getIcebergCatalog(); + return catalog.loadTable(TableIdentifier.of(Namespace.of(namespace), tablePrefix + table.replace(".", "_"))); + } + + protected HadoopCatalog getIcebergCatalog() { + // loop and set hadoopConf + Configuration hadoopConf = new Configuration(); + for (String name : ConfigProvider.getConfig().getPropertyNames()) { + if (name.startsWith("debezium.sink.iceberg.")) { + hadoopConf.set(name.substring("debezium.sink.iceberg.".length()), + ConfigProvider.getConfig().getValue(name, String.class)); + } + } + HadoopCatalog icebergCatalog = new HadoopCatalog(); + icebergCatalog.setConf(hadoopConf); + + Map configMap = new HashMap<>(); + hadoopConf.forEach(e -> configMap.put(e.getKey(), e.getValue())); + icebergCatalog.initialize("iceberg", configMap); + return icebergCatalog; + } + public Dataset getTableData(String table) { return spark.newSession().sql("SELECT *, input_file_name() as input_file FROM debeziumevents.debeziumcdc_" + table.replace(".", "_")); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java index 1120d549..eb20ab67 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/TestChangeEvent.java @@ -22,9 +22,9 @@ public TestChangeEvent(K key, V value, String destination) { this.value = value; this.destination = destination; } - + public TestChangeEvent(V value) { - this(null,value,null); + this(null, value, null); } @Override