diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 4e147ac7..0c93103f 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -21,6 +21,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; import javax.annotation.PostConstruct; @@ -38,6 +39,7 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; @@ -54,6 +56,8 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumer.class); private static final String PROP_PREFIX = "debezium.sink.iceberg."; + final Configuration hadoopConf = new Configuration(); + final Map icebergProperties = new ConcurrentHashMap<>(); @ConfigProperty(name = "debezium.format.value", defaultValue = "json") String valueFormat; @ConfigProperty(name = "debezium.format.key", defaultValue = "json") @@ -72,16 +76,13 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu boolean upsert; @ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait") String batchSizeWaitName; - + @ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false") + boolean eventSchemaEnabled; @Inject @Any Instance batchSizeWaitInstances; InterfaceBatchSizeWait batchSizeWait; - - final Configuration hadoopConf = new Configuration(); Catalog icebergCatalog; - final Map icebergProperties = new ConcurrentHashMap<>(); - @Inject @Any Instance icebergTableOperatorInstances; @@ -144,10 +145,8 @@ public void handleBatch(List> records, DebeziumEngin for (Map.Entry>> event : result.entrySet()) { final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey()); - Table icebergTable = icebergTableOperator - .loadIcebergTable(icebergCatalog, tableIdentifier) - .orElseGet(() -> - icebergTableOperator.createIcebergTable(icebergCatalog, tableIdentifier, event.getValue().get(0))); + Table icebergTable = loadIcebergTable(tableIdentifier) + .orElseGet(() -> createIcebergTable(tableIdentifier, event.getValue().get(0))); //addToTable(icebergTable, event.getValue()); icebergTableOperator.addToTable(icebergTable, event.getValue()); } @@ -163,4 +162,35 @@ public void handleBatch(List> records, DebeziumEngin } + + private Table createIcebergTable(TableIdentifier tableIdentifier, + ChangeEvent event) { + + if (!eventSchemaEnabled) { + throw new RuntimeException("Table '" + tableIdentifier + "' not found! " + + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); + } + + if (event.value() == null) { + throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null"); + } + + DebeziumToIcebergTable eventSchema = event.key() == null + ? new DebeziumToIcebergTable(getBytes(event.value())) + : new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key())); + + return eventSchema.create(icebergCatalog, tableIdentifier); + } + + private Optional loadIcebergTable(TableIdentifier tableId) { + try { + Table table = icebergCatalog.loadTable(tableId); + return Optional.of(table); + } catch (NoSuchTableException e) { + LOGGER.warn("table not found: {}", tableId.toString()); + return Optional.empty(); + } + } + + } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java index 16054284..098ae3e5 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/AbstractIcebergTableOperator.java @@ -11,29 +11,27 @@ import io.debezium.DebeziumException; import io.debezium.engine.ChangeEvent; import io.debezium.serde.DebeziumSerdes; -import io.debezium.server.iceberg.DebeziumToIcebergTable; import io.debezium.server.iceberg.IcebergUtil; import java.io.Closeable; import java.io.IOException; import java.time.Instant; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; import com.fasterxml.jackson.databind.JsonNode; import org.apache.iceberg.*; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetWriter; -import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.parquet.Parquet; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; -import org.eclipse.microprofile.config.inject.ConfigProperty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,8 +43,6 @@ abstract class AbstractIcebergTableOperator implements InterfaceIcebergTableOperator { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTableOperator.class); - @ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false") - boolean eventSchemaEnabled; final Serde valSerde = DebeziumSerdes.payloadJson(JsonNode.class); Deserializer valDeserializer; @@ -122,33 +118,4 @@ protected DataFile getDataFile(Table icebergTable, ArrayList icebergReco .build(); } - public Table createIcebergTable(Catalog catalog, - TableIdentifier tableIdentifier, - ChangeEvent event) { - - if (!eventSchemaEnabled) { - throw new RuntimeException("Table '" + tableIdentifier + "' not found! " + - "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); - } - - if (event.value() == null) { - throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null"); - } - - DebeziumToIcebergTable eventSchema = event.key() == null - ? new DebeziumToIcebergTable(getBytes(event.value())) - : new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key())); - - return eventSchema.create(catalog, tableIdentifier); - } - - public Optional
loadIcebergTable(Catalog catalog, TableIdentifier tableId) { - try { - Table table = catalog.loadTable(tableId); - return Optional.of(table); - } catch (NoSuchTableException e) { - LOGGER.warn("table not found: {}", tableId.toString()); - return Optional.empty(); - } - } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java index fd14ec96..de49a96b 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/InterfaceIcebergTableOperator.java @@ -11,12 +11,9 @@ import io.debezium.engine.ChangeEvent; import java.util.ArrayList; -import java.util.Optional; import java.util.function.Predicate; import org.apache.iceberg.Table; -import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; public interface InterfaceIcebergTableOperator { @@ -26,10 +23,4 @@ public interface InterfaceIcebergTableOperator { void addToTable(Table icebergTable, ArrayList> events) throws InterruptedException; Predicate filterEvents(); - - Table createIcebergTable(Catalog catalog, - TableIdentifier tableIdentifier, - ChangeEvent event); - - Optional
loadIcebergTable(Catalog catalog, TableIdentifier tableId); }