diff --git a/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example b/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example index 0733217e..2eb7a573 100644 --- a/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example +++ b/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example @@ -31,18 +31,20 @@ debezium.format.value.schemas.enable=true debezium.format.key.schemas.enable=true debezium.format.value=json debezium.format.key=json +debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore +debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_custom_table # postgres source -#debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector -#debezium.source.offset.storage.file.filename=data/offsets.dat -#debezium.source.offset.flush.interval.ms=0 -#debezium.source.database.hostname=localhost -#debezium.source.database.port=5432 -#debezium.source.database.user=postgres -#debezium.source.database.password=postgres -#debezium.source.database.dbname=postgres -#debezium.source.database.server.name=tutorial -#debezium.source.schema.include.list=inventory +debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector +debezium.source.offset.storage.file.filename=data/offsets.dat +debezium.source.offset.flush.interval.ms=0 +debezium.source.database.hostname=localhost +debezium.source.database.port=5432 +debezium.source.database.user=postgres +debezium.source.database.password=postgres +debezium.source.database.dbname=postgres +debezium.source.database.server.name=tutorial +debezium.source.schema.include.list=inventory # sql server source #debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 4b45de8a..245c8eb0 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -68,7 +68,7 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu protected static final Serde valSerde = DebeziumSerdes.payloadJson(JsonNode.class); protected static final Serde keySerde = DebeziumSerdes.payloadJson(JsonNode.class); private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumer.class); - private static final String PROP_PREFIX = "debezium.sink.iceberg."; + public static final String PROP_PREFIX = "debezium.sink.iceberg."; static Deserializer valDeserializer; static Deserializer keyDeserializer; protected final Clock clock = Clock.system(); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java new file mode 100644 index 00000000..83b44a21 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java @@ -0,0 +1,249 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.offset; + +import io.debezium.DebeziumException; +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.server.iceberg.IcebergChangeConsumer; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import javax.enterprise.context.Dependent; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.iceberg.*; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; +import org.apache.kafka.connect.storage.OffsetBackingStore; +import org.apache.kafka.connect.util.Callback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +/** + * Implementation of OffsetBackingStore that saves data to database table. + */ +@Dependent +public class IcebergOffsetBackingStore extends MemoryOffsetBackingStore implements OffsetBackingStore { + + static final Schema OFFSET_STORAGE_TABLE_SCHEMA = new Schema( + required(1, "id", Types.StringType.get()), + optional(2, "offset_data", Types.StringType.get()), + optional(3, "record_insert_ts", Types.TimestampType.withZone() + ) + ); + protected static final ObjectMapper mapper = new ObjectMapper(); + private static final Logger LOG = LoggerFactory.getLogger(IcebergOffsetBackingStore.class); + protected Map data = new HashMap<>(); + Catalog icebergCatalog; + private String tableFullName; + private TableIdentifier tableId; + private Table offsetTable; + + public IcebergOffsetBackingStore() { + } + + @Override + public void configure(WorkerConfig config) { + super.configure(config); + IcebergOffsetBackingStoreConfig offsetConfig = new IcebergOffsetBackingStoreConfig(Configuration.from(config.originalsStrings())); + icebergCatalog = CatalogUtil.buildIcebergCatalog(offsetConfig.catalogName(), + offsetConfig.icebergProperties(), offsetConfig.hadoopConfig()); + tableFullName = String.format("%s.%s", offsetConfig.catalogName(), offsetConfig.tableName()); + tableId = TableIdentifier.of(Namespace.of(offsetConfig.catalogName()), offsetConfig.tableName()); + } + + @Override + public synchronized void start() { + super.start(); + LOG.info("Starting IcebergOffsetBackingStore table:{}", tableFullName); + initializeTable(); + load(); + } + + private void initializeTable() { + if (icebergCatalog.tableExists(tableId)) { + offsetTable = icebergCatalog.loadTable(tableId); + } else { + LOG.debug("Creating table {} to store offset", tableFullName); + offsetTable = icebergCatalog.createTable(tableId, OFFSET_STORAGE_TABLE_SCHEMA); + if (!icebergCatalog.tableExists(tableId)) { + throw new DebeziumException("Failed to create table " + tableId + " to store offset"); + } + } + } + + protected void save() { + LOG.debug("Saving offset data to iceberg table..."); + try { + String dataJson = mapper.writeValueAsString(data); + LOG.debug("Saving offset data {}", dataJson); + Timestamp currentTs = new Timestamp(System.currentTimeMillis()); + + GenericRecord record = GenericRecord.create(OFFSET_STORAGE_TABLE_SCHEMA); + Record row = record.copy("id", UUID.randomUUID().toString(), "offset_data", "record_insert_ts", dataJson, currentTs); + OutputFile out; + try (FileIO tableIo = offsetTable.io()) { + out = tableIo.newOutputFile(offsetTable.locationProvider().newDataLocation(tableId.name() + "-data-001")); + } + FileAppender writer = Parquet.write(out) + .createWriterFunc(GenericParquetWriter::buildWriter) + .forTable(offsetTable) + .overwrite() + .build(); + try (Closeable ignored = writer) { + writer.add(row); + } + DataFile dataFile = DataFiles.builder(offsetTable.spec()) + .withFormat(FileFormat.PARQUET) + .withPath(out.location()) + .withFileSizeInBytes(writer.length()) + .withSplitOffsets(writer.splitOffsets()) + .withMetrics(writer.metrics()) + .build(); + offsetTable.newOverwrite().addFile(dataFile).commit(); + LOG.debug("Successfully saved offset data to iceberg table"); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void load() { + try { + String dataJsonString = null; + + int rowNum = 0; + try (CloseableIterable rs = IcebergGenerics.read(offsetTable) + .build()) { + for (Record row : rs) { + dataJsonString = (String) row.getField("offset_data"); + rowNum++; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + if (rowNum > 1) { + throw new DebeziumException("Failed recover offset data from iceberg, Found multiple offset row!"); + } + + if (dataJsonString != null) { + this.data = mapper.readValue(dataJsonString, new TypeReference<>() { + }); + LOG.debug("Loaded offset data {}", dataJsonString); + } + } catch (JsonProcessingException e) { + e.printStackTrace(); + throw new DebeziumException("Failed recover offset data from iceberg", e); + } + } + + @Override + public Future set(final Map values, + final Callback callback) { + return executor.submit(() -> { + for (Map.Entry entry : values.entrySet()) { + if (entry.getKey() == null) { + continue; + } + data.put(fromByteBuffer(entry.getKey()), fromByteBuffer(entry.getValue())); + } + save(); + if (callback != null) { + callback.onCompletion(null, null); + } + return null; + }); + } + + @Override + public Future> get(final Collection keys) { + return executor.submit(() -> { + Map result = new HashMap<>(); + for (ByteBuffer key : keys) { + result.put(key, toByteBuffer(data.get(fromByteBuffer(key)))); + } + return result; + }); + } + + public String fromByteBuffer(ByteBuffer data) { + return (data != null) ? String.valueOf(StandardCharsets.UTF_16.decode(data.asReadOnlyBuffer())) : null; + } + + public ByteBuffer toByteBuffer(String data) { + return (data != null) ? ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_16)) : null; + } + + public static class IcebergOffsetBackingStoreConfig extends WorkerConfig { + final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration(); + private final Configuration config; + Map icebergProperties = new ConcurrentHashMap<>(); + + public IcebergOffsetBackingStoreConfig(Configuration config) { + super(new ConfigDef(), config.asMap()); + this.config = config; + + final Map conf = new HashMap<>(); + this.config.forEach((propName, value) -> { + if (propName.startsWith(IcebergChangeConsumer.PROP_PREFIX)) { + final String newPropName = propName.substring(IcebergChangeConsumer.PROP_PREFIX.length()); + conf.put(newPropName, value); + } + }); + + conf.forEach(hadoopConfig::set); + icebergProperties.putAll(conf); + } + + public String catalogName() { + return this.config.getString(Field.create("debezium.sink.iceberg.catalog-name").withDefault("default")); + } + + public String tableName() { + return this.config.getString(Field.create("offset.storage.iceberg.table-name").withDefault("debezium_offset_storage")); + } + + public org.apache.hadoop.conf.Configuration hadoopConfig() { + return hadoopConfig; + } + + public Map icebergProperties() { + return icebergProperties; + } + } + +} diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java index 82ef3ae4..98b5009c 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/ConfigSource.java @@ -50,6 +50,7 @@ public ConfigSource() { // DEBEZIUM SOURCE conf config.put("debezium.source.offset.storage", "org.apache.kafka.connect.storage.MemoryOffsetBackingStore"); + //config.put("debezium.source.offset.storage", "io.debezium.server.iceberg.offset.IcebergOffsetBackingStore"); config.put("debezium.source.database.history", "io.debezium.relational.history.MemoryDatabaseHistory"); config.put("debezium.source.offset.flush.interval.ms", "60000"); config.put("debezium.source.database.server.name", "testc"); diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java new file mode 100644 index 00000000..20aea30b --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java @@ -0,0 +1,137 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.offset; + +import io.debezium.server.iceberg.testresources.S3Minio; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; +import io.quarkus.test.junit.TestProfile; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.util.Callback; +import org.eclipse.microprofile.config.ConfigProvider; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; + +@QuarkusTest +@TestProfile(IcebergOffsetBackingStoreTest.TestProfile.class) +@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) +public class IcebergOffsetBackingStoreTest { + + private static final Map firstSet = new HashMap<>(); + private static final Map secondSet = new HashMap<>(); + + public static String fromByteBuffer(ByteBuffer data) { + return (data != null) ? String.valueOf(StandardCharsets.UTF_16.decode(data.asReadOnlyBuffer())) : null; + } + + public static ByteBuffer toByteBuffer(String data) { + return (data != null) ? ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_16)) : null; + } + + @BeforeClass + public static void setup() { + firstSet.put(toByteBuffer("key"), toByteBuffer("value")); + firstSet.put(toByteBuffer("key2"), null); + secondSet.put(toByteBuffer("key1secondSet"), toByteBuffer("value1secondSet")); + secondSet.put(toByteBuffer("key2secondSet"), toByteBuffer("value2secondSet")); + } + + public Map config() { + Map conf = new HashMap<>(); + for (String propName : ConfigProvider.getConfig().getPropertyNames()) { + if (propName.startsWith("debezium")) { + try { + conf.put(propName, ConfigProvider.getConfig().getValue(propName, String.class)); + } catch (Exception e) { + conf.put(propName, ""); + } + } + } + return conf; + } + + @Test + public void testInitialize() { + // multiple initialization should not fail + // first one should create the table and following ones should use the created table + IcebergOffsetBackingStore store = new IcebergOffsetBackingStore(); + store.configure(new TestWorkerConfig(config())); + store.start(); + store.start(); + store.start(); + store.stop(); + } + + @Test + public void testGetSet() throws Exception { + Callback cb = (error, result) -> { + }; + + IcebergOffsetBackingStore store = new IcebergOffsetBackingStore(); + store.configure(new TestWorkerConfig(config())); + store.start(); + store.set(firstSet, cb).get(); + + Map values = store.get(Arrays.asList(toByteBuffer("key"), toByteBuffer("bad"))).get(); + assertEquals(toByteBuffer("value"), values.get(toByteBuffer("key"))); + Assert.assertNull(values.get(toByteBuffer("bad"))); + } + + @Test + public void testSaveRestore() throws Exception { + Callback cb = (error, result) -> { + }; + + IcebergOffsetBackingStore store = new IcebergOffsetBackingStore(); + store.configure(new TestWorkerConfig(config())); + store.start(); + store.set(firstSet, cb).get(); + store.set(secondSet, cb).get(); + store.stop(); + // Restore into a new store mand make sure its correctly reload + IcebergOffsetBackingStore restore = new IcebergOffsetBackingStore(); + restore.configure(new TestWorkerConfig(config())); + restore.start(); + Map values = restore.get(Collections.singletonList(toByteBuffer("key"))).get(); + Map values2 = restore.get(Collections.singletonList(toByteBuffer("key1secondSet"))).get(); + Map values3 = restore.get(Collections.singletonList(toByteBuffer("key2secondSet"))).get(); + assertEquals(toByteBuffer("value"), values.get(toByteBuffer("key"))); + assertEquals(toByteBuffer("value1secondSet"), values2.get(toByteBuffer("key1secondSet"))); + assertEquals(toByteBuffer("value2secondSet"), values3.get(toByteBuffer("key2secondSet"))); + } + + public static class TestWorkerConfig extends WorkerConfig { + public TestWorkerConfig(Map props) { + super(new ConfigDef(), props); + } + } + + public static class TestProfile implements QuarkusTestProfile { + @Override + public Map getConfigOverrides() { + Map config = new HashMap<>(); + config.put("debezium.source.offset.storage", "io.debezium.server.iceberg.offset.IcebergOffsetBackingStore"); + config.put("debezium.source.offset.flush.interval.ms", "60000"); + config.put("debezium.source.offset.storage.iceberg.table-name", "debezium_offset_storage_custom_table"); + return config; + } + } +} \ No newline at end of file