diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java index abf29bf7..2a2cdbe7 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumMetrics.java @@ -22,7 +22,6 @@ import org.slf4j.LoggerFactory; /** - * Optimizes batch size around 85%-90% of max,batch.size using dynamically calculated sleep(ms) * * @author Ismail Simsek */ diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java index a9be8f59..80f3c17a 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/DebeziumToIcebergTable.java @@ -25,7 +25,6 @@ import org.slf4j.LoggerFactory; /** - * Implementation of the consumer that delivers the messages into Amazon S3 destination. * * @author Ismail Simsek */ @@ -123,7 +122,7 @@ public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) thr return tb.create(); } - return null; + throw new Exception("Failed to create table "+ tableIdentifier); } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 01bd6a4f..2143e655 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -55,7 +55,7 @@ import org.slf4j.LoggerFactory; /** - * Implementation of the consumer that delivers the messages into Amazon S3 destination. + * Implementation of the consumer that delivers the messages to iceberg tables. * * @author Ismail Simsek */ @@ -65,6 +65,7 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumer.class); private static final String PROP_PREFIX = "debezium.sink.iceberg."; + static ImmutableMap cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4); @ConfigProperty(name = "debezium.format.value", defaultValue = "json") String valueFormat; @ConfigProperty(name = "debezium.format.key", defaultValue = "json") @@ -81,26 +82,20 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu String namespace; @ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default") String catalogName; - @ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true") - boolean upsertData; + boolean upsert; @ConfigProperty(name = "debezium.sink.iceberg.upsert-keep-deletes", defaultValue = "true") - boolean upsertDataKeepDeletes; - + boolean upsertKeepDeletes; @ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op") String opColumn; @ConfigProperty(name = "debezium.sink.iceberg.upsert-source-ts-ms-column", defaultValue = "__source_ts_ms") String sourceTsMsColumn; - @ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait") String batchSizeWaitName; - @Inject @Any Instance batchSizeWaitInstances; InterfaceBatchSizeWait batchSizeWait; - static ImmutableMap cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4); - Configuration hadoopConf = new Configuration(); Catalog icebergCatalog; Map icebergProperties = new ConcurrentHashMap<>(); @@ -142,14 +137,21 @@ public String map(String destination) { } private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent event) throws Exception { - if (eventSchemaEnabled && event.value() != null) { - DebeziumToIcebergTable eventSchema = event.key() == null - ? new DebeziumToIcebergTable(getBytes(event.value())) - : new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key())); - return eventSchema.create(icebergCatalog, tableIdentifier); + if (!eventSchemaEnabled) { + throw new Exception("Table '" + tableIdentifier + "' not found! " + + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); + } + + if (event.value() == null) { + throw new Exception("Failed to get event schema for table '" + tableIdentifier + "' event value is null"); } - return null; + + DebeziumToIcebergTable eventSchema = event.key() == null + ? new DebeziumToIcebergTable(getBytes(event.value())) + : new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key())); + + return eventSchema.create(icebergCatalog, tableIdentifier); } @Override @@ -170,7 +172,7 @@ public void handleBatch(List> records, DebeziumEngin // load iceberg table icebergTable = icebergCatalog.loadTable(tableIdentifier); } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { - // get schema fom an event and create iceberg table + // get schema from an event and create iceberg table try { icebergTable = createIcebergTable(tableIdentifier, event.getValue().get(0)); } catch (Exception e2) { @@ -195,10 +197,12 @@ public void handleBatch(List> records, DebeziumEngin public int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) { if (lhs.getField(sourceTsMsColumn).equals(rhs.getField(sourceTsMsColumn))) { // return (x < y) ? -1 : ((x == y) ? 0 : 1); - return Integer.compare( - cdcOperations.getOrDefault(lhs.getField(opColumn), -1), - cdcOperations.getOrDefault(rhs.getField(opColumn), -1) - ); + return + cdcOperations.getOrDefault(lhs.getField(opColumn), -1) + .compareTo( + cdcOperations.getOrDefault(rhs.getField(opColumn), -1) + ) + ; } else { return Long.compare((Long) lhs.getField(sourceTsMsColumn), (Long) rhs.getField(sourceTsMsColumn)); } @@ -241,9 +245,9 @@ private ArrayList toDeduppedIcebergRecords(Schema schema, ArrayList> events) throws InterruptedException { - if (!upsertData || icebergTable.sortOrder().isUnsorted()) { + if (!upsert || icebergTable.sortOrder().isUnsorted()) { - if (upsertData && icebergTable.sortOrder().isUnsorted()) { + if (upsert && icebergTable.sortOrder().isUnsorted()) { LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!"); } @@ -258,7 +262,7 @@ private void addToTable(Table icebergTable, ArrayList>> DELETE + INSERT ArrayList icebergRecords = toDeduppedIcebergRecords(icebergTable.schema(), events); DataFile dataFile = getDataFile(icebergTable, icebergRecords); - DeleteFile deleteDataFile = getDeleteDataFile(icebergTable, icebergRecords); + DeleteFile deleteDataFile = getDeleteFile(icebergTable, icebergRecords); LOGGER.debug("Committing new file as Upsert (has deletes:{}) '{}' !", deleteDataFile != null, dataFile.path()); RowDelta c = icebergTable .newRowDelta() @@ -271,11 +275,11 @@ private void addToTable(Table icebergTable, ArrayList icebergRecords) throws InterruptedException { + private DeleteFile getDeleteFile(Table icebergTable, ArrayList icebergRecords) throws InterruptedException { final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET; OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName)); @@ -284,13 +288,13 @@ private DeleteFile getDeleteDataFile(Table icebergTable, ArrayList icebe EqualityDeleteWriter deleteWriter; // anything is not an insert. - // upsertDataKeepDeletes = false, which means delete deletes + // upsertKeepDeletes = false, which means delete deletes List deleteRows = icebergRecords.stream() .filter(r -> // anything is not an insert. !r.getField(opColumn).equals("c") - // upsertDataKeepDeletes = false and its deleted record, which means delete deletes - // || !(upsertDataKeepDeletes && r.getField(opColumn).equals("d")) + // upsertKeepDeletes = false and its deleted record, which means delete deletes + // || !(upsertKeepDeletes && r.getField(opColumn).equals("d")) ).collect(Collectors.toList()); if (deleteRows.size() == 0) { @@ -346,11 +350,11 @@ private DataFile getDataFile(Table icebergTable, ArrayList icebergRecord List newRecords = icebergRecords.stream() .filter(r -> // if its append OR upsert with keep deletes then add all the records to data file - !upsertData + !upsert // if table has no PK - which means fall back to append || icebergTable.sortOrder().isUnsorted() // if its upsert and upsertKeepDeletes = true - || upsertDataKeepDeletes + || upsertKeepDeletes // if nothing above then exclude deletes || !(r.getField(opColumn).equals("d"))) .collect(Collectors.toList()); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java index 271a3e7d..89513ba0 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergEventsChangeConsumer.java @@ -56,7 +56,7 @@ import static org.apache.iceberg.types.Types.NestedField.required; /** - * Implementation of the consumer that delivers the messages into Amazon S3 destination. + * Implementation of the consumer that delivers the messages to iceberg table. * * @author Ismail Simsek */ @@ -112,10 +112,12 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D @PostConstruct void connect() throws InterruptedException { if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) { - throw new InterruptedException("debezium.format.value={" + valueFormat + "} not supported! Supported (debezium.format.value=*) formats are {json,}!"); + throw new InterruptedException("debezium.format.value={" + valueFormat + "} not supported, " + + "Supported (debezium.format.value=*) formats are {json,}!"); } if (!keyFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) { - throw new InterruptedException("debezium.format.key={" + valueFormat + "} not supported! Supported (debezium.format.key=*) formats are {json,}!"); + throw new InterruptedException("debezium.format.key={" + valueFormat + "} not supported, " + + "Supported (debezium.format.key=*) formats are {json,}!"); } tableIdentifier = TableIdentifier.of(Namespace.of(namespace), "debezium_events"); @@ -233,11 +235,11 @@ private void commitBatch(String destination, OffsetDateTime batchTime, ArrayList .withPartition(pk) .build(); - LOGGER.debug("Appending new file '{}' !", dataFile.path()); + LOGGER.debug("Appending new file '{}'", dataFile.path()); eventTable.newAppend() .appendFile(dataFile) .commit(); - LOGGER.info("Committed {} events to table! {}", icebergRecords.size(), eventTable.location()); + LOGGER.info("Committed {} events to table {}", icebergRecords.size(), eventTable.location()); } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index ca5eb40e..414fd9dd 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -17,7 +17,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.types.Types; @@ -26,8 +25,6 @@ import org.slf4j.LoggerFactory; /** - * Implementation of the consumer that delivers the messages into Amazon S3 destination. - * * @author Ismail Simsek */ public class IcebergUtil { @@ -110,56 +107,57 @@ public static GenericRecord getIcebergRecord(Schema schema, JsonNode data) throw return IcebergUtil.getIcebergRecord(schema.asStruct(), data); } - public static GenericRecord getIcebergRecord(Types.StructType nestedField, JsonNode data) throws InterruptedException { + public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonNode data) throws InterruptedException { Map mappedResult = new HashMap<>(); - LOGGER.debug("Processing nested field : " + nestedField); + LOGGER.debug("Processing nested field:{}", tableFields); - for (Types.NestedField field : nestedField.fields()) { + for (Types.NestedField field : tableFields.fields()) { if (data == null || !data.has(field.name()) || data.get(field.name()) == null) { mappedResult.put(field.name(), null); continue; } - jsonToGenericRecord(mappedResult, field, data.get(field.name())); + mappedResult.put(field.name(), jsonToGenericRecordVal(field, data.get(field.name()))); } - return GenericRecord.create(nestedField).copy(mappedResult); - } - private static void jsonToGenericRecord(Map mappedResult, Types.NestedField field, - JsonNode node) throws InterruptedException { - LOGGER.debug("Processing Field:" + field.name() + " Type:" + field.type()); + return GenericRecord.create(tableFields).copy(mappedResult); + } + private static Object jsonToGenericRecordVal(Types.NestedField field, + JsonNode node) throws InterruptedException { + LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type()); + final Object val; switch (field.type().typeId()) { case INTEGER: // int 4 bytes - mappedResult.put(field.name(), node.isNull() ? null : node.asInt()); + val = node.isNull() ? null : node.asInt(); break; case LONG: // long 8 bytes - mappedResult.put(field.name(), node.isNull() ? null : node.asLong()); + val = node.isNull() ? null : node.asLong(); break; case FLOAT: // float is represented in 32 bits, - mappedResult.put(field.name(), node.isNull() ? null : node.floatValue()); + val = node.isNull() ? null : node.floatValue(); break; case DOUBLE: // double is represented in 64 bits - mappedResult.put(field.name(), node.isNull() ? null : node.asDouble()); + val = node.isNull() ? null : node.asDouble(); break; case BOOLEAN: - mappedResult.put(field.name(), node.isNull() ? null : node.asBoolean()); + val = node.isNull() ? null : node.asBoolean(); break; case STRING: - mappedResult.put(field.name(), node.asText(null)); + val = node.asText(null); break; case BINARY: try { - mappedResult.put(field.name(), node.isNull() ? null : ByteBuffer.wrap(node.binaryValue())); + val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue()); } catch (IOException e) { LOGGER.error("Failed converting '" + field.name() + "' binary value to iceberg record", e); throw new InterruptedException("Failed Processing Event!" + e.getMessage()); } break; case LIST: - mappedResult.put(field.name(), jsonObjectMapper.convertValue(node, List.class)); + val = jsonObjectMapper.convertValue(node, List.class); break; case MAP: - mappedResult.put(field.name(), jsonObjectMapper.convertValue(node, Map.class)); + val = jsonObjectMapper.convertValue(node, Map.class); break; case STRUCT: throw new RuntimeException("Cannot process recursive records!"); @@ -167,15 +165,16 @@ private static void jsonToGenericRecord(Map mappedResult, Types. // // recursive call to get nested data/record // Types.StructType nestedField = Types.StructType.of(field); // GenericRecord r = getIcebergRecord(schema, nestedField, node.get(field.name())); -// mappedResult.put(field.name(), r); +// return r); // // throw new RuntimeException("Cannot process recursive record!"); // break; default: // default to String type - mappedResult.put(field.name(), node.asText(null)); + val = node.asText(null); break; } + return val; } public static Map getConfigSubset(Config config, String prefix) { @@ -191,13 +190,5 @@ public static Map getConfigSubset(Config config, String prefix) return ret; } - public static Map getConfigurationAsMap(Configuration conf) { - Map config = new HashMap(); - for (Map.Entry entry : conf) { - config.put(entry.getKey(), entry.getValue()); - } - return config; - } - } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java index 53908a3f..ef6de4f9 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/DynamicBatchSizeWait.java @@ -19,7 +19,7 @@ import static io.debezium.config.CommonConnectorConfig.DEFAULT_MAX_BATCH_SIZE; /** - * Optimizes batch size around 85%-90% of max,batch.size using dynamically calculated sleep(ms) + * Optimizes batch size around 85%-90% of max,batch.size by dynamically calculating sleep(ms) * * @author Ismail Simsek */ diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java index 29071f7b..b04d04e7 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/InterfaceBatchSizeWait.java @@ -9,7 +9,7 @@ package io.debezium.server.iceberg.batchsizewait; /** - * Implementation of the consumer that delivers the messages into Amazon S3 destination. + * Implementation of the consumer that delivers the messages to iceberg tables. * * @author Ismail Simsek */ @@ -18,6 +18,7 @@ public interface InterfaceBatchSizeWait { default void initizalize() { } - void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException; + default void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException{ + } } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java index e71dde01..bd853de1 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/MaxBatchSizeWait.java @@ -40,48 +40,41 @@ public class MaxBatchSizeWait implements InterfaceBatchSizeWait { int waitIntervalMs; @Inject - DebeziumMetrics debeziumMetrics; + DebeziumMetrics dbzMetrics; @Override public void initizalize() throws DebeziumException { assert waitIntervalMs < maxWaitMs : "`wait-interval-ms` cannot be bigger than `max-wait-ms`"; - debeziumMetrics.initizalize(); + dbzMetrics.initizalize(); } -// log warning! -// if (streamingSecondsBehindSource > 30 * 60) { // behind 30 minutes -// LOGGER.warn("Streaming {} is behind by {} seconds, QueueCurrentSize:{}, QueueTotalCapacity:{}, " + -// "SnapshotCompleted:{}", -// numRecordsProcessed, streamingQueueCurrentSize, maxQueueSize, streamingSecondsBehindSource, snapshotCompleted -// ); -// } - @Override public void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException { - if (debeziumMetrics.snapshotRunning()) { + // don't wait if snapshot process is running + if (dbzMetrics.snapshotRunning()) { return; } - final int streamingQueueCurrentSize = debeziumMetrics.streamingQueueCurrentSize(); - final int streamingSecondsBehindSource = (int) (debeziumMetrics.streamingMilliSecondsBehindSource() / 1000); - final boolean snapshotCompleted = debeziumMetrics.snapshotCompleted(); - LOGGER.debug("Processed {}, QueueCurrentSize:{}, QueueTotalCapacity:{}, SecondsBehindSource:{}, SnapshotCompleted:{}", - numRecordsProcessed, streamingQueueCurrentSize, maxQueueSize, streamingSecondsBehindSource, snapshotCompleted + numRecordsProcessed, + dbzMetrics.streamingQueueCurrentSize(), + maxQueueSize, + (int) (dbzMetrics.streamingMilliSecondsBehindSource() / 1000), + dbzMetrics.snapshotCompleted() ); int totalWaitMs = 0; - while (totalWaitMs < maxWaitMs && debeziumMetrics.streamingQueueCurrentSize() < maxBatchSize) { + while (totalWaitMs < maxWaitMs && dbzMetrics.streamingQueueCurrentSize() < maxBatchSize) { totalWaitMs += waitIntervalMs; LOGGER.debug("Sleeping {} Milliseconds, QueueCurrentSize:{} < maxBatchSize:{}", - waitIntervalMs, debeziumMetrics.streamingQueueCurrentSize(), maxBatchSize); + waitIntervalMs, dbzMetrics.streamingQueueCurrentSize(), maxBatchSize); Thread.sleep(waitIntervalMs); } LOGGER.debug("Total wait {} Milliseconds, QueueCurrentSize:{} < maxBatchSize:{}", - totalWaitMs, debeziumMetrics.streamingQueueCurrentSize(), maxBatchSize); + totalWaitMs, dbzMetrics.streamingQueueCurrentSize(), maxBatchSize); } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/NoBatchSizeWait.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/NoBatchSizeWait.java index 5bd8d8f4..49b72166 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/NoBatchSizeWait.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/batchsizewait/NoBatchSizeWait.java @@ -19,8 +19,4 @@ @Dependent @Named("NoBatchSizeWait") public class NoBatchSizeWait implements InterfaceBatchSizeWait { - - public void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) { - } - } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 3df7957b..8be7c8a0 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -18,6 +18,8 @@ import io.quarkus.test.junit.TestProfile; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import javax.inject.Inject; import org.apache.hadoop.conf.Configuration; @@ -32,7 +34,6 @@ import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.config.inject.ConfigProperty; import org.junit.jupiter.api.Test; -import static io.debezium.server.iceberg.IcebergUtil.getConfigurationAsMap; import static org.junit.jupiter.api.Assertions.assertEquals; /** @@ -74,7 +75,10 @@ private HadoopCatalog getIcebergCatalog() { } HadoopCatalog icebergCatalog = new HadoopCatalog(); icebergCatalog.setConf(hadoopConf); - icebergCatalog.initialize("iceberg", getConfigurationAsMap(hadoopConf)); + + Map configMap = new HashMap<>(); + hadoopConf.forEach(e-> configMap.put(e.getKey(), e.getValue())); + icebergCatalog.initialize("iceberg", configMap); return icebergCatalog; } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java index e1df91b5..aeb34fd1 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestIcebergUtil.java @@ -11,11 +11,16 @@ import io.debezium.serde.DebeziumSerdes; import io.debezium.util.Testing; +import java.io.IOException; import java.util.Collections; +import java.util.List; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.types.Types; import org.apache.kafka.common.serialization.Serde; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.*; @@ -30,16 +35,16 @@ public void testNestedJsonRecord() throws JsonProcessingException { assertTrue(exception.getMessage().contains("nested data type")); } - /* @Test public void testUnwrapJsonRecord() throws IOException, InterruptedException { JsonNode event = new ObjectMapper().readTree(unwrapWithSchema).get("payload"); - Schema schema = IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(unwrapWithSchema).get("schema")); + List fileds = IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(unwrapWithSchema) + .get("schema")); + Schema schema = new Schema(fileds); GenericRecord record = IcebergUtil.getIcebergRecord(schema.asStruct(), event); assertEquals("orders", record.getField("__table").toString()); assertEquals(16850, record.getField("order_date")); } - */ @Test public void valuePayloadWithSchemaAsJsonNode() { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java index 3a01aebd..d36bc1ad 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/S3Minio.java @@ -61,7 +61,7 @@ public static List getObjectList(String bucketName) { objects.add(item); } } catch (Exception e) { - LOGGER.info("Failed listing bucket"); + e.printStackTrace(); } return objects; } @@ -71,11 +71,11 @@ public static void listFiles() { try { List bucketList = client.listBuckets(); for (Bucket bucket : bucketList) { - LOGGER.info("Bucket:{} ROOT", bucket.name()); + System.out.printf("Bucket:%s ROOT", bucket.name()); Iterable> results = client.listObjects(ListObjectsArgs.builder().bucket(bucket.name()).recursive(true).build()); for (Result result : results) { Item item = result.get(); - LOGGER.info("Bucket:{} Item:{} Size:{}", bucket.name(), item.objectName(), item.size()); + System.out.printf("Bucket:%s Item:%s Size:%s", bucket.name(), item.objectName(), item.size()); } } } catch (Exception e) { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java index 2467a9a4..ca37bd1c 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java @@ -45,7 +45,7 @@ public static void runSQL(String query) throws SQLException, ClassNotFoundExcept st.execute(query); con.close(); } catch (Exception e) { - LOGGER.error(query); + e.printStackTrace(); throw e; } } diff --git a/pom.xml b/pom.xml index 31a1ebe3..901b25b1 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,7 @@ 1.7.0.Alpha1 - 2.0.0.Final + 2.0.3.Final