Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade debezium to 2.x #176

Merged
merged 1 commit into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ debezium.source.database.user=postgres
debezium.source.database.password=postgres
debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.database.server.id=1234
debezium.source.schema.include.list=inventory
debezium.source.topic.prefix=dbz_

# sql server source
#debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
Expand Down
11 changes: 11 additions & 0 deletions debezium-server-iceberg-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@

<!-- Testing -->
<!-- spark for tests -->
<!-- antlr4 version compatible with spark, testing only -->
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<version>${version.antlr}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.13</artifactId>
Expand All @@ -170,6 +177,10 @@
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@
import static org.apache.iceberg.types.Types.NestedField.required;

/**
* A {@link DatabaseHistory} implementation that stores the schema history to database table
* A {@link SchemaHistory} implementation that stores the schema history to database table
*
* @author Ismail Simsek
*/
@ThreadSafe
@Incubating
public final class IcebergSchemaHistory extends AbstractDatabaseHistory {
public final class IcebergSchemaHistory extends AbstractSchemaHistory {

public static final String DATABASE_HISTORY_STORAGE_TABLE_INSERT = "INSERT INTO %s VALUES ( ?, ?, ? )";
public static final String DATABASE_HISTORY_STORAGE_TABLE_SELECT = "SELECT id, history_data, record_insert_ts FROM %s ORDER BY " +
Expand All @@ -84,7 +84,7 @@ public final class IcebergSchemaHistory extends AbstractDatabaseHistory {
private Table historyTable;

@Override
public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener, boolean useCatalogBeforeSchema) {
public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
super.configure(config, comparator, listener, useCatalogBeforeSchema);
this.historyConfig = new IcebergSchemaHistoryConfig(config);
icebergCatalog = CatalogUtil.buildIcebergCatalog(this.historyConfig.catalogName(),
Expand All @@ -93,7 +93,7 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
tableId = TableIdentifier.of(Namespace.of(this.historyConfig.catalogName()), this.historyConfig.tableName());

if (running.get()) {
throw new DatabaseHistoryException("Bigquery database history process already initialized table: " + tableFullName);
throw new SchemaHistoryException("Bigquery database history process already initialized table: " + tableFullName);
}
}

Expand All @@ -107,7 +107,7 @@ public void start() {
initializeStorage();
}
} catch (Exception e) {
throw new DatabaseHistoryException("Unable to create history table: " + tableFullName + " : " + e.getMessage(),
throw new SchemaHistoryException("Unable to create history table: " + tableFullName + " : " + e.getMessage(),
e);
}
}
Expand All @@ -119,7 +119,7 @@ public String getTableFullName() {
}

@Override
protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
if (record == null) {
return;
}
Expand Down Expand Up @@ -161,7 +161,7 @@ protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException
/// END iceberg append
LOG.trace("Successfully saved history data to bigquery table");
} catch (IOException e) {
throw new DatabaseHistoryException("Failed to store record: " + record, e);
throw new SchemaHistoryException("Failed to store record: " + record, e);
}
});
}
Expand Down Expand Up @@ -239,17 +239,17 @@ public void initializeStorage() {

if (!Strings.isNullOrEmpty(historyConfig.getMigrateHistoryFile().strip())) {
LOG.warn("Migrating history from file {}", historyConfig.getMigrateHistoryFile());
this.loadFileDatabaseHistory(new File(historyConfig.getMigrateHistoryFile()));
this.loadFileSchemaHistory(new File(historyConfig.getMigrateHistoryFile()));
}
} catch (Exception e) {
throw new DatabaseHistoryException("Creation of database history topic failed, please create the topic manually", e);
throw new SchemaHistoryException("Creation of database history topic failed, please create the topic manually", e);
}
} else {
LOG.debug("Storage is exists, skipping initialization");
}
}

private void loadFileDatabaseHistory(File file) {
private void loadFileSchemaHistory(File file) {
LOG.warn(String.format("Migrating file database history from:'%s' to Bigquery database history storage: %s",
file.toPath(), tableFullName));
AtomicInteger numRecords = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,12 @@ public TestConfigSource() {
// DEBEZIUM SOURCE conf
config.put("debezium.source.offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore");
//config.put("debezium.source.offset.storage", "io.debezium.server.iceberg.offset.IcebergOffsetBackingStore");
config.put("debezium.source.database.history", "io.debezium.relational.history.MemoryDatabaseHistory");
config.put("debezium.source.database.history", "io.debezium.relational.history.MemorySchemaHistory");
config.put("debezium.source.schema.history.internal", "io.debezium.relational.history.MemorySchemaHistory");
config.put("debezium.source.offset.flush.interval.ms", "60000");
config.put("debezium.source.database.server.name", "testc");
config.put("debezium.source.database.server.id", "1234");
config.put("debezium.source.topic.prefix", "testc");
config.put("%postgresql.debezium.source.schema.whitelist", "inventory");
config.put("%postgresql.debezium.source.database.whitelist", "inventory");
config.put("debezium.source.table.whitelist", "inventory.*");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("quarkus.profile", "mysql");
config.put("%mysql.debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector");
config.put("debezium.source.database.history", "io.debezium.server.iceberg.history.IcebergSchemaHistory");
config.put("debezium.source.database.history.iceberg.table-name", "debezium_database_history_storage_test");
config.put("debezium.source.schema.history", "io.debezium.server.iceberg.history.IcebergSchemaHistory");
config.put("debezium.source.schema.history.iceberg.table-name", "debezium_database_history_storage_test");
config.put("debezium.source.schema.history.internal", "io.debezium.server.iceberg.history.IcebergSchemaHistory");
config.put("debezium.source.schema.history.internal.iceberg.table-name", "debezium_database_history_storage_test");
config.put("debezium.source.table.whitelist", "inventory.customers");
return config;
}
Expand Down
33 changes: 7 additions & 26 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,24 @@

<version.groovy>3.0.13</version.groovy>
<version.assembly.plugin>3.4.2</version.assembly.plugin>
<version.jackson>2.13.4.20221013</version.jackson>
<version.iceberg>1.2.0</version.iceberg>
<version.jackson>2.13.5</version.jackson>
<version.iceberg>1.2.1</version.iceberg>
<version.spark>3.3.1</version.spark>
<version.hadoop>3.3.4</version.hadoop>
<version.awssdk>2.19.4</version.awssdk>
<version.parquet>1.12.3</version.parquet>
<version.testcontainers>1.17.6</version.testcontainers>
<version.kafkaclients>3.2.1</version.kafkaclients>
<!-- Debezium -->
<version.debezium>1.9.7.Final</version.debezium>
<version.mysql.driver>8.0.30</version.mysql.driver>
<version.debezium>2.1.4.Final</version.debezium>
<version.mysql.driver>8.0.32</version.mysql.driver>
<!-- Quarkus -->
<version.quarkus>2.15.3.Final</version.quarkus>
<version.quarkus>2.16.6.Final</version.quarkus>
<!-- ANTLR -->
<!-- Align with Antlr runtime version pulled in via debezium -->
<version.antlr>4.8</version.antlr>
<!-- Align with Antlr runtime version with spark for unit tests -->
<version.antlr>4.9.3</version.antlr>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<version>${version.antlr}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${version.kafkaclients}</version>
</dependency>
<!-- log -->
<dependency>
<groupId>org.jboss.slf4j</groupId>
Expand All @@ -73,12 +61,6 @@
<type>pom</type>
<scope>import</scope>
</dependency>
<!-- parquet -->
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
<version>${version.parquet}</version>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-bom</artifactId>
Expand Down Expand Up @@ -123,7 +105,6 @@
<artifactId>groovy-jsr223</artifactId>
<version>${version.groovy}</version>
</dependency>

<!-- jackson version for spark -->
<dependency>
<groupId>com.fasterxml.jackson</groupId>
Expand Down