Skip to content

Commit

Permalink
Upgrade debezium to 2.x (#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Apr 20, 2023
1 parent cdd1ac0 commit b9d1a67
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.database.server.id=1234
debezium.source.schema.include.list=inventory
debezium.source.topic.prefix=dbz_

# sql server source
#debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
Expand Down
11 changes: 11 additions & 0 deletions debezium-server-iceberg-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@

<!-- Testing -->
<!-- spark for tests -->
<!-- antlr4 version compatible with spark, testing only -->
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<version>${version.antlr}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
Expand All @@ -170,6 +177,10 @@
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@
import static org.apache.iceberg.types.Types.NestedField.required;

/**
* A {@link DatabaseHistory} implementation that stores the schema history to database table
* A {@link SchemaHistory} implementation that stores the schema history to database table
*
* @author Ismail Simsek
*/
@ThreadSafe
@Incubating
public final class IcebergSchemaHistory extends AbstractDatabaseHistory {
public final class IcebergSchemaHistory extends AbstractSchemaHistory {

public static final String DATABASE_HISTORY_STORAGE_TABLE_INSERT = "INSERT INTO %s VALUES ( ?, ?, ? )";
public static final String DATABASE_HISTORY_STORAGE_TABLE_SELECT = "SELECT id, history_data, record_insert_ts FROM %s ORDER BY " +
Expand All @@ -84,7 +84,7 @@ public final class IcebergSchemaHistory extends AbstractDatabaseHistory {
private Table historyTable;

@Override
public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener, boolean useCatalogBeforeSchema) {
public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
super.configure(config, comparator, listener, useCatalogBeforeSchema);
this.historyConfig = new IcebergSchemaHistoryConfig(config);
icebergCatalog = CatalogUtil.buildIcebergCatalog(this.historyConfig.catalogName(),
Expand All @@ -93,7 +93,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
tableId = TableIdentifier.of(Namespace.of(this.historyConfig.catalogName()), this.historyConfig.tableName());

if (running.get()) {
throw new DatabaseHistoryException("Bigquery database history process already initialized table: " + tableFullName);
throw new SchemaHistoryException("Bigquery database history process already initialized table: " + tableFullName);
}
}

Expand All @@ -107,7 +107,7 @@ public void start() {
initializeStorage();
}
} catch (Exception e) {
throw new DatabaseHistoryException("Unable to create history table: " + tableFullName + " : " + e.getMessage(),
throw new SchemaHistoryException("Unable to create history table: " + tableFullName + " : " + e.getMessage(),
e);
}
}
Expand All @@ -119,7 +119,7 @@ public String getTableFullName() {
}

@Override
protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
if (record == null) {
return;
}
Expand Down Expand Up @@ -161,7 +161,7 @@ protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException
/// END iceberg append
LOG.trace("Successfully saved history data to bigquery table");
} catch (IOException e) {
throw new DatabaseHistoryException("Failed to store record: " + record, e);
throw new SchemaHistoryException("Failed to store record: " + record, e);
}
});
}
Expand Down Expand Up @@ -239,17 +239,17 @@ public void initializeStorage() {

if (!Strings.isNullOrEmpty(historyConfig.getMigrateHistoryFile().strip())) {
LOG.warn("Migrating history from file {}", historyConfig.getMigrateHistoryFile());
this.loadFileDatabaseHistory(new File(historyConfig.getMigrateHistoryFile()));
this.loadFileSchemaHistory(new File(historyConfig.getMigrateHistoryFile()));
}
} catch (Exception e) {
throw new DatabaseHistoryException("Creation of database history topic failed, please create the topic manually", e);
throw new SchemaHistoryException("Creation of database history topic failed, please create the topic manually", e);
}
} else {
LOG.debug("Storage is exists, skipping initialization");
}
}

private void loadFileDatabaseHistory(File file) {
private void loadFileSchemaHistory(File file) {
LOG.warn(String.format("Migrating file database history from:'%s' to Bigquery database history storage: %s",
file.toPath(), tableFullName));
AtomicInteger numRecords = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,12 @@ public TestConfigSource() {
// DEBEZIUM SOURCE conf
config.put("debezium.source.offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore");
//config.put("debezium.source.offset.storage", "io.debezium.server.iceberg.offset.IcebergOffsetBackingStore");
config.put("debezium.source.database.history", "io.debezium.relational.history.MemoryDatabaseHistory");
config.put("debezium.source.database.history", "io.debezium.relational.history.MemorySchemaHistory");
config.put("debezium.source.schema.history.internal", "io.debezium.relational.history.MemorySchemaHistory");
config.put("debezium.source.offset.flush.interval.ms", "60000");
config.put("debezium.source.database.server.name", "testc");
config.put("debezium.source.database.server.id", "1234");
config.put("debezium.source.topic.prefix", "testc");
config.put("%postgresql.debezium.source.schema.whitelist", "inventory");
config.put("%postgresql.debezium.source.database.whitelist", "inventory");
config.put("debezium.source.table.whitelist", "inventory.*");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("quarkus.profile", "mysql");
config.put("%mysql.debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector");
config.put("debezium.source.database.history", "io.debezium.server.iceberg.history.IcebergSchemaHistory");
config.put("debezium.source.database.history.iceberg.table-name", "debezium_database_history_storage_test");
config.put("debezium.source.schema.history", "io.debezium.server.iceberg.history.IcebergSchemaHistory");
config.put("debezium.source.schema.history.iceberg.table-name", "debezium_database_history_storage_test");
config.put("debezium.source.schema.history.internal", "io.debezium.server.iceberg.history.IcebergSchemaHistory");
config.put("debezium.source.schema.history.internal.iceberg.table-name", "debezium_database_history_storage_test");
config.put("debezium.source.table.whitelist", "inventory.customers");
return config;
}
Expand Down
33 changes: 7 additions & 26 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,24 @@

<version.groovy>3.0.13</version.groovy>
<version.assembly.plugin>3.4.2</version.assembly.plugin>
<version.jackson>2.13.4.20221013</version.jackson>
<version.iceberg>1.2.0</version.iceberg>
<version.jackson>2.13.5</version.jackson>
<version.iceberg>1.2.1</version.iceberg>
<version.spark>3.3.1</version.spark>
<version.hadoop>3.3.4</version.hadoop>
<version.awssdk>2.19.4</version.awssdk>
<version.parquet>1.12.3</version.parquet>
<version.testcontainers>1.17.6</version.testcontainers>
<version.kafkaclients>3.2.1</version.kafkaclients>
<!-- Debezium -->
<version.debezium>1.9.7.Final</version.debezium>
<version.mysql.driver>8.0.30</version.mysql.driver>
<version.debezium>2.1.4.Final</version.debezium>
<version.mysql.driver>8.0.32</version.mysql.driver>
<!-- Quarkus -->
<version.quarkus>2.15.3.Final</version.quarkus>
<version.quarkus>2.16.6.Final</version.quarkus>
<!-- ANTLR -->
<!-- Align with Antlr runtime version pulled in via debezium -->
<version.antlr>4.8</version.antlr>
<!-- Align with Antlr runtime version with spark for unit tests -->
<version.antlr>4.9.3</version.antlr>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<version>${version.antlr}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${version.kafkaclients}</version>
</dependency>
<!-- log -->
<dependency>
<groupId>org.jboss.slf4j</groupId>
Expand All @@ -73,12 +61,6 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- parquet -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
<version>${version.parquet}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-bom</artifactId>
Expand Down Expand Up @@ -123,7 +105,6 @@
<artifactId>groovy-jsr223</artifactId>
<version>${version.groovy}</version>
</dependency>

<!-- jackson version for spark -->
<dependency>
<groupId>com.fasterxml.jackson</groupId>
Expand Down

0 comments on commit b9d1a67

Please sign in to comment.