Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add iceberg schema history storage #175

Merged
merged 1 commit into from
Feb 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
# Use iceberg sink
debezium.sink.type=iceberg

# Run without Kafka, use local file to store checkpoints
debezium.source.database.history=io.debezium.relational.history.FileDatabaseHistory
debezium.source.database.history.file.filename=data/status.dat

# Iceberg sink config
debezium.sink.iceberg.table-prefix=debeziumcdc_
debezium.sink.iceberg.upsert=true
Expand Down Expand Up @@ -33,10 +29,11 @@ debezium.format.value=json
debezium.format.key=json
debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore
debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_custom_table
debezium.source.database.history=io.debezium.server.iceberg.history.IcebergSchemaHistory
debezium.source.database.history.iceberg.table-name=debezium_database_history_storage_test

# postgres source
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.offset.storage.file.filename=data/offsets.dat
debezium.source.offset.flush.interval.ms=0
debezium.source.database.hostname=localhost
debezium.source.database.port=5432
Expand All @@ -48,7 +45,6 @@ debezium.source.schema.include.list=inventory

# sql server source
#debezium.source.connector.class=io.debezium.connector.sqlserver.SqlServerConnector
#debezium.source.offset.storage.file.filename=data/offsets.dat
#debezium.source.offset.flush.interval.ms=0
#debezium.source.database.hostname=localhost
#debezium.source.database.port=5432
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg.history;

import io.debezium.DebeziumException;
import io.debezium.annotation.ThreadSafe;
import io.debezium.common.annotation.Incubating;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.*;
import io.debezium.util.FunctionalReadWriteLock;
import io.debezium.util.Strings;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;

/**
* A {@link DatabaseHistory} implementation that stores the schema history to database table
*
* @author Ismail Simsek
*/
@ThreadSafe
@Incubating
public final class IcebergSchemaHistory extends AbstractDatabaseHistory {

public static final String DATABASE_HISTORY_STORAGE_TABLE_INSERT = "INSERT INTO %s VALUES ( ?, ?, ? )";
public static final String DATABASE_HISTORY_STORAGE_TABLE_SELECT = "SELECT id, history_data, record_insert_ts FROM %s ORDER BY " +
"record_insert_ts ASC";
static final Schema DATABASE_HISTORY_TABLE_SCHEMA = new Schema(
required(1, "id", Types.StringType.get()),
optional(2, "history_data", Types.StringType.get()),
optional(3, "record_insert_ts", Types.TimestampType.withZone()
)
);
private static final Logger LOG = LoggerFactory.getLogger(IcebergSchemaHistory.class);
private final FunctionalReadWriteLock lock = FunctionalReadWriteLock.reentrant();
private final DocumentWriter writer = DocumentWriter.defaultWriter();
private final DocumentReader reader = DocumentReader.defaultReader();
private final AtomicBoolean running = new AtomicBoolean();
IcebergSchemaHistoryConfig historyConfig;
Catalog icebergCatalog;
private String tableFullName;
private TableIdentifier tableId;
private Table historyTable;

@Override
public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener, boolean useCatalogBeforeSchema) {
super.configure(config, comparator, listener, useCatalogBeforeSchema);
this.historyConfig = new IcebergSchemaHistoryConfig(config);
icebergCatalog = CatalogUtil.buildIcebergCatalog(this.historyConfig.catalogName(),
this.historyConfig.icebergProperties(), this.historyConfig.hadoopConfig());
tableFullName = String.format("%s.%s", this.historyConfig.catalogName(), this.historyConfig.tableName());
tableId = TableIdentifier.of(Namespace.of(this.historyConfig.catalogName()), this.historyConfig.tableName());

if (running.get()) {
throw new DatabaseHistoryException("Bigquery database history process already initialized table: " + tableFullName);
}
}

@Override
public void start() {
super.start();
lock.write(() -> {
if (running.compareAndSet(false, true)) {
try {
if (!storageExists()) {
initializeStorage();
}
} catch (Exception e) {
throw new DatabaseHistoryException("Unable to create history table: " + tableFullName + " : " + e.getMessage(),
e);
}
}
});
}

public String getTableFullName() {
return tableFullName;
}

@Override
protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
if (record == null) {
return;
}
lock.write(() -> {
if (!running.get()) {
throw new DebeziumException("The history has been stopped and will not accept more records");
}
try {
String recordDocString = writer.write(record.document());
LOG.trace("Saving history data {}", recordDocString);
OffsetDateTime currentTs = OffsetDateTime.now(ZoneOffset.UTC);
/// iceberg append
GenericRecord icebergRecord = GenericRecord.create(DATABASE_HISTORY_TABLE_SCHEMA);
Record row = icebergRecord.copy(
"id", UUID.randomUUID().toString(),
"history_data", recordDocString,
"record_insert_ts", currentTs
);
OutputFile out;
try (FileIO tableIo = historyTable.io()) {
out = tableIo.newOutputFile(historyTable.locationProvider().newDataLocation(UUID.randomUUID() + "-data-001"));
}
FileAppender<Record> writer = Parquet.write(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.forTable(historyTable)
.overwrite()
.build();
try (Closeable ignored = writer) {
writer.add(row);
}
DataFile dataFile = DataFiles.builder(historyTable.spec())
.withFormat(FileFormat.PARQUET)
.withPath(out.location())
.withFileSizeInBytes(writer.length())
.withSplitOffsets(writer.splitOffsets())
.withMetrics(writer.metrics())
.build();
historyTable.newOverwrite().addFile(dataFile).commit();
/// END iceberg append
LOG.trace("Successfully saved history data to bigquery table");
} catch (IOException e) {
throw new DatabaseHistoryException("Failed to store record: " + record, e);
}
});
}

@Override
public void stop() {
running.set(false);
super.stop();
}

@Override
protected synchronized void recoverRecords(Consumer<HistoryRecord> records) {
lock.write(() -> {
if (exists()) {
try (CloseableIterable<Record> rs = IcebergGenerics.read(historyTable)
.build()) {
for (Record row : rs) {
String line = (String) row.getField("history_data");
if (line == null) {
break;
}
if (!line.isEmpty()) {
records.accept(new HistoryRecord(reader.read(line)));
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}

@Override
public boolean storageExists() {
try {
Table table = icebergCatalog.loadTable(tableId);
return table != null;
} catch (NoSuchTableException e) {
return false;
}
}

@Override
public boolean exists() {

if (!storageExists()) {
return false;
}

int numRows = 0;
try (CloseableIterable<Record> rs = IcebergGenerics.read(historyTable)
.build()) {
for (Record ignored : rs) {
numRows++;
break;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return numRows > 0;
}

@Override
public String toString() {
return "Bigquery database history storage: " + (tableFullName != null ? tableFullName : "(unstarted)");
}

@Override
public void initializeStorage() {
if (!storageExists()) {
try {
LOG.debug("Creating table {} to store database history", tableFullName);
historyTable = icebergCatalog.createTable(tableId, DATABASE_HISTORY_TABLE_SCHEMA);
LOG.warn("Created database history storage table {} to store history", tableFullName);

if (!Strings.isNullOrEmpty(historyConfig.getMigrateHistoryFile().strip())) {
LOG.warn("Migrating history from file {}", historyConfig.getMigrateHistoryFile());
this.loadFileDatabaseHistory(new File(historyConfig.getMigrateHistoryFile()));
}
} catch (Exception e) {
throw new DatabaseHistoryException("Creation of database history topic failed, please create the topic manually", e);
}
} else {
LOG.debug("Storage is exists, skipping initialization");
}
}

private void loadFileDatabaseHistory(File file) {
LOG.warn(String.format("Migrating file database history from:'%s' to Bigquery database history storage: %s",
file.toPath(), tableFullName));
AtomicInteger numRecords = new AtomicInteger();
lock.write(() -> {
try (BufferedReader historyReader = Files.newBufferedReader(file.toPath())) {
while (true) {
String line = historyReader.readLine();
if (line == null) {
break;
}
if (!line.isEmpty()) {
this.storeRecord(new HistoryRecord(reader.read(line)));
numRecords.getAndIncrement();
}
}
} catch (IOException e) {
logger.error("Failed to migrate history record from history file at {}", file.toPath(), e);
}
});
LOG.warn("Migrated {} database history record. " +
"Migrating file database history to Bigquery database history storage successfully completed", numRecords.get());
}

public static class IcebergSchemaHistoryConfig {

final org.apache.hadoop.conf.Configuration hadoopConfig = new org.apache.hadoop.conf.Configuration();
private final Configuration config;
Map<String, String> icebergProperties = new ConcurrentHashMap<>();

public IcebergSchemaHistoryConfig(Configuration config) {
this.config = config;

final Map<String, String> conf = new HashMap<>();
this.config.forEach((propName, value) -> {
if (propName.startsWith(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.")) {
final String newPropName = propName.substring((CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.").length());
conf.put(newPropName, value);
}
});

conf.forEach(hadoopConfig::set);
icebergProperties.putAll(conf);
}

public String catalogName() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.catalog-name").withDefault("default"));
}

public String tableName() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.table-name").withDefault(
"debezium_database_history_storage"));
}

public org.apache.hadoop.conf.Configuration hadoopConfig() {
return hadoopConfig;
}

public Map<String, String> icebergProperties() {
return icebergProperties;
}

public String getMigrateHistoryFile() {
return this.config.getString(Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "iceberg.migrate-history-file").withDefault(""));
}
}

}
Loading