Skip to content

Commit

Permalink
Minor improvements (#19)
Browse files Browse the repository at this point in the history
* Minor improvements
  • Loading branch information
ismailsimsek committed Aug 15, 2021
1 parent 00a9b68 commit bcee6a6
Show file tree
Hide file tree
Showing 14 changed files with 99 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.slf4j.LoggerFactory;

/**
* Optimizes batch size around 85%-90% of max,batch.size using dynamically calculated sleep(ms)
*
* @author Ismail Simsek
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.slf4j.LoggerFactory;

/**
* Implementation of the consumer that delivers the messages into Amazon S3 destination.
*
* @author Ismail Simsek
*/
Expand Down Expand Up @@ -123,7 +122,7 @@ public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) thr
return tb.create();
}

return null;
throw new Exception("Failed to create table "+ tableIdentifier);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
import org.slf4j.LoggerFactory;

/**
* Implementation of the consumer that delivers the messages into Amazon S3 destination.
* Implementation of the consumer that delivers the messages to iceberg tables.
*
* @author Ismail Simsek
*/
Expand All @@ -65,6 +65,7 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu

private static final Logger LOGGER = LoggerFactory.getLogger(IcebergChangeConsumer.class);
private static final String PROP_PREFIX = "debezium.sink.iceberg.";
static ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);
@ConfigProperty(name = "debezium.format.value", defaultValue = "json")
String valueFormat;
@ConfigProperty(name = "debezium.format.key", defaultValue = "json")
Expand All @@ -81,26 +82,20 @@ public class IcebergChangeConsumer extends BaseChangeConsumer implements Debeziu
String namespace;
@ConfigProperty(name = "debezium.sink.iceberg.catalog-name", defaultValue = "default")
String catalogName;

@ConfigProperty(name = "debezium.sink.iceberg.upsert", defaultValue = "true")
boolean upsertData;
boolean upsert;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-keep-deletes", defaultValue = "true")
boolean upsertDataKeepDeletes;

boolean upsertKeepDeletes;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-op-column", defaultValue = "__op")
String opColumn;
@ConfigProperty(name = "debezium.sink.iceberg.upsert-source-ts-ms-column", defaultValue = "__source_ts_ms")
String sourceTsMsColumn;

@ConfigProperty(name = "debezium.sink.batch.batch-size-wait", defaultValue = "NoBatchSizeWait")
String batchSizeWaitName;

@Inject
@Any
Instance<InterfaceBatchSizeWait> batchSizeWaitInstances;
InterfaceBatchSizeWait batchSizeWait;
static ImmutableMap<String, Integer> cdcOperations = ImmutableMap.of("c", 1, "r", 2, "u", 3, "d", 4);

Configuration hadoopConf = new Configuration();
Catalog icebergCatalog;
Map<String, String> icebergProperties = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -142,14 +137,21 @@ public String map(String destination) {
}

private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent<Object, Object> event) throws Exception {
if (eventSchemaEnabled && event.value() != null) {
DebeziumToIcebergTable eventSchema = event.key() == null
? new DebeziumToIcebergTable(getBytes(event.value()))
: new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key()));

return eventSchema.create(icebergCatalog, tableIdentifier);
if (!eventSchemaEnabled) {
throw new Exception("Table '" + tableIdentifier + "' not found! " +
"Set `debezium.format.value.schemas.enable` to true to create tables automatically!");
}

if (event.value() == null) {
throw new Exception("Failed to get event schema for table '" + tableIdentifier + "' event value is null");
}
return null;

DebeziumToIcebergTable eventSchema = event.key() == null
? new DebeziumToIcebergTable(getBytes(event.value()))
: new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key()));

return eventSchema.create(icebergCatalog, tableIdentifier);
}

@Override
Expand All @@ -170,7 +172,7 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
// load iceberg table
icebergTable = icebergCatalog.loadTable(tableIdentifier);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
// get schema fom an event and create iceberg table
// get schema from an event and create iceberg table
try {
icebergTable = createIcebergTable(tableIdentifier, event.getValue().get(0));
} catch (Exception e2) {
Expand All @@ -195,10 +197,12 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
public int compareByTsThenOp(GenericRecord lhs, GenericRecord rhs) {
if (lhs.getField(sourceTsMsColumn).equals(rhs.getField(sourceTsMsColumn))) {
// return (x < y) ? -1 : ((x == y) ? 0 : 1);
return Integer.compare(
cdcOperations.getOrDefault(lhs.getField(opColumn), -1),
cdcOperations.getOrDefault(rhs.getField(opColumn), -1)
);
return
cdcOperations.getOrDefault(lhs.getField(opColumn), -1)
.compareTo(
cdcOperations.getOrDefault(rhs.getField(opColumn), -1)
)
;
} else {
return Long.compare((Long) lhs.getField(sourceTsMsColumn), (Long) rhs.getField(sourceTsMsColumn));
}
Expand Down Expand Up @@ -241,9 +245,9 @@ private ArrayList<Record> toDeduppedIcebergRecords(Schema schema, ArrayList<Chan

private void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object>> events) throws InterruptedException {

if (!upsertData || icebergTable.sortOrder().isUnsorted()) {
if (!upsert || icebergTable.sortOrder().isUnsorted()) {

if (upsertData && icebergTable.sortOrder().isUnsorted()) {
if (upsert && icebergTable.sortOrder().isUnsorted()) {
LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!");
}

Expand All @@ -258,7 +262,7 @@ private void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object
// DO UPSERT >>> DELETE + INSERT
ArrayList<Record> icebergRecords = toDeduppedIcebergRecords(icebergTable.schema(), events);
DataFile dataFile = getDataFile(icebergTable, icebergRecords);
DeleteFile deleteDataFile = getDeleteDataFile(icebergTable, icebergRecords);
DeleteFile deleteDataFile = getDeleteFile(icebergTable, icebergRecords);
LOGGER.debug("Committing new file as Upsert (has deletes:{}) '{}' !", deleteDataFile != null, dataFile.path());
RowDelta c = icebergTable
.newRowDelta()
Expand All @@ -271,11 +275,11 @@ private void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object

c.commit();
}
LOGGER.info("Committed {} events to table! {}", events.size(),icebergTable.location());
LOGGER.info("Committed {} events to table! {}", events.size(), icebergTable.location());

}

private DeleteFile getDeleteDataFile(Table icebergTable, ArrayList<Record> icebergRecords) throws InterruptedException {
private DeleteFile getDeleteFile(Table icebergTable, ArrayList<Record> icebergRecords) throws InterruptedException {

final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));
Expand All @@ -284,13 +288,13 @@ private DeleteFile getDeleteDataFile(Table icebergTable, ArrayList<Record> icebe
EqualityDeleteWriter<Record> deleteWriter;

// anything is not an insert.
// upsertDataKeepDeletes = false, which means delete deletes
// upsertKeepDeletes = false, which means delete deletes
List<Record> deleteRows = icebergRecords.stream()
.filter(r ->
// anything is not an insert.
!r.getField(opColumn).equals("c")
// upsertDataKeepDeletes = false and its deleted record, which means delete deletes
// || !(upsertDataKeepDeletes && r.getField(opColumn).equals("d"))
// upsertKeepDeletes = false and its deleted record, which means delete deletes
// || !(upsertKeepDeletes && r.getField(opColumn).equals("d"))
).collect(Collectors.toList());

if (deleteRows.size() == 0) {
Expand Down Expand Up @@ -346,11 +350,11 @@ private DataFile getDataFile(Table icebergTable, ArrayList<Record> icebergRecord
List<Record> newRecords = icebergRecords.stream()
.filter(r ->
// if its append OR upsert with keep deletes then add all the records to data file
!upsertData
!upsert
// if table has no PK - which means fall back to append
|| icebergTable.sortOrder().isUnsorted()
// if its upsert and upsertKeepDeletes = true
|| upsertDataKeepDeletes
|| upsertKeepDeletes
// if nothing above then exclude deletes
|| !(r.getField(opColumn).equals("d")))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import static org.apache.iceberg.types.Types.NestedField.required;

/**
* Implementation of the consumer that delivers the messages into Amazon S3 destination.
* Implementation of the consumer that delivers the messages to iceberg table.
*
* @author Ismail Simsek
*/
Expand Down Expand Up @@ -111,10 +111,12 @@ public class IcebergEventsChangeConsumer extends BaseChangeConsumer implements D
@PostConstruct
void connect() throws InterruptedException {
if (!valueFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new InterruptedException("debezium.format.value={" + valueFormat + "} not supported! Supported (debezium.format.value=*) formats are {json,}!");
throw new InterruptedException("debezium.format.value={" + valueFormat + "} not supported, " +
"Supported (debezium.format.value=*) formats are {json,}!");
}
if (!keyFormat.equalsIgnoreCase(Json.class.getSimpleName().toLowerCase())) {
throw new InterruptedException("debezium.format.key={" + valueFormat + "} not supported! Supported (debezium.format.key=*) formats are {json,}!");
throw new InterruptedException("debezium.format.key={" + valueFormat + "} not supported, " +
"Supported (debezium.format.key=*) formats are {json,}!");
}

TableIdentifier tableIdentifier = TableIdentifier.of(Namespace.of(namespace), "debezium_events");
Expand Down Expand Up @@ -232,11 +234,11 @@ private void commitBatch(String destination, OffsetDateTime batchTime, ArrayList
.withPartition(pk)
.build();

LOGGER.debug("Appending new file '{}' !", dataFile.path());
LOGGER.debug("Appending new file '{}'", dataFile.path());
eventTable.newAppend()
.appendFile(dataFile)
.commit();
LOGGER.info("Committed {} events to table! {}", icebergRecords.size(), eventTable.location());
LOGGER.info("Committed {} events to table {}", icebergRecords.size(), eventTable.location());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.types.Types;
Expand All @@ -26,8 +25,6 @@
import org.slf4j.LoggerFactory;

/**
* Implementation of the consumer that delivers the messages into Amazon S3 destination.
*
* @author Ismail Simsek
*/
public class IcebergUtil {
Expand Down Expand Up @@ -110,72 +107,74 @@ public static GenericRecord getIcebergRecord(Schema schema, JsonNode data) throw
return IcebergUtil.getIcebergRecord(schema.asStruct(), data);
}

public static GenericRecord getIcebergRecord(Types.StructType nestedField, JsonNode data) throws InterruptedException {
public static GenericRecord getIcebergRecord(Types.StructType tableFields, JsonNode data) throws InterruptedException {
Map<String, Object> mappedResult = new HashMap<>();
LOGGER.debug("Processing nested field : " + nestedField);
LOGGER.debug("Processing nested field:{}", tableFields);

for (Types.NestedField field : nestedField.fields()) {
for (Types.NestedField field : tableFields.fields()) {
if (data == null || !data.has(field.name()) || data.get(field.name()) == null) {
mappedResult.put(field.name(), null);
continue;
}
jsonToGenericRecord(mappedResult, field, data.get(field.name()));
mappedResult.put(field.name(), jsonToGenericRecordVal(field, data.get(field.name())));
}
return GenericRecord.create(nestedField).copy(mappedResult);
}

private static void jsonToGenericRecord(Map<String, Object> mappedResult, Types.NestedField field,
JsonNode node) throws InterruptedException {
LOGGER.debug("Processing Field:" + field.name() + " Type:" + field.type());
return GenericRecord.create(tableFields).copy(mappedResult);
}

private static Object jsonToGenericRecordVal(Types.NestedField field,
JsonNode node) throws InterruptedException {
LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type());
final Object val;
switch (field.type().typeId()) {
case INTEGER: // int 4 bytes
mappedResult.put(field.name(), node.isNull() ? null : node.asInt());
val = node.isNull() ? null : node.asInt();
break;
case LONG: // long 8 bytes
mappedResult.put(field.name(), node.isNull() ? null : node.asLong());
val = node.isNull() ? null : node.asLong();
break;
case FLOAT: // float is represented in 32 bits,
mappedResult.put(field.name(), node.isNull() ? null : node.floatValue());
val = node.isNull() ? null : node.floatValue();
break;
case DOUBLE: // double is represented in 64 bits
mappedResult.put(field.name(), node.isNull() ? null : node.asDouble());
val = node.isNull() ? null : node.asDouble();
break;
case BOOLEAN:
mappedResult.put(field.name(), node.isNull() ? null : node.asBoolean());
val = node.isNull() ? null : node.asBoolean();
break;
case STRING:
mappedResult.put(field.name(), node.asText(null));
val = node.asText(null);
break;
case BINARY:
try {
mappedResult.put(field.name(), node.isNull() ? null : ByteBuffer.wrap(node.binaryValue()));
val = node.isNull() ? null : ByteBuffer.wrap(node.binaryValue());
} catch (IOException e) {
LOGGER.error("Failed converting '" + field.name() + "' binary value to iceberg record", e);
throw new InterruptedException("Failed Processing Event!" + e.getMessage());
}
break;
case LIST:
mappedResult.put(field.name(), jsonObjectMapper.convertValue(node, List.class));
val = jsonObjectMapper.convertValue(node, List.class);
break;
case MAP:
mappedResult.put(field.name(), jsonObjectMapper.convertValue(node, Map.class));
val = jsonObjectMapper.convertValue(node, Map.class);
break;
case STRUCT:
throw new RuntimeException("Cannot process recursive records!");
// Disabled because cannot recursively process StructType/NestedField
// // recursive call to get nested data/record
// Types.StructType nestedField = Types.StructType.of(field);
// GenericRecord r = getIcebergRecord(schema, nestedField, node.get(field.name()));
// mappedResult.put(field.name(), r);
// return r);
// // throw new RuntimeException("Cannot process recursive record!");
// break;
default:
// default to String type
mappedResult.put(field.name(), node.asText(null));
val = node.asText(null);
break;
}

return val;
}

public static Map<String, String> getConfigSubset(Config config, String prefix) {
Expand All @@ -191,13 +190,5 @@ public static Map<String, String> getConfigSubset(Config config, String prefix)
return ret;
}

public static Map<String, String> getConfigurationAsMap(Configuration conf) {
Map<String, String> config = new HashMap<String, String>();
for (Map.Entry<String, String> entry : conf) {
config.put(entry.getKey(), entry.getValue());
}
return config;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import static io.debezium.config.CommonConnectorConfig.DEFAULT_MAX_BATCH_SIZE;

/**
* Optimizes batch size around 85%-90% of max,batch.size using dynamically calculated sleep(ms)
* Optimizes batch size around 85%-90% of max,batch.size by dynamically calculating sleep(ms)
*
* @author Ismail Simsek
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package io.debezium.server.iceberg.batchsizewait;

/**
* Implementation of the consumer that delivers the messages into Amazon S3 destination.
* Implementation of the consumer that delivers the messages to iceberg tables.
*
* @author Ismail Simsek
*/
Expand All @@ -18,6 +18,7 @@ public interface InterfaceBatchSizeWait {
default void initizalize() {
}

void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException;
default void waitMs(Integer numRecordsProcessed, Integer processingTimeMs) throws InterruptedException{
}

}
Loading

0 comments on commit bcee6a6

Please sign in to comment.