Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding mysql test #12

Merged
merged 4 commits into from
Jun 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,20 @@
*
* @author Ismail Simsek
*/
public class EventToIcebergTable {
protected static final Logger LOGGER = LoggerFactory.getLogger(EventToIcebergTable.class);
public class DebeziumToIcebergTable {
protected static final Logger LOGGER = LoggerFactory.getLogger(DebeziumToIcebergTable.class);

private final Schema schemaTable;
private final Schema schemaTableRowKeyIdentifier;
private final Schema tableSchema;
private final Schema tableRowIdentifierSchema;

public EventToIcebergTable(byte[] eventKey, byte[] eventVal) throws IOException {
schemaTable = extractSchema(eventVal);
schemaTableRowKeyIdentifier = extractSchema(eventKey);
public DebeziumToIcebergTable(byte[] eventKey, byte[] eventVal) throws IOException {
tableSchema = extractSchema(eventVal);
tableRowIdentifierSchema = extractSchema(eventKey);
}

public EventToIcebergTable(byte[] eventVal) throws IOException {
schemaTable = extractSchema(eventVal);
schemaTableRowKeyIdentifier = null;
public DebeziumToIcebergTable(byte[] eventVal) throws IOException {
tableSchema = extractSchema(eventVal);
tableRowIdentifierSchema = null;
}

private Schema extractSchema(byte[] eventVal) throws IOException {
Expand All @@ -51,38 +51,38 @@ private Schema extractSchema(byte[] eventVal) throws IOException {
return null;
}

public Schema getSchemaTable() {
return schemaTable;
public Schema getTableSchema() {
return tableSchema;
}

public Schema getSchemaTableRowKeyIdentifier() {
return schemaTableRowKeyIdentifier;
public Schema getTableRowIdentifierSchema() {
return tableRowIdentifierSchema;
}

private Schema getIcebergSchema(JsonNode eventSchema) {
return IcebergUtil.getIcebergSchema(eventSchema);
}

public boolean hasSchema() {
return schemaTable != null;
return tableSchema != null;
}

public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) {

if (this.hasSchema()) {
Catalog.TableBuilder tb = icebergCatalog.buildTable(tableIdentifier, this.schemaTable);
Catalog.TableBuilder tb = icebergCatalog.buildTable(tableIdentifier, this.tableSchema);

if (this.schemaTableRowKeyIdentifier != null) {
SortOrder.Builder sob = SortOrder.builderFor(schemaTable);
for (Types.NestedField coll : schemaTableRowKeyIdentifier.columns()) {
if (this.tableRowIdentifierSchema != null) {
SortOrder.Builder sob = SortOrder.builderFor(tableSchema);
for (Types.NestedField coll : tableRowIdentifierSchema.columns()) {
sob = sob.asc(coll.name(), NullOrder.NULLS_FIRST);
}
tb.withSortOrder(sob.build());
// "@TODO waiting spec v2 // use as PK / RowKeyIdentifier
}

LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schemaTable,
schemaTableRowKeyIdentifier);
LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, tableSchema,
tableRowIdentifierSchema);
Table table = tb.create();
// @TODO remove once spec v2 released
return upgradeToFormatVersion2(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ public String map(String destination) {
private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent<Object, Object> event) {
if (eventSchemaEnabled && event.value() != null) {
try {
EventToIcebergTable eventSchema = event.key() == null
? new EventToIcebergTable(getBytes(event.value()))
: new EventToIcebergTable(getBytes(event.key()), getBytes(event.value()));
DebeziumToIcebergTable eventSchema = event.key() == null
? new DebeziumToIcebergTable(getBytes(event.value()))
: new DebeziumToIcebergTable(getBytes(event.key()), getBytes(event.value()));

return eventSchema.create(icebergCatalog, tableIdentifier);
} catch (Exception e) {
Expand Down Expand Up @@ -164,6 +164,12 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
}
addToTable(icebergTable, event.getValue());
}
// workaround! somehow offset is not saved to file unless we call committer.markProcessed
// even its should be saved to file periodically
for (ChangeEvent<Object, Object> record : records) {
LOGGER.trace("Processed event '{}'", record);
committer.markProcessed(record);
}
committer.markBatchFinished();

if (dynamicWaitEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,33 @@ debezium.source.database.dbname=postgres
debezium.source.database.server.name=tutorial
debezium.source.schema.include.list=inventory

# @TODO add defaults!!
debezium.sink.iceberg.fs.defaultFS=xyz
debezium.sink.batch.objectkey.mapper=xyz
debezium.format.schemas.enable=xyz
debezium.sink.batch.batchwriter=xyz
debezium.sink.iceberg.warehouse=xyz
debezium.transforms=xyz
# configure batch behaviour/size
debezium.source.max.batch.size=2048
debezium.source.poll.interval.ms=10000 # 5 seconds!

# enable schemas
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true

# @TODO add multiple example!!
# debezium unwrap message
debezium.transforms=unwrap
debezium.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.transforms.unwrap.add.fields=op,table,lsn,source.ts_ms,db
debezium.transforms.unwrap.delete.handling.mode=rewrite

# iceberg settings
debezium.sink.iceberg.upsert=false
debezium.sink.iceberg.upsert-keep-deletes=true
debezium.sink.iceberg.table-prefix=debeziumcdc_
debezium.sink.iceberg.fs.defaultFS=s3a://S3_BUCKET
debezium.sink.iceberg.warehouse=s3a://S3_BUCKET/iceberg_warehouse
debezium.sink.iceberg.type=hadoop
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.hadoop.HadoopCatalog

# set logging levels
quarkus.log.level=INFO
quarkus.log.category."io.debezium.server.iceberg".level=DEBUG
quarkus.log.category."org.apache.hadoop".level=ERROR
quarkus.log.category."org.apache.parquet".level=WARN
quarkus.log.category."org.eclipse.jetty".level=WARN
quarkus.log.category."org.apache.iceberg".level=ERROR
12 changes: 0 additions & 12 deletions debezium-server-iceberg-sink/src/main/resources/testdata

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg;

import io.debezium.server.testresource.BaseSparkTest;
import io.debezium.server.testresource.S3Minio;
import io.debezium.server.testresource.SourceMysqlDB;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.TestProfile;

import java.time.Duration;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.Test;

/**
* Integration test that verifies basic reading from PostgreSQL database and writing to s3 destination.
*
* @author Ismail Simsek
*/
@QuarkusTest
@QuarkusTestResource(S3Minio.class)
@QuarkusTestResource(SourceMysqlDB.class)
@TestProfile(BatchSparkChangeConsumerMysqlTestProfile.class)
public class BatchSparkChangeConsumerMysqlTest extends BaseSparkTest {


@ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = "1000")
Integer maxBatchSize;

@Test
public void testSimpleUpload() {
Testing.Print.enable();

Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.customers");
df.show(false);
return df.filter("id is not null").count() >= 4;
} catch (Exception e) {
return false;
}
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg;

import io.quarkus.test.junit.QuarkusTestProfile;

import java.util.HashMap;
import java.util.Map;

public class BatchSparkChangeConsumerMysqlTestProfile implements QuarkusTestProfile {

//This method allows us to override configuration properties.
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("quarkus.profile", "mysql");
config.put("%mysql.debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector");
return config;
}

@Override
public String getConfigProfile() {
return "mysql";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
package io.debezium.server.iceberg;

import io.debezium.server.TestConfigSource;
import io.debezium.util.Testing;

import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -21,8 +23,10 @@ public class ConfigSource extends TestConfigSource {
public static final String S3_BUCKET = "test-bucket";

final Map<String, String> s3Test = new HashMap<>();
public static final Path HISTORY_FILE = Testing.Files.createTestingPath("dbhistory.txt").toAbsolutePath();

public ConfigSource() {
s3Test.put("quarkus.profile", "postgresql");
// common sink conf
s3Test.put("debezium.sink.type", "iceberg");
s3Test.put("debezium.sink.iceberg.upsert", "false");
Expand All @@ -48,15 +52,16 @@ public ConfigSource() {
// debezium unwrap message
s3Test.put("debezium.transforms", "unwrap");
s3Test.put("debezium.transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState");
s3Test.put("debezium.transforms.unwrap.add.fields", "op,table,lsn,source.ts_ms");
s3Test.put("debezium.transforms.unwrap.add.headers", "db");
s3Test.put("debezium.transforms.unwrap.add.fields", "op,table,source.ts_ms,db");
s3Test.put("debezium.transforms.unwrap.delete.handling.mode", "rewrite");

// DEBEZIUM SOURCE conf
s3Test.put("debezium.source." + StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
s3Test.put("debezium.source.database.history", "io.debezium.relational.history.FileDatabaseHistory");
s3Test.put("debezium.source.database.history.file.filename", HISTORY_FILE.toAbsolutePath().toString());
s3Test.put("debezium.source.offset.flush.interval.ms", "60000");
s3Test.put("debezium.source.database.server.name", "testc");
s3Test.put("debezium.source.schema.whitelist", "inventory");
s3Test.put("%postgresql.debezium.source.schema.whitelist", "inventory");
s3Test.put("debezium.source.table.whitelist", "inventory.customers,inventory.orders,inventory.products," +
"inventory.geom,inventory.table_datatypes,inventory.test_date_table");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,14 @@ public class SourceMysqlDB implements QuarkusTestResourceLifecycleManager {
public static final Integer MYSQL_PORT_DEFAULT = 3306;
private static final Logger LOGGER = LoggerFactory.getLogger(SourceMysqlDB.class);

private static GenericContainer<?> container;
static private GenericContainer<?> container;

@Override
public void stop() {
if (container != null) {
container.stop();
}
}

public static void runSQL(String query) throws SQLException, ClassNotFoundException {
try {
Expand All @@ -52,17 +59,6 @@ public static void runSQL(String query) throws SQLException, ClassNotFoundExcept
}
}

public static Integer getMappedPort() {
return container.getMappedPort(MYSQL_PORT_DEFAULT);
}

@Override
public void stop() {
if (container != null) {
container.stop();
}
}

@Override
public Map<String, String> start() {
container = new GenericContainer<>(MYSQL_IMAGE)
Expand All @@ -82,4 +78,9 @@ public Map<String, String> start() {
return params;
}

public static Integer getMappedPort() {
return container.getMappedPort(MYSQL_PORT_DEFAULT);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ public void markProcessed(Object record) throws InterruptedException {
public synchronized void markBatchFinished() throws InterruptedException {
return;
}

@Override
public void markProcessed(Object record, DebeziumEngine.Offsets sourceOffsets) throws InterruptedException {
return;
}

@Override
public DebeziumEngine.Offsets buildOffsets() {
return null;
}
};
}

Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
<version.awssdk>2.16.34</version.awssdk>
<version.parquet>1.11.1</version.parquet>
<!-- Debezium -->
<version.debezium>1.4.2.Final</version.debezium>
<version.debezium>1.5.2.Final</version.debezium>
<!-- Quarkus -->
<version.quarkus>1.13.2.Final</version.quarkus>
<version.quarkus>1.13.7.Final</version.quarkus>
</properties>

<dependencyManagement>
Expand Down