From 9baede1d8da1f074c76cd89902393421de628096 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Fri, 5 May 2023 21:53:55 +0200 Subject: [PATCH] Fix IcebergSchemaHistory and IcebergOffsetBackingStore (#194) --- .../iceberg/history/IcebergSchemaHistory.java | 11 +++++----- .../offset/IcebergOffsetBackingStore.java | 21 +++++++++++++------ .../offset/IcebergOffsetBackingStoreTest.java | 2 +- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java index dd200d18..326267b8 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/history/IcebergSchemaHistory.java @@ -93,13 +93,14 @@ public void configure(Configuration config, HistoryRecordComparator comparator, tableId = TableIdentifier.of(Namespace.of(this.historyConfig.catalogName()), this.historyConfig.tableName()); if (running.get()) { - throw new SchemaHistoryException("Bigquery database history process already initialized table: " + tableFullName); + throw new SchemaHistoryException("Iceberg database history process already initialized table: " + tableFullName); } } @Override public void start() { super.start(); + LOG.info("Starting IcebergSchemaHistory storage table:" + tableFullName); lock.write(() -> { if (running.compareAndSet(false, true)) { try { @@ -159,7 +160,7 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException { .build(); historyTable.newOverwrite().addFile(dataFile).commit(); /// END iceberg append - LOG.trace("Successfully saved history data to bigquery table"); + LOG.trace("Successfully saved history data to Iceberg table"); } catch (IOException e) { throw new SchemaHistoryException("Failed to store record: " + record, e); } @@ -226,7 +227,7 @@ public boolean exists() { @Override public String toString() { - return "Bigquery database history storage: " + (tableFullName != null ? tableFullName : "(unstarted)"); + return "Iceberg database history storage: " + (tableFullName != null ? tableFullName : "(unstarted)"); } @Override @@ -250,7 +251,7 @@ public void initializeStorage() { } private void loadFileSchemaHistory(File file) { - LOG.warn(String.format("Migrating file database history from:'%s' to Bigquery database history storage: %s", + LOG.warn(String.format("Migrating file database history from:'%s' to Iceberg database history storage: %s", file.toPath(), tableFullName)); AtomicInteger numRecords = new AtomicInteger(); lock.write(() -> { @@ -270,7 +271,7 @@ private void loadFileSchemaHistory(File file) { } }); LOG.warn("Migrated {} database history record. " + - "Migrating file database history to Bigquery database history storage successfully completed", numRecords.get()); + "Migrating file database history to Iceberg database history storage successfully completed", numRecords.get()); } public static class IcebergSchemaHistoryConfig { diff --git a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java index efaffe30..71c4a114 100644 --- a/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java +++ b/debezium-server-iceberg-sink/src/main/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStore.java @@ -20,7 +20,8 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Files; -import java.sql.Timestamp; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -40,6 +41,7 @@ import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.parquet.GenericParquetWriter; +import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.FileIO; @@ -143,10 +145,13 @@ protected void save() { try { String dataJson = mapper.writeValueAsString(data); LOG.debug("Saving offset data {}", dataJson); - Timestamp currentTs = new Timestamp(System.currentTimeMillis()); + OffsetDateTime currentTs = OffsetDateTime.now(ZoneOffset.UTC); GenericRecord record = GenericRecord.create(OFFSET_STORAGE_TABLE_SCHEMA); - Record row = record.copy("id", UUID.randomUUID().toString(), "offset_data", "record_insert_ts", dataJson, currentTs); + Record row = record.copy( + "id", UUID.randomUUID().toString(), + "offset_data", dataJson, + "record_insert_ts", currentTs); OutputFile out; try (FileIO tableIo = offsetTable.io()) { out = tableIo.newOutputFile(offsetTable.locationProvider().newDataLocation(tableId.name() + "-data-001")); @@ -166,7 +171,11 @@ protected void save() { .withSplitOffsets(writer.splitOffsets()) .withMetrics(writer.metrics()) .build(); - offsetTable.newOverwrite().addFile(dataFile).commit(); + + Transaction t = offsetTable.newTransaction(); + t.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit(); + t.newAppend().appendFile(dataFile).commit(); + t.commitTransaction(); LOG.debug("Successfully saved offset data to iceberg table"); } catch (IOException e) { @@ -233,11 +242,11 @@ public Future> get(final Collection keys } public String fromByteBuffer(ByteBuffer data) { - return (data != null) ? String.valueOf(StandardCharsets.UTF_8.decode(data.asReadOnlyBuffer())) : null; + return (data != null) ? String.valueOf(StandardCharsets.UTF_16.decode(data.asReadOnlyBuffer())) : null; } public ByteBuffer toByteBuffer(String data) { - return (data != null) ? ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)) : null; + return (data != null) ? ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_16)) : null; } public static class IcebergOffsetBackingStoreConfig extends WorkerConfig { diff --git a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java index 20aea30b..be6a38f4 100644 --- a/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java +++ b/debezium-server-iceberg-sink/src/test/java/io/debezium/server/iceberg/offset/IcebergOffsetBackingStoreTest.java @@ -113,7 +113,7 @@ public void testSaveRestore() throws Exception { Map values = restore.get(Collections.singletonList(toByteBuffer("key"))).get(); Map values2 = restore.get(Collections.singletonList(toByteBuffer("key1secondSet"))).get(); Map values3 = restore.get(Collections.singletonList(toByteBuffer("key2secondSet"))).get(); - assertEquals(toByteBuffer("value"), values.get(toByteBuffer("key"))); + assertEquals("value", fromByteBuffer(values.get(toByteBuffer("key")))); assertEquals(toByteBuffer("value1secondSet"), values2.get(toByteBuffer("key1secondSet"))); assertEquals(toByteBuffer("value2secondSet"), values3.get(toByteBuffer("key2secondSet"))); }