From e372f51658423d9ee2210fd5cbbb34ffd628a998 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Mon, 3 Jun 2024 22:31:41 +0200 Subject: [PATCH] Don't partition tables on append mode. User could partition the iceberg tables if needed. (#340) --- .../server/iceberg/IcebergChangeConsumer.java | 6 +----- .../io/debezium/server/iceberg/IcebergUtil.java | 15 +-------------- .../tableoperator/IcebergTableOperatorTest.java | 4 +--- .../iceberg/testresources/SourceMysqlDB.java | 2 +- 4 files changed, 4 insertions(+), 23 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java index 509b754a..57554642 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergChangeConsumer.java @@ -95,8 +95,6 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu String catalogName; @ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true") boolean upsert; - @ConfigProperty(name = "debezium.sink.iceberg.partition-field", defaultValue = "__ts_ms") - String partitionField; @ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait") String batchSizeWaitName; @ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false") @@ -191,9 +189,7 @@ public Table loadIcebergTable(TableIdentifier tableId, IcebergChangeEvent sample throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!"); } try { - return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, - !upsert, // partition if its append mode - partitionField); + return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat); } catch (Exception e){ throw new DebeziumException("Failed to create table from debezium event schema:"+tableId+" Error:" + e.getMessage(), e); } diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java index ed0614db..bd07cbef 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/IcebergUtil.java @@ -79,23 +79,11 @@ public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier t } public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier tableIdentifier, - Schema schema, String writeFormat, boolean partition, String partitionField) { + Schema schema, String writeFormat) { LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema, schema.identifierFieldNames()); - final PartitionSpec ps; - if (partition) { - if (schema.findField(partitionField) == null) { - LOGGER.warn("Table schema dont contain partition field {}! Creating table without partition", partition); - ps = PartitionSpec.builderFor(schema).build(); - } else { - ps = PartitionSpec.builderFor(schema).day(partitionField).build(); - } - } else { - ps = PartitionSpec.builderFor(schema).build(); - } - if (!((SupportsNamespaces) icebergCatalog).namespaceExists(tableIdentifier.namespace())) { ((SupportsNamespaces) icebergCatalog).createNamespace(tableIdentifier.namespace()); LOGGER.warn("Created namespace:'{}'", tableIdentifier.namespace()); @@ -105,7 +93,6 @@ public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier t .withProperty(FORMAT_VERSION, "2") .withProperty(DEFAULT_FILE_FORMAT, writeFormat.toLowerCase(Locale.ENGLISH)) .withSortOrder(IcebergUtil.getIdentifierFieldsAsSortOrder(schema)) - .withPartitionSpec(ps) .create(); } diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java index f0355885..871d3909 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/tableoperator/IcebergTableOperatorTest.java @@ -46,8 +46,6 @@ class IcebergTableOperatorTest extends BaseSparkTest { String namespace; @ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true") boolean upsert; - @ConfigProperty(name = "debezium.sink.iceberg.partition-field", defaultValue = "__ts_ms") - String partitionField; @ConfigProperty(name = "debezium.sink.iceberg." + DEFAULT_FILE_FORMAT, defaultValue = DEFAULT_FILE_FORMAT_DEFAULT) String writeFormat; @Inject @@ -57,7 +55,7 @@ class IcebergTableOperatorTest extends BaseSparkTest { public Table createTable(IcebergChangeEvent sampleEvent) { HadoopCatalog icebergCatalog = getIcebergCatalog(); final TableIdentifier tableId = TableIdentifier.of(Namespace.of(namespace), tablePrefix + sampleEvent.destination()); - return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat, !upsert, partitionField); + return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.icebergSchema(), writeFormat); } @Test diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java index 06501576..9451f50c 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/testresources/SourceMysqlDB.java @@ -30,7 +30,7 @@ public class SourceMysqlDB implements QuarkusTestResourceLifecycleManager { public static final String MYSQL_PASSWORD = "mysqlpw"; public static final String MYSQL_DEBEZIUM_USER = "debezium"; public static final String MYSQL_DEBEZIUM_PASSWORD = "dbz"; - public static final String MYSQL_IMAGE = "debezium/example-mysql:2.1.2.Final"; + public static final String MYSQL_IMAGE = "debezium/example-mysql:2.5"; public static final String MYSQL_HOST = "127.0.0.1"; public static final String MYSQL_DATABASE = "inventory"; public static final Integer MYSQL_PORT_DEFAULT = 3306;