From ffdb56ffca8f186ed577f2eede529538daa7c35c Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Thu, 9 Feb 2023 19:47:59 +0100 Subject: [PATCH] Add iceberg schema history storage --- .../conf/application.properties.example | 8 +- .../iceberg/history/IcebergSchemaHistory.java | 319 ++++++++++++++++++ .../offset/IcebergOffsetBackingStore.java | 40 ++- .../server/iceberg/TestConfigSource.java | 1 + .../history/IcebergSchemaHistoryTest.java | 80 +++++ .../testresources/SourcePostgresqlDB.java | 1 - 6 files changed, 440 insertions(+), 9 deletions(-) create mode 100644 debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java create mode 100644 debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java diff --git a/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example b/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example index 2eb7a573..81dcb550 100644 --- a/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example +++ b/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example @@ -1,10 +1,6 @@ # Use iceberg sink debezium.sink.type=iceberg -# Run without Kafka, use local file to store checkpoints -debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory -debezium.source.database.history.file.filename=data/status.dat - # Iceberg sink config debezium.sink.iceberg.table-prefix=debeziumcdc_ debezium.sink.iceberg.upsert=true @@ -33,10 +29,11 @@ debezium.format.value=json debezium.format.key=json debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_custom_table +debezium.source.database.history=io.debezium.server.iceberg.history.IcebergSchemaHistory +debezium.source.database.history.iceberg.table-name=debezium_database_history_storage_test # postgres source debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector -debezium.source.offset.storage.file.filename=data/offsets.dat debezium.source.offset.flush.interval.ms=0 debezium.source.database.hostname=localhost debezium.source.database.port=5432 @@ -48,7 +45,6 @@ debezium.source.schema.include.list=inventory # sql server source #debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector -#debezium.source.offset.storage.file.filename=data/offsets.dat #debezium.source.offset.flush.interval.ms=0 #debezium.source.database.hostname=localhost #debezium.source.database.port=5432 diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java new file mode 100644 index 00000000..51687e28 --- /dev/null +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java @@ -0,0 +1,319 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.history; + +import io.debezium.DebeziumException; +import io.debezium.annotation.ThreadSafe; +import io.debezium.common.annotation.Incubating; +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.document.DocumentReader; +import io.debezium.document.DocumentWriter; +import io.debezium.relational.history.*; +import io.debezium.util.FunctionalReadWriteLock; +import io.debezium.util.Strings; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import org.apache.iceberg.*; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + +/** + * A {@link DatabaseHistory} implementation that stores the schema history to database table + * + * @author Ismail Simsek + */ +@ThreadSafe +@Incubating +public final class IcebergSchemaHistory extends AbstractDatabaseHistory { + + public static final String DATABASE_HISTORY_STORAGE_TABLE_INSERT = "INSERT INTO %s VALUES ( ?, ?, ? )"; + public static final String DATABASE_HISTORY_STORAGE_TABLE_SELECT = "SELECT id, history_data, record_insert_ts FROM %s ORDER BY " + + "record_insert_ts ASC"; + static final Schema DATABASE_HISTORY_TABLE_SCHEMA = new Schema( + required(1, "id", Types.StringType.get()), + optional(2, "history_data", Types.StringType.get()), + optional(3, "record_insert_ts", Types.TimestampType.withZone() + ) + ); + private static final Logger LOG = LoggerFactory.getLogger(IcebergSchemaHistory.class); + private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant(); + private final DocumentWriter writer = DocumentWriter.defaultWriter(); + private final DocumentReader reader = DocumentReader.defaultReader(); + private final AtomicBoolean running = new AtomicBoolean(); + IcebergSchemaHistoryConfig historyConfig; + Catalog icebergCatalog; + private String tableFullName; + private TableIdentifier tableId; + private Table historyTable; + + @Override + public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener, boolean useCatalogBeforeSchema) { + super.configure(config, comparator, listener, useCatalogBeforeSchema); + this.historyConfig = new IcebergSchemaHistoryConfig(config); + icebergCatalog = CatalogUtil.buildIcebergCatalog(this.historyConfig.catalogName(), + this.historyConfig.icebergProperties(), this.historyConfig.hadoopConfig()); + tableFullName = String.format("%s.%s", this.historyConfig.catalogName(), this.historyConfig.tableName()); + tableId = TableIdentifier.of(Namespace.of(this.historyConfig.catalogName()), this.historyConfig.tableName()); + + if (running.get()) { + throw new DatabaseHistoryException("Bigquery database history process already initialized table: " + tableFullName); + } + } + + @Override + public void start() { + super.start(); + lock.write(() -> { + if (running.compareAndSet(false, true)) { + try { + if (!storageExists()) { + initializeStorage(); + } + } catch (Exception e) { + throw new DatabaseHistoryException("Unable to create history table: " + tableFullName + " : " + e.getMessage(), + e); + } + } + }); + } + + public String getTableFullName() { + return tableFullName; + } + + @Override + protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException { + if (record == null) { + return; + } + lock.write(() -> { + if (!running.get()) { + throw new DebeziumException("The history has been stopped and will not accept more records"); + } + try { + String recordDocString = writer.write(record.document()); + LOG.trace("Saving history data {}", recordDocString); + OffsetDateTime currentTs = OffsetDateTime.now(ZoneOffset.UTC); + /// iceberg append + GenericRecord icebergRecord = GenericRecord.create(DATABASE_HISTORY_TABLE_SCHEMA); + Record row = icebergRecord.copy( + "id", UUID.randomUUID().toString(), + "history_data", recordDocString, + "record_insert_ts", currentTs + ); + OutputFile out; + try (FileIO tableIo = historyTable.io()) { + out = tableIo.newOutputFile(historyTable.locationProvider().newDataLocation(UUID.randomUUID() + "-data-001")); + } + FileAppender writer = Parquet.write(out) + .createWriterFunc(GenericParquetWriter::buildWriter) + .forTable(historyTable) + .overwrite() + .build(); + try (Closeable ignored = writer) { + writer.add(row); + } + DataFile dataFile = DataFiles.builder(historyTable.spec()) + .withFormat(FileFormat.PARQUET) + .withPath(out.location()) + .withFileSizeInBytes(writer.length()) + .withSplitOffsets(writer.splitOffsets()) + .withMetrics(writer.metrics()) + .build(); + historyTable.newOverwrite().addFile(dataFile).commit(); + /// END iceberg append + LOG.trace("Successfully saved history data to bigquery table"); + } catch (IOException e) { + throw new DatabaseHistoryException("Failed to store record: " + record, e); + } + }); + } + + @Override + public void stop() { + running.set(false); + super.stop(); + } + + @Override + protected synchronized void recoverRecords(Consumer records) { + lock.write(() -> { + if (exists()) { + try (CloseableIterable rs = IcebergGenerics.read(historyTable) + .build()) { + for (Record row : rs) { + String line = (String) row.getField("history_data"); + if (line == null) { + break; + } + if (!line.isEmpty()) { + records.accept(new HistoryRecord(reader.read(line))); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + } + + @Override + public boolean storageExists() { + try { + Table table = icebergCatalog.loadTable(tableId); + return table != null; + } catch (NoSuchTableException e) { + return false; + } + } + + @Override + public boolean exists() { + + if (!storageExists()) { + return false; + } + + int numRows = 0; + try (CloseableIterable rs = IcebergGenerics.read(historyTable) + .build()) { + for (Record ignored : rs) { + numRows++; + break; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return numRows > 0; + } + + @Override + public String toString() { + return "Bigquery database history storage: " + (tableFullName != null ? tableFullName : "(unstarted)"); + } + + @Override + public void initializeStorage() { + if (!storageExists()) { + try { + LOG.debug("Creating table {} to store database history", tableFullName); + historyTable = icebergCatalog.createTable(tableId, DATABASE_HISTORY_TABLE_SCHEMA); + LOG.warn("Created database history storage table {} to store history", tableFullName); + + if (!Strings.isNullOrEmpty(historyConfig.getMigrateHistoryFile().strip())) { + LOG.warn("Migrating history from file {}", historyConfig.getMigrateHistoryFile()); + this.loadFileDatabaseHistory(new File(historyConfig.getMigrateHistoryFile())); + } + } catch (Exception e) { + throw new DatabaseHistoryException("Creation of database history topic failed, please create the topic manually", e); + } + } else { + LOG.debug("Storage is exists, skipping initialization"); + } + } + + private void loadFileDatabaseHistory(File file) { + LOG.warn(String.format("Migrating file database history from:'%s' to Bigquery database history storage: %s", + file.toPath(), tableFullName)); + AtomicInteger numRecords = new AtomicInteger(); + lock.write(() -> { + try (BufferedReader historyReader = Files.newBufferedReader(file.toPath())) { + while (true) { + String line = historyReader.readLine(); + if (line == null) { + break; + } + if (!line.isEmpty()) { + this.storeRecord(new HistoryRecord(reader.read(line))); + numRecords.getAndIncrement(); + } + } + } catch (IOException e) { + logger.error("Failed to migrate history record from history file at {}", file.toPath(), e); + } + }); + LOG.warn("Migrated {} database history record. " + + "Migrating file database history to Bigquery database history storage successfully completed", numRecords.get()); + } + + public static class IcebergSchemaHistoryConfig { + + final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration(); + private final Configuration config; + Map icebergProperties = new ConcurrentHashMap<>(); + + public IcebergSchemaHistoryConfig(Configuration config) { + this.config = config; + + final Map conf = new HashMap<>(); + this.config.forEach((propName, value) -> { + if (propName.startsWith(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.")) { + final String newPropName = propName.substring((CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.").length()); + conf.put(newPropName, value); + } + }); + + conf.forEach(hadoopConfig::set); + icebergProperties.putAll(conf); + } + + public String catalogName() { + return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.catalog-name").withDefault("default")); + } + + public String tableName() { + return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-name").withDefault( + "debezium_database_history_storage")); + } + + public org.apache.hadoop.conf.Configuration hadoopConfig() { + return hadoopConfig; + } + + public Map icebergProperties() { + return icebergProperties; + } + + public String getMigrateHistoryFile() { + return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.migrate-history-file").withDefault("")); + } + } + +} diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java index 41a6476e..efaffe30 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java @@ -12,11 +12,14 @@ import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.server.iceberg.IcebergChangeConsumer; +import io.debezium.util.Strings; import java.io.Closeable; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.sql.Timestamp; import java.util.Collection; import java.util.HashMap; @@ -44,10 +47,12 @@ import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.types.Types; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.storage.MemoryOffsetBackingStore; import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.SafeObjectInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.iceberg.types.Types.NestedField.optional; @@ -72,14 +77,14 @@ public class IcebergOffsetBackingStore extends MemoryOffsetBackingStore implemen private String tableFullName; private TableIdentifier tableId; private Table offsetTable; - + IcebergOffsetBackingStoreConfig offsetConfig; public IcebergOffsetBackingStore() { } @Override public void configure(WorkerConfig config) { super.configure(config); - IcebergOffsetBackingStoreConfig offsetConfig = new IcebergOffsetBackingStoreConfig(Configuration.from(config.originalsStrings())); + offsetConfig = new IcebergOffsetBackingStoreConfig(Configuration.from(config.originalsStrings())); icebergCatalog = CatalogUtil.buildIcebergCatalog(offsetConfig.catalogName(), offsetConfig.icebergProperties(), offsetConfig.hadoopConfig()); tableFullName = String.format("%s.%s", offsetConfig.catalogName(), offsetConfig.tableName()); @@ -103,9 +108,36 @@ private void initializeTable() { if (!icebergCatalog.tableExists(tableId)) { throw new DebeziumException("Failed to create table " + tableId + " to store offset"); } + + if (!Strings.isNullOrEmpty(offsetConfig.getMigrateOffsetFile().strip())) { + LOG.warn("Migrating offset from file {}", offsetConfig.getMigrateOffsetFile()); + this.loadFileOffset(new File(offsetConfig.getMigrateOffsetFile())); + } + } } + private void loadFileOffset(File file) { + try (SafeObjectInputStream is = new SafeObjectInputStream(Files.newInputStream(file.toPath()))) { + Object obj = is.readObject(); + + if (!(obj instanceof HashMap)) + throw new ConnectException("Expected HashMap but found " + obj.getClass()); + + Map raw = (Map) obj; + for (Map.Entry mapEntry : raw.entrySet()) { + ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null; + ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null; + data.put(fromByteBuffer(key), fromByteBuffer(value)); + } + } catch (IOException | ClassNotFoundException e) { + throw new DebeziumException("Failed migrating offset from file", e); + } + + LOG.warn("Loaded file offset, saving it to iceberg offset storage"); + save(); + } + protected void save() { LOG.debug("Saving offset data to iceberg table..."); try { @@ -237,6 +269,10 @@ public String tableName() { return this.config.getString(Field.create("offset.storage.iceberg.table-name").withDefault("debezium_offset_storage")); } + public String getMigrateOffsetFile() { + return this.config.getString(Field.create("offset.storage.iceberg.migrate-offset-file").withDefault("")); + } + public org.apache.hadoop.conf.Configuration hadoopConfig() { return hadoopConfig; } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java index f3498def..46d6bc24 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/TestConfigSource.java @@ -27,6 +27,7 @@ public TestConfigSource() { config.put("debezium.sink.type", "iceberg"); config.put("debezium.sink.iceberg.upsert", "false"); config.put("debezium.sink.iceberg.upsert-keep-deletes", "true"); + config.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); // ==== configure batch behaviour/size ==== // Positive integer value that specifies the maximum size of each batch of events that should be processed during diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java new file mode 100644 index 00000000..2d0f9133 --- /dev/null +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/history/IcebergSchemaHistoryTest.java @@ -0,0 +1,80 @@ +/* + * + * * Copyright memiiso Authors. + * * + * * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + * + */ + +package io.debezium.server.iceberg.history; + +import io.debezium.server.iceberg.testresources.BaseSparkTest; +import io.debezium.server.iceberg.testresources.S3Minio; +import io.debezium.server.iceberg.testresources.SourceMysqlDB; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.junit.QuarkusTestProfile; +import io.quarkus.test.junit.TestProfile; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; + +/** + * Integration test that verifies basic reading from PostgreSQL database and writing to iceberg destination. + * + * @author Ismail Simsek + */ +@QuarkusTest +@QuarkusTestResource(value = S3Minio.class, restrictToAnnotatedClass = true) +@QuarkusTestResource(value = SourceMysqlDB.class, restrictToAnnotatedClass = true) +@TestProfile(IcebergSchemaHistoryTest.TestProfile.class) +public class IcebergSchemaHistoryTest extends BaseSparkTest { + @Test + public void testSimpleUpload() { + Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { + try { + Dataset ds = getTableData("testc.inventory.customers"); + return ds.count() >= 3; + } catch (Exception e) { + return false; + } + }); + + // test nested data(struct) consumed + Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> { + try { + Dataset ds = spark.newSession().sql("SELECT * FROM mycatalog.debezium_database_history_storage_test"); + ds.show(false); + return ds.count() >= 5; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + }); + } + + + public static class TestProfile implements QuarkusTestProfile { + @Override + public Map getConfigOverrides() { + Map config = new HashMap<>(); + config.put("quarkus.profile", "mysql"); + config.put("%mysql.debezium.source.connector.class", "io.debezium.connector.mysql.MySqlConnector"); + config.put("debezium.source.database.history", "io.debezium.server.iceberg.history.IcebergSchemaHistory"); + config.put("debezium.source.database.history.iceberg.table-name", "debezium_database_history_storage_test"); + config.put("debezium.source.table.whitelist", "inventory.customers"); + return config; + } + + @Override + public String getConfigProfile() { + return "mysql"; + } + } +} \ No newline at end of file diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java index 7e380682..a7c01cfc 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourcePostgresqlDB.java @@ -63,7 +63,6 @@ public Map start() { container.start(); Map params = new ConcurrentHashMap<>(); - params.put("debezium.source.connector.class", "io.debezium.connector.postgresql.PostgresConnector"); params.put("debezium.source.database.hostname", POSTGRES_HOST); params.put("debezium.source.database.port", container.getMappedPort(POSTGRES_PORT_DEFAULT).toString()); params.put("debezium.source.database.user", POSTGRES_USER);