From 09cc918f9fd5f95bd415d2300eee24c7b637d280 Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 11 May 2023 18:50:01 +0200 Subject: [PATCH 1/5] Use iceberg storage classes for tests --- .../iceberg/history/IcebergSchemaHistory.java | 3 -- .../offset/IcebergOffsetBackingStore.java | 31 +++++++------- .../server/iceberg/TestConfigSource.java | 8 ++-- .../history/IcebergSchemaHistoryTest.java | 2 - .../offset/IcebergOffsetBackingStoreTest.java | 40 +++++++------------ 5 files changed, 32 insertions(+), 52 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java index 326267b8..147395f8 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java @@ -63,9 +63,6 @@ @Incubating public final class IcebergSchemaHistory extends AbstractSchemaHistory { - public static final String DATABASE_HISTORY_STORAGE_TABLE_INSERT = "INSERT INTO %s VALUES ( ?, ?, ? )"; - public static final String DATABASE_HISTORY_STORAGE_TABLE_SELECT = "SELECT id, history_data, record_insert_ts FROM %s ORDER BY " + - "record_insert_ts ASC"; static final Schema DATABASE_HISTORY_TABLE_SCHEMA = new Schema( required(1, "id", Types.StringType.get()), optional(2, "history_data", Types.StringType.get()), diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java index e3bb1cf0..5f79f4ed 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java @@ -11,7 +11,7 @@ import io.debezium.DebeziumException; import io.debezium.config.Configuration; import io.debezium.config.Field; -import io.debezium.server.iceberg.IcebergChangeConsumer; +import io.debezium.server.iceberg.IcebergUtil; import io.debezium.util.Strings; import java.io.Closeable; @@ -55,8 +55,10 @@ import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.SafeObjectInputStream; +import org.eclipse.microprofile.config.ConfigProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static io.debezium.server.iceberg.IcebergChangeConsumer.PROP_PREFIX; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -73,6 +75,7 @@ public class IcebergOffsetBackingStore extends MemoryOffsetBackingStore implemen ) ); protected static final ObjectMapper mapper = new ObjectMapper(); + public static String CONFIGURATION_FIELD_PREFIX_STRING = "offset.storage."; private static final Logger LOG = LoggerFactory.getLogger(IcebergOffsetBackingStore.class); protected Map data = new HashMap<>(); Catalog icebergCatalog; @@ -86,7 +89,9 @@ public IcebergOffsetBackingStore() { @Override public void configure(WorkerConfig config) { super.configure(config); + offsetConfig = new IcebergOffsetBackingStoreConfig(Configuration.from(config.originalsStrings())); + icebergCatalog = CatalogUtil.buildIcebergCatalog(offsetConfig.catalogName(), offsetConfig.icebergProperties(), offsetConfig.hadoopConfig()); tableFullName = String.format("%s.%s", offsetConfig.catalogName(), offsetConfig.tableName()); @@ -241,12 +246,12 @@ public Future> get(final Collection keys }); } - public String fromByteBuffer(ByteBuffer data) { - return (data != null) ? String.valueOf(StandardCharsets.UTF_16.decode(data.asReadOnlyBuffer())) : null; + public static String fromByteBuffer(ByteBuffer data) { + return (data != null) ? String.valueOf(StandardCharsets.UTF_8.decode(data.asReadOnlyBuffer())) : null; } - public ByteBuffer toByteBuffer(String data) { - return (data != null) ? ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_16)) : null; + public static ByteBuffer toByteBuffer(String data) { + return (data != null) ? ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)) : null; } public static class IcebergOffsetBackingStoreConfig extends WorkerConfig { @@ -257,29 +262,21 @@ public static class IcebergOffsetBackingStoreConfig extends WorkerConfig { public IcebergOffsetBackingStoreConfig(Configuration config) { super(new ConfigDef(), config.asMap()); this.config = config; - - final Map conf = new HashMap<>(); - this.config.forEach((propName, value) -> { - if (propName.startsWith(IcebergChangeConsumer.PROP_PREFIX)) { - final String newPropName = propName.substring(IcebergChangeConsumer.PROP_PREFIX.length()); - conf.put(newPropName, value); - } - }); - + Map conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX); conf.forEach(hadoopConfig::set); icebergProperties.putAll(conf); } public String catalogName() { - return this.config.getString(Field.create("debezium.sink.iceberg.catalog-name").withDefault("default")); + return this.config.getString(Field.create(PROP_PREFIX + "catalog-name").withDefault("default")); } public String tableName() { - return this.config.getString(Field.create("offset.storage.iceberg.table-name").withDefault("debezium_offset_storage")); + return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-name").withDefault("debezium_offset_storage")); } public String getMigrateOffsetFile() { - return this.config.getString(Field.create("offset.storage.iceberg.migrate-offset-file").withDefault("")); + return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.migrate-offset-file").withDefault("")); } public org.apache.hadoop.conf.Configuration hadoopConfig() { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java index 14e9312b..8416214e 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java @@ -56,10 +56,10 @@ public TestConfigSource() { config.put("debezium.transforms.unwrap.drop.tombstones", "true"); // DEBEZIUM SOURCE conf - config.put("debezium.source.offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore"); - //config.put("debezium.source.offset.storage", "io.debezium.server.iceberg.offset.IcebergOffsetBackingStore"); - config.put("debezium.source.database.history", "io.debezium.relational.history.MemorySchemaHistory"); - config.put("debezium.source.schema.history.internal", "io.debezium.relational.history.MemorySchemaHistory"); + config.put("debezium.source.offset.storage", "io.debezium.server.iceberg.offset.IcebergOffsetBackingStore"); + config.put("debezium.source.offset.storage.iceberg.table-name", "debezium_offset_storage_custom_table"); + config.put("debezium.source.schema.history.internal", "io.debezium.server.iceberg.history.IcebergSchemaHistory"); + config.put("debezium.source.schema.history.internal.iceberg.table-name", "debezium_database_history_storage_test"); config.put("debezium.source.offset.flush.interval.ms", "60000"); config.put("debezium.source.database.server.name", "testc"); config.put("debezium.source.database.server.id", "1234"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java index d5152ccd..2296fbe1 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java @@ -65,8 +65,6 @@ public Map getConfigOverrides() { Map config = new HashMap<>(); config.put("quarkus.profile", "mysql"); config.put("%mysql.debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector"); - config.put("debezium.source.schema.history", "io.debezium.server.iceberg.history.IcebergSchemaHistory"); - config.put("debezium.source.schema.history.iceberg.table-name", "debezium_database_history_storage_test"); config.put("debezium.source.schema.history.internal", "io.debezium.server.iceberg.history.IcebergSchemaHistory"); config.put("debezium.source.schema.history.internal.iceberg.table-name", "debezium_database_history_storage_test"); config.put("debezium.source.table.whitelist", "inventory.customers"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java index c104235f..96222924 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java @@ -8,19 +8,20 @@ package io.debezium.server.iceberg.offset; +import io.debezium.server.iceberg.testresources.BaseTest; import io.debezium.server.iceberg.testresources.S3Minio; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; -import io.quarkus.test.junit.QuarkusTestProfile; -import io.quarkus.test.junit.TestProfile; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import com.google.common.collect.Lists; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.util.Callback; @@ -28,24 +29,17 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import static io.debezium.server.iceberg.offset.IcebergOffsetBackingStore.fromByteBuffer; +import static io.debezium.server.iceberg.offset.IcebergOffsetBackingStore.toByteBuffer; import static org.junit.jupiter.api.Assertions.assertEquals; @QuarkusTest -@TestProfile(IcebergOffsetBackingStoreTest.TestProfile.class) @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) -public class IcebergOffsetBackingStoreTest { +public class IcebergOffsetBackingStoreTest extends BaseTest { private static final Map firstSet = new HashMap<>(); private static final Map secondSet = new HashMap<>(); - public static String fromByteBuffer(ByteBuffer data) { - return (data != null) ? String.valueOf(StandardCharsets.UTF_16.decode(data.asReadOnlyBuffer())) : null; - } - - public static ByteBuffer toByteBuffer(String data) { - return (data != null) ? ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_16)) : null; - } - @BeforeAll public static void setup() { firstSet.put(toByteBuffer("key"), toByteBuffer("value")); @@ -59,7 +53,8 @@ public Map config() { for (String propName : ConfigProvider.getConfig().getPropertyNames()) { if (propName.startsWith("debezium")) { try { - conf.put(propName, ConfigProvider.getConfig().getValue(propName, String.class)); + conf.put(propName.replace("debezium.source.", IcebergOffsetBackingStore.CONFIGURATION_FIELD_PREFIX_STRING) + , ConfigProvider.getConfig().getValue(propName, String.class)); } catch (Exception e) { conf.put(propName, ""); } @@ -91,8 +86,12 @@ public void testGetSet() throws Exception { store.set(firstSet, cb).get(); Map values = store.get(Arrays.asList(toByteBuffer("key"), toByteBuffer("bad"))).get(); - assertEquals(toByteBuffer("value"), values.get(toByteBuffer("key"))); + assertEquals(("value"), fromByteBuffer(values.get(toByteBuffer("key")))); Assertions.assertNull(values.get(toByteBuffer("bad"))); + + CloseableIterable d = getTableDataV2("mycatalog", "debezium_offset_storage"); + d.forEach(System.out::println); + Assertions.assertEquals(1, Lists.newArrayList(d).size()); } @Test @@ -123,15 +122,4 @@ public TestWorkerConfig(Map props) { super(new ConfigDef(), props); } } - - public static class TestProfile implements QuarkusTestProfile { - @Override - public Map getConfigOverrides() { - Map config = new HashMap<>(); - config.put("debezium.source.offset.storage", "io.debezium.server.iceberg.offset.IcebergOffsetBackingStore"); - config.put("debezium.source.offset.flush.interval.ms", "60000"); - config.put("debezium.source.offset.storage.iceberg.table-name", "debezium_offset_storage_custom_table"); - return config; - } - } } \ No newline at end of file From 186457d56ff80876a9c94e9276d8a9c842f87b72 Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 11 May 2023 20:48:22 +0200 Subject: [PATCH 2/5] Use iceberg storage classes for tests --- .../debezium/server/iceberg/IcebergChangeConsumerTest.java | 7 +++++++ .../iceberg/offset/IcebergOffsetBackingStoreTest.java | 4 ++-- .../server/iceberg/testresources/BaseSparkTest.java | 2 +- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 7271df26..c15463cc 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -20,13 +20,17 @@ import java.util.HashMap; import java.util.Map; +import com.google.common.collect.Lists; import jakarta.inject.Inject; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.awaitility.Awaitility; import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -271,6 +275,9 @@ public void testSimpleUpload() { return false; } }); + + CloseableIterable d = getTableDataV2(TableIdentifier.of("mycatalog", "debezium_offset_storage")); + Assertions.assertEquals(1, Lists.newArrayList(d).size()); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java index 96222924..225e9875 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java @@ -20,6 +20,7 @@ import java.util.Map; import com.google.common.collect.Lists; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.CloseableIterable; import org.apache.kafka.common.config.ConfigDef; @@ -89,8 +90,7 @@ public void testGetSet() throws Exception { assertEquals(("value"), fromByteBuffer(values.get(toByteBuffer("key")))); Assertions.assertNull(values.get(toByteBuffer("bad"))); - CloseableIterable d = getTableDataV2("mycatalog", "debezium_offset_storage"); - d.forEach(System.out::println); + CloseableIterable d = getTableDataV2(TableIdentifier.of("mycatalog", "debezium_offset_storage")); Assertions.assertEquals(1, Lists.newArrayList(d).size()); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java index 611ec071..7e7e5e22 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java @@ -28,7 +28,7 @@ * * @author Ismail Simsek */ -public class BaseSparkTest { +public class BaseSparkTest extends BaseTest { protected static final SparkConf sparkconf = new SparkConf() .setAppName("CDC-S3-Batch-Spark-Sink") From f15f56e011d3b7802d72c3887d00ca0dcc35b40f Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 11 May 2023 21:06:59 +0200 Subject: [PATCH 3/5] Use iceberg storage classes for tests --- .../iceberg/offset/IcebergOffsetBackingStore.java | 10 +++++++--- .../server/iceberg/IcebergChangeConsumerTest.java | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java index 5f79f4ed..360efb15 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java @@ -94,8 +94,8 @@ public void configure(WorkerConfig config) { icebergCatalog = CatalogUtil.buildIcebergCatalog(offsetConfig.catalogName(), offsetConfig.icebergProperties(), offsetConfig.hadoopConfig()); - tableFullName = String.format("%s.%s", offsetConfig.catalogName(), offsetConfig.tableName()); - tableId = TableIdentifier.of(Namespace.of(offsetConfig.catalogName()), offsetConfig.tableName()); + tableFullName = String.format("%s.%s", offsetConfig.tableNamespace(), offsetConfig.tableName()); + tableId = TableIdentifier.of(Namespace.of(offsetConfig.tableNamespace()), offsetConfig.tableName()); } @Override @@ -268,7 +268,11 @@ public IcebergOffsetBackingStoreConfig(Configuration config) { } public String catalogName() { - return this.config.getString(Field.create(PROP_PREFIX + "catalog-name").withDefault("default")); + return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.catalog-name").withDefault("default")); + } + + public String tableNamespace() { + return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-namespace").withDefault("default")); } public String tableName() { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index c15463cc..e576a6fe 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -276,7 +276,7 @@ public void testSimpleUpload() { } }); - CloseableIterable d = getTableDataV2(TableIdentifier.of("mycatalog", "debezium_offset_storage")); + CloseableIterable d = getTableDataV2(TableIdentifier.of("debeziumevents", "debezium_offset_storage_custom_table")); Assertions.assertEquals(1, Lists.newArrayList(d).size()); } From c8ead9c96c5656b18a9f23ea771df6da6fd97c24 Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 11 May 2023 21:10:10 +0200 Subject: [PATCH 4/5] Use iceberg storage classes for tests --- .../server/iceberg/IcebergChangeConsumerTest.java | 11 ++++++++--- .../io/debezium/server/iceberg/TestConfigSource.java | 1 - 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index e576a6fe..75bf8829 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -30,7 +30,6 @@ import org.apache.spark.sql.Row; import org.awaitility.Awaitility; import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -276,8 +275,14 @@ public void testSimpleUpload() { } }); - CloseableIterable d = getTableDataV2(TableIdentifier.of("debeziumevents", "debezium_offset_storage_custom_table")); - Assertions.assertEquals(1, Lists.newArrayList(d).size()); + Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { + try { + CloseableIterable d = getTableDataV2(TableIdentifier.of("debeziumevents", "debezium_offset_storage_custom_table")); + return Lists.newArrayList(d).size() == 1; + } catch (Exception e) { + return false; + } + }); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java index 8416214e..5294a867 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java @@ -60,7 +60,6 @@ public TestConfigSource() { config.put("debezium.source.offset.storage.iceberg.table-name", "debezium_offset_storage_custom_table"); config.put("debezium.source.schema.history.internal", "io.debezium.server.iceberg.history.IcebergSchemaHistory"); config.put("debezium.source.schema.history.internal.iceberg.table-name", "debezium_database_history_storage_test"); - config.put("debezium.source.offset.flush.interval.ms", "60000"); config.put("debezium.source.database.server.name", "testc"); config.put("debezium.source.database.server.id", "1234"); config.put("debezium.source.topic.prefix", "testc"); From 1264ccb140f16db5a854bd60259aabbd25894ece Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 11 May 2023 21:29:01 +0200 Subject: [PATCH 5/5] Use iceberg storage classes for tests --- .../io/debezium/server/iceberg/IcebergChangeConsumerTest.java | 1 + .../test/java/io/debezium/server/iceberg/TestConfigSource.java | 1 + .../server/iceberg/offset/IcebergOffsetBackingStoreTest.java | 2 +- 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 75bf8829..98eef82f 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -278,6 +278,7 @@ public void testSimpleUpload() { Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { try { CloseableIterable d = getTableDataV2(TableIdentifier.of("debeziumevents", "debezium_offset_storage_custom_table")); + System.out.println(Lists.newArrayList(d)); return Lists.newArrayList(d).size() == 1; } catch (Exception e) { return false; diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java index 5294a867..76fd1c2e 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java @@ -60,6 +60,7 @@ public TestConfigSource() { config.put("debezium.source.offset.storage.iceberg.table-name", "debezium_offset_storage_custom_table"); config.put("debezium.source.schema.history.internal", "io.debezium.server.iceberg.history.IcebergSchemaHistory"); config.put("debezium.source.schema.history.internal.iceberg.table-name", "debezium_database_history_storage_test"); + config.put("debezium.source.offset.flush.interval.ms", "1000"); config.put("debezium.source.database.server.name", "testc"); config.put("debezium.source.database.server.id", "1234"); config.put("debezium.source.topic.prefix", "testc"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java index 225e9875..d6724d07 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java @@ -90,7 +90,7 @@ public void testGetSet() throws Exception { assertEquals(("value"), fromByteBuffer(values.get(toByteBuffer("key")))); Assertions.assertNull(values.get(toByteBuffer("bad"))); - CloseableIterable d = getTableDataV2(TableIdentifier.of("mycatalog", "debezium_offset_storage")); + CloseableIterable d = getTableDataV2(TableIdentifier.of("default", "debezium_offset_storage")); Assertions.assertEquals(1, Lists.newArrayList(d).size()); }