Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Partitioned tables #71

Merged
merged 7 commits into from
Dec 31, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.slf4j.LoggerFactory;

/**
*
* @author Ismail Simsek
*/
@Dependent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import io.debezium.serde.DebeziumSerdes;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.iceberg.batchsizewait.InterfaceBatchSizeWait;
import io.debezium.server.iceberg.tableoperator.InterfaceIcebergTableOperator;
import io.debezium.server.iceberg.tableoperator.IcebergTableOperator;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
Expand Down Expand Up @@ -104,9 +104,7 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
InterfaceBatchSizeWait batchSizeWait;
Catalog icebergCatalog;
@Inject
@Any
Instance<InterfaceIcebergTableOperator> icebergTableOperatorInstances;
InterfaceIcebergTableOperator icebergTableOperator;
IcebergTableOperator icebergTableOperator;

@PostConstruct
void connect() {
Expand All @@ -127,9 +125,6 @@ void connect() {
batchSizeWait = IcebergUtil.selectInstance(batchSizeWaitInstances, batchSizeWaitName);
batchSizeWait.initizalize();

final String icebergTableOperatorName = upsert ? "IcebergTableOperatorUpsert" : "IcebergTableOperatorAppend";
icebergTableOperator = IcebergUtil.selectInstance(icebergTableOperatorInstances, icebergTableOperatorName);
icebergTableOperator.initialize();
// configure and set
valSerde.configure(Collections.emptyMap(), false);
valDeserializer = valSerde.deserializer();
Expand Down Expand Up @@ -169,7 +164,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
"Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}
return IcebergUtil.createIcebergTable(icebergCatalog, tableIdentifier,
event.getValue().get(0).getSchema(), writeFormat);
event.getValue().get(0).getSchema(), writeFormat, !upsert);
});
//addToTable(icebergTable, event.getValue());
icebergTableOperator.addToTable(icebergTable, event.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.*;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -53,8 +56,17 @@ public String destinationTable() {
return destination.replace(".", "_");
}

public GenericRecord getIcebergRecord(Schema schema) {
return getIcebergRecord(schema.asStruct(), value);
public GenericRecord asIcebergRecord(Schema schema) {
final GenericRecord record = asIcebergRecord(schema.asStruct(), value);

if (value != null && value.has("__source_ts_ms") && value.get("__source_ts_ms") != null) {
final long source_ts_ms = value.get("__source_ts_ms").longValue();
final OffsetDateTime odt = OffsetDateTime.ofInstant(Instant.ofEpochMilli(source_ts_ms), ZoneOffset.UTC);
record.setField("__source_ts", odt);
} else {
record.setField("__source_ts", null);
}
return record;
}

public String schemaHashCode() {
Expand All @@ -67,13 +79,13 @@ public Schema getSchema() {
throw new RuntimeException("Failed to get event schema, event value is null, destination:" + this.destination);
}

final List<Types.NestedField> tableColumns = getValueFields();
final List<Types.NestedField> tableColumns = valueSchemaFields();

if (tableColumns.isEmpty()) {
throw new RuntimeException("Failed to get schema destination:" + this.destination);
}

final List<Types.NestedField> keyColumns = getKeyFields();
final List<Types.NestedField> keyColumns = KeySchemaFields();
Set<Integer> identifierFieldIds = new HashSet<>();

for (Types.NestedField ic : keyColumns) {
Expand All @@ -100,43 +112,43 @@ public Schema getSchema() {
return new Schema(tableColumns, identifierFieldIds);
}

private GenericRecord getIcebergRecord(Types.StructType tableFields, JsonNode data) {
Map<String, Object> mappedResult = new HashMap<>();
private GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode data) {
LOGGER.debug("Processing nested field:{}", tableFields);
GenericRecord record = GenericRecord.create(tableFields);

for (Types.NestedField field : tableFields.fields()) {
// Set value to null if json event don't have the field
if (data == null || !data.has(field.name()) || data.get(field.name()) == null) {
mappedResult.put(field.name(), null);
record.setField(field.name(), null);
continue;
}
// get the value of the field from json event, map it to iceberg value
mappedResult.put(field.name(), jsonToGenericRecordVal(field, data.get(field.name())));
record.setField(field.name(), jsonValToIcebergVal(field, data.get(field.name())));
}

return GenericRecord.create(tableFields).copy(mappedResult);
return record;
}

//getIcebergFieldsFromEventSchema
private List<Types.NestedField> getKeyFields() {
private List<Types.NestedField> KeySchemaFields() {
if (keySchema != null && keySchema.has("fields") && keySchema.get("fields").isArray()) {
LOGGER.debug(keySchema.toString());
return getIcebergSchema(keySchema, "", 0);
return icebergSchema(keySchema, "", 0);
}
LOGGER.trace("Key schema not found!");
return new ArrayList<>();
}

private List<Types.NestedField> getValueFields() {
private List<Types.NestedField> valueSchemaFields() {
if (valueSchema != null && valueSchema.has("fields") && valueSchema.get("fields").isArray()) {
LOGGER.debug(valueSchema.toString());
return getIcebergSchema(valueSchema, "", 0);
return icebergSchema(valueSchema, "", 0, true);
}
LOGGER.trace("Event schema not found!");
return new ArrayList<>();
}

private Type.PrimitiveType getIcebergFieldType(String fieldType) {
private Type.PrimitiveType icebergFieldType(String fieldType) {
switch (fieldType) {
case "int8":
case "int16":
Expand All @@ -163,7 +175,12 @@ private Type.PrimitiveType getIcebergFieldType(String fieldType) {
}
}

private List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
return icebergSchema(eventSchema, schemaName, columnId, false);
}

private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schemaName, int columnId,
boolean addSourceTsField) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
String schemaType = eventSchema.get("type").textValue();
LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType);
Expand All @@ -180,7 +197,7 @@ private List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, String sc
if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) {
throw new RuntimeException("Complex Array types are not supported array[" + listItemType + "], field " + fieldName);
}
Type.PrimitiveType item = getIcebergFieldType(listItemType);
Type.PrimitiveType item = icebergFieldType(listItemType);
schemaColumns.add(Types.NestedField.optional(
columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
//throw new RuntimeException("'" + fieldName + "' has Array type, Array type not supported!");
Expand All @@ -194,20 +211,25 @@ private List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, String sc
//break;
case "struct":
// create it as struct, nested type
List<Types.NestedField> subSchema = getIcebergSchema(jsonSchemaFieldNode, fieldName, columnId);
List<Types.NestedField> subSchema = icebergSchema(jsonSchemaFieldNode, fieldName, columnId);
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema)));
columnId += subSchema.size();
break;
default: //primitive types
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, getIcebergFieldType(fieldType)));
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldType)));
break;
}
}

if (addSourceTsField) {
columnId++;
schemaColumns.add(Types.NestedField.optional(columnId, "__source_ts", Types.TimestampType.withZone()));
}
return schemaColumns;
}

private Object jsonToGenericRecordVal(Types.NestedField field,
JsonNode node) {
private Object jsonValToIcebergVal(Types.NestedField field,
JsonNode node) {
LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type());
final Object val;
switch (field.type().typeId()) {
Expand Down Expand Up @@ -247,7 +269,7 @@ private Object jsonToGenericRecordVal(Types.NestedField field,
case STRUCT:
// create it as struct, nested type
// recursive call to get nested data/record
val = getIcebergRecord(field.type().asStructType(), node);
val = asIcebergRecord(field.type().asStructType(), node);
break;
default:
// default to String type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,16 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
static final SortOrder TABLE_SORT_ORDER = SortOrder.builderFor(TABLE_SCHEMA)
.asc("event_sink_epoch_ms", NullOrder.NULLS_LAST)
.build();

@ConfigProperty(name = "debezium.sink.iceberg." + CatalogProperties.WAREHOUSE_LOCATION)
String warehouseLocation;

private static final Logger LOGGER = LoggerFactory.getLogger(IcebergEventsChangeConsumer.class);
private static final String PROP_PREFIX = "debezium.sink.iceberg.";
final Configuration hadoopConf = new Configuration();
final Map<String, String> icebergProperties = new ConcurrentHashMap<>();
@ConfigProperty(name = "debezium.sink.iceberg." + CatalogProperties.WAREHOUSE_LOCATION)
String warehouseLocation;
@ConfigProperty(name = "debezium.format.value", defaultValue = "json")
String valueFormat;
@ConfigProperty(name = "debezium.format.key", defaultValue = "json")
String keyFormat;
final Configuration hadoopConf = new Configuration();
@ConfigProperty(name = "debezium.sink.iceberg.fs.defaultFS")
String defaultFs;
@ConfigProperty(name = "debezium.sink.iceberg.table-namespace", defaultValue = "default")
Expand All @@ -97,13 +96,10 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
String catalogName;
@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;

@Inject
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;

final Map<String, String> icebergProperties = new ConcurrentHashMap<>();
Catalog icebergCatalog;
Table eventTable;

Expand All @@ -112,11 +108,11 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
void connect() {
if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new DebeziumException("debezium.format.value={" + valueFormat + "} not supported, " +
"Supported (debezium.format.value=*) formats are {json,}!");
"Supported (debezium.format.value=*) formats are {json,}!");
}
if (!keyFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new DebeziumException("debezium.format.key={" + valueFormat + "} not supported, " +
"Supported (debezium.format.key=*) formats are {json,}!");
"Supported (debezium.format.key=*) formats are {json,}!");
}

TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), "debezium_events");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@
import javax.enterprise.inject.literal.NamedLiteral;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericAppenderFactory;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.eclipse.microprofile.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.*;

/**
* @author Ismail Simsek
Expand Down Expand Up @@ -64,15 +63,23 @@ public static <T> T selectInstance(Instance<T> instances, String name) {
}

public static Table createIcebergTable(Catalog icebergCatalog, TableIdentifier tableIdentifier,
Schema schema, String writeFormat) {
Schema schema, String writeFormat, boolean partition) {

LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema,
schema.identifierFieldNames());

PartitionSpec ps;
if (partition) {
ps = PartitionSpec.builderFor(schema).day("__source_ts").build();
} else {
ps = PartitionSpec.builderFor(schema).build();
}

return icebergCatalog.buildTable(tableIdentifier, schema)
.withProperty(FORMAT_VERSION, "2")
.withProperty(DEFAULT_FILE_FORMAT, writeFormat.toLowerCase(Locale.ENGLISH))
.withSortOrder(IcebergUtil.getIdentifierFieldsAsSortOrder(schema))
.withPartitionSpec(ps)
.create();
}

Expand All @@ -95,4 +102,18 @@ public static Optional<Table> loadIcebergTable(Catalog icebergCatalog, TableIden
}
}

public static FileFormat getTableFileFormat(Table icebergTable) {
String formatAsString = icebergTable.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
return FileFormat.valueOf(formatAsString.toUpperCase(Locale.ROOT));
}

public static GenericAppenderFactory getTableAppender(Table icebergTable) {
return new GenericAppenderFactory(
icebergTable.schema(),
icebergTable.spec(),
Ints.toArray(icebergTable.schema().identifierFieldIds()),
icebergTable.schema(),
null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,13 @@
@Deprecated
public class DynamicBatchSizeWait implements InterfaceBatchSizeWait {
protected static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatchSizeWait.class);

final LinkedList<Integer> batchSizeHistory = new LinkedList<>();
final LinkedList<Integer> sleepMsHistory = new LinkedList<>();
@ConfigProperty(name = "debezium.source.max.batch.size", defaultValue = DEFAULT_MAX_BATCH_SIZE + "")
Integer maxBatchSize;

@ConfigProperty(name = "debezium.sink.batch.batch-size-wait.max-wait-ms", defaultValue = "300000")
Integer maxWaitMs;

final LinkedList<Integer> batchSizeHistory = new LinkedList<>();
final LinkedList<Integer> sleepMsHistory = new LinkedList<>();

public DynamicBatchSizeWait() {
batchSizeHistory.add(1);
batchSizeHistory.add(1);
Expand Down Expand Up @@ -82,7 +79,7 @@ else if ((getAverage(batchSizeHistory) / maxBatchSize) >= 0.90) {
sleepMsHistory.removeFirst();

LOGGER.debug("Calculating Wait delay\n" +
"max.batch.size={}\npoll.interval.ms={}\nbatchSizeHistory{}\nsleepMsHistory{}\nval{}",
"max.batch.size={}\npoll.interval.ms={}\nbatchSizeHistory{}\nsleepMsHistory{}\nval{}",
maxBatchSize, maxWaitMs, batchSizeHistory, sleepMsHistory, sleepMsHistory.getLast());

return sleepMsHistory.getLast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public interface InterfaceBatchSizeWait {
default void initizalize() {
}

default void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException{
default void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException {
}

}
Loading