diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java index 326267b8..147395f8 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java @@ -63,9 +63,6 @@ @Incubating public final class IcebergSchemaHistory extends AbstractSchemaHistory { - public static final String DATABASE_HISTORY_STORAGE_TABLE_INSERT = "INSERT INTO %s VALUES ( ?, ?, ? )"; - public static final String DATABASE_HISTORY_STORAGE_TABLE_SELECT = "SELECT id, history_data, record_insert_ts FROM %s ORDER BY " + - "record_insert_ts ASC"; static final Schema DATABASE_HISTORY_TABLE_SCHEMA = new Schema( required(1, "id", Types.StringType.get()), optional(2, "history_data", Types.StringType.get()), diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java index e3bb1cf0..360efb15 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java @@ -11,7 +11,7 @@ import io.debezium.DebeziumException; import io.debezium.config.Configuration; import io.debezium.config.Field; -import io.debezium.server.iceberg.IcebergChangeConsumer; +import io.debezium.server.iceberg.IcebergUtil; import io.debezium.util.Strings; import java.io.Closeable; @@ -55,8 +55,10 @@ import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.SafeObjectInputStream; +import org.eclipse.microprofile.config.ConfigProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static io.debezium.server.iceberg.IcebergChangeConsumer.PROP_PREFIX; import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; @@ -73,6 +75,7 @@ public class IcebergOffsetBackingStore extends MemoryOffsetBackingStore implemen ) ); protected static final ObjectMapper mapper = new ObjectMapper(); + public static String CONFIGURATION_FIELD_PREFIX_STRING = "offset.storage."; private static final Logger LOG = LoggerFactory.getLogger(IcebergOffsetBackingStore.class); protected Map data = new HashMap<>(); Catalog icebergCatalog; @@ -86,11 +89,13 @@ public IcebergOffsetBackingStore() { @Override public void configure(WorkerConfig config) { super.configure(config); + offsetConfig = new IcebergOffsetBackingStoreConfig(Configuration.from(config.originalsStrings())); + icebergCatalog = CatalogUtil.buildIcebergCatalog(offsetConfig.catalogName(), offsetConfig.icebergProperties(), offsetConfig.hadoopConfig()); - tableFullName = String.format("%s.%s", offsetConfig.catalogName(), offsetConfig.tableName()); - tableId = TableIdentifier.of(Namespace.of(offsetConfig.catalogName()), offsetConfig.tableName()); + tableFullName = String.format("%s.%s", offsetConfig.tableNamespace(), offsetConfig.tableName()); + tableId = TableIdentifier.of(Namespace.of(offsetConfig.tableNamespace()), offsetConfig.tableName()); } @Override @@ -241,12 +246,12 @@ public Future> get(final Collection keys }); } - public String fromByteBuffer(ByteBuffer data) { - return (data != null) ? String.valueOf(StandardCharsets.UTF_16.decode(data.asReadOnlyBuffer())) : null; + public static String fromByteBuffer(ByteBuffer data) { + return (data != null) ? String.valueOf(StandardCharsets.UTF_8.decode(data.asReadOnlyBuffer())) : null; } - public ByteBuffer toByteBuffer(String data) { - return (data != null) ? ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_16)) : null; + public static ByteBuffer toByteBuffer(String data) { + return (data != null) ? ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)) : null; } public static class IcebergOffsetBackingStoreConfig extends WorkerConfig { @@ -257,29 +262,25 @@ public static class IcebergOffsetBackingStoreConfig extends WorkerConfig { public IcebergOffsetBackingStoreConfig(Configuration config) { super(new ConfigDef(), config.asMap()); this.config = config; - - final Map conf = new HashMap<>(); - this.config.forEach((propName, value) -> { - if (propName.startsWith(IcebergChangeConsumer.PROP_PREFIX)) { - final String newPropName = propName.substring(IcebergChangeConsumer.PROP_PREFIX.length()); - conf.put(newPropName, value); - } - }); - + Map conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX); conf.forEach(hadoopConfig::set); icebergProperties.putAll(conf); } public String catalogName() { - return this.config.getString(Field.create("debezium.sink.iceberg.catalog-name").withDefault("default")); + return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.catalog-name").withDefault("default")); + } + + public String tableNamespace() { + return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-namespace").withDefault("default")); } public String tableName() { - return this.config.getString(Field.create("offset.storage.iceberg.table-name").withDefault("debezium_offset_storage")); + return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-name").withDefault("debezium_offset_storage")); } public String getMigrateOffsetFile() { - return this.config.getString(Field.create("offset.storage.iceberg.migrate-offset-file").withDefault("")); + return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.migrate-offset-file").withDefault("")); } public org.apache.hadoop.conf.Configuration hadoopConfig() { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java index 7271df26..98eef82f 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/IcebergChangeConsumerTest.java @@ -20,9 +20,12 @@ import java.util.HashMap; import java.util.Map; +import com.google.common.collect.Lists; import jakarta.inject.Inject; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.awaitility.Awaitility; @@ -271,6 +274,16 @@ public void testSimpleUpload() { return false; } }); + + Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { + try { + CloseableIterable d = getTableDataV2(TableIdentifier.of("debeziumevents", "debezium_offset_storage_custom_table")); + System.out.println(Lists.newArrayList(d)); + return Lists.newArrayList(d).size() == 1; + } catch (Exception e) { + return false; + } + }); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java index 14e9312b..76fd1c2e 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java @@ -56,11 +56,11 @@ public TestConfigSource() { config.put("debezium.transforms.unwrap.drop.tombstones", "true"); // DEBEZIUM SOURCE conf - config.put("debezium.source.offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore"); - //config.put("debezium.source.offset.storage", "io.debezium.server.iceberg.offset.IcebergOffsetBackingStore"); - config.put("debezium.source.database.history", "io.debezium.relational.history.MemorySchemaHistory"); - config.put("debezium.source.schema.history.internal", "io.debezium.relational.history.MemorySchemaHistory"); - config.put("debezium.source.offset.flush.interval.ms", "60000"); + config.put("debezium.source.offset.storage", "io.debezium.server.iceberg.offset.IcebergOffsetBackingStore"); + config.put("debezium.source.offset.storage.iceberg.table-name", "debezium_offset_storage_custom_table"); + config.put("debezium.source.schema.history.internal", "io.debezium.server.iceberg.history.IcebergSchemaHistory"); + config.put("debezium.source.schema.history.internal.iceberg.table-name", "debezium_database_history_storage_test"); + config.put("debezium.source.offset.flush.interval.ms", "1000"); config.put("debezium.source.database.server.name", "testc"); config.put("debezium.source.database.server.id", "1234"); config.put("debezium.source.topic.prefix", "testc"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java index d5152ccd..2296fbe1 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java @@ -65,8 +65,6 @@ public Map getConfigOverrides() { Map config = new HashMap<>(); config.put("quarkus.profile", "mysql"); config.put("%mysql.debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector"); - config.put("debezium.source.schema.history", "io.debezium.server.iceberg.history.IcebergSchemaHistory"); - config.put("debezium.source.schema.history.iceberg.table-name", "debezium_database_history_storage_test"); config.put("debezium.source.schema.history.internal", "io.debezium.server.iceberg.history.IcebergSchemaHistory"); config.put("debezium.source.schema.history.internal.iceberg.table-name", "debezium_database_history_storage_test"); config.put("debezium.source.table.whitelist", "inventory.customers"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java index c104235f..d6724d07 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java @@ -8,19 +8,21 @@ package io.debezium.server.iceberg.offset; +import io.debezium.server.iceberg.testresources.BaseTest; import io.debezium.server.iceberg.testresources.S3Minio; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; -import io.quarkus.test.junit.QuarkusTestProfile; -import io.quarkus.test.junit.TestProfile; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import com.google.common.collect.Lists; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.util.Callback; @@ -28,24 +30,17 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import static io.debezium.server.iceberg.offset.IcebergOffsetBackingStore.fromByteBuffer; +import static io.debezium.server.iceberg.offset.IcebergOffsetBackingStore.toByteBuffer; import static org.junit.jupiter.api.Assertions.assertEquals; @QuarkusTest -@TestProfile(IcebergOffsetBackingStoreTest.TestProfile.class) @QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) -public class IcebergOffsetBackingStoreTest { +public class IcebergOffsetBackingStoreTest extends BaseTest { private static final Map firstSet = new HashMap<>(); private static final Map secondSet = new HashMap<>(); - public static String fromByteBuffer(ByteBuffer data) { - return (data != null) ? String.valueOf(StandardCharsets.UTF_16.decode(data.asReadOnlyBuffer())) : null; - } - - public static ByteBuffer toByteBuffer(String data) { - return (data != null) ? ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_16)) : null; - } - @BeforeAll public static void setup() { firstSet.put(toByteBuffer("key"), toByteBuffer("value")); @@ -59,7 +54,8 @@ public Map config() { for (String propName : ConfigProvider.getConfig().getPropertyNames()) { if (propName.startsWith("debezium")) { try { - conf.put(propName, ConfigProvider.getConfig().getValue(propName, String.class)); + conf.put(propName.replace("debezium.source.", IcebergOffsetBackingStore.CONFIGURATION_FIELD_PREFIX_STRING) + , ConfigProvider.getConfig().getValue(propName, String.class)); } catch (Exception e) { conf.put(propName, ""); } @@ -91,8 +87,11 @@ public void testGetSet() throws Exception { store.set(firstSet, cb).get(); Map values = store.get(Arrays.asList(toByteBuffer("key"), toByteBuffer("bad"))).get(); - assertEquals(toByteBuffer("value"), values.get(toByteBuffer("key"))); + assertEquals(("value"), fromByteBuffer(values.get(toByteBuffer("key")))); Assertions.assertNull(values.get(toByteBuffer("bad"))); + + CloseableIterable d = getTableDataV2(TableIdentifier.of("default", "debezium_offset_storage")); + Assertions.assertEquals(1, Lists.newArrayList(d).size()); } @Test @@ -123,15 +122,4 @@ public TestWorkerConfig(Map props) { super(new ConfigDef(), props); } } - - public static class TestProfile implements QuarkusTestProfile { - @Override - public Map getConfigOverrides() { - Map config = new HashMap<>(); - config.put("debezium.source.offset.storage", "io.debezium.server.iceberg.offset.IcebergOffsetBackingStore"); - config.put("debezium.source.offset.flush.interval.ms", "60000"); - config.put("debezium.source.offset.storage.iceberg.table-name", "debezium_offset_storage_custom_table"); - return config; - } - } } \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java index 611ec071..7e7e5e22 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/BaseSparkTest.java @@ -28,7 +28,7 @@ * * @author Ismail Simsek */ -public class BaseSparkTest { +public class BaseSparkTest extends BaseTest { protected static final SparkConf sparkconf = new SparkConf() .setAppName("CDC-S3-Batch-Spark-Sink")