Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Offset and Database History Storage classes #199

Merged
merged 1 commit into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ debezium.sink.iceberg.upsert-keep-deletes=true
debezium.sink.iceberg.write.format.default=parquet
debezium.sink.iceberg.catalog-name=mycatalog
# Hadoop catalog, you can use other catalog supported by iceberg as well

# S3 config with hadoop and hadoop catalog
debezium.sink.iceberg.type=hadoop
debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse
debezium.sink.iceberg.table-namespace=debeziumevents

# S3 config
debezium.sink.iceberg.fs.defaultFS=s3a://my-bucket
debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true
debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true
Expand All @@ -22,11 +22,23 @@ debezium.sink.iceberg.fs.s3a.secret.key=AWS_SECRET_ACCESS_KEY
debezium.sink.iceberg.fs.s3a.path.style.access=true
debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem

# enable event schemas - mandate
# S3 config without hadoop catalog. Using InMemoryCatalog catalog And S3FileIO
### using mino as S3
debezium.sink.iceberg.s3.endpoint=http://localhost:9000;
debezium.sink.iceberg.s3.path-style-access=true
debezium.sink.iceberg.s3.access-key-id=MY_ACCESS_KEY
debezium.sink.iceberg.s3.secret-access-key=MY_SECRET_KEY
debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO
debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse
debezium.sink.iceberg.catalog-impl=org.apache.iceberg.inmemory.InMemoryCatalog

# enable event schemas - mandatory
debezium.format.value.schemas.enable=true
debezium.format.key.schemas.enable=true
debezium.format.value=json
debezium.format.key=json

# saving debezium state data to destination, iceberg tables
debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore
debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_custom_table
debezium.source.database.history=io.debezium.server.iceberg.history.IcebergSchemaHistory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

import io.debezium.DebeziumException;

import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
Expand All @@ -20,9 +23,11 @@
import jakarta.enterprise.inject.literal.NamedLiteral;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.eclipse.microprofile.config.Config;
import org.slf4j.Logger;
Expand All @@ -35,6 +40,8 @@
public class IcebergUtil {
protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergUtil.class);
protected static final ObjectMapper jsonObjectMapper = new ObjectMapper();
protected static final DateTimeFormatter dtFormater = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneOffset.UTC);


public static Map<String, String> getConfigSubset(Config config, String prefix) {
final Map<String, String> ret = new HashMap<>();
Expand Down Expand Up @@ -62,6 +69,15 @@ public static <T> T selectInstance(Instance<T> instances, String name) {
return instance.get();
}

public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier tableIdentifier, Schema schema) {

if (!((SupportsNamespaces) icebergCatalog).namespaceExists(tableIdentifier.namespace())) {
((SupportsNamespaces) icebergCatalog).createNamespace(tableIdentifier.namespace());
LOGGER.warn("Created namespace:'{}'", tableIdentifier.namespace());
}
return icebergCatalog.createTable(tableIdentifier, schema);
}

public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier tableIdentifier,
Schema schema, String writeFormat, boolean partition, String partitionField) {

Expand All @@ -80,6 +96,11 @@ public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier t
ps = PartitionSpec.builderFor(schema).build();
}

if (!((SupportsNamespaces) icebergCatalog).namespaceExists(tableIdentifier.namespace())) {
((SupportsNamespaces) icebergCatalog).createNamespace(tableIdentifier.namespace());
LOGGER.warn("Created namespace:'{}'", tableIdentifier.namespace());
}

return icebergCatalog.buildTable(tableIdentifier, schema)
.withProperty(FORMAT_VERSION, "2")
.withProperty(DEFAULT_FILE_FORMAT, writeFormat.toLowerCase(Locale.ENGLISH))
Expand Down Expand Up @@ -121,4 +142,14 @@ public static GenericAppenderFactory getTableAppender(Table icebergTable) {
null);
}

public static OutputFileFactory getTableOutputFileFactory(Table icebergTable, FileFormat format) {
return OutputFileFactory.builderFor(icebergTable,
IcebergUtil.partitionId(), 1L)
.defaultSpec(icebergTable.spec()).format(format).build();
}

public static int partitionId() {
return Integer.parseInt(dtFormater.format(Instant.now()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
import io.debezium.document.DocumentReader;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.history.*;
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.util.FunctionalReadWriteLock;
import io.debezium.util.Strings;

import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
Expand All @@ -38,16 +39,13 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.*;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -79,6 +77,9 @@ public final class IcebergSchemaHistory extends AbstractSchemaHistory {
private String tableFullName;
private TableIdentifier tableId;
private Table historyTable;
FileFormat format;
GenericAppenderFactory appenderFactory;
OutputFileFactory fileFactory;

@Override
public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) {
Expand Down Expand Up @@ -110,6 +111,11 @@ public void start() {
}
}
});

historyTable = icebergCatalog.loadTable(tableId);
format = IcebergUtil.getTableFileFormat(historyTable);
appenderFactory = IcebergUtil.getTableAppender(historyTable);
fileFactory = IcebergUtil.getTableOutputFileFactory(historyTable, format);
}

public String getTableFullName() {
Expand All @@ -136,28 +142,19 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
"history_data", recordDocString,
"record_insert_ts", currentTs
);
OutputFile out;
try (FileIO tableIo = historyTable.io()) {
out = tableIo.newOutputFile(historyTable.locationProvider().newDataLocation(UUID.randomUUID() + "-data-001"));
}
FileAppender<Record> writer = Parquet.write(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.forTable(historyTable)
.overwrite()
.build();
try (Closeable ignored = writer) {
writer.add(row);

try (BaseTaskWriter<Record> writer = new UnpartitionedWriter<>(
historyTable.spec(), format, appenderFactory, fileFactory, historyTable.io(), Long.MAX_VALUE)) {
writer.write(row);
writer.close();
WriteResult files = writer.complete();

Transaction t = historyTable.newTransaction();
t.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
Arrays.stream(files.dataFiles()).forEach(t.newAppend()::appendFile);
t.commitTransaction();
LOG.trace("Successfully saved history data to Iceberg table");
}
DataFile dataFile = DataFiles.builder(historyTable.spec())
.withFormat(FileFormat.PARQUET)
.withPath(out.location())
.withFileSizeInBytes(writer.length())
.withSplitOffsets(writer.splitOffsets())
.withMetrics(writer.metrics())
.build();
historyTable.newOverwrite().addFile(dataFile).commit();
/// END iceberg append
LOG.trace("Successfully saved history data to Iceberg table");
} catch (IOException e) {
throw new SchemaHistoryException("Failed to store record: " + record, e);
}
Expand Down Expand Up @@ -232,7 +229,7 @@ public void initializeStorage() {
if (!storageExists()) {
try {
LOG.debug("Creating table {} to store database history", tableFullName);
historyTable = icebergCatalog.createTable(tableId, DATABASE_HISTORY_TABLE_SCHEMA);
historyTable = IcebergUtil.createIcebergTable(icebergCatalog, tableId, DATABASE_HISTORY_TABLE_SCHEMA);
LOG.warn("Created database history storage table {} to store history", tableFullName);

if (!Strings.isNullOrEmpty(historyConfig.getMigrateHistoryFile().strip())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,14 @@
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.util.Strings;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;

Expand All @@ -37,16 +33,12 @@
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.io.*;
import org.apache.iceberg.types.Types;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.errors.ConnectException;
Expand Down Expand Up @@ -83,6 +75,10 @@ public class IcebergOffsetBackingStore extends MemoryOffsetBackingStore implemen
private TableIdentifier tableId;
private Table offsetTable;
IcebergOffsetBackingStoreConfig offsetConfig;
FileFormat format;
GenericAppenderFactory appenderFactory;
OutputFileFactory fileFactory;

public IcebergOffsetBackingStore() {
}

Expand Down Expand Up @@ -111,7 +107,7 @@ private void initializeTable() {
offsetTable = icebergCatalog.loadTable(tableId);
} else {
LOG.debug("Creating table {} to store offset", tableFullName);
offsetTable = icebergCatalog.createTable(tableId, OFFSET_STORAGE_TABLE_SCHEMA);
offsetTable = IcebergUtil.createIcebergTable(icebergCatalog, tableId, OFFSET_STORAGE_TABLE_SCHEMA);
if (!icebergCatalog.tableExists(tableId)) {
throw new DebeziumException("Failed to create table " + tableId + " to store offset");
}
Expand All @@ -120,8 +116,11 @@ private void initializeTable() {
LOG.warn("Migrating offset from file {}", offsetConfig.getMigrateOffsetFile());
this.loadFileOffset(new File(offsetConfig.getMigrateOffsetFile()));
}

}

format = IcebergUtil.getTableFileFormat(offsetTable);
appenderFactory = IcebergUtil.getTableAppender(offsetTable);
fileFactory = IcebergUtil.getTableOutputFileFactory(offsetTable, format);
}

private void loadFileOffset(File file) {
Expand Down Expand Up @@ -157,31 +156,21 @@ protected void save() {
"id", UUID.randomUUID().toString(),
"offset_data", dataJson,
"record_insert_ts", currentTs);
OutputFile out;
try (FileIO tableIo = offsetTable.io()) {
out = tableIo.newOutputFile(offsetTable.locationProvider().newDataLocation(tableId.name() + "-data-001"));
}
FileAppender<Record> writer = Parquet.write(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.forTable(offsetTable)
.overwrite()
.build();
try (Closeable ignored = writer) {
writer.add(row);

try (BaseTaskWriter<Record> writer = new UnpartitionedWriter<>(
offsetTable.spec(), format, appenderFactory, fileFactory, offsetTable.io(), Long.MAX_VALUE)) {
writer.write(row);
writer.close();
WriteResult files = writer.complete();

Transaction t = offsetTable.newTransaction();
t.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
AppendFiles tableAppender = t.newAppend();
Arrays.stream(files.dataFiles()).forEach(tableAppender::appendFile);
tableAppender.commit();
t.commitTransaction();
LOG.debug("Successfully saved offset data to iceberg table");
}
DataFile dataFile = DataFiles.builder(offsetTable.spec())
.withFormat(FileFormat.PARQUET)
.withPath(out.location())
.withFileSizeInBytes(writer.length())
.withSplitOffsets(writer.splitOffsets())
.withMetrics(writer.metrics())
.build();

Transaction t = offsetTable.newTransaction();
t.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
t.newAppend().appendFile(dataFile).commit();
t.commitTransaction();
LOG.debug("Successfully saved offset data to iceberg table");

} catch (IOException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,7 @@ public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {
*/
private void addToTablePerSchema(Table icebergTable, List<IcebergChangeEvent> events) {
// Initialize a task writer to write both INSERT and equality DELETE.
BaseTaskWriter<Record> writer = writerFactory.create(icebergTable);
try {
try (BaseTaskWriter<Record> writer = writerFactory.create(icebergTable)) {
for (IcebergChangeEvent e : events) {
writer.write(e.asIcebergRecord(icebergTable.schema()));
}
Expand All @@ -186,7 +185,6 @@ private void addToTablePerSchema(Table icebergTable, List<IcebergChangeEvent> ev
Arrays.stream(files.dataFiles()).forEach(appendFiles::appendFile);
appendFiles.commit();
}

} catch (IOException ex) {
throw new DebeziumException(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

import io.debezium.server.iceberg.IcebergUtil;

import java.time.Instant;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -22,25 +19,19 @@

@Dependent
public class IcebergTableWriterFactory {
protected static final DateTimeFormatter dtFormater = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneOffset.UTC);
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperator.class);
@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsert;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-keep-deletes", defaultValue = "true")
boolean upsertKeepDeletes;

private static int partitionId() {
return Integer.parseInt(dtFormater.format(Instant.now()));
}

public BaseTaskWriter<Record> create(Table icebergTable) {

// file format of the table parquet, orc ...
FileFormat format = IcebergUtil.getTableFileFormat(icebergTable);
// appender factory
GenericAppenderFactory appenderFactory = IcebergUtil.getTableAppender(icebergTable);
OutputFileFactory fileFactory = OutputFileFactory.builderFor(icebergTable, partitionId(), 1L)
.defaultSpec(icebergTable.spec()).format(format).build();
OutputFileFactory fileFactory = IcebergUtil.getTableOutputFileFactory(icebergTable, format);
// equality Field Ids
List<Integer> equalityFieldIds = new ArrayList<>(icebergTable.schema().identifierFieldIds());

Expand Down
Loading