Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make file type parametric #46

Merged
merged 9 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.iceberg.TableProperties.*;

/**
*
Expand Down Expand Up @@ -107,13 +108,14 @@ private Set<Integer> getRowIdentifierFieldIds() {
return identifierFieldIds;
}

public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) {
public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier, String writeFormat) {

Schema schema = new Schema(this.tableColumns, getRowIdentifierFieldIds());

if (this.hasSchema()) {
Catalog.TableBuilder tb = icebergCatalog.buildTable(tableIdentifier, schema)
.withProperty("format-version", "2")
.withProperty(FORMAT_VERSION, "2")
.withProperty(DEFAULT_FILE_FORMAT, writeFormat.toLowerCase(Locale.ENGLISH))
.withSortOrder(getSortOrder(schema));

LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.iceberg.TableProperties.*;

/**
* Implementation of the consumer that delivers the messages to iceberg tables.
Expand Down Expand Up @@ -81,6 +82,9 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
String batchSizeWaitName;
@ConfigProperty(name = "debezium.format.value.schemas.enable", defaultValue = "false")
boolean eventSchemaEnabled;
@ConfigProperty(name = "debezium.sink.iceberg." + DEFAULT_FILE_FORMAT, defaultValue = DEFAULT_FILE_FORMAT_DEFAULT)
String writeFormat;

@Inject
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
Expand Down Expand Up @@ -199,15 +203,15 @@ private Table createIcebergTable(TableIdentifier tableIdentifier,
? new DebeziumToIcebergTable(getBytes(event.value()))
: new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key()));

return eventSchema.create(icebergCatalog, tableIdentifier);
return eventSchema.create(icebergCatalog, tableIdentifier, writeFormat);
}

private Optional<Table> loadIcebergTable(TableIdentifier tableId) {
try {
Table table = icebergCatalog.loadTable(tableId);
return Optional.of(table);
} catch (NoSuchTableException e) {
LOGGER.warn("table not found: {}", tableId.toString());
LOGGER.warn("Table not found: {}", tableId.toString());
return Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,29 @@
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.iceberg.IcebergUtil;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.stream.Collectors;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.iceberg.*;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;

/**
* Wrapper to perform operations in iceberg tables
Expand Down Expand Up @@ -86,36 +88,26 @@ protected ArrayList<Record> toIcebergRecords(Schema schema, ArrayList<ChangeEven
}

protected DataFile getDataFile(Table icebergTable, ArrayList<Record> icebergRecords) {
final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;

String formatAsString = icebergTable.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
FileFormat fileFormat = FileFormat.valueOf(formatAsString.toUpperCase(Locale.ROOT));

final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + fileFormat.name();
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));

GenericAppenderFactory apender = new GenericAppenderFactory(icebergTable.schema(), icebergTable.spec());
DataWriter<Record> dw = apender.newDataWriter(icebergTable.encryption().encrypt(out), fileFormat, null);

icebergRecords.stream().filter(this.filterEvents()).forEach(dw::add);

FileAppender<Record> writer;
List<Record> newRecords = icebergRecords.stream()
.filter(this.filterEvents()).collect(Collectors.toList());
try {
LOGGER.debug("Writing data to file: {}!", out);
writer = Parquet.write(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.forTable(icebergTable)
.overwrite()
.build();

try (Closeable toClose = writer) {
writer.addAll(newRecords);
}

dw.close();
} catch (IOException e) {
throw new RuntimeException(e);
}

LOGGER.debug("Creating iceberg DataFile!");
return DataFiles.builder(icebergTable.spec())
.withFormat(FileFormat.PARQUET)
.withPath(out.location())
.withFileSizeInBytes(writer.length())
.withSplitOffsets(writer.splitOffsets())
.withMetrics(writer.metrics())
.build();
return dw.toDataFile();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,9 @@
import io.debezium.engine.ChangeEvent;
import io.debezium.server.iceberg.IcebergUtil;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -27,24 +22,25 @@
import javax.inject.Named;

import com.google.common.collect.ImmutableMap;
import com.google.common.primitives.Ints;
import org.apache.iceberg.*;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.iceberg.TableProperties.*;

@Dependent
@Named("IcebergTableOperatorUpsert")
public class IcebergTableOperatorUpsert extends AbstractIcebergTableOperator {

private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperatorUpsert.class);
static final ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergTableOperatorUpsert.class);
@ConfigProperty(name = "debezium.sink.iceberg.upsert-dedup-column", defaultValue = "__source_ts_ms")
String sourceTsMsColumn;

Expand All @@ -64,11 +60,21 @@ public void initialize() {
}

private Optional<DeleteFile> getDeleteFile(Table icebergTable, ArrayList<Record> icebergRecords) {

final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;

String formatAsString = icebergTable.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
FileFormat fileFormat = FileFormat.valueOf(formatAsString.toUpperCase(Locale.ROOT));

final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + fileFormat.name();
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));
EncryptedOutputFile eout = icebergTable.encryption().encrypt(out);

EqualityDeleteWriter<Record> deleteWriter;
GenericAppenderFactory apender = new GenericAppenderFactory(
icebergTable.schema(),
icebergTable.spec(),
Ints.toArray(icebergTable.schema().identifierFieldIds()),
icebergTable.schema(),
null);
EqualityDeleteWriter<Record> edw = apender.newEqDeleteWriter(eout, fileFormat, null);

// anything is not an insert.
// upsertKeepDeletes = false, which means delete deletes
Expand All @@ -84,36 +90,16 @@ private Optional<DeleteFile> getDeleteFile(Table icebergTable, ArrayList<Record>
return Optional.empty();
}

try {
LOGGER.debug("Writing data to equality delete file: {}!", out);
deleteWriter = Parquet.writeDeletes(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.overwrite()
.forTable(icebergTable)
.equalityFieldIds(List.copyOf(icebergTable.schema().identifierFieldIds()))
.withSortOrder(icebergTable.sortOrder())
.buildEqualityWriter();

try (Closeable toClose = deleteWriter) {
deleteWriter.deleteAll(deleteRows);
}
edw.deleteAll(deleteRows);

try {
edw.close();
} catch (IOException e) {
throw new RuntimeException(e);
}

LOGGER.debug("Creating iceberg equality delete file!");
// Equality delete files identify deleted rows in a collection of data files by one or more column values,
// and may optionally contain additional columns of the deleted row.
return Optional.of(FileMetadata.deleteFileBuilder(icebergTable.spec())
.ofEqualityDeletes(Ints.toArray(icebergTable.schema().identifierFieldIds()))
.withFormat(FileFormat.PARQUET)
.withPath(out.location())
.withFileSizeInBytes(deleteWriter.length())
.withFileSizeInBytes(deleteWriter.length())
.withRecordCount(deleteRows.size())
.withSortOrder(icebergTable.sortOrder())
.build());
return Optional.of(edw.toDeleteFile());
}

private ArrayList<Record> toDeduppedIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public Map<String, String> getConfigOverrides() {
Map<String, String> config = new HashMap<>();

config.put("debezium.sink.type", "iceberg");
config.put("debezium.sink.iceberg.write.format.default", "orc");

return config;
}
Expand Down