Skip to content

Commit

Permalink
Minor improvements (#73)
Browse files Browse the repository at this point in the history
* minor improvements
  • Loading branch information
ismailsimsek committed Dec 31, 2021
1 parent 0558cbb commit 6bdfab1
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,22 +155,15 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
})
.collect(Collectors.groupingBy(IcebergChangeEvent::destinationTable));

// consume list of events for each destination table
for (Map.Entry<String, List<IcebergChangeEvent>> event : result.entrySet()) {
final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey());
Table icebergTable = IcebergUtil.loadIcebergTable(icebergCatalog, tableIdentifier)
.orElseGet(() -> {
if (!eventSchemaEnabled) {
throw new RuntimeException("Table '" + tableIdentifier + "' not found! " +
"Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}
return IcebergUtil.createIcebergTable(icebergCatalog, tableIdentifier,
event.getValue().get(0).getSchema(), writeFormat, !upsert);
});
//addToTable(icebergTable, event.getValue());
Table icebergTable = this.loadIcebergTable(icebergCatalog, tableIdentifier, event.getValue().get(0));
icebergTableOperator.addToTable(icebergTable, event.getValue());
}

// workaround! somehow offset is not saved to file unless we call committer.markProcessed
// even its should be saved to file periodically
// even it's should be saved to file periodically
for (ChangeEvent<Object, Object> record : records) {
LOGGER.trace("Processed event '{}'", record);
committer.markProcessed(record);
Expand All @@ -179,9 +172,26 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
this.logConsumerProgress(records.size());

batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis());
}

/**
* @param icebergCatalog iceberg catalog
* @param tableId iceberg table identifier
* @param sampleEvent sample debezium event. event schema used to create iceberg table when table not found
* @return iceberg table, throws RuntimeException when table not found and it's not possible to create it
*/
public Table loadIcebergTable(Catalog icebergCatalog, TableIdentifier tableId, IcebergChangeEvent sampleEvent) {
return IcebergUtil.loadIcebergTable(icebergCatalog, tableId).orElseGet(() -> {
if (!eventSchemaEnabled) {
throw new RuntimeException("Table '" + tableId + "' not found! " + "Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}
return IcebergUtil.createIcebergTable(icebergCatalog, tableId, sampleEvent.getSchema(), writeFormat, !upsert);
});
}

/**
* @param numUploadedEvents periodically log number of events consumed
*/
protected void logConsumerProgress(long numUploadedEvents) {
numConsumedEvents += numUploadedEvents;
if (logTimer.expired()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,43 @@
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait;
import io.debezium.server.iceberg.tableoperator.PartitionedAppendWriter;

import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
import javax.enterprise.context.Dependent;
import javax.enterprise.inject.Any;
import javax.enterprise.inject.Instance;
import javax.enterprise.inject.literal.NamedLiteral;
import javax.inject.Inject;
import javax.inject.Named;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.io.BaseTaskWriter;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
Expand All @@ -64,22 +65,32 @@
@Dependent
public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {

protected static final DateTimeFormatter dtFormater = DateTimeFormatter.ofPattern("yyyyMMdd").withZone(ZoneOffset.UTC);
protected static final ObjectMapper mapper = new ObjectMapper();
protected static final Serde<JsonNode> valSerde = DebeziumSerdes.payloadJson(JsonNode.class);
protected static final Serde<JsonNode> keySerde = DebeziumSerdes.payloadJson(JsonNode.class);
static final Schema TABLE_SCHEMA = new Schema(
required(1, "event_destination", Types.StringType.get()),
optional(2, "event_key", Types.StringType.get()),
optional(3, "event_value", Types.StringType.get()),
optional(4, "event_sink_epoch_ms", Types.LongType.get()),
optional(5, "event_sink_timestamptz", Types.TimestampType.withZone())
optional(2, "event_key_schema", Types.StringType.get()),
optional(3, "event_key_payload", Types.StringType.get()),
optional(4, "event_value_schema", Types.StringType.get()),
optional(5, "event_value_payload", Types.StringType.get()),
optional(6, "event_sink_epoch_ms", Types.LongType.get()),
optional(7, "event_sink_timestamptz", Types.TimestampType.withZone())
);

static final PartitionSpec TABLE_PARTITION = PartitionSpec.builderFor(TABLE_SCHEMA)
.identity("event_destination")
.hour("event_sink_timestamptz")
.build();
static final SortOrder TABLE_SORT_ORDER = SortOrder.builderFor(TABLE_SCHEMA)
.asc("event_sink_epoch_ms", NullOrder.NULLS_LAST)
.asc("event_sink_timestamptz", NullOrder.NULLS_LAST)
.build();
private static final Logger LOGGER = LoggerFactory.getLogger(IcebergEventsChangeConsumer.class);
private static final String PROP_PREFIX = "debezium.sink.iceberg.";
static Deserializer<JsonNode> valDeserializer;
static Deserializer<JsonNode> keyDeserializer;
final Configuration hadoopConf = new Configuration();
final Map<String, String> icebergProperties = new ConcurrentHashMap<>();
@ConfigProperty(name = "debezium.sink.iceberg." + CatalogProperties.WAREHOUSE_LOCATION)
Expand All @@ -103,7 +114,6 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
Catalog icebergCatalog;
Table eventTable;


@PostConstruct
void connect() {
if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
Expand Down Expand Up @@ -138,20 +148,41 @@ void connect() {
// load table
eventTable = icebergCatalog.loadTable(tableIdentifier);

Instance<InterfaceBatchSizeWait> instance = batchSizeWaitInstances.select(NamedLiteral.of(batchSizeWaitName));
batchSizeWait = IcebergUtil.selectInstance(batchSizeWaitInstances, batchSizeWaitName);
batchSizeWait.initizalize();

// configure and set
valSerde.configure(Collections.emptyMap(), false);
valDeserializer = valSerde.deserializer();
// configure and set
keySerde.configure(Collections.emptyMap(), true);
keyDeserializer = keySerde.deserializer();

LOGGER.info("Using {}", batchSizeWait.getClass().getName());
}

public GenericRecord getIcebergRecord(String destination, ChangeEvent<Object, Object> record, OffsetDateTime batchTime) {
GenericRecord rec = GenericRecord.create(TABLE_SCHEMA.asStruct());
rec.setField("event_destination", destination);
rec.setField("event_key", getString(record.key()));
rec.setField("event_value", getString(record.value()));
rec.setField("event_sink_epoch_ms", batchTime.toEpochSecond());
rec.setField("event_sink_timestamptz", batchTime);
return rec;
public GenericRecord getIcebergRecord(ChangeEvent<Object, Object> record, OffsetDateTime batchTime) {

try {
// deserialize
JsonNode valueSchema = record.value() == null ? null : mapper.readTree(getBytes(record.value())).get("schema");
JsonNode valuePayload = valDeserializer.deserialize(record.destination(), getBytes(record.value()));
JsonNode keyPayload = record.key() == null ? null : keyDeserializer.deserialize(record.destination(), getBytes(record.key()));
JsonNode keySchema = record.key() == null ? null : mapper.readTree(getBytes(record.key())).get("schema");
// convert to GenericRecord
GenericRecord rec = GenericRecord.create(TABLE_SCHEMA.asStruct());
rec.setField("event_destination", record.destination());
rec.setField("event_key_schema", mapper.writeValueAsString(keySchema));
rec.setField("event_key_payload", mapper.writeValueAsString(keyPayload));
rec.setField("event_value_schema", mapper.writeValueAsString(valueSchema));
rec.setField("event_value_payload", mapper.writeValueAsString(valuePayload));
rec.setField("event_sink_epoch_ms", batchTime.toEpochSecond());
rec.setField("event_sink_timestamptz", batchTime);

return rec;
} catch (IOException e) {
throw new DebeziumException(e);
}
}

public String map(String destination) {
Expand All @@ -164,72 +195,49 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
Instant start = Instant.now();

OffsetDateTime batchTime = OffsetDateTime.now(ZoneOffset.UTC);

Map<String, ArrayList<ChangeEvent<Object, Object>>> result = records.stream()
.collect(Collectors.groupingBy(
objectObjectChangeEvent -> map(objectObjectChangeEvent.destination()),
Collectors.mapping(p -> p,
Collectors.toCollection(ArrayList::new))));

for (Map.Entry<String, ArrayList<ChangeEvent<Object, Object>>> destEvents : result.entrySet()) {
// each destEvents is set of events for a single table
ArrayList<Record> destIcebergRecords = destEvents.getValue().stream()
.map(e -> getIcebergRecord(destEvents.getKey(), e, batchTime))
.collect(Collectors.toCollection(ArrayList::new));

commitBatch(destEvents.getKey(), batchTime, destIcebergRecords);
ArrayList<Record> icebergRecords = records.stream()
.map(e -> getIcebergRecord(e, batchTime))
.collect(Collectors.toCollection(ArrayList::new));
commitBatch(icebergRecords);

// workaround! somehow offset is not saved to file unless we call committer.markProcessed
// even it's should be saved to file periodically
for (ChangeEvent<Object, Object> record : records) {
LOGGER.trace("Processed event '{}'", record);
committer.markProcessed(record);
}
// committer.markProcessed(record);
committer.markBatchFinished();

batchSizeWait.waitMs(records.size(), (int) Duration.between(start, Instant.now()).toMillis());

}

private void commitBatch(String destination, OffsetDateTime batchTime, ArrayList<Record> icebergRecords) {
final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;
private void commitBatch(ArrayList<Record> icebergRecords) {

PartitionKey pk = new PartitionKey(TABLE_PARTITION, TABLE_SCHEMA);
Record pr = GenericRecord.create(TABLE_SCHEMA)
.copy("event_destination",
destination, "event_sink_timestamptz",
DateTimeUtil.microsFromTimestamptz(batchTime));
pk.partition(pr);
FileFormat format = IcebergUtil.getTableFileFormat(eventTable);
GenericAppenderFactory appenderFactory = IcebergUtil.getTableAppender(eventTable);
int partitionId = Integer.parseInt(dtFormater.format(Instant.now()));
OutputFileFactory fileFactory = OutputFileFactory.builderFor(eventTable, partitionId, 1L)
.defaultSpec(eventTable.spec()).format(format).build();

OutputFile out =
eventTable.io().newOutputFile(eventTable.locationProvider().newDataLocation(pk.toPath() + "/" + fileName));
BaseTaskWriter<Record> writer = new PartitionedAppendWriter(
eventTable.spec(), format, appenderFactory, fileFactory, eventTable.io(), Long.MAX_VALUE, eventTable.schema());

FileAppender<Record> writer;
try {
writer = Parquet.write(out)
.createWriterFunc(GenericParquetWriter::buildWriter)
.forTable(eventTable)
.overwrite()
.build();

try (Closeable toClose = writer) {
writer.addAll(icebergRecords);
for (Record icebergRecord : icebergRecords) {
writer.write(icebergRecord);
}

writer.close();
WriteResult files = writer.complete();
AppendFiles appendFiles = eventTable.newAppend();
Arrays.stream(files.dataFiles()).forEach(appendFiles::appendFile);
appendFiles.commit();

} catch (IOException e) {
LOGGER.error("Failed committing events to iceberg table!", e);
throw new RuntimeException("Failed commiting events to iceberg table!", e);
throw new DebeziumException("Failed committing events to iceberg table!", e);
}

DataFile dataFile = DataFiles.builder(eventTable.spec())
.withFormat(FileFormat.PARQUET)
.withPath(out.location())
.withFileSizeInBytes(writer.length())
.withSplitOffsets(writer.splitOffsets())
.withMetrics(writer.metrics())
.withPartition(pk)
.build();

LOGGER.debug("Appending new file '{}'", dataFile.path());
eventTable.newAppend()
.appendFile(dataFile)
.commit();
LOGGER.info("Committed {} events to table {}", icebergRecords.size(), eventTable.location());
LOGGER.info("Committed {} events to table! {}", icebergRecords.size(), eventTable.location());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier t
LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema,
schema.identifierFieldNames());

PartitionSpec ps;
if (partition) {
final PartitionSpec ps;
if (partition && schema.findField("__source_ts") != null) {
ps = PartitionSpec.builderFor(schema).day("__source_ts").build();
} else {
ps = PartitionSpec.builderFor(schema).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.enterprise.context.Dependent;
import javax.inject.Inject;

Expand Down Expand Up @@ -89,18 +90,22 @@ private int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) {
}

public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {

final List<Record> batchEvents;
// when its operation mode is not upsert deduplicate the events to avoid inserting duplicate row
if (upsert && !icebergTable.schema().identifierFieldIds().isEmpty()) {
batchEvents = deduplicatedBatchRecords(icebergTable.schema(), events);
} else {
batchEvents = events.stream().
map(e -> e.asIcebergRecord(icebergTable.schema())).
collect(Collectors.toList());
}

// Initialize a task writer to write both INSERT and equality DELETE.
BaseTaskWriter<Record> writer = writerFactory.create(icebergTable);
try {
if (upsert && !icebergTable.schema().identifierFieldIds().isEmpty()) {
ArrayList<Record> icebergRecords = deduplicatedBatchRecords(icebergTable.schema(), events);
for (Record icebergRecord : icebergRecords) {
writer.write(icebergRecord);
}
} else {
for (IcebergChangeEvent e : events) {
writer.write(e.asIcebergRecord(icebergTable.schema()));
}
for (Record icebergRecord : batchEvents) {
writer.write(icebergRecord);
}

writer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ public void testSimpleUpload() {
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
try {
Dataset<Row> ds = spark.newSession().sql("SELECT * FROM debeziumevents.debezium_events");
ds.show();
return ds.count() >= 5
ds.show(false);
return ds.count() >= 10
&& ds.select("event_destination").distinct().count() >= 2;
} catch (Exception e) {
return false;
}
});

S3Minio.listFiles();
}

}

0 comments on commit 6bdfab1

Please sign in to comment.