Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

split table operations append and upsert to separate classes #33

Merged
merged 3 commits into from
Oct 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait;
import io.debezium.server.iceberg.tableoperator.InterfaceIcebergTableOperator;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
Expand All @@ -31,24 +31,13 @@
import javax.inject.Inject;
import javax.inject.Named;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
Expand All @@ -65,13 +54,10 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu

private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumer.class);
private static final String PROP_PREFIX = "debezium.sink.iceberg.";
static ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);
@ConfigProperty(name = "debezium.format.value", defaultValue = "json")
String valueFormat;
@ConfigProperty(name = "debezium.format.key", defaultValue = "json")
String keyFormat;
@ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false")
boolean eventSchemaEnabled;
@ConfigProperty(name = PROP_PREFIX + CatalogProperties.WAREHOUSE_LOCATION)
String warehouseLocation;
@ConfigProperty(name = "debezium.sink.iceberg.fs.defaultFS")
Expand All @@ -84,24 +70,22 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
String catalogName;
@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsert;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-keep-deletes", defaultValue = "true")
boolean upsertKeepDeletes;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op")
String opColumn;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms")
String sourceTsMsColumn;
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;

@Inject
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;

Configuration hadoopConf = new Configuration();
Catalog icebergCatalog;
Map<String, String> icebergProperties = new ConcurrentHashMap<>();
Serde<JsonNode> valSerde = DebeziumSerdes.payloadJson(JsonNode.class);
Deserializer<JsonNode> valDeserializer;
private IcebergTableOperations icebergTableOperations;

@Inject
@Any
Instance<InterfaceIcebergTableOperator> icebergTableOperatorInstances;
InterfaceIcebergTableOperator icebergTableOperator;

@PostConstruct
void connect() throws InterruptedException {
Expand All @@ -118,10 +102,6 @@ void connect() throws InterruptedException {
conf.forEach(this.icebergProperties::put);

icebergCatalog = CatalogUtil.buildIcebergCatalog(catalogName, icebergProperties, hadoopConf);
icebergTableOperations = new IcebergTableOperations(icebergCatalog);

valSerde.configure(Collections.emptyMap(), false);
valDeserializer = valSerde.deserializer();

Instance<InterfaceBatchSizeWait> instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName));
if (instance.isAmbiguous()) {
Expand All @@ -132,28 +112,23 @@ void connect() throws InterruptedException {
batchSizeWait = instance.get();
batchSizeWait.initizalize();
LOGGER.info("Using {}", batchSizeWait.getClass().getName());
}

public String map(String destination) {
return destination.replace(".", "_");
}

private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent<Object, Object> event) {

if (!eventSchemaEnabled) {
throw new RuntimeException("Table '" + tableIdentifier + "' not found! " +
"Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
String icebergTableOperatorName = upsert ? "IcebergTableOperatorUpsert" : "IcebergTableOperatorAppend";
Instance<InterfaceIcebergTableOperator> toInstance = icebergTableOperatorInstances.select(NamedLiteral.of(icebergTableOperatorName));
if (instance.isAmbiguous()) {
throw new DebeziumException("Multiple class named `" + icebergTableOperatorName + "` were found");
}

if (event.value() == null) {
throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null");
if (instance.isUnsatisfied()) {
throw new DebeziumException("No class named `" + icebergTableOperatorName + "` found");
}
icebergTableOperator = toInstance.get();
icebergTableOperator.initialize();
LOGGER.info("Using {}", icebergTableOperator.getClass().getName());

DebeziumToIcebergTable eventSchema = event.key() == null
? new DebeziumToIcebergTable(getBytes(event.value()))
: new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key()));
}

return eventSchema.create(icebergCatalog, tableIdentifier);
public String map(String destination) {
return destination.replace(".", "_");
}

@Override
Expand All @@ -169,9 +144,12 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin

for (Map.Entry<String, ArrayList<ChangeEvent<Object, Object>>> event : result.entrySet()) {
final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey());
Table icebergTable = icebergTableOperations.loadTable(tableIdentifier)
.orElseGet(() -> createIcebergTable(tableIdentifier, event.getValue().get(0)));
addToTable(icebergTable, event.getValue());
Table icebergTable = icebergTableOperator
.loadIcebergTable(icebergCatalog, tableIdentifier)
.orElseGet(() ->
icebergTableOperator.createIcebergTable(icebergCatalog, tableIdentifier, event.getValue().get(0)));
//addToTable(icebergTable, event.getValue());
icebergTableOperator.addToTable(icebergTable, event.getValue());
}
// workaround! somehow offset is not saved to file unless we call committer.markProcessed
// even its should be saved to file periodically
Expand All @@ -185,194 +163,4 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin

}

public int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) {
if (lhs.getField(sourceTsMsColumn).equals(rhs.getField(sourceTsMsColumn))) {
// return (x < y) ? -1 : ((x == y) ? 0 : 1);
return
cdcOperations.getOrDefault(lhs.getField(opColumn), -1)
.compareTo(
cdcOperations.getOrDefault(rhs.getField(opColumn), -1)
)
;
} else {
return Long.compare((Long) lhs.getField(sourceTsMsColumn), (Long) rhs.getField(sourceTsMsColumn));
}
}


private ArrayList<Record> toIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) throws InterruptedException {

ArrayList<Record> icebergRecords = new ArrayList<>();
for (ChangeEvent<Object, Object> e : events) {
GenericRecord icebergRecord = IcebergUtil.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(),
getBytes(e.value())));
icebergRecords.add(icebergRecord);
}

return icebergRecords;
}

private ArrayList<Record> toDeduppedIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) throws InterruptedException {
ConcurrentHashMap<Object, GenericRecord> icebergRecordsmap = new ConcurrentHashMap<>();

for (ChangeEvent<Object, Object> e : events) {
GenericRecord icebergRecord = IcebergUtil.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(),
getBytes(e.value())));

// only replace it if its newer
if (icebergRecordsmap.containsKey(e.key())) {

if (this.compareByTsThenOp(icebergRecordsmap.get(e.key()), icebergRecord) <= 0) {
icebergRecordsmap.put(e.key(), icebergRecord);
}

} else {
icebergRecordsmap.put(e.key(), icebergRecord);
}

}
return new ArrayList<>(icebergRecordsmap.values());
}

private void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events) throws InterruptedException {

if (!upsert || icebergTable.sortOrder().isUnsorted()) {

if (upsert && icebergTable.sortOrder().isUnsorted()) {
LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!");
}

ArrayList<Record> icebergRecords = toIcebergRecords(icebergTable.schema(), events);
DataFile dataFile = getDataFile(icebergTable, icebergRecords);
LOGGER.debug("Committing new file as Append '{}' !", dataFile.path());
icebergTable.newAppend()
.appendFile(dataFile)
.commit();

} else {
// DO UPSERT >>> DELETE + INSERT
ArrayList<Record> icebergRecords = toDeduppedIcebergRecords(icebergTable.schema(), events);
DataFile dataFile = getDataFile(icebergTable, icebergRecords);
DeleteFile deleteDataFile = getDeleteFile(icebergTable, icebergRecords);
LOGGER.debug("Committing new file as Upsert (has deletes:{}) '{}' !", deleteDataFile != null, dataFile.path());
RowDelta c = icebergTable
.newRowDelta()
.addRows(dataFile);

if (deleteDataFile != null) {
c.addDeletes(deleteDataFile)
.validateDeletedFiles();
}

c.commit();
}
LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location());

}

private DeleteFile getDeleteFile(Table icebergTable, ArrayList<Record> icebergRecords) throws InterruptedException {

final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));
Set<Integer> equalityDeleteFieldIds = icebergTable.schema().identifierFieldIds();

EqualityDeleteWriter<Record> deleteWriter;

// anything is not an insert.
// upsertKeepDeletes = false, which means delete deletes
List<Record> deleteRows = icebergRecords.stream()
.filter(r ->
// anything is not an insert.
!r.getField(opColumn).equals("c")
// upsertKeepDeletes = false and its deleted record, which means delete deletes
// || !(upsertKeepDeletes && r.getField(opColumn).equals("d"))
).collect(Collectors.toList());

if (deleteRows.size() == 0) {
return null;
}

try {
LOGGER.debug("Writing data to equality delete file: {}!", out);

deleteWriter = Parquet.writeDeletes(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.rowSchema(icebergTable.sortOrder().schema())
.withSpec(icebergTable.spec())
.equalityFieldIds(Lists.newArrayList(icebergTable.schema().identifierFieldIds()))
.metricsConfig(MetricsConfig.fromProperties(icebergTable.properties()))
.withSortOrder(icebergTable.sortOrder())
.setAll(icebergTable.properties())
.buildEqualityWriter()
;

try (Closeable toClose = deleteWriter) {
deleteWriter.deleteAll(deleteRows);
}

} catch (IOException e) {
throw new InterruptedException(e.getMessage());
}

LOGGER.debug("Creating iceberg equality delete file!");
// Equality delete files identify deleted rows in a collection of data files by one or more column values,
// and may optionally contain additional columns of the deleted row.
return FileMetadata.deleteFileBuilder(icebergTable.spec())
.ofEqualityDeletes(Ints.toArray(icebergTable.schema().identifierFieldIds()))
.withFormat(FileFormat.PARQUET)
.withPath(out.location())
.withFileSizeInBytes(deleteWriter.length())
.withFileSizeInBytes(deleteWriter.length())
.withRecordCount(deleteRows.size())
.withSortOrder(icebergTable.sortOrder())
.build();
}

private DataFile getDataFile(Table icebergTable, ArrayList<Record> icebergRecords) throws InterruptedException {
final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));

FileAppender<Record> writer;
// if its append OR upsert with keep deletes then add all the records to data file
// if table has no PK - which means fall back to append
// if its upsert and upsertKeepDeletes = true
// if nothing above then exclude deletes
List<Record> newRecords = icebergRecords.stream()
.filter(r ->
// if its append OR upsert with keep deletes then add all the records to data file
!upsert
// if table has no PK - which means fall back to append
|| icebergTable.sortOrder().isUnsorted()
// if its upsert and upsertKeepDeletes = true
|| upsertKeepDeletes
// if nothing above then exclude deletes
|| !(r.getField(opColumn).equals("d")))
.collect(Collectors.toList());
try {
LOGGER.debug("Writing data to file: {}!", out);
writer = Parquet.write(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.forTable(icebergTable)
.overwrite()
.build();

try (Closeable toClose = writer) {
writer.addAll(newRecords);
}

} catch (IOException e) {
throw new InterruptedException(e.getMessage());
}

LOGGER.debug("Creating iceberg DataFile!");
return DataFiles.builder(icebergTable.spec())
.withFormat(FileFormat.PARQUET)
.withPath(out.location())
.withFileSizeInBytes(writer.length())
.withSplitOffsets(writer.splitOffsets())
.withMetrics(writer.metrics())
.build();
}

}

This file was deleted.

Loading