Skip to content

Commit

Permalink
Fix IcebergSchemaHistory and IcebergOffsetBackingStore (#194)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed May 5, 2023
1 parent 2c41f46 commit 9baede1
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,14 @@ public void configure(Configuration config, HistoryRecordComparator comparator,
tableId = TableIdentifier.of(Namespace.of(this.historyConfig.catalogName()), this.historyConfig.tableName());

if (running.get()) {
throw new SchemaHistoryException("Bigquery database history process already initialized table: " + tableFullName);
throw new SchemaHistoryException("Iceberg database history process already initialized table: " + tableFullName);
}
}

@Override
public void start() {
super.start();
LOG.info("Starting IcebergSchemaHistory storage table:" + tableFullName);
lock.write(() -> {
if (running.compareAndSet(false, true)) {
try {
Expand Down Expand Up @@ -159,7 +160,7 @@ protected void storeRecord(HistoryRecord record) throws SchemaHistoryException {
.build();
historyTable.newOverwrite().addFile(dataFile).commit();
/// END iceberg append
LOG.trace("Successfully saved history data to bigquery table");
LOG.trace("Successfully saved history data to Iceberg table");
} catch (IOException e) {
throw new SchemaHistoryException("Failed to store record: " + record, e);
}
Expand Down Expand Up @@ -226,7 +227,7 @@ public boolean exists() {

@Override
public String toString() {
return "Bigquery database history storage: " + (tableFullName != null ? tableFullName : "(unstarted)");
return "Iceberg database history storage: " + (tableFullName != null ? tableFullName : "(unstarted)");
}

@Override
Expand All @@ -250,7 +251,7 @@ public void initializeStorage() {
}

private void loadFileSchemaHistory(File file) {
LOG.warn(String.format("Migrating file database history from:'%s' to Bigquery database history storage: %s",
LOG.warn(String.format("Migrating file database history from:'%s' to Iceberg database history storage: %s",
file.toPath(), tableFullName));
AtomicInteger numRecords = new AtomicInteger();
lock.write(() -> {
Expand All @@ -270,7 +271,7 @@ private void loadFileSchemaHistory(File file) {
}
});
LOG.warn("Migrated {} database history record. " +
"Migrating file database history to Bigquery database history storage successfully completed", numRecords.get());
"Migrating file database history to Iceberg database history storage successfully completed", numRecords.get());
}

public static class IcebergSchemaHistoryConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.sql.Timestamp;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -40,6 +41,7 @@
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileIO;
Expand Down Expand Up @@ -143,10 +145,13 @@ protected void save() {
try {
String dataJson = mapper.writeValueAsString(data);
LOG.debug("Saving offset data {}", dataJson);
Timestamp currentTs = new Timestamp(System.currentTimeMillis());
OffsetDateTime currentTs = OffsetDateTime.now(ZoneOffset.UTC);

GenericRecord record = GenericRecord.create(OFFSET_STORAGE_TABLE_SCHEMA);
Record row = record.copy("id", UUID.randomUUID().toString(), "offset_data", "record_insert_ts", dataJson, currentTs);
Record row = record.copy(
"id", UUID.randomUUID().toString(),
"offset_data", dataJson,
"record_insert_ts", currentTs);
OutputFile out;
try (FileIO tableIo = offsetTable.io()) {
out = tableIo.newOutputFile(offsetTable.locationProvider().newDataLocation(tableId.name() + "-data-001"));
Expand All @@ -166,7 +171,11 @@ protected void save() {
.withSplitOffsets(writer.splitOffsets())
.withMetrics(writer.metrics())
.build();
offsetTable.newOverwrite().addFile(dataFile).commit();

Transaction t = offsetTable.newTransaction();
t.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
t.newAppend().appendFile(dataFile).commit();
t.commitTransaction();
LOG.debug("Successfully saved offset data to iceberg table");

} catch (IOException e) {
Expand Down Expand Up @@ -233,11 +242,11 @@ public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys
}

public String fromByteBuffer(ByteBuffer data) {
return (data != null) ? String.valueOf(StandardCharsets.UTF_8.decode(data.asReadOnlyBuffer())) : null;
return (data != null) ? String.valueOf(StandardCharsets.UTF_16.decode(data.asReadOnlyBuffer())) : null;
}

public ByteBuffer toByteBuffer(String data) {
return (data != null) ? ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8)) : null;
return (data != null) ? ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_16)) : null;
}

public static class IcebergOffsetBackingStoreConfig extends WorkerConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void testSaveRestore() throws Exception {
Map<ByteBuffer, ByteBuffer> values = restore.get(Collections.singletonList(toByteBuffer("key"))).get();
Map<ByteBuffer, ByteBuffer> values2 = restore.get(Collections.singletonList(toByteBuffer("key1secondSet"))).get();
Map<ByteBuffer, ByteBuffer> values3 = restore.get(Collections.singletonList(toByteBuffer("key2secondSet"))).get();
assertEquals(toByteBuffer("value"), values.get(toByteBuffer("key")));
assertEquals("value", fromByteBuffer(values.get(toByteBuffer("key"))));
assertEquals(toByteBuffer("value1secondSet"), values2.get(toByteBuffer("key1secondSet")));
assertEquals(toByteBuffer("value2secondSet"), values3.get(toByteBuffer("key2secondSet")));
}
Expand Down

0 comments on commit 9baede1

Please sign in to comment.