Skip to content

Commit

Permalink
split table operations append and upsert to separate classes (#33)
Browse files Browse the repository at this point in the history
* WIP table operators

* WIP table operators

* WIP table operators
  • Loading branch information
ismailsimsek committed Oct 16, 2021
1 parent eb337b7 commit f1d8802
Show file tree
Hide file tree
Showing 6 changed files with 466 additions and 277 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait;
import io.debezium.server.iceberg.tableoperator.InterfaceIcebergTableOperator;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
Expand All @@ -31,24 +31,13 @@
import javax.inject.Inject;
import javax.inject.Named;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
Expand All @@ -65,13 +54,10 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu

private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumer.class);
private static final String PROP_PREFIX = "debezium.sink.iceberg.";
static ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);
@ConfigProperty(name = "debezium.format.value", defaultValue = "json")
String valueFormat;
@ConfigProperty(name = "debezium.format.key", defaultValue = "json")
String keyFormat;
@ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false")
boolean eventSchemaEnabled;
@ConfigProperty(name = PROP_PREFIX + CatalogProperties.WAREHOUSE_LOCATION)
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.fs.defaultFS")
Expand All @@ -84,24 +70,22 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
String catalogName;
@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsert;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-keep-deletes", defaultValue = "true")
boolean upsertKeepDeletes;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op")
String opColumn;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms")
String sourceTsMsColumn;
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;

@Inject
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;

Configuration hadoopConf = new Configuration();
Catalog icebergCatalog;
Map<String, String> icebergProperties = new ConcurrentHashMap<>();
Serde<JsonNode> valSerde = DebeziumSerdes.payloadJson(JsonNode.class);
Deserializer<JsonNode> valDeserializer;
private IcebergTableOperations icebergTableOperations;

@Inject
@Any
Instance<InterfaceIcebergTableOperator> icebergTableOperatorInstances;
InterfaceIcebergTableOperator icebergTableOperator;

@PostConstruct
void connect() throws InterruptedException {
Expand All @@ -118,10 +102,6 @@ void connect() throws InterruptedException {
conf.forEach(this.icebergProperties::put);

icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf);
icebergTableOperations = new IcebergTableOperations(icebergCatalog);

valSerde.configure(Collections.emptyMap(), false);
valDeserializer = valSerde.deserializer();

Instance<InterfaceBatchSizeWait> instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName));
if (instance.isAmbiguous()) {
Expand All @@ -132,28 +112,23 @@ void connect() throws InterruptedException {
batchSizeWait = instance.get();
batchSizeWait.initizalize();
LOGGER.info("Using {}", batchSizeWait.getClass().getName());
}

public String map(String destination) {
return destination.replace(".", "_");
}

private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent<Object, Object> event) {

if (!eventSchemaEnabled) {
throw new RuntimeException("Table '" + tableIdentifier + "' not found! " +
"Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
String icebergTableOperatorName = upsert ? "IcebergTableOperatorUpsert" : "IcebergTableOperatorAppend";
Instance<InterfaceIcebergTableOperator> toInstance = icebergTableOperatorInstances.select(NamedLiteral.of(icebergTableOperatorName));
if (instance.isAmbiguous()) {
throw new DebeziumException("Multiple class named `" + icebergTableOperatorName + "` were found");
}

if (event.value() == null) {
throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null");
if (instance.isUnsatisfied()) {
throw new DebeziumException("No class named `" + icebergTableOperatorName + "` found");
}
icebergTableOperator = toInstance.get();
icebergTableOperator.initialize();
LOGGER.info("Using {}", icebergTableOperator.getClass().getName());

DebeziumToIcebergTable eventSchema = event.key() == null
? new DebeziumToIcebergTable(getBytes(event.value()))
: new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key()));
}

return eventSchema.create(icebergCatalog, tableIdentifier);
public String map(String destination) {
return destination.replace(".", "_");
}

@Override
Expand All @@ -169,9 +144,12 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin

for (Map.Entry<String, ArrayList<ChangeEvent<Object, Object>>> event : result.entrySet()) {
final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey());
Table icebergTable = icebergTableOperations.loadTable(tableIdentifier)
.orElseGet(() -> createIcebergTable(tableIdentifier, event.getValue().get(0)));
addToTable(icebergTable, event.getValue());
Table icebergTable = icebergTableOperator
.loadIcebergTable(icebergCatalog, tableIdentifier)
.orElseGet(() ->
icebergTableOperator.createIcebergTable(icebergCatalog, tableIdentifier, event.getValue().get(0)));
//addToTable(icebergTable, event.getValue());
icebergTableOperator.addToTable(icebergTable, event.getValue());
}
// workaround! somehow offset is not saved to file unless we call committer.markProcessed
// even its should be saved to file periodically
Expand All @@ -185,194 +163,4 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin

}

public int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) {
if (lhs.getField(sourceTsMsColumn).equals(rhs.getField(sourceTsMsColumn))) {
// return (x < y) ? -1 : ((x == y) ? 0 : 1);
return
cdcOperations.getOrDefault(lhs.getField(opColumn), -1)
.compareTo(
cdcOperations.getOrDefault(rhs.getField(opColumn), -1)
)
;
} else {
return Long.compare((Long) lhs.getField(sourceTsMsColumn), (Long) rhs.getField(sourceTsMsColumn));
}
}


private ArrayList<Record> toIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) throws InterruptedException {

ArrayList<Record> icebergRecords = new ArrayList<>();
for (ChangeEvent<Object, Object> e : events) {
GenericRecord icebergRecord = IcebergUtil.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(),
getBytes(e.value())));
icebergRecords.add(icebergRecord);
}

return icebergRecords;
}

private ArrayList<Record> toDeduppedIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) throws InterruptedException {
ConcurrentHashMap<Object, GenericRecord> icebergRecordsmap = new ConcurrentHashMap<>();

for (ChangeEvent<Object, Object> e : events) {
GenericRecord icebergRecord = IcebergUtil.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(),
getBytes(e.value())));

// only replace it if its newer
if (icebergRecordsmap.containsKey(e.key())) {

if (this.compareByTsThenOp(icebergRecordsmap.get(e.key()), icebergRecord) <= 0) {
icebergRecordsmap.put(e.key(), icebergRecord);
}

} else {
icebergRecordsmap.put(e.key(), icebergRecord);
}

}
return new ArrayList<>(icebergRecordsmap.values());
}

private void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events) throws InterruptedException {

if (!upsert || icebergTable.sortOrder().isUnsorted()) {

if (upsert && icebergTable.sortOrder().isUnsorted()) {
LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!");
}

ArrayList<Record> icebergRecords = toIcebergRecords(icebergTable.schema(), events);
DataFile dataFile = getDataFile(icebergTable, icebergRecords);
LOGGER.debug("Committing new file as Append '{}' !", dataFile.path());
icebergTable.newAppend()
.appendFile(dataFile)
.commit();

} else {
// DO UPSERT >>> DELETE + INSERT
ArrayList<Record> icebergRecords = toDeduppedIcebergRecords(icebergTable.schema(), events);
DataFile dataFile = getDataFile(icebergTable, icebergRecords);
DeleteFile deleteDataFile = getDeleteFile(icebergTable, icebergRecords);
LOGGER.debug("Committing new file as Upsert (has deletes:{}) '{}' !", deleteDataFile != null, dataFile.path());
RowDelta c = icebergTable
.newRowDelta()
.addRows(dataFile);

if (deleteDataFile != null) {
c.addDeletes(deleteDataFile)
.validateDeletedFiles();
}

c.commit();
}
LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location());

}

private DeleteFile getDeleteFile(Table icebergTable, ArrayList<Record> icebergRecords) throws InterruptedException {

final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));
Set<Integer> equalityDeleteFieldIds = icebergTable.schema().identifierFieldIds();

EqualityDeleteWriter<Record> deleteWriter;

// anything is not an insert.
// upsertKeepDeletes = false, which means delete deletes
List<Record> deleteRows = icebergRecords.stream()
.filter(r ->
// anything is not an insert.
!r.getField(opColumn).equals("c")
// upsertKeepDeletes = false and its deleted record, which means delete deletes
// || !(upsertKeepDeletes && r.getField(opColumn).equals("d"))
).collect(Collectors.toList());

if (deleteRows.size() == 0) {
return null;
}

try {
LOGGER.debug("Writing data to equality delete file: {}!", out);

deleteWriter = Parquet.writeDeletes(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.rowSchema(icebergTable.sortOrder().schema())
.withSpec(icebergTable.spec())
.equalityFieldIds(Lists.newArrayList(icebergTable.schema().identifierFieldIds()))
.metricsConfig(MetricsConfig.fromProperties(icebergTable.properties()))
.withSortOrder(icebergTable.sortOrder())
.setAll(icebergTable.properties())
.buildEqualityWriter()
;

try (Closeable toClose = deleteWriter) {
deleteWriter.deleteAll(deleteRows);
}

} catch (IOException e) {
throw new InterruptedException(e.getMessage());
}

LOGGER.debug("Creating iceberg equality delete file!");
// Equality delete files identify deleted rows in a collection of data files by one or more column values,
// and may optionally contain additional columns of the deleted row.
return FileMetadata.deleteFileBuilder(icebergTable.spec())
.ofEqualityDeletes(Ints.toArray(icebergTable.schema().identifierFieldIds()))
.withFormat(FileFormat.PARQUET)
.withPath(out.location())
.withFileSizeInBytes(deleteWriter.length())
.withFileSizeInBytes(deleteWriter.length())
.withRecordCount(deleteRows.size())
.withSortOrder(icebergTable.sortOrder())
.build();
}

private DataFile getDataFile(Table icebergTable, ArrayList<Record> icebergRecords) throws InterruptedException {
final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));

FileAppender<Record> writer;
// if its append OR upsert with keep deletes then add all the records to data file
// if table has no PK - which means fall back to append
// if its upsert and upsertKeepDeletes = true
// if nothing above then exclude deletes
List<Record> newRecords = icebergRecords.stream()
.filter(r ->
// if its append OR upsert with keep deletes then add all the records to data file
!upsert
// if table has no PK - which means fall back to append
|| icebergTable.sortOrder().isUnsorted()
// if its upsert and upsertKeepDeletes = true
|| upsertKeepDeletes
// if nothing above then exclude deletes
|| !(r.getField(opColumn).equals("d")))
.collect(Collectors.toList());
try {
LOGGER.debug("Writing data to file: {}!", out);
writer = Parquet.write(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.forTable(icebergTable)
.overwrite()
.build();

try (Closeable toClose = writer) {
writer.addAll(newRecords);
}

} catch (IOException e) {
throw new InterruptedException(e.getMessage());
}

LOGGER.debug("Creating iceberg DataFile!");
return DataFiles.builder(icebergTable.spec())
.withFormat(FileFormat.PARQUET)
.withPath(out.location())
.withFileSizeInBytes(writer.length())
.withSplitOffsets(writer.splitOffsets())
.withMetrics(writer.metrics())
.build();
}

}

This file was deleted.

Loading

0 comments on commit f1d8802

Please sign in to comment.