Skip to content

Commit

Permalink
Improve Offset and Database History Storage classes (#199)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed May 30, 2023
1 parent 29092be commit 0728a23
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ debezium.sink.iceberg.upsert-keep-deletes=true
debezium.sink.iceberg.write.format.default=parquet
debezium.sink.iceberg.catalog-name=mycatalog
# Hadoop catalog, you can use other catalog supported by iceberg as well

# S3 config with hadoop and hadoop catalog
debezium.sink.iceberg.type=hadoop
debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse
debezium.sink.iceberg.table-namespace=debeziumevents

# S3 config
debezium.sink.iceberg.fs.defaultFS=s3a://my-bucket
debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true
debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true
Expand All @@ -22,11 +22,23 @@ debezium.sink.iceberg.fs.s3a.secret.key=AWS_SECRET_ACCESS_KEY
debezium.sink.iceberg.fs.s3a.path.style.access=true
debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem

# enable event schemas - mandate
# S3 config without hadoop catalog. Using InMemoryCatalog catalog And S3FileIO
### using mino as S3
debezium.sink.iceberg.s3.endpoint=http://localhost:9000;
debezium.sink.iceberg.s3.path-style-access=true
debezium.sink.iceberg.s3.access-key-id=MY_ACCESS_KEY
debezium.sink.iceberg.s3.secret-access-key=MY_SECRET_KEY
debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.inmemory.InMemoryCatalog

# enable event schemas - mandatory
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# saving debezium state data to destination, iceberg tables
debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore
debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_custom_table
debezium.source.database.history=io.debezium.server.iceberg.history.IcebergSchemaHistory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

import io.debezium.DebeziumException;

import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
Expand All @@ -20,9 +23,11 @@
import jakarta.enterprise.inject.literal.NamedLiteral;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.eclipse.microprofile.config.Config;
import org.slf4j.Logger;
Expand All @@ -35,6 +40,8 @@
public class IcebergUtil {
protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergUtil.class);
protected static final ObjectMapper jsonObjectMapper = new ObjectMapper();
protected static final DateTimeFormatter dtFormater = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneOffset.UTC);


public static Map<String, String> getConfigSubset(Config config, String prefix) {
final Map<String, String> ret = new HashMap<>();
Expand Down Expand Up @@ -62,6 +69,15 @@ public static <T> T selectInstance(Instance<T> instances, String name) {
return instance.get();
}

public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier tableIdentifier, Schema schema) {

if (!((SupportsNamespaces) icebergCatalog).namespaceExists(tableIdentifier.namespace())) {
((SupportsNamespaces) icebergCatalog).createNamespace(tableIdentifier.namespace());
LOGGER.warn("Created namespace:'{}'", tableIdentifier.namespace());
}
return icebergCatalog.createTable(tableIdentifier, schema);
}

public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier tableIdentifier,
Schema schema, String writeFormat, boolean partition, String partitionField) {

Expand All @@ -80,6 +96,11 @@ public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier t
ps = PartitionSpec.builderFor(schema).build();
}

if (!((SupportsNamespaces) icebergCatalog).namespaceExists(tableIdentifier.namespace())) {
((SupportsNamespaces) icebergCatalog).createNamespace(tableIdentifier.namespace());
LOGGER.warn("Created namespace:'{}'", tableIdentifier.namespace());
}

return icebergCatalog.buildTable(tableIdentifier, schema)
.withProperty(FORMAT_VERSION, "2")
.withProperty(DEFAULT_FILE_FORMAT, writeFormat.toLowerCase(Locale.ENGLISH))
Expand Down Expand Up @@ -121,4 +142,14 @@ public static GenericAppenderFactory getTableAppender(Table icebergTable) {
null);
}

public static OutputFileFactory getTableOutputFileFactory(Table icebergTable, FileFormat format) {
return OutputFileFactory.builderFor(icebergTable,
IcebergUtil.partitionId(), 1L)
.defaultSpec(icebergTable.spec()).format(format).build();
}

public static int partitionId() {
return Integer.parseInt(dtFormater.format(Instant.now()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.*;
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.util.FunctionalReadWriteLock;
import io.debezium.util.Strings;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
Expand All @@ -38,16 +39,13 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.*;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -79,6 +77,9 @@ public final class IcebergSchemaHistory extends AbstractSchemaHistory {
private String tableFullName;
private TableIdentifier tableId;
private Table historyTable;
FileFormat format;
GenericAppenderFactory appenderFactory;
OutputFileFactory fileFactory;

@Override
public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
Expand Down Expand Up @@ -110,6 +111,11 @@ public void start() {
}
}
});

historyTable = icebergCatalog.loadTable(tableId);
format = IcebergUtil.getTableFileFormat(historyTable);
appenderFactory = IcebergUtil.getTableAppender(historyTable);
fileFactory = IcebergUtil.getTableOutputFileFactory(historyTable, format);
}

public String getTableFullName() {
Expand All @@ -136,28 +142,19 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
"history_data", recordDocString,
"record_insert_ts", currentTs
);
OutputFile out;
try (FileIO tableIo = historyTable.io()) {
out = tableIo.newOutputFile(historyTable.locationProvider().newDataLocation(UUID.randomUUID() + "-data-001"));
}
FileAppender<Record> writer = Parquet.write(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.forTable(historyTable)
.overwrite()
.build();
try (Closeable ignored = writer) {
writer.add(row);

try (BaseTaskWriter<Record> writer = new UnpartitionedWriter<>(
historyTable.spec(), format, appenderFactory, fileFactory, historyTable.io(), Long.MAX_VALUE)) {
writer.write(row);
writer.close();
WriteResult files = writer.complete();

Transaction t = historyTable.newTransaction();
t.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
Arrays.stream(files.dataFiles()).forEach(t.newAppend()::appendFile);
t.commitTransaction();
LOG.trace("Successfully saved history data to Iceberg table");
}
DataFile dataFile = DataFiles.builder(historyTable.spec())
.withFormat(FileFormat.PARQUET)
.withPath(out.location())
.withFileSizeInBytes(writer.length())
.withSplitOffsets(writer.splitOffsets())
.withMetrics(writer.metrics())
.build();
historyTable.newOverwrite().addFile(dataFile).commit();
/// END iceberg append
LOG.trace("Successfully saved history data to Iceberg table");
} catch (IOException e) {
throw new SchemaHistoryException("Failed to store record: " + record, e);
}
Expand Down Expand Up @@ -232,7 +229,7 @@ public void initializeStorage() {
if (!storageExists()) {
try {
LOG.debug("Creating table {} to store database history", tableFullName);
historyTable = icebergCatalog.createTable(tableId, DATABASE_HISTORY_TABLE_SCHEMA);
historyTable = IcebergUtil.createIcebergTable(icebergCatalog, tableId, DATABASE_HISTORY_TABLE_SCHEMA);
LOG.warn("Created database history storage table {} to store history", tableFullName);

if (!Strings.isNullOrEmpty(historyConfig.getMigrateHistoryFile().strip())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,14 @@
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.util.Strings;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;

Expand All @@ -37,16 +33,12 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.io.*;
import org.apache.iceberg.types.Types;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.errors.ConnectException;
Expand Down Expand Up @@ -83,6 +75,10 @@ public class IcebergOffsetBackingStore extends MemoryOffsetBackingStore implemen
private TableIdentifier tableId;
private Table offsetTable;
IcebergOffsetBackingStoreConfig offsetConfig;
FileFormat format;
GenericAppenderFactory appenderFactory;
OutputFileFactory fileFactory;

public IcebergOffsetBackingStore() {
}

Expand Down Expand Up @@ -111,7 +107,7 @@ private void initializeTable() {
offsetTable = icebergCatalog.loadTable(tableId);
} else {
LOG.debug("Creating table {} to store offset", tableFullName);
offsetTable = icebergCatalog.createTable(tableId, OFFSET_STORAGE_TABLE_SCHEMA);
offsetTable = IcebergUtil.createIcebergTable(icebergCatalog, tableId, OFFSET_STORAGE_TABLE_SCHEMA);
if (!icebergCatalog.tableExists(tableId)) {
throw new DebeziumException("Failed to create table " + tableId + " to store offset");
}
Expand All @@ -120,8 +116,11 @@ private void initializeTable() {
LOG.warn("Migrating offset from file {}", offsetConfig.getMigrateOffsetFile());
this.loadFileOffset(new File(offsetConfig.getMigrateOffsetFile()));
}

}

format = IcebergUtil.getTableFileFormat(offsetTable);
appenderFactory = IcebergUtil.getTableAppender(offsetTable);
fileFactory = IcebergUtil.getTableOutputFileFactory(offsetTable, format);
}

private void loadFileOffset(File file) {
Expand Down Expand Up @@ -157,31 +156,21 @@ protected void save() {
"id", UUID.randomUUID().toString(),
"offset_data", dataJson,
"record_insert_ts", currentTs);
OutputFile out;
try (FileIO tableIo = offsetTable.io()) {
out = tableIo.newOutputFile(offsetTable.locationProvider().newDataLocation(tableId.name() + "-data-001"));
}
FileAppender<Record> writer = Parquet.write(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.forTable(offsetTable)
.overwrite()
.build();
try (Closeable ignored = writer) {
writer.add(row);

try (BaseTaskWriter<Record> writer = new UnpartitionedWriter<>(
offsetTable.spec(), format, appenderFactory, fileFactory, offsetTable.io(), Long.MAX_VALUE)) {
writer.write(row);
writer.close();
WriteResult files = writer.complete();

Transaction t = offsetTable.newTransaction();
t.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
AppendFiles tableAppender = t.newAppend();
Arrays.stream(files.dataFiles()).forEach(tableAppender::appendFile);
tableAppender.commit();
t.commitTransaction();
LOG.debug("Successfully saved offset data to iceberg table");
}
DataFile dataFile = DataFiles.builder(offsetTable.spec())
.withFormat(FileFormat.PARQUET)
.withPath(out.location())
.withFileSizeInBytes(writer.length())
.withSplitOffsets(writer.splitOffsets())
.withMetrics(writer.metrics())
.build();

Transaction t = offsetTable.newTransaction();
t.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
t.newAppend().appendFile(dataFile).commit();
t.commitTransaction();
LOG.debug("Successfully saved offset data to iceberg table");

} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@ public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {
*/
private void addToTablePerSchema(Table icebergTable, List<IcebergChangeEvent> events) {
// Initialize a task writer to write both INSERT and equality DELETE.
BaseTaskWriter<Record> writer = writerFactory.create(icebergTable);
try {
try (BaseTaskWriter<Record> writer = writerFactory.create(icebergTable)) {
for (IcebergChangeEvent e : events) {
writer.write(e.asIcebergRecord(icebergTable.schema()));
}
Expand All @@ -186,7 +185,6 @@ private void addToTablePerSchema(Table icebergTable, List<IcebergChangeEvent> ev
Arrays.stream(files.dataFiles()).forEach(appendFiles::appendFile);
appendFiles.commit();
}

} catch (IOException ex) {
throw new DebeziumException(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

import io.debezium.server.iceberg.IcebergUtil;

import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -22,25 +19,19 @@

@Dependent
public class IcebergTableWriterFactory {
protected static final DateTimeFormatter dtFormater = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneOffset.UTC);
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperator.class);
@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsert;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-keep-deletes", defaultValue = "true")
boolean upsertKeepDeletes;

private static int partitionId() {
return Integer.parseInt(dtFormater.format(Instant.now()));
}

public BaseTaskWriter<Record> create(Table icebergTable) {

// file format of the table parquet, orc ...
FileFormat format = IcebergUtil.getTableFileFormat(icebergTable);
// appender factory
GenericAppenderFactory appenderFactory = IcebergUtil.getTableAppender(icebergTable);
OutputFileFactory fileFactory = OutputFileFactory.builderFor(icebergTable, partitionId(), 1L)
.defaultSpec(icebergTable.spec()).format(format).build();
OutputFileFactory fileFactory = IcebergUtil.getTableOutputFileFactory(icebergTable, format);
// equality Field Ids
List<Integer> equalityFieldIds = new ArrayList<>(icebergTable.schema().identifierFieldIds());

Expand Down
Loading

0 comments on commit 0728a23

Please sign in to comment.