Skip to content

Commit

Permalink
Add IcebergChangeEvent class Part 1 (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Dec 3, 2021
1 parent d80c235 commit a0dc73f
Show file tree
Hide file tree
Showing 10 changed files with 220 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@

import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
Expand All @@ -41,7 +44,6 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.types.Types;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
Expand Down Expand Up @@ -137,23 +139,18 @@ void connect() {
LOGGER.info("Using {}", icebergTableOperator.getClass().getName());

}

public String map(String destination) {
return destination.replace(".", "_");
}


@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
Instant start = Instant.now();

Map<String, ArrayList<ChangeEvent<Object, Object>>> result = records.stream()
.collect(Collectors.groupingBy(
objectObjectChangeEvent -> map(objectObjectChangeEvent.destination()),
Collectors.mapping(p -> p,
Collectors.toCollection(ArrayList::new))));
Map<String, List<IcebergChangeEvent<Object, Object>>> result =
records.stream()
.map(IcebergChangeEvent::new)
.collect(Collectors.groupingBy(IcebergChangeEvent::destinationTable));

for (Map.Entry<String, ArrayList<ChangeEvent<Object, Object>>> event : result.entrySet()) {
for (Map.Entry<String, List<IcebergChangeEvent<Object, Object>>> event : result.entrySet()) {
final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey());
Table icebergTable = loadIcebergTable(tableIdentifier)
.orElseGet(() -> createIcebergTable(tableIdentifier, event.getValue().get(0)));
Expand Down Expand Up @@ -185,26 +182,14 @@ protected void logConsumerProgress(long numUploadedEvents) {


private Table createIcebergTable(TableIdentifier tableIdentifier,
ChangeEvent<Object, Object> event) {
IcebergChangeEvent<Object, Object> event) {

if (!eventSchemaEnabled) {
throw new RuntimeException("Table '" + tableIdentifier + "' not found! " +
"Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}

if (event.value() == null) {
throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null");
}

List<Types.NestedField> tableColumns = IcebergUtil.getIcebergFieldsFromEventSchema(getBytes(event.value()));
List<Types.NestedField> keyColumns =
IcebergUtil.getIcebergFieldsFromEventSchema(event.key() == null ? null : getBytes(event.key()));

if (tableColumns.isEmpty()) {
throw new RuntimeException("Failed to create table " + tableIdentifier);
"Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}

Schema schema = IcebergUtil.getSchema(tableColumns, keyColumns);
Schema schema = event.getSchema();

LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema,
schema.identifierFieldNames());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;

import java.util.*;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Ismail Simsek
*/
public class IcebergChangeEvent<K,V> implements ChangeEvent<K,V> {

private final ChangeEvent<K,V> event;
protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeEvent.class);
protected static final ObjectMapper objectMapper = new ObjectMapper();

public IcebergChangeEvent(ChangeEvent<K,V> e) {
event = e;
}

@Override
public K key() {
return event.key();
}

@Override
public V value() {
return event.value();
}

@Override
public String destination() {
return event.destination();
}

public String destinationTable() {
return event.destination().replace(".","_");
}

public GenericRecord getIcebergRecord(Schema schema, JsonNode data) {
return IcebergUtil.getIcebergRecord(schema.asStruct(), data);
}

public Schema getSchema() {

if (this.value() == null) {
throw new RuntimeException("Failed to get event schema event value is null, destination:" + this.destination());
}

List<Types.NestedField> tableColumns = IcebergUtil.getIcebergFieldsFromEventSchema(getBytes(this.value()));
List<Types.NestedField> keyColumns =
IcebergUtil.getIcebergFieldsFromEventSchema(this.key() == null ? null : getBytes(this.key()));

if (tableColumns.isEmpty()) {
throw new RuntimeException("Failed to get schema destination:" + this.destination());
}

return getSchema(tableColumns, keyColumns);
}

private Schema getSchema(List<Types.NestedField> tableColumns,
List<Types.NestedField> keyColumns) {

Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns) {
boolean found = false;

ListIterator<Types.NestedField> colsIterator = tableColumns.listIterator();
while (colsIterator.hasNext()) {
Types.NestedField tc = colsIterator.next();
if (Objects.equals(tc.name(), ic.name())) {
identifierFieldIds.add(tc.fieldId());
// set column as required its part of identifier filed
colsIterator.set(tc.asRequired());
found = true;
break;
}
}

if (!found) {
throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns");
}

}

return new Schema(tableColumns, identifierFieldIds);
}


protected byte[] getBytes(Object object) {
if (object instanceof byte[]) {
return (byte[]) object;
}
else if (object instanceof String) {
return ((String) object).getBytes();
}
throw new DebeziumException(unsupportedTypeMessage(object));
}

protected String unsupportedTypeMessage(Object object) {
final String type = (object == null) ? "null" : object.getClass().getName();
return "Unexpected data type '" + type + "'";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.types.Type.PrimitiveType;
import org.apache.iceberg.types.Types;
import org.eclipse.microprofile.config.Config;
Expand All @@ -31,12 +33,12 @@ public class IcebergUtil {
protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergUtil.class);
protected static final ObjectMapper jsonObjectMapper = new ObjectMapper();

public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema) {
private static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema) {
LOGGER.debug(eventSchema.toString());
return getIcebergSchema(eventSchema, "", 0);
}

public static PrimitiveType getIcebergFieldType(String fieldType) {
private static PrimitiveType getIcebergFieldType(String fieldType) {
switch (fieldType) {
case "int8":
case "int16":
Expand All @@ -63,7 +65,7 @@ public static PrimitiveType getIcebergFieldType(String fieldType) {
}
}

public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
private static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
String schemaType = eventSchema.get("type").textValue();
LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType);
Expand Down Expand Up @@ -106,17 +108,13 @@ public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, Str
return schemaColumns;
}

public static boolean hasSchema(JsonNode jsonNode) {
private static boolean hasSchema(JsonNode jsonNode) {
return jsonNode != null
&& jsonNode.has("schema")
&& jsonNode.get("schema").has("fields")
&& jsonNode.get("schema").get("fields").isArray();
}

public static GenericRecord getIcebergRecord(Schema schema, JsonNode data) {
return IcebergUtil.getIcebergRecord(schema.asStruct(), data);
}

public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonNode data) {
Map<String, Object> mappedResult = new HashMap<>();
LOGGER.debug("Processing nested field:{}", tableFields);
Expand Down Expand Up @@ -219,34 +217,6 @@ public static List<Types.NestedField> getIcebergFieldsFromEventSchema(byte[] eve
}
}

public static Schema getSchema(List<Types.NestedField> tableColumns,
List<Types.NestedField> keyColumns) {

Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns) {
boolean found = false;

ListIterator<Types.NestedField> colsIterator = tableColumns.listIterator();
while (colsIterator.hasNext()) {
Types.NestedField tc = colsIterator.next();
if (Objects.equals(tc.name(), ic.name())) {
identifierFieldIds.add(tc.fieldId());
// set column as required its part of identifier filed
colsIterator.set(tc.asRequired());
found = true;
break;
}
}

if (!found) {
throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns");
}

}

return new Schema(tableColumns, identifierFieldIds);
}

public static SortOrder getIdentifierFieldsAsSortOrder(Schema schema) {
SortOrder.Builder sob = SortOrder.builderFor(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,12 @@
package io.debezium.server.iceberg.tableoperator;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.server.iceberg.IcebergChangeEvent;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Locale;
import java.util.UUID;
import java.util.*;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.iceberg.DataFile;
Expand Down Expand Up @@ -76,11 +72,11 @@ protected String unsupportedTypeMessage(Object object) {
return "Unexpected data type '" + type + "'";
}

protected ArrayList<Record> toIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) {
protected ArrayList<Record> toIcebergRecords(Schema schema, List<IcebergChangeEvent<Object, Object>> events) {

ArrayList<Record> icebergRecords = new ArrayList<>();
for (ChangeEvent<Object, Object> e : events) {
GenericRecord icebergRecord = IcebergUtil.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(),
for (IcebergChangeEvent<Object, Object> e : events) {
GenericRecord icebergRecord = e.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(),
getBytes(e.value())));
icebergRecords.add(icebergRecord);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@

package io.debezium.server.iceberg.tableoperator;

import io.debezium.engine.ChangeEvent;
import io.debezium.server.iceberg.IcebergChangeEvent;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;
import javax.enterprise.context.Dependent;
import javax.inject.Named;
Expand All @@ -28,7 +29,7 @@ public class IcebergTableOperatorAppend extends AbstractIcebergTableOperator {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTableOperator.class);

@Override
public void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events) {
public void addToTable(Table icebergTable, List<IcebergChangeEvent<Object, Object>> events) {

ArrayList<Record> icebergRecords = toIcebergRecords(icebergTable.schema(), events);
DataFile dataFile = getDataFile(icebergTable, icebergRecords);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

package io.debezium.server.iceberg.tableoperator;

import io.debezium.engine.ChangeEvent;
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.server.iceberg.IcebergChangeEvent;

import java.io.IOException;
import java.time.Instant;
Expand Down Expand Up @@ -96,11 +95,11 @@ private Optional<DeleteFile> getDeleteFile(Table icebergTable, ArrayList<Record>
return Optional.of(edw.toDeleteFile());
}

private ArrayList<Record> toDeduppedIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) {
private ArrayList<Record> toDeduppedIcebergRecords(Schema schema, List<IcebergChangeEvent<Object, Object>> events) {
ConcurrentHashMap<Object, GenericRecord> icebergRecordsmap = new ConcurrentHashMap<>();

for (ChangeEvent<Object, Object> e : events) {
GenericRecord icebergRecord = IcebergUtil.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(),
for (IcebergChangeEvent<Object, Object> e : events) {
GenericRecord icebergRecord = e.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(),
getBytes(e.value())));

// only replace it if its newer
Expand Down Expand Up @@ -133,7 +132,7 @@ private int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) {
}

@Override
public void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events) {
public void addToTable(Table icebergTable, List<IcebergChangeEvent<Object, Object>> events) {

if (icebergTable.sortOrder().isUnsorted()) {
LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!");
Expand Down
Loading

0 comments on commit a0dc73f

Please sign in to comment.