diff --git a/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example b/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example index 81dcb550..4f627a3b 100644 --- a/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example +++ b/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example @@ -41,7 +41,9 @@ debezium.source.database.user=postgres debezium.source.database.password=postgres debezium.source.database.dbname=postgres debezium.source.database.server.name=tutorial +debezium.source.database.server.id=1234 debezium.source.schema.include.list=inventory +debezium.source.topic.prefix=dbz_ # sql server source #debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector diff --git a/debezium-server-iceberg-sink/pom.xml b/debezium-server-iceberg-sink/pom.xml index 19c0ac68..73d9e9c8 100644 --- a/debezium-server-iceberg-sink/pom.xml +++ b/debezium-server-iceberg-sink/pom.xml @@ -152,6 +152,13 @@ + + + org.antlr + antlr4-runtime + ${version.antlr} + test + org.apache.spark spark-core_2.13 @@ -170,6 +177,10 @@ org.slf4j jul-to-slf4j + + org.apache.logging.log4j + log4j-slf4j-impl + diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java index 51687e28..dd200d18 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java @@ -55,13 +55,13 @@ import static org.apache.iceberg.types.Types.NestedField.required; /** - * A {@link DatabaseHistory} implementation that stores the schema history to database table + * A {@link SchemaHistory} implementation that stores the schema history to database table * * @author Ismail Simsek */ @ThreadSafe @Incubating -public final class IcebergSchemaHistory extends AbstractDatabaseHistory { +public final class IcebergSchemaHistory extends AbstractSchemaHistory { public static final String DATABASE_HISTORY_STORAGE_TABLE_INSERT = "INSERT INTO %s VALUES ( ?, ?, ? )"; public static final String DATABASE_HISTORY_STORAGE_TABLE_SELECT = "SELECT id, history_data, record_insert_ts FROM %s ORDER BY " + @@ -84,7 +84,7 @@ public final class IcebergSchemaHistory extends AbstractDatabaseHistory { private Table historyTable; @Override - public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener, boolean useCatalogBeforeSchema) { + public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { super.configure(config, comparator, listener, useCatalogBeforeSchema); this.historyConfig = new IcebergSchemaHistoryConfig(config); icebergCatalog = CatalogUtil.buildIcebergCatalog(this.historyConfig.catalogName(), @@ -93,7 +93,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator, tableId = TableIdentifier.of(Namespace.of(this.historyConfig.catalogName()), this.historyConfig.tableName()); if (running.get()) { - throw new DatabaseHistoryException("Bigquery database history process already initialized table: " + tableFullName); + throw new SchemaHistoryException("Bigquery database history process already initialized table: " + tableFullName); } } @@ -107,7 +107,7 @@ public void start() { initializeStorage(); } } catch (Exception e) { - throw new DatabaseHistoryException("Unable to create history table: " + tableFullName + " : " + e.getMessage(), + throw new SchemaHistoryException("Unable to create history table: " + tableFullName + " : " + e.getMessage(), e); } } @@ -119,7 +119,7 @@ public String getTableFullName() { } @Override - protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException { + protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { if (record == null) { return; } @@ -161,7 +161,7 @@ protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException /// END iceberg append LOG.trace("Successfully saved history data to bigquery table"); } catch (IOException e) { - throw new DatabaseHistoryException("Failed to store record: " + record, e); + throw new SchemaHistoryException("Failed to store record: " + record, e); } }); } @@ -239,17 +239,17 @@ public void initializeStorage() { if (!Strings.isNullOrEmpty(historyConfig.getMigrateHistoryFile().strip())) { LOG.warn("Migrating history from file {}", historyConfig.getMigrateHistoryFile()); - this.loadFileDatabaseHistory(new File(historyConfig.getMigrateHistoryFile())); + this.loadFileSchemaHistory(new File(historyConfig.getMigrateHistoryFile())); } } catch (Exception e) { - throw new DatabaseHistoryException("Creation of database history topic failed, please create the topic manually", e); + throw new SchemaHistoryException("Creation of database history topic failed, please create the topic manually", e); } } else { LOG.debug("Storage is exists, skipping initialization"); } } - private void loadFileDatabaseHistory(File file) { + private void loadFileSchemaHistory(File file) { LOG.warn(String.format("Migrating file database history from:'%s' to Bigquery database history storage: %s", file.toPath(), tableFullName)); AtomicInteger numRecords = new AtomicInteger(); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java index 46d6bc24..14e9312b 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java @@ -58,9 +58,12 @@ public TestConfigSource() { // DEBEZIUM SOURCE conf config.put("debezium.source.offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore"); //config.put("debezium.source.offset.storage", "io.debezium.server.iceberg.offset.IcebergOffsetBackingStore"); - config.put("debezium.source.database.history", "io.debezium.relational.history.MemoryDatabaseHistory"); + config.put("debezium.source.database.history", "io.debezium.relational.history.MemorySchemaHistory"); + config.put("debezium.source.schema.history.internal", "io.debezium.relational.history.MemorySchemaHistory"); config.put("debezium.source.offset.flush.interval.ms", "60000"); config.put("debezium.source.database.server.name", "testc"); + config.put("debezium.source.database.server.id", "1234"); + config.put("debezium.source.topic.prefix", "testc"); config.put("%postgresql.debezium.source.schema.whitelist", "inventory"); config.put("%postgresql.debezium.source.database.whitelist", "inventory"); config.put("debezium.source.table.whitelist", "inventory.*"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java index 2d0f9133..be6939b0 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java @@ -66,8 +66,10 @@ public Map getConfigOverrides() { Map config = new HashMap<>(); config.put("quarkus.profile", "mysql"); config.put("%mysql.debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector"); - config.put("debezium.source.database.history", "io.debezium.server.iceberg.history.IcebergSchemaHistory"); - config.put("debezium.source.database.history.iceberg.table-name", "debezium_database_history_storage_test"); + config.put("debezium.source.schema.history", "io.debezium.server.iceberg.history.IcebergSchemaHistory"); + config.put("debezium.source.schema.history.iceberg.table-name", "debezium_database_history_storage_test"); + config.put("debezium.source.schema.history.internal", "io.debezium.server.iceberg.history.IcebergSchemaHistory"); + config.put("debezium.source.schema.history.internal.iceberg.table-name", "debezium_database_history_storage_test"); config.put("debezium.source.table.whitelist", "inventory.customers"); return config; } diff --git a/pom.xml b/pom.xml index 77481d84..a8dac217 100644 --- a/pom.xml +++ b/pom.xml @@ -29,36 +29,24 @@ 3.0.13 3.4.2 - 2.13.4.20221013 - 1.2.0 + 2.13.5 + 1.2.1 3.3.1 3.3.4 2.19.4 - 1.12.3 1.17.6 - 3.2.1 - 1.9.7.Final - 8.0.30 + 2.1.4.Final + 8.0.32 - 2.15.3.Final + 2.16.6.Final - - 4.8 + + 4.9.3 - - org.antlr - antlr4-runtime - ${version.antlr} - - - org.apache.kafka - kafka-clients - ${version.kafkaclients} - org.jboss.slf4j @@ -73,12 +61,6 @@ pom import - - - org.apache.parquet - parquet-hadoop-bundle - ${version.parquet} - io.quarkus quarkus-bom @@ -123,7 +105,6 @@ groovy-jsr223 ${version.groovy} - com.fasterxml.jackson