Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IcebergChangeEvent class Part 1 #66

Merged
merged 1 commit into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@

import java.time.Duration;
import java.time.Instant;
import java.util.*;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import javax.annotation.PostConstruct;
Expand All @@ -41,7 +44,6 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.types.Types;
import org.eclipse.microprofile.config.ConfigProvider;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
Expand Down Expand Up @@ -137,23 +139,18 @@ void connect() {
LOGGER.info("Using {}", icebergTableOperator.getClass().getName());

}

public String map(String destination) {
return destination.replace(".", "_");
}


@Override
public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer)
throws InterruptedException {
Instant start = Instant.now();

Map<String, ArrayList<ChangeEvent<Object, Object>>> result = records.stream()
.collect(Collectors.groupingBy(
objectObjectChangeEvent -> map(objectObjectChangeEvent.destination()),
Collectors.mapping(p -> p,
Collectors.toCollection(ArrayList::new))));
Map<String, List<IcebergChangeEvent<Object, Object>>> result =
records.stream()
.map(IcebergChangeEvent::new)
.collect(Collectors.groupingBy(IcebergChangeEvent::destinationTable));

for (Map.Entry<String, ArrayList<ChangeEvent<Object, Object>>> event : result.entrySet()) {
for (Map.Entry<String, List<IcebergChangeEvent<Object, Object>>> event : result.entrySet()) {
final TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), tablePrefix + event.getKey());
Table icebergTable = loadIcebergTable(tableIdentifier)
.orElseGet(() -> createIcebergTable(tableIdentifier, event.getValue().get(0)));
Expand Down Expand Up @@ -185,26 +182,14 @@ protected void logConsumerProgress(long numUploadedEvents) {


private Table createIcebergTable(TableIdentifier tableIdentifier,
ChangeEvent<Object, Object> event) {
IcebergChangeEvent<Object, Object> event) {

if (!eventSchemaEnabled) {
throw new RuntimeException("Table '" + tableIdentifier + "' not found! " +
"Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}

if (event.value() == null) {
throw new RuntimeException("Failed to get event schema for table '" + tableIdentifier + "' event value is null");
}

List<Types.NestedField> tableColumns = IcebergUtil.getIcebergFieldsFromEventSchema(getBytes(event.value()));
List<Types.NestedField> keyColumns =
IcebergUtil.getIcebergFieldsFromEventSchema(event.key() == null ? null : getBytes(event.key()));

if (tableColumns.isEmpty()) {
throw new RuntimeException("Failed to create table " + tableIdentifier);
"Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}

Schema schema = IcebergUtil.getSchema(tableColumns, keyColumns);
Schema schema = event.getSchema();

LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema,
schema.identifierFieldNames());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
*
* * Copyright memiiso Authors.
* *
* * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*
*/

package io.debezium.server.iceberg;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;

import java.util.*;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Ismail Simsek
*/
public class IcebergChangeEvent<K,V> implements ChangeEvent<K,V> {

private final ChangeEvent<K,V> event;
protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeEvent.class);
protected static final ObjectMapper objectMapper = new ObjectMapper();

public IcebergChangeEvent(ChangeEvent<K,V> e) {
event = e;
}

@Override
public K key() {
return event.key();
}

@Override
public V value() {
return event.value();
}

@Override
public String destination() {
return event.destination();
}

public String destinationTable() {
return event.destination().replace(".","_");
}

public GenericRecord getIcebergRecord(Schema schema, JsonNode data) {
return IcebergUtil.getIcebergRecord(schema.asStruct(), data);
}

public Schema getSchema() {

if (this.value() == null) {
throw new RuntimeException("Failed to get event schema event value is null, destination:" + this.destination());
}

List<Types.NestedField> tableColumns = IcebergUtil.getIcebergFieldsFromEventSchema(getBytes(this.value()));
List<Types.NestedField> keyColumns =
IcebergUtil.getIcebergFieldsFromEventSchema(this.key() == null ? null : getBytes(this.key()));

if (tableColumns.isEmpty()) {
throw new RuntimeException("Failed to get schema destination:" + this.destination());
}

return getSchema(tableColumns, keyColumns);
}

private Schema getSchema(List<Types.NestedField> tableColumns,
List<Types.NestedField> keyColumns) {

Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns) {
boolean found = false;

ListIterator<Types.NestedField> colsIterator = tableColumns.listIterator();
while (colsIterator.hasNext()) {
Types.NestedField tc = colsIterator.next();
if (Objects.equals(tc.name(), ic.name())) {
identifierFieldIds.add(tc.fieldId());
// set column as required its part of identifier filed
colsIterator.set(tc.asRequired());
found = true;
break;
}
}

if (!found) {
throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns");
}

}

return new Schema(tableColumns, identifierFieldIds);
}


protected byte[] getBytes(Object object) {
if (object instanceof byte[]) {
return (byte[]) object;
}
else if (object instanceof String) {
return ((String) object).getBytes();
}
throw new DebeziumException(unsupportedTypeMessage(object));
}

protected String unsupportedTypeMessage(Object object) {
final String type = (object == null) ? "null" : object.getClass().getName();
return "Unexpected data type '" + type + "'";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.types.Type.PrimitiveType;
import org.apache.iceberg.types.Types;
import org.eclipse.microprofile.config.Config;
Expand All @@ -31,12 +33,12 @@ public class IcebergUtil {
protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergUtil.class);
protected static final ObjectMapper jsonObjectMapper = new ObjectMapper();

public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema) {
private static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema) {
LOGGER.debug(eventSchema.toString());
return getIcebergSchema(eventSchema, "", 0);
}

public static PrimitiveType getIcebergFieldType(String fieldType) {
private static PrimitiveType getIcebergFieldType(String fieldType) {
switch (fieldType) {
case "int8":
case "int16":
Expand All @@ -63,7 +65,7 @@ public static PrimitiveType getIcebergFieldType(String fieldType) {
}
}

public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
private static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
String schemaType = eventSchema.get("type").textValue();
LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType);
Expand Down Expand Up @@ -106,17 +108,13 @@ public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, Str
return schemaColumns;
}

public static boolean hasSchema(JsonNode jsonNode) {
private static boolean hasSchema(JsonNode jsonNode) {
return jsonNode != null
&& jsonNode.has("schema")
&& jsonNode.get("schema").has("fields")
&& jsonNode.get("schema").get("fields").isArray();
}

public static GenericRecord getIcebergRecord(Schema schema, JsonNode data) {
return IcebergUtil.getIcebergRecord(schema.asStruct(), data);
}

public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonNode data) {
Map<String, Object> mappedResult = new HashMap<>();
LOGGER.debug("Processing nested field:{}", tableFields);
Expand Down Expand Up @@ -219,34 +217,6 @@ public static List<Types.NestedField> getIcebergFieldsFromEventSchema(byte[] eve
}
}

public static Schema getSchema(List<Types.NestedField> tableColumns,
List<Types.NestedField> keyColumns) {

Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns) {
boolean found = false;

ListIterator<Types.NestedField> colsIterator = tableColumns.listIterator();
while (colsIterator.hasNext()) {
Types.NestedField tc = colsIterator.next();
if (Objects.equals(tc.name(), ic.name())) {
identifierFieldIds.add(tc.fieldId());
// set column as required its part of identifier filed
colsIterator.set(tc.asRequired());
found = true;
break;
}
}

if (!found) {
throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns");
}

}

return new Schema(tableColumns, identifierFieldIds);
}

public static SortOrder getIdentifierFieldsAsSortOrder(Schema schema) {
SortOrder.Builder sob = SortOrder.builderFor(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,12 @@
package io.debezium.server.iceberg.tableoperator;

import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.server.iceberg.IcebergChangeEvent;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Locale;
import java.util.UUID;
import java.util.*;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.iceberg.DataFile;
Expand Down Expand Up @@ -76,11 +72,11 @@ protected String unsupportedTypeMessage(Object object) {
return "Unexpected data type '" + type + "'";
}

protected ArrayList<Record> toIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) {
protected ArrayList<Record> toIcebergRecords(Schema schema, List<IcebergChangeEvent<Object, Object>> events) {

ArrayList<Record> icebergRecords = new ArrayList<>();
for (ChangeEvent<Object, Object> e : events) {
GenericRecord icebergRecord = IcebergUtil.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(),
for (IcebergChangeEvent<Object, Object> e : events) {
GenericRecord icebergRecord = e.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(),
getBytes(e.value())));
icebergRecords.add(icebergRecord);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@

package io.debezium.server.iceberg.tableoperator;

import io.debezium.engine.ChangeEvent;
import io.debezium.server.iceberg.IcebergChangeEvent;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Predicate;
import javax.enterprise.context.Dependent;
import javax.inject.Named;
Expand All @@ -28,7 +29,7 @@ public class IcebergTableOperatorAppend extends AbstractIcebergTableOperator {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIcebergTableOperator.class);

@Override
public void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events) {
public void addToTable(Table icebergTable, List<IcebergChangeEvent<Object, Object>> events) {

ArrayList<Record> icebergRecords = toIcebergRecords(icebergTable.schema(), events);
DataFile dataFile = getDataFile(icebergTable, icebergRecords);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

package io.debezium.server.iceberg.tableoperator;

import io.debezium.engine.ChangeEvent;
import io.debezium.server.iceberg.IcebergUtil;
import io.debezium.server.iceberg.IcebergChangeEvent;

import java.io.IOException;
import java.time.Instant;
Expand Down Expand Up @@ -96,11 +95,11 @@ private Optional<DeleteFile> getDeleteFile(Table icebergTable, ArrayList<Record>
return Optional.of(edw.toDeleteFile());
}

private ArrayList<Record> toDeduppedIcebergRecords(Schema schema, ArrayList<ChangeEvent<Object, Object>> events) {
private ArrayList<Record> toDeduppedIcebergRecords(Schema schema, List<IcebergChangeEvent<Object, Object>> events) {
ConcurrentHashMap<Object, GenericRecord> icebergRecordsmap = new ConcurrentHashMap<>();

for (ChangeEvent<Object, Object> e : events) {
GenericRecord icebergRecord = IcebergUtil.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(),
for (IcebergChangeEvent<Object, Object> e : events) {
GenericRecord icebergRecord = e.getIcebergRecord(schema, valDeserializer.deserialize(e.destination(),
getBytes(e.value())));

// only replace it if its newer
Expand Down Expand Up @@ -133,7 +132,7 @@ private int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) {
}

@Override
public void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events) {
public void addToTable(Table icebergTable, List<IcebergChangeEvent<Object, Object>> events) {

if (icebergTable.sortOrder().isUnsorted()) {
LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!");
Expand Down
Loading