diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java deleted file mode 100644 index 7ff4fb2e..00000000 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * - * * Copyright memiiso Authors. - * * - * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 - * - */ - -package io.debezium.server.iceberg; - -import java.io.IOException; -import java.util.*; - -import com.fasterxml.jackson.databind.JsonNode; -import org.apache.iceberg.NullOrder; -import org.apache.iceberg.Schema; -import org.apache.iceberg.SortOrder; -import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.exceptions.ValidationException; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; -import org.apache.iceberg.types.Types; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import static org.apache.iceberg.TableProperties.*; - -/** - * - * @author Ismail Simsek - */ -public class DebeziumToIcebergTable { - protected static final Logger LOGGER = LoggerFactory.getLogger(DebeziumToIcebergTable.class); - - private final List tableColumns; - private final List tableRowIdentifierColumns; - - public DebeziumToIcebergTable(byte[] eventVal, byte[] eventKey) { - tableColumns = extractSchema(eventVal); - tableRowIdentifierColumns = (eventKey == null) ? null : extractSchema(eventKey); - } - - public DebeziumToIcebergTable(byte[] eventVal) { - this(eventVal, null); - } - - private List extractSchema(byte[] eventVal) { - try { - JsonNode jsonEvent = IcebergUtil.jsonObjectMapper.readTree(eventVal); - if (IcebergUtil.hasSchema(jsonEvent)) { - return IcebergUtil.getIcebergSchema(jsonEvent.get("schema")); - } - - LOGGER.trace("Event schema not found in the given data:!"); - return null; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - public boolean hasSchema() { - return tableColumns != null; - } - - private SortOrder getSortOrder(Schema schema) { - SortOrder so = SortOrder.unsorted(); - - if (this.tableRowIdentifierColumns != null) { - SortOrder.Builder sob = SortOrder.builderFor(schema); - for (Types.NestedField coll : tableRowIdentifierColumns) { - sob = sob.asc(coll.name(), NullOrder.NULLS_FIRST); - } - so = sob.build(); - } - - return so; - } - - private Set getRowIdentifierFieldIds() { - - if (this.tableRowIdentifierColumns == null) { - return ImmutableSet.of(); - } - - Set identifierFieldIds = new HashSet<>(); - - for (Types.NestedField ic : this.tableRowIdentifierColumns) { - boolean found = false; - - ListIterator colsIterator = this.tableColumns.listIterator(); - while (colsIterator.hasNext()) { - Types.NestedField tc = colsIterator.next(); - if (Objects.equals(tc.name(), ic.name())) { - identifierFieldIds.add(tc.fieldId()); - // set columns as required its part of identifier filed - colsIterator.set(tc.asRequired()); - found = true; - break; - } - } - - if (!found) { - throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns"); - } - - } - - return identifierFieldIds; - } - - public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier, String writeFormat) { - - Schema schema = new Schema(this.tableColumns, getRowIdentifierFieldIds()); - - if (this.hasSchema()) { - Catalog.TableBuilder tb = icebergCatalog.buildTable(tableIdentifier, schema) - .withProperty(FORMAT_VERSION, "2") - .withProperty(DEFAULT_FILE_FORMAT, writeFormat.toLowerCase(Locale.ENGLISH)) - .withSortOrder(getSortOrder(schema)); - - LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema, - schema.identifierFieldNames()); - - return tb.create(); - } - - throw new RuntimeException("Failed to create table "+ tableIdentifier); - } - -} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index e3d7f4f7..6d7c2a4e 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -21,10 +21,7 @@ import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import javax.annotation.PostConstruct; @@ -38,11 +35,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.types.Types; import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; @@ -58,10 +57,15 @@ @Dependent public class IcebergChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer> { + protected static final Duration LOG_INTERVAL = Duration.ofMinutes(15); private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumer.class); private static final String PROP_PREFIX = "debezium.sink.iceberg."; + protected final Clock clock = Clock.system(); final Configuration hadoopConf = new Configuration(); final Map icebergProperties = new ConcurrentHashMap<>(); + protected long consumerStart = clock.currentTimeInMillis(); + protected long numConsumedEvents = 0; + protected Threads.Timer logTimer = Threads.timer(clock, LOG_INTERVAL); @ConfigProperty(name = "debezium.format.value", defaultValue = "json") String valueFormat; @ConfigProperty(name = "debezium.format.key", defaultValue = "json") @@ -84,7 +88,6 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu boolean eventSchemaEnabled; @ConfigProperty(name = "debezium.sink.iceberg." + DEFAULT_FILE_FORMAT, defaultValue = DEFAULT_FILE_FORMAT_DEFAULT) String writeFormat; - @Inject @Any Instance batchSizeWaitInstances; @@ -95,12 +98,6 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu Instance icebergTableOperatorInstances; InterfaceIcebergTableOperator icebergTableOperator; - protected static final Duration LOG_INTERVAL = Duration.ofMinutes(15); - protected final Clock clock = Clock.system(); - protected long consumerStart = clock.currentTimeInMillis(); - protected long numConsumedEvents = 0; - protected Threads.Timer logTimer = Threads.timer(clock, LOG_INTERVAL); - @PostConstruct void connect() { if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) { @@ -199,13 +196,27 @@ private Table createIcebergTable(TableIdentifier tableIdentifier, throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null"); } - DebeziumToIcebergTable eventSchema = event.key() == null - ? new DebeziumToIcebergTable(getBytes(event.value())) - : new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key())); + List tableColumns = IcebergUtil.getIcebergFieldsFromEventSchema(getBytes(event.value())); + List keyColumns = + IcebergUtil.getIcebergFieldsFromEventSchema(event.key() == null ? null : getBytes(event.key())); + + if (tableColumns.isEmpty()) { + throw new RuntimeException("Failed to create table " + tableIdentifier); + } - return eventSchema.create(icebergCatalog, tableIdentifier, writeFormat); + Schema schema = IcebergUtil.getSchema(tableColumns, keyColumns); + + LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema, + schema.identifierFieldNames()); + + return icebergCatalog.buildTable(tableIdentifier, schema) + .withProperty(FORMAT_VERSION, "2") + .withProperty(DEFAULT_FILE_FORMAT, writeFormat.toLowerCase(Locale.ENGLISH)) + .withSortOrder(IcebergUtil.getIdentifierFieldsAsSortOrder(schema)) + .create(); } + private Optional loadIcebergTable(TableIdentifier tableId) { try { Table table = icebergCatalog.loadTable(tableId); @@ -216,5 +227,4 @@ private Optional
loadIcebergTable(TableIdentifier tableId) { } } - } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index bc852ba4..45fe8f30 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -10,15 +10,14 @@ import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.types.Types; import org.eclipse.microprofile.config.Config; import org.slf4j.Logger; @@ -188,5 +187,61 @@ public static Map getConfigSubset(Config config, String prefix) return ret; } + public static List getIcebergFieldsFromEventSchema(byte[] eventVal) { + + if(eventVal == null){ + return new ArrayList<>(); + } + + try { + JsonNode jsonEvent = IcebergUtil.jsonObjectMapper.readTree(eventVal); + if (IcebergUtil.hasSchema(jsonEvent)) { + return IcebergUtil.getIcebergSchema(jsonEvent.get("schema")); + } + + LOGGER.trace("Event schema not found in the given data:!"); + return new ArrayList<>(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public static Schema getSchema(List tableColumns, + List keyColumns) { + + Set identifierFieldIds = new HashSet<>(); + + for (Types.NestedField ic : keyColumns) { + boolean found = false; + + ListIterator colsIterator = tableColumns.listIterator(); + while (colsIterator.hasNext()) { + Types.NestedField tc = colsIterator.next(); + if (Objects.equals(tc.name(), ic.name())) { + identifierFieldIds.add(tc.fieldId()); + // set column as required its part of identifier filed + colsIterator.set(tc.asRequired()); + found = true; + break; + } + } + + if (!found) { + throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns"); + } + + } + + return new Schema(tableColumns, identifierFieldIds); + } + + public static SortOrder getIdentifierFieldsAsSortOrder(Schema schema) { + SortOrder.Builder sob = SortOrder.builderFor(schema); + for (String fieldName : schema.identifierFieldNames()) { + sob = sob.asc(fieldName); + } + + return sob.build(); + } } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 4106bb23..5cbc4587 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -219,7 +219,7 @@ public void testDataTypeChanges() throws Exception { " (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " + " VALUES (2, 'STRING-DATA-2', 222, current_date , current_timestamp, 222, current_timestamp, true)"; SourcePostgresqlDB.runSQL(sql); - + SourcePostgresqlDB.runSQL("ALTER TABLE inventory.data_type_changes " + "ALTER COLUMN c_int2string TYPE VARCHAR(555), " + "ALTER COLUMN c_date2string TYPE VARCHAR(555), " + @@ -227,7 +227,7 @@ public void testDataTypeChanges() throws Exception { "ALTER COLUMN string2int TYPE INTEGER USING string2int::integer, " + "ALTER COLUMN string2timestamp TYPE TIMESTAMP USING string2timestamp::TIMESTAMP, " + "ALTER COLUMN string2boolean TYPE boolean USING string2boolean::boolean" - ); + ); sql = "INSERT INTO inventory.data_type_changes " + " (c_id, c_varchar, c_int2string, c_date2string, c_timestamp2string, string2int, string2timestamp, string2boolean) " + " VALUES (3, 'STRING-DATA-3', '333', 'current_date-3' , 'current_timestamp-3', 333, current_timestamp, false)"; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java index 8592b817..f7923cf8 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTestProfile.java @@ -19,8 +19,6 @@ public class IcebergChangeConsumerTestProfile implements QuarkusTestProfile { @Override public Map getConfigOverrides() { Map config = new HashMap<>(); - - config.put("debezium.sink.type", "iceberg"); config.put("debezium.sink.iceberg.write.format.default", "orc"); return config; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTestDeleteDeletesProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTestDeleteDeletesProfile.java index 72495432..3255deb3 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTestDeleteDeletesProfile.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTestDeleteDeletesProfile.java @@ -20,7 +20,6 @@ public class IcebergChangeConsumerUpsertTestDeleteDeletesProfile implements Quar public Map getConfigOverrides() { Map config = new HashMap<>(); - config.put("debezium.sink.type", "iceberg"); config.put("debezium.sink.iceberg.upsert", "true"); config.put("debezium.sink.iceberg.upsert-keep-deletes", "false"); return config; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTestProfile.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTestProfile.java index eafe7183..c3849096 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTestProfile.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerUpsertTestProfile.java @@ -20,7 +20,6 @@ public class IcebergChangeConsumerUpsertTestProfile implements QuarkusTestProfil public Map getConfigOverrides() { Map config = new HashMap<>(); - config.put("debezium.sink.type", "iceberg"); config.put("debezium.sink.iceberg.upsert", "true"); config.put("debezium.sink.iceberg.upsert-keep-deletes", "true"); return config;