Skip to content

Commit

Permalink
Support writing partitioned tables (#71)
Browse files Browse the repository at this point in the history
 Support writing partitioned tables #71
  • Loading branch information
ismailsimsek committed Dec 31, 2021
1 parent 98cf398 commit 0558cbb
Show file tree
Hide file tree
Showing 27 changed files with 720 additions and 607 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.slf4j.LoggerFactory;

/**
*
* @author Ismail Simsek
*/
@Dependent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait;
import io.debezium.server.iceberg.tableoperator.InterfaceIcebergTableOperator;
import io.debezium.server.iceberg.tableoperator.IcebergTableOperator;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
Expand Down Expand Up @@ -104,9 +104,7 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
InterfaceBatchSizeWait batchSizeWait;
Catalog icebergCatalog;
@Inject
@Any
Instance<InterfaceIcebergTableOperator> icebergTableOperatorInstances;
InterfaceIcebergTableOperator icebergTableOperator;
IcebergTableOperator icebergTableOperator;

@PostConstruct
void connect() {
Expand All @@ -127,9 +125,6 @@ void connect() {
batchSizeWait = IcebergUtil.selectInstance(batchSizeWaitInstances, batchSizeWaitName);
batchSizeWait.initizalize();

final String icebergTableOperatorName = upsert ? "IcebergTableOperatorUpsert" : "IcebergTableOperatorAppend";
icebergTableOperator = IcebergUtil.selectInstance(icebergTableOperatorInstances, icebergTableOperatorName);
icebergTableOperator.initialize();
// configure and set
valSerde.configure(Collections.emptyMap(), false);
valDeserializer = valSerde.deserializer();
Expand Down Expand Up @@ -169,7 +164,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
"Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}
return IcebergUtil.createIcebergTable(icebergCatalog, tableIdentifier,
event.getValue().get(0).getSchema(), writeFormat);
event.getValue().get(0).getSchema(), writeFormat, !upsert);
});
//addToTable(icebergTable, event.getValue());
icebergTableOperator.addToTable(icebergTable, event.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -53,8 +56,17 @@ public String destinationTable() {
return destination.replace(".", "_");
}

public GenericRecord getIcebergRecord(Schema schema) {
return getIcebergRecord(schema.asStruct(), value);
public GenericRecord asIcebergRecord(Schema schema) {
final GenericRecord record = asIcebergRecord(schema.asStruct(), value);

if (value != null && value.has("__source_ts_ms") && value.get("__source_ts_ms") != null) {
final long source_ts_ms = value.get("__source_ts_ms").longValue();
final OffsetDateTime odt = OffsetDateTime.ofInstant(Instant.ofEpochMilli(source_ts_ms), ZoneOffset.UTC);
record.setField("__source_ts", odt);
} else {
record.setField("__source_ts", null);
}
return record;
}

public String schemaHashCode() {
Expand All @@ -67,13 +79,13 @@ public Schema getSchema() {
throw new RuntimeException("Failed to get event schema, event value is null, destination:" + this.destination);
}

final List<Types.NestedField> tableColumns = getValueFields();
final List<Types.NestedField> tableColumns = valueSchemaFields();

if (tableColumns.isEmpty()) {
throw new RuntimeException("Failed to get schema destination:" + this.destination);
}

final List<Types.NestedField> keyColumns = getKeyFields();
final List<Types.NestedField> keyColumns = KeySchemaFields();
Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns) {
Expand All @@ -100,43 +112,43 @@ public Schema getSchema() {
return new Schema(tableColumns, identifierFieldIds);
}

private GenericRecord getIcebergRecord(Types.StructType tableFields, JsonNode data) {
Map<String, Object> mappedResult = new HashMap<>();
private GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode data) {
LOGGER.debug("Processing nested field:{}", tableFields);
GenericRecord record = GenericRecord.create(tableFields);

for (Types.NestedField field : tableFields.fields()) {
// Set value to null if json event don't have the field
if (data == null || !data.has(field.name()) || data.get(field.name()) == null) {
mappedResult.put(field.name(), null);
record.setField(field.name(), null);
continue;
}
// get the value of the field from json event, map it to iceberg value
mappedResult.put(field.name(), jsonToGenericRecordVal(field, data.get(field.name())));
record.setField(field.name(), jsonValToIcebergVal(field, data.get(field.name())));
}

return GenericRecord.create(tableFields).copy(mappedResult);
return record;
}

//getIcebergFieldsFromEventSchema
private List<Types.NestedField> getKeyFields() {
private List<Types.NestedField> KeySchemaFields() {
if (keySchema != null && keySchema.has("fields") && keySchema.get("fields").isArray()) {
LOGGER.debug(keySchema.toString());
return getIcebergSchema(keySchema, "", 0);
return icebergSchema(keySchema, "", 0);
}
LOGGER.trace("Key schema not found!");
return new ArrayList<>();
}

private List<Types.NestedField> getValueFields() {
private List<Types.NestedField> valueSchemaFields() {
if (valueSchema != null && valueSchema.has("fields") && valueSchema.get("fields").isArray()) {
LOGGER.debug(valueSchema.toString());
return getIcebergSchema(valueSchema, "", 0);
return icebergSchema(valueSchema, "", 0, true);
}
LOGGER.trace("Event schema not found!");
return new ArrayList<>();
}

private Type.PrimitiveType getIcebergFieldType(String fieldType) {
private Type.PrimitiveType icebergFieldType(String fieldType) {
switch (fieldType) {
case "int8":
case "int16":
Expand All @@ -163,7 +175,12 @@ private Type.PrimitiveType getIcebergFieldType(String fieldType) {
}
}

private List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
return icebergSchema(eventSchema, schemaName, columnId, false);
}

private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId,
boolean addSourceTsField) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
String schemaType = eventSchema.get("type").textValue();
LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType);
Expand All @@ -180,7 +197,7 @@ private List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, String sc
if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) {
throw new RuntimeException("Complex Array types are not supported array[" + listItemType + "], field " + fieldName);
}
Type.PrimitiveType item = getIcebergFieldType(listItemType);
Type.PrimitiveType item = icebergFieldType(listItemType);
schemaColumns.add(Types.NestedField.optional(
columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
//throw new RuntimeException("'" + fieldName + "' has Array type, Array type not supported!");
Expand All @@ -194,20 +211,25 @@ private List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, String sc
//break;
case "struct":
// create it as struct, nested type
List<Types.NestedField> subSchema = getIcebergSchema(jsonSchemaFieldNode, fieldName, columnId);
List<Types.NestedField> subSchema = icebergSchema(jsonSchemaFieldNode, fieldName, columnId);
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema)));
columnId += subSchema.size();
break;
default: //primitive types
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, getIcebergFieldType(fieldType)));
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldType)));
break;
}
}

if (addSourceTsField) {
columnId++;
schemaColumns.add(Types.NestedField.optional(columnId, "__source_ts", Types.TimestampType.withZone()));
}
return schemaColumns;
}

private Object jsonToGenericRecordVal(Types.NestedField field,
JsonNode node) {
private Object jsonValToIcebergVal(Types.NestedField field,
JsonNode node) {
LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type());
final Object val;
switch (field.type().typeId()) {
Expand Down Expand Up @@ -247,7 +269,7 @@ private Object jsonToGenericRecordVal(Types.NestedField field,
case STRUCT:
// create it as struct, nested type
// recursive call to get nested data/record
val = getIcebergRecord(field.type().asStructType(), node);
val = asIcebergRecord(field.type().asStructType(), node);
break;
default:
// default to String type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,16 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
static final SortOrder TABLE_SORT_ORDER = SortOrder.builderFor(TABLE_SCHEMA)
.asc("event_sink_epoch_ms", NullOrder.NULLS_LAST)
.build();

@ConfigProperty(name = "debezium.sink.iceberg." + CatalogProperties.WAREHOUSE_LOCATION)
String warehouseLocation;

private static final Logger LOGGER = LoggerFactory.getLogger(IcebergEventsChangeConsumer.class);
private static final String PROP_PREFIX = "debezium.sink.iceberg.";
final Configuration hadoopConf = new Configuration();
final Map<String, String> icebergProperties = new ConcurrentHashMap<>();
@ConfigProperty(name = "debezium.sink.iceberg." + CatalogProperties.WAREHOUSE_LOCATION)
String warehouseLocation;
@ConfigProperty(name = "debezium.format.value", defaultValue = "json")
String valueFormat;
@ConfigProperty(name = "debezium.format.key", defaultValue = "json")
String keyFormat;
final Configuration hadoopConf = new Configuration();
@ConfigProperty(name = "debezium.sink.iceberg.fs.defaultFS")
String defaultFs;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
Expand All @@ -97,13 +96,10 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
String catalogName;
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;

@Inject
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;

final Map<String, String> icebergProperties = new ConcurrentHashMap<>();
Catalog icebergCatalog;
Table eventTable;

Expand All @@ -112,11 +108,11 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
void connect() {
if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new DebeziumException("debezium.format.value={" + valueFormat + "} not supported, " +
"Supported (debezium.format.value=*) formats are {json,}!");
"Supported (debezium.format.value=*) formats are {json,}!");
}
if (!keyFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new DebeziumException("debezium.format.key={" + valueFormat + "} not supported, " +
"Supported (debezium.format.key=*) formats are {json,}!");
"Supported (debezium.format.key=*) formats are {json,}!");
}

TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), "debezium_events");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@
import javax.enterprise.inject.literal.NamedLiteral;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.eclipse.microprofile.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.*;

/**
* @author Ismail Simsek
Expand Down Expand Up @@ -64,15 +63,23 @@ public static <T> T selectInstance(Instance<T> instances, String name) {
}

public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier tableIdentifier,
Schema schema, String writeFormat) {
Schema schema, String writeFormat, boolean partition) {

LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema,
schema.identifierFieldNames());

PartitionSpec ps;
if (partition) {
ps = PartitionSpec.builderFor(schema).day("__source_ts").build();
} else {
ps = PartitionSpec.builderFor(schema).build();
}

return icebergCatalog.buildTable(tableIdentifier, schema)
.withProperty(FORMAT_VERSION, "2")
.withProperty(DEFAULT_FILE_FORMAT, writeFormat.toLowerCase(Locale.ENGLISH))
.withSortOrder(IcebergUtil.getIdentifierFieldsAsSortOrder(schema))
.withPartitionSpec(ps)
.create();
}

Expand All @@ -95,4 +102,18 @@ public static Optional<Table> loadIcebergTable(Catalog icebergCatalog, TableIden
}
}

public static FileFormat getTableFileFormat(Table icebergTable) {
String formatAsString = icebergTable.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
return FileFormat.valueOf(formatAsString.toUpperCase(Locale.ROOT));
}

public static GenericAppenderFactory getTableAppender(Table icebergTable) {
return new GenericAppenderFactory(
icebergTable.schema(),
icebergTable.spec(),
Ints.toArray(icebergTable.schema().identifierFieldIds()),
icebergTable.schema(),
null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,13 @@
@Deprecated
public class DynamicBatchSizeWait implements InterfaceBatchSizeWait {
protected static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatchSizeWait.class);

final LinkedList<Integer> batchSizeHistory = new LinkedList<>();
final LinkedList<Integer> sleepMsHistory = new LinkedList<>();
@ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = DEFAULT_MAX_BATCH_SIZE + "")
Integer maxBatchSize;

@ConfigProperty(name = "debezium.sink.batch.batch-size-wait.max-wait-ms", defaultValue = "300000")
Integer maxWaitMs;

final LinkedList<Integer> batchSizeHistory = new LinkedList<>();
final LinkedList<Integer> sleepMsHistory = new LinkedList<>();

public DynamicBatchSizeWait() {
batchSizeHistory.add(1);
batchSizeHistory.add(1);
Expand Down Expand Up @@ -82,7 +79,7 @@ else if ((getAverage(batchSizeHistory) / maxBatchSize) >= 0.90) {
sleepMsHistory.removeFirst();

LOGGER.debug("Calculating Wait delay\n" +
"max.batch.size={}\npoll.interval.ms={}\nbatchSizeHistory{}\nsleepMsHistory{}\nval{}",
"max.batch.size={}\npoll.interval.ms={}\nbatchSizeHistory{}\nsleepMsHistory{}\nval{}",
maxBatchSize, maxWaitMs, batchSizeHistory, sleepMsHistory, sleepMsHistory.getLast());

return sleepMsHistory.getLast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public interface InterfaceBatchSizeWait {
default void initizalize() {
}

default void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException{
default void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException {
}

}
Loading

0 comments on commit 0558cbb

Please sign in to comment.