Skip to content

Commit

Permalink
Use new storage calesses for tests (#195)
Browse files Browse the repository at this point in the history
* Use iceberg storage classes for tests

* Use iceberg storage classes for tests

* Use iceberg storage classes for tests

* Use iceberg storage classes for tests

* Use iceberg storage classes for tests
  • Loading branch information
ismailsimsek committed May 11, 2023
1 parent c0e07a2 commit 7b022d3
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@
@Incubating
public final class IcebergSchemaHistory extends AbstractSchemaHistory {

public static final String DATABASE_HISTORY_STORAGE_TABLE_INSERT = "INSERT INTO %s VALUES ( ?, ?, ? )";
public static final String DATABASE_HISTORY_STORAGE_TABLE_SELECT = "SELECT id, history_data, record_insert_ts FROM %s ORDER BY " +
"record_insert_ts ASC";
static final Schema DATABASE_HISTORY_TABLE_SCHEMA = new Schema(
required(1, "id", Types.StringType.get()),
optional(2, "history_data", Types.StringType.get()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.server.iceberg.IcebergChangeConsumer;
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.util.Strings;

import java.io.Closeable;
Expand Down Expand Up @@ -55,8 +55,10 @@
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.SafeObjectInputStream;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static io.debezium.server.iceberg.IcebergChangeConsumer.PROP_PREFIX;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

Expand All @@ -73,6 +75,7 @@ public class IcebergOffsetBackingStore extends MemoryOffsetBackingStore implemen
)
);
protected static final ObjectMapper mapper = new ObjectMapper();
public static String CONFIGURATION_FIELD_PREFIX_STRING = "offset.storage.";
private static final Logger LOG = LoggerFactory.getLogger(IcebergOffsetBackingStore.class);
protected Map<String, String> data = new HashMap<>();
Catalog icebergCatalog;
Expand All @@ -86,11 +89,13 @@ public IcebergOffsetBackingStore() {
@Override
public void configure(WorkerConfig config) {
super.configure(config);

offsetConfig = new IcebergOffsetBackingStoreConfig(Configuration.from(config.originalsStrings()));

icebergCatalog = CatalogUtil.buildIcebergCatalog(offsetConfig.catalogName(),
offsetConfig.icebergProperties(), offsetConfig.hadoopConfig());
tableFullName = String.format("%s.%s", offsetConfig.catalogName(), offsetConfig.tableName());
tableId = TableIdentifier.of(Namespace.of(offsetConfig.catalogName()), offsetConfig.tableName());
tableFullName = String.format("%s.%s", offsetConfig.tableNamespace(), offsetConfig.tableName());
tableId = TableIdentifier.of(Namespace.of(offsetConfig.tableNamespace()), offsetConfig.tableName());
}

@Override
Expand Down Expand Up @@ -241,12 +246,12 @@ public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys
});
}

public String fromByteBuffer(ByteBuffer data) {
return (data != null) ? String.valueOf(StandardCharsets.UTF_16.decode(data.asReadOnlyBuffer())) : null;
public static String fromByteBuffer(ByteBuffer data) {
return (data != null) ? String.valueOf(StandardCharsets.UTF_8.decode(data.asReadOnlyBuffer())) : null;
}

public ByteBuffer toByteBuffer(String data) {
return (data != null) ? ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_16)) : null;
public static ByteBuffer toByteBuffer(String data) {
return (data != null) ? ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)) : null;
}

public static class IcebergOffsetBackingStoreConfig extends WorkerConfig {
Expand All @@ -257,29 +262,25 @@ public static class IcebergOffsetBackingStoreConfig extends WorkerConfig {
public IcebergOffsetBackingStoreConfig(Configuration config) {
super(new ConfigDef(), config.asMap());
this.config = config;

final Map<String, String> conf = new HashMap<>();
this.config.forEach((propName, value) -> {
if (propName.startsWith(IcebergChangeConsumer.PROP_PREFIX)) {
final String newPropName = propName.substring(IcebergChangeConsumer.PROP_PREFIX.length());
conf.put(newPropName, value);
}
});

Map<String, String> conf = IcebergUtil.getConfigSubset(ConfigProvider.getConfig(), PROP_PREFIX);
conf.forEach(hadoopConfig::set);
icebergProperties.putAll(conf);
}

public String catalogName() {
return this.config.getString(Field.create("debezium.sink.iceberg.catalog-name").withDefault("default"));
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.catalog-name").withDefault("default"));
}

public String tableNamespace() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-namespace").withDefault("default"));
}

public String tableName() {
return this.config.getString(Field.create("offset.storage.iceberg.table-name").withDefault("debezium_offset_storage"));
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-name").withDefault("debezium_offset_storage"));
}

public String getMigrateOffsetFile() {
return this.config.getString(Field.create("offset.storage.iceberg.migrate-offset-file").withDefault(""));
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.migrate-offset-file").withDefault(""));
}

public org.apache.hadoop.conf.Configuration hadoopConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import java.util.HashMap;
import java.util.Map;

import com.google.common.collect.Lists;
import jakarta.inject.Inject;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.awaitility.Awaitility;
Expand Down Expand Up @@ -271,6 +274,16 @@ public void testSimpleUpload() {
return false;
}
});

Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
CloseableIterable<Record> d = getTableDataV2(TableIdentifier.of("debeziumevents", "debezium_offset_storage_custom_table"));
System.out.println(Lists.newArrayList(d));
return Lists.newArrayList(d).size() == 1;
} catch (Exception e) {
return false;
}
});
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ public TestConfigSource() {
config.put("debezium.transforms.unwrap.drop.tombstones", "true");

// DEBEZIUM SOURCE conf
config.put("debezium.source.offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore");
//config.put("debezium.source.offset.storage", "io.debezium.server.iceberg.offset.IcebergOffsetBackingStore");
config.put("debezium.source.database.history", "io.debezium.relational.history.MemorySchemaHistory");
config.put("debezium.source.schema.history.internal", "io.debezium.relational.history.MemorySchemaHistory");
config.put("debezium.source.offset.flush.interval.ms", "60000");
config.put("debezium.source.offset.storage", "io.debezium.server.iceberg.offset.IcebergOffsetBackingStore");
config.put("debezium.source.offset.storage.iceberg.table-name", "debezium_offset_storage_custom_table");
config.put("debezium.source.schema.history.internal", "io.debezium.server.iceberg.history.IcebergSchemaHistory");
config.put("debezium.source.schema.history.internal.iceberg.table-name", "debezium_database_history_storage_test");
config.put("debezium.source.offset.flush.interval.ms", "1000");
config.put("debezium.source.database.server.name", "testc");
config.put("debezium.source.database.server.id", "1234");
config.put("debezium.source.topic.prefix", "testc");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("quarkus.profile", "mysql");
config.put("%mysql.debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector");
config.put("debezium.source.schema.history", "io.debezium.server.iceberg.history.IcebergSchemaHistory");
config.put("debezium.source.schema.history.iceberg.table-name", "debezium_database_history_storage_test");
config.put("debezium.source.schema.history.internal", "io.debezium.server.iceberg.history.IcebergSchemaHistory");
config.put("debezium.source.schema.history.internal.iceberg.table-name", "debezium_database_history_storage_test");
config.put("debezium.source.table.whitelist", "inventory.customers");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,44 +8,39 @@

package io.debezium.server.iceberg.offset;

import io.debezium.server.iceberg.testresources.BaseTest;
import io.debezium.server.iceberg.testresources.S3Minio;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import io.quarkus.test.junit.QuarkusTestProfile;
import io.quarkus.test.junit.TestProfile;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import com.google.common.collect.Lists;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.util.Callback;
import org.eclipse.microprofile.config.ConfigProvider;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import static io.debezium.server.iceberg.offset.IcebergOffsetBackingStore.fromByteBuffer;
import static io.debezium.server.iceberg.offset.IcebergOffsetBackingStore.toByteBuffer;
import static org.junit.jupiter.api.Assertions.assertEquals;

@QuarkusTest
@TestProfile(IcebergOffsetBackingStoreTest.TestProfile.class)
@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true)
public class IcebergOffsetBackingStoreTest {
public class IcebergOffsetBackingStoreTest extends BaseTest {

private static final Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<>();
private static final Map<ByteBuffer, ByteBuffer> secondSet = new HashMap<>();

public static String fromByteBuffer(ByteBuffer data) {
return (data != null) ? String.valueOf(StandardCharsets.UTF_16.decode(data.asReadOnlyBuffer())) : null;
}

public static ByteBuffer toByteBuffer(String data) {
return (data != null) ? ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_16)) : null;
}

@BeforeAll
public static void setup() {
firstSet.put(toByteBuffer("key"), toByteBuffer("value"));
Expand All @@ -59,7 +54,8 @@ public Map<String, String> config() {
for (String propName : ConfigProvider.getConfig().getPropertyNames()) {
if (propName.startsWith("debezium")) {
try {
conf.put(propName, ConfigProvider.getConfig().getValue(propName, String.class));
conf.put(propName.replace("debezium.source.", IcebergOffsetBackingStore.CONFIGURATION_FIELD_PREFIX_STRING)
, ConfigProvider.getConfig().getValue(propName, String.class));
} catch (Exception e) {
conf.put(propName, "");
}
Expand Down Expand Up @@ -91,8 +87,11 @@ public void testGetSet() throws Exception {
store.set(firstSet, cb).get();

Map<ByteBuffer, ByteBuffer> values = store.get(Arrays.asList(toByteBuffer("key"), toByteBuffer("bad"))).get();
assertEquals(toByteBuffer("value"), values.get(toByteBuffer("key")));
assertEquals(("value"), fromByteBuffer(values.get(toByteBuffer("key"))));
Assertions.assertNull(values.get(toByteBuffer("bad")));

CloseableIterable<Record> d = getTableDataV2(TableIdentifier.of("default", "debezium_offset_storage"));
Assertions.assertEquals(1, Lists.newArrayList(d).size());
}

@Test
Expand Down Expand Up @@ -123,15 +122,4 @@ public TestWorkerConfig(Map<String, String> props) {
super(new ConfigDef(), props);
}
}

public static class TestProfile implements QuarkusTestProfile {
@Override
public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();
config.put("debezium.source.offset.storage", "io.debezium.server.iceberg.offset.IcebergOffsetBackingStore");
config.put("debezium.source.offset.flush.interval.ms", "60000");
config.put("debezium.source.offset.storage.iceberg.table-name", "debezium_offset_storage_custom_table");
return config;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*
* @author Ismail Simsek
*/
public class BaseSparkTest {
public class BaseSparkTest extends BaseTest {

protected static final SparkConf sparkconf = new SparkConf()
.setAppName("CDC-S3-Batch-Spark-Sink")
Expand Down

0 comments on commit 7b022d3

Please sign in to comment.