Skip to content

Commit

Permalink
Make file type parametric (#46)
Browse files Browse the repository at this point in the history
* use GenericAppenderFactory
  • Loading branch information
ismailsimsek committed Oct 27, 2021
1 parent 2f3d56b commit 26c482d
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.iceberg.TableProperties.*;

/**
*
Expand Down Expand Up @@ -107,13 +108,14 @@ private Set<Integer> getRowIdentifierFieldIds() {
return identifierFieldIds;
}

public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) {
public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier, String writeFormat) {

Schema schema = new Schema(this.tableColumns, getRowIdentifierFieldIds());

if (this.hasSchema()) {
Catalog.TableBuilder tb = icebergCatalog.buildTable(tableIdentifier, schema)
.withProperty("format-version", "2")
.withProperty(FORMAT_VERSION, "2")
.withProperty(DEFAULT_FILE_FORMAT, writeFormat.toLowerCase(Locale.ENGLISH))
.withSortOrder(getSortOrder(schema));

LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.iceberg.TableProperties.*;

/**
* Implementation of the consumer that delivers the messages to iceberg tables.
Expand Down Expand Up @@ -81,6 +82,9 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
String batchSizeWaitName;
@ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false")
boolean eventSchemaEnabled;
@ConfigProperty(name = "debezium.sink.iceberg." + DEFAULT_FILE_FORMAT, defaultValue = DEFAULT_FILE_FORMAT_DEFAULT)
String writeFormat;

@Inject
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
Expand Down Expand Up @@ -199,15 +203,15 @@ private Table createIcebergTable(TableIdentifier tableIdentifier,
? new DebeziumToIcebergTable(getBytes(event.value()))
: new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key()));

return eventSchema.create(icebergCatalog, tableIdentifier);
return eventSchema.create(icebergCatalog, tableIdentifier, writeFormat);
}

private Optional<Table> loadIcebergTable(TableIdentifier tableId) {
try {
Table table = icebergCatalog.loadTable(tableId);
return Optional.of(table);
} catch (NoSuchTableException e) {
LOGGER.warn("table not found: {}", tableId.toString());
LOGGER.warn("Table not found: {}", tableId.toString());
return Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,29 @@
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.iceberg.IcebergUtil;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.iceberg.*;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;

/**
* Wrapper to perform operations in iceberg tables
Expand Down Expand Up @@ -86,36 +88,26 @@ protected ArrayList<Record> toIcebergRecords(Schema schema, ArrayList<ChangeEven
}

protected DataFile getDataFile(Table icebergTable, ArrayList<Record> icebergRecords) {
final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;

String formatAsString = icebergTable.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
FileFormat fileFormat = FileFormat.valueOf(formatAsString.toUpperCase(Locale.ROOT));

final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + fileFormat.name();
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));

GenericAppenderFactory apender = new GenericAppenderFactory(icebergTable.schema(), icebergTable.spec());
DataWriter<Record> dw = apender.newDataWriter(icebergTable.encryption().encrypt(out), fileFormat, null);

icebergRecords.stream().filter(this.filterEvents()).forEach(dw::add);

FileAppender<Record> writer;
List<Record> newRecords = icebergRecords.stream()
.filter(this.filterEvents()).collect(Collectors.toList());
try {
LOGGER.debug("Writing data to file: {}!", out);
writer = Parquet.write(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.forTable(icebergTable)
.overwrite()
.build();

try (Closeable toClose = writer) {
writer.addAll(newRecords);
}

dw.close();
} catch (IOException e) {
throw new RuntimeException(e);
}

LOGGER.debug("Creating iceberg DataFile!");
return DataFiles.builder(icebergTable.spec())
.withFormat(FileFormat.PARQUET)
.withPath(out.location())
.withFileSizeInBytes(writer.length())
.withSplitOffsets(writer.splitOffsets())
.withMetrics(writer.metrics())
.build();
return dw.toDataFile();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,9 @@
import io.debezium.engine.ChangeEvent;
import io.debezium.server.iceberg.IcebergUtil;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -27,24 +22,25 @@
import javax.inject.Named;

import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Ints;
import org.apache.iceberg.*;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.iceberg.TableProperties.*;

@Dependent
@Named("IcebergTableOperatorUpsert")
public class IcebergTableOperatorUpsert extends AbstractIcebergTableOperator {

private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperatorUpsert.class);
static final ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperatorUpsert.class);
@ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms")
String sourceTsMsColumn;

Expand All @@ -64,11 +60,21 @@ public void initialize() {
}

private Optional<DeleteFile> getDeleteFile(Table icebergTable, ArrayList<Record> icebergRecords) {

final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;

String formatAsString = icebergTable.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
FileFormat fileFormat = FileFormat.valueOf(formatAsString.toUpperCase(Locale.ROOT));

final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + fileFormat.name();
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));
EncryptedOutputFile eout = icebergTable.encryption().encrypt(out);

EqualityDeleteWriter<Record> deleteWriter;
GenericAppenderFactory apender = new GenericAppenderFactory(
icebergTable.schema(),
icebergTable.spec(),
Ints.toArray(icebergTable.schema().identifierFieldIds()),
icebergTable.schema(),
null);
EqualityDeleteWriter<Record> edw = apender.newEqDeleteWriter(eout, fileFormat, null);

// anything is not an insert.
// upsertKeepDeletes = false, which means delete deletes
Expand All @@ -84,36 +90,16 @@ private Optional<DeleteFile> getDeleteFile(Table icebergTable, ArrayList<Record>
return Optional.empty();
}

try {
LOGGER.debug("Writing data to equality delete file: {}!", out);
deleteWriter = Parquet.writeDeletes(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.forTable(icebergTable)
.equalityFieldIds(List.copyOf(icebergTable.schema().identifierFieldIds()))
.withSortOrder(icebergTable.sortOrder())
.buildEqualityWriter();

try (Closeable toClose = deleteWriter) {
deleteWriter.deleteAll(deleteRows);
}
edw.deleteAll(deleteRows);

try {
edw.close();
} catch (IOException e) {
throw new RuntimeException(e);
}

LOGGER.debug("Creating iceberg equality delete file!");
// Equality delete files identify deleted rows in a collection of data files by one or more column values,
// and may optionally contain additional columns of the deleted row.
return Optional.of(FileMetadata.deleteFileBuilder(icebergTable.spec())
.ofEqualityDeletes(Ints.toArray(icebergTable.schema().identifierFieldIds()))
.withFormat(FileFormat.PARQUET)
.withPath(out.location())
.withFileSizeInBytes(deleteWriter.length())
.withFileSizeInBytes(deleteWriter.length())
.withRecordCount(deleteRows.size())
.withSortOrder(icebergTable.sortOrder())
.build());
return Optional.of(edw.toDeleteFile());
}

private ArrayList<Record> toDeduppedIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();

config.put("debezium.sink.type", "iceberg");
config.put("debezium.sink.iceberg.write.format.default", "orc");

return config;
}
Expand Down

0 comments on commit 26c482d

Please sign in to comment.