From 522d2eab5dd35c1d245f5db820f113521f42e09c Mon Sep 17 00:00:00 2001 From: Ismail Simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Tue, 30 May 2023 12:58:44 +0200 Subject: [PATCH] Use memory catalog for tests --- .../conf/application.properties.example | 18 +++++- .../debezium/server/iceberg/IcebergUtil.java | 31 +++++++++ .../iceberg/history/IcebergSchemaHistory.java | 55 ++++++++-------- .../offset/IcebergOffsetBackingStore.java | 63 ++++++++----------- .../tableoperator/IcebergTableOperator.java | 4 +- .../IcebergTableWriterFactory.java | 11 +--- docs/DOCS.md | 2 +- pom.xml | 17 +++-- 8 files changed, 113 insertions(+), 88 deletions(-) diff --git a/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example b/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example index 4f627a3b..6625124a 100644 --- a/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example +++ b/debezium-server-iceberg-dist/src/main/resources/distro/conf/application.properties.example @@ -8,11 +8,11 @@ debezium.sink.iceberg.upsert-keep-deletes=true debezium.sink.iceberg.write.format.default=parquet debezium.sink.iceberg.catalog-name=mycatalog # Hadoop catalog, you can use other catalog supported by iceberg as well + +# S3 config with hadoop and hadoop catalog debezium.sink.iceberg.type=hadoop debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse debezium.sink.iceberg.table-namespace=debeziumevents - -# S3 config debezium.sink.iceberg.fs.defaultFS=s3a://my-bucket debezium.sink.iceberg.com.amazonaws.services.s3.enableV4=true debezium.sink.iceberg.com.amazonaws.services.s3a.enableV4=true @@ -22,11 +22,23 @@ debezium.sink.iceberg.fs.s3a.secret.key=AWS_SECRET_ACCESS_KEY debezium.sink.iceberg.fs.s3a.path.style.access=true debezium.sink.iceberg.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem -# enable event schemas - mandate +# S3 config without hadoop catalog. Using InMemoryCatalog catalog And S3FileIO +### using mino as S3 +debezium.sink.iceberg.s3.endpoint=http://localhost:9000; +debezium.sink.iceberg.s3.path-style-access=true +debezium.sink.iceberg.s3.access-key-id=MY_ACCESS_KEY +debezium.sink.iceberg.s3.secret-access-key=MY_SECRET_KEY +debezium.sink.iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO +debezium.sink.iceberg.warehouse=s3a://my-bucket/iceberg_warehouse +debezium.sink.iceberg.catalog-impl=org.apache.iceberg.inmemory.InMemoryCatalog + +# enable event schemas - mandatory debezium.format.value.schemas.enable=true debezium.format.key.schemas.enable=true debezium.format.value=json debezium.format.key=json + +# saving debezium state data to destination, iceberg tables debezium.source.offset.storage=io.debezium.server.iceberg.offset.IcebergOffsetBackingStore debezium.source.offset.storage.iceberg.table-name=debezium_offset_storage_custom_table debezium.source.database.history=io.debezium.server.iceberg.history.IcebergSchemaHistory diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index 2e6d14c7..2547db30 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -10,6 +10,9 @@ import io.debezium.DebeziumException; +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Locale; import java.util.Map; @@ -20,9 +23,11 @@ import jakarta.enterprise.inject.literal.NamedLiteral; import org.apache.iceberg.*; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.primitives.Ints; import org.eclipse.microprofile.config.Config; import org.slf4j.Logger; @@ -35,6 +40,8 @@ public class IcebergUtil { protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergUtil.class); protected static final ObjectMapper jsonObjectMapper = new ObjectMapper(); + protected static final DateTimeFormatter dtFormater = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneOffset.UTC); + public static Map getConfigSubset(Config config, String prefix) { final Map ret = new HashMap<>(); @@ -62,6 +69,15 @@ public static T selectInstance(Instance instances, String name) { return instance.get(); } + public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier tableIdentifier, Schema schema) { + + if (!((SupportsNamespaces) icebergCatalog).namespaceExists(tableIdentifier.namespace())) { + ((SupportsNamespaces) icebergCatalog).createNamespace(tableIdentifier.namespace()); + LOGGER.warn("Created namespace:'{}'", tableIdentifier.namespace()); + } + return icebergCatalog.createTable(tableIdentifier, schema); + } + public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier tableIdentifier, Schema schema, String writeFormat, boolean partition, String partitionField) { @@ -80,6 +96,11 @@ public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier t ps = PartitionSpec.builderFor(schema).build(); } + if (!((SupportsNamespaces) icebergCatalog).namespaceExists(tableIdentifier.namespace())) { + ((SupportsNamespaces) icebergCatalog).createNamespace(tableIdentifier.namespace()); + LOGGER.warn("Created namespace:'{}'", tableIdentifier.namespace()); + } + return icebergCatalog.buildTable(tableIdentifier, schema) .withProperty(FORMAT_VERSION, "2") .withProperty(DEFAULT_FILE_FORMAT, writeFormat.toLowerCase(Locale.ENGLISH)) @@ -121,4 +142,14 @@ public static GenericAppenderFactory getTableAppender(Table icebergTable) { null); } + public static OutputFileFactory getTableOutputFileFactory(Table icebergTable, FileFormat format) { + return OutputFileFactory.builderFor(icebergTable, + IcebergUtil.partitionId(), 1L) + .defaultSpec(icebergTable.spec()).format(format).build(); + } + + public static int partitionId() { + return Integer.parseInt(dtFormater.format(Instant.now())); + } + } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java index 147395f8..2d0342e3 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java @@ -16,16 +16,17 @@ import io.debezium.document.DocumentReader; import io.debezium.document.DocumentWriter; import io.debezium.relational.history.*; +import io.debezium.server.iceberg.IcebergUtil; import io.debezium.util.FunctionalReadWriteLock; import io.debezium.util.Strings; import java.io.BufferedReader; -import java.io.Closeable; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.time.OffsetDateTime; import java.time.ZoneOffset; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -38,16 +39,13 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.*; import org.apache.iceberg.types.Types; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,6 +77,9 @@ public final class IcebergSchemaHistory extends AbstractSchemaHistory { private String tableFullName; private TableIdentifier tableId; private Table historyTable; + FileFormat format; + GenericAppenderFactory appenderFactory; + OutputFileFactory fileFactory; @Override public void configure(Configuration config, HistoryRecordComparator comparator, SchemaHistoryListener listener, boolean useCatalogBeforeSchema) { @@ -110,6 +111,11 @@ public void start() { } } }); + + historyTable = icebergCatalog.loadTable(tableId); + format = IcebergUtil.getTableFileFormat(historyTable); + appenderFactory = IcebergUtil.getTableAppender(historyTable); + fileFactory = IcebergUtil.getTableOutputFileFactory(historyTable, format); } public String getTableFullName() { @@ -136,28 +142,19 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { "history_data", recordDocString, "record_insert_ts", currentTs ); - OutputFile out; - try (FileIO tableIo = historyTable.io()) { - out = tableIo.newOutputFile(historyTable.locationProvider().newDataLocation(UUID.randomUUID() + "-data-001")); - } - FileAppender writer = Parquet.write(out) - .createWriterFunc(GenericParquetWriter::buildWriter) - .forTable(historyTable) - .overwrite() - .build(); - try (Closeable ignored = writer) { - writer.add(row); + + try (BaseTaskWriter writer = new UnpartitionedWriter<>( + historyTable.spec(), format, appenderFactory, fileFactory, historyTable.io(), Long.MAX_VALUE)) { + writer.write(row); + writer.close(); + WriteResult files = writer.complete(); + + Transaction t = historyTable.newTransaction(); + t.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); + Arrays.stream(files.dataFiles()).forEach(t.newAppend()::appendFile); + t.commitTransaction(); + LOG.trace("Successfully saved history data to Iceberg table"); } - DataFile dataFile = DataFiles.builder(historyTable.spec()) - .withFormat(FileFormat.PARQUET) - .withPath(out.location()) - .withFileSizeInBytes(writer.length()) - .withSplitOffsets(writer.splitOffsets()) - .withMetrics(writer.metrics()) - .build(); - historyTable.newOverwrite().addFile(dataFile).commit(); - /// END iceberg append - LOG.trace("Successfully saved history data to Iceberg table"); } catch (IOException e) { throw new SchemaHistoryException("Failed to store record: " + record, e); } @@ -232,7 +229,7 @@ public void initializeStorage() { if (!storageExists()) { try { LOG.debug("Creating table {} to store database history", tableFullName); - historyTable = icebergCatalog.createTable(tableId, DATABASE_HISTORY_TABLE_SCHEMA); + historyTable = IcebergUtil.createIcebergTable(icebergCatalog, tableId, DATABASE_HISTORY_TABLE_SCHEMA); LOG.warn("Created database history storage table {} to store history", tableFullName); if (!Strings.isNullOrEmpty(historyConfig.getMigrateHistoryFile().strip())) { diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java index 360efb15..57af007f 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java @@ -14,7 +14,6 @@ import io.debezium.server.iceberg.IcebergUtil; import io.debezium.util.Strings; -import java.io.Closeable; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -22,10 +21,7 @@ import java.nio.file.Files; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; @@ -37,16 +33,12 @@ import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.parquet.GenericParquetWriter; import org.apache.iceberg.expressions.Expressions; -import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.FileAppender; -import org.apache.iceberg.io.FileIO; -import org.apache.iceberg.io.OutputFile; -import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.io.*; import org.apache.iceberg.types.Types; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.errors.ConnectException; @@ -83,6 +75,10 @@ public class IcebergOffsetBackingStore extends MemoryOffsetBackingStore implemen private TableIdentifier tableId; private Table offsetTable; IcebergOffsetBackingStoreConfig offsetConfig; + FileFormat format; + GenericAppenderFactory appenderFactory; + OutputFileFactory fileFactory; + public IcebergOffsetBackingStore() { } @@ -111,7 +107,7 @@ private void initializeTable() { offsetTable = icebergCatalog.loadTable(tableId); } else { LOG.debug("Creating table {} to store offset", tableFullName); - offsetTable = icebergCatalog.createTable(tableId, OFFSET_STORAGE_TABLE_SCHEMA); + offsetTable = IcebergUtil.createIcebergTable(icebergCatalog, tableId, OFFSET_STORAGE_TABLE_SCHEMA); if (!icebergCatalog.tableExists(tableId)) { throw new DebeziumException("Failed to create table " + tableId + " to store offset"); } @@ -120,8 +116,11 @@ private void initializeTable() { LOG.warn("Migrating offset from file {}", offsetConfig.getMigrateOffsetFile()); this.loadFileOffset(new File(offsetConfig.getMigrateOffsetFile())); } - } + + format = IcebergUtil.getTableFileFormat(offsetTable); + appenderFactory = IcebergUtil.getTableAppender(offsetTable); + fileFactory = IcebergUtil.getTableOutputFileFactory(offsetTable, format); } private void loadFileOffset(File file) { @@ -157,31 +156,21 @@ protected void save() { "id", UUID.randomUUID().toString(), "offset_data", dataJson, "record_insert_ts", currentTs); - OutputFile out; - try (FileIO tableIo = offsetTable.io()) { - out = tableIo.newOutputFile(offsetTable.locationProvider().newDataLocation(tableId.name() + "-data-001")); - } - FileAppender writer = Parquet.write(out) - .createWriterFunc(GenericParquetWriter::buildWriter) - .forTable(offsetTable) - .overwrite() - .build(); - try (Closeable ignored = writer) { - writer.add(row); + + try (BaseTaskWriter writer = new UnpartitionedWriter<>( + offsetTable.spec(), format, appenderFactory, fileFactory, offsetTable.io(), Long.MAX_VALUE)) { + writer.write(row); + writer.close(); + WriteResult files = writer.complete(); + + Transaction t = offsetTable.newTransaction(); + t.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); + AppendFiles tableAppender = t.newAppend(); + Arrays.stream(files.dataFiles()).forEach(tableAppender::appendFile); + tableAppender.commit(); + t.commitTransaction(); + LOG.debug("Successfully saved offset data to iceberg table"); } - DataFile dataFile = DataFiles.builder(offsetTable.spec()) - .withFormat(FileFormat.PARQUET) - .withPath(out.location()) - .withFileSizeInBytes(writer.length()) - .withSplitOffsets(writer.splitOffsets()) - .withMetrics(writer.metrics()) - .build(); - - Transaction t = offsetTable.newTransaction(); - t.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); - t.newAppend().appendFile(dataFile).commit(); - t.commitTransaction(); - LOG.debug("Successfully saved offset data to iceberg table"); } catch (IOException e) { throw new RuntimeException(e); diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java index bdf0d2d8..9e997070 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperator.java @@ -168,8 +168,7 @@ public void addToTable(Table icebergTable, List events) { */ private void addToTablePerSchema(Table icebergTable, List events) { // Initialize a task writer to write both INSERT and equality DELETE. - BaseTaskWriter writer = writerFactory.create(icebergTable); - try { + try (BaseTaskWriter writer = writerFactory.create(icebergTable)) { for (IcebergChangeEvent e : events) { writer.write(e.asIcebergRecord(icebergTable.schema())); } @@ -186,7 +185,6 @@ private void addToTablePerSchema(Table icebergTable, List ev Arrays.stream(files.dataFiles()).forEach(appendFiles::appendFile); appendFiles.commit(); } - } catch (IOException ex) { throw new DebeziumException(ex); } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java index 2dc0595f..c936be57 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/tableoperator/IcebergTableWriterFactory.java @@ -2,9 +2,6 @@ import io.debezium.server.iceberg.IcebergUtil; -import java.time.Instant; -import java.time.ZoneOffset; -import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; @@ -22,25 +19,19 @@ @Dependent public class IcebergTableWriterFactory { - protected static final DateTimeFormatter dtFormater = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneOffset.UTC); private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperator.class); @ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true") boolean upsert; @ConfigProperty(name = "debezium.sink.iceberg.upsert-keep-deletes", defaultValue = "true") boolean upsertKeepDeletes; - private static int partitionId() { - return Integer.parseInt(dtFormater.format(Instant.now())); - } - public BaseTaskWriter create(Table icebergTable) { // file format of the table parquet, orc ... FileFormat format = IcebergUtil.getTableFileFormat(icebergTable); // appender factory GenericAppenderFactory appenderFactory = IcebergUtil.getTableAppender(icebergTable); - OutputFileFactory fileFactory = OutputFileFactory.builderFor(icebergTable, partitionId(), 1L) - .defaultSpec(icebergTable.spec()).format(format).build(); + OutputFileFactory fileFactory = IcebergUtil.getTableOutputFileFactory(icebergTable, format); // equality Field Ids List equalityFieldIds = new ArrayList<>(icebergTable.schema().identifierFieldIds()); diff --git a/docs/DOCS.md b/docs/DOCS.md index d02c0d70..f482d290 100644 --- a/docs/DOCS.md +++ b/docs/DOCS.md @@ -149,7 +149,7 @@ debezium.sink.iceberg.{iceberg.prop.name}=xyz-value # passed to iceberg! ``` ### Example Configuration -Read [application.properties.example](../debezium-server-iceberg-sink/src/main/resources/conf/application.properties.example) +Read [application.properties.example](..%2Fdebezium-server-iceberg-dist%2Fsrc%2Fmain%2Fresources%2Fdistro%2Fconf%2Fapplication.properties.example) ## Schema Change Behaviour diff --git a/pom.xml b/pom.xml index fd28f690..903eb2bf 100644 --- a/pom.xml +++ b/pom.xml @@ -30,20 +30,27 @@ 3.0.17 3.4.2 2.13.5 - 1.2.1 - 3.3.1 - 3.3.4 + 1.3.0 + 3.3.2 + 3.3.2 2.19.4 1.17.6 - 2.2.0.Final + 2.2.1.Final 8.0.32 - 3.0.2.Final + 3.0.4.Final 4.8 + + + orgapacheiceberg-release + https://repository.apache.org/content/repositories/orgapacheiceberg-1134/ + + +