Skip to content

Commit

Permalink
Adapt iceberg 12 changes (#18)
Browse files Browse the repository at this point in the history
* create table with identifierFields
  • Loading branch information
ismailsimsek committed Aug 5, 2021
1 parent ac37634 commit 89fc05a
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,17 @@
package io.debezium.server.iceberg;

import java.io.IOException;
import java.util.*;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.iceberg.*;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,20 +32,19 @@
public class DebeziumToIcebergTable {
protected static final Logger LOGGER = LoggerFactory.getLogger(DebeziumToIcebergTable.class);

private final Schema tableSchema;
private final Schema tableRowIdentifierSchema;
private final List<Types.NestedField> tableColumns;
private final List<Types.NestedField> tableRowIdentifierColumns;

public DebeziumToIcebergTable(byte[] eventKey, byte[] eventVal) throws IOException {
tableSchema = extractSchema(eventVal);
tableRowIdentifierSchema = extractSchema(eventKey);
public DebeziumToIcebergTable(byte[] eventVal, byte[] eventKey) throws IOException {
tableColumns = extractSchema(eventVal);
tableRowIdentifierColumns = (eventKey == null) ? null : extractSchema(eventKey);
}

public DebeziumToIcebergTable(byte[] eventVal) throws IOException {
tableSchema = extractSchema(eventVal);
tableRowIdentifierSchema = null;
this(eventVal, null);
}

private Schema extractSchema(byte[] eventVal) throws IOException {
private List<Types.NestedField> extractSchema(byte[] eventVal) throws IOException {

JsonNode jsonEvent = IcebergUtil.jsonObjectMapper.readTree(eventVal);

Expand All @@ -51,53 +56,74 @@ private Schema extractSchema(byte[] eventVal) throws IOException {
return null;
}

public Schema getTableSchema() {
return tableSchema;
public boolean hasSchema() {
return tableColumns != null;
}

public Schema getTableRowIdentifierSchema() {
return tableRowIdentifierSchema;
}
private SortOrder getSortOrder(Schema schema) {
SortOrder so = SortOrder.unsorted();

private Schema getIcebergSchema(JsonNode eventSchema) {
return IcebergUtil.getIcebergSchema(eventSchema);
}
if (this.tableRowIdentifierColumns != null) {
SortOrder.Builder sob = SortOrder.builderFor(schema);
for (Types.NestedField coll : tableRowIdentifierColumns) {
sob = sob.asc(coll.name(), NullOrder.NULLS_FIRST);
}
so = sob.build();
}

public boolean hasSchema() {
return tableSchema != null;
return so;
}

public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) {
private Set<Integer> getRowIdentifierFieldIds() throws Exception {

if (this.hasSchema()) {
Catalog.TableBuilder tb = icebergCatalog.buildTable(tableIdentifier, this.tableSchema);
if (this.tableRowIdentifierColumns == null) {
return ImmutableSet.of();
}

if (this.tableRowIdentifierSchema != null) {
SortOrder.Builder sob = SortOrder.builderFor(tableSchema);
for (Types.NestedField coll : tableRowIdentifierSchema.columns()) {
sob = sob.asc(coll.name(), NullOrder.NULLS_FIRST);
Set<Integer> identifierFieldIds = new HashSet<>();

ListIterator<Types.NestedField> idIterator = this.tableRowIdentifierColumns.listIterator();
while (idIterator.hasNext()) {
Types.NestedField ic = idIterator.next();
boolean found = false;

ListIterator<Types.NestedField> colsIterator = this.tableColumns.listIterator();
while (colsIterator.hasNext()) {
Types.NestedField tc = colsIterator.next();
if (Objects.equals(tc.name(), ic.name())) {
identifierFieldIds.add(tc.fieldId());
// set columns as required its part of identifier filed
colsIterator.set(tc.asRequired());
found = true;
break;
}
tb.withSortOrder(sob.build());
// "@TODO waiting spec v2 // use as PK / RowKeyIdentifier
}

LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, tableSchema,
tableRowIdentifierSchema);
Table table = tb.create();
// @TODO remove once spec v2 released
return upgradeToFormatVersion2(table);
if (!found) {
throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns");
}

}

return null;
return identifierFieldIds;
}

// @TODO remove once spec v2 released! upgrading table to V2
public Table upgradeToFormatVersion2(Table icebergTable) {
TableOperations ops = ((BaseTable) icebergTable).operations();
TableMetadata meta = ops.current();
ops.commit(ops.current(), meta.upgradeToFormatVersion(2));
icebergTable.refresh();
return icebergTable;
public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) throws Exception {

Schema schema = new Schema(this.tableColumns, getRowIdentifierFieldIds());

if (this.hasSchema()) {
Catalog.TableBuilder tb = icebergCatalog.buildTable(tableIdentifier, schema)
.withProperty("format-version", "2")
.withSortOrder(getSortOrder(schema));

LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema,
schema.identifierFieldNames());

return tb.create();
}

return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
Expand All @@ -45,7 +47,6 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.eclipse.microprofile.config.ConfigProvider;
Expand Down Expand Up @@ -140,17 +141,13 @@ public String map(String destination) {
return destination.replace(".", "_");
}

private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent<Object, Object> event) {
private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent<Object, Object> event) throws Exception {
if (eventSchemaEnabled && event.value() != null) {
try {
DebeziumToIcebergTable eventSchema = event.key() == null
? new DebeziumToIcebergTable(getBytes(event.value()))
: new DebeziumToIcebergTable(getBytes(event.key()), getBytes(event.value()));
DebeziumToIcebergTable eventSchema = event.key() == null
? new DebeziumToIcebergTable(getBytes(event.value()))
: new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key()));

return eventSchema.create(icebergCatalog, tableIdentifier);
} catch (Exception e) {
LOGGER.warn("Failed creating iceberg table! {}", e.getMessage());
}
return eventSchema.create(icebergCatalog, tableIdentifier);
}
return null;
}
Expand All @@ -174,10 +171,11 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
icebergTable = icebergCatalog.loadTable(tableIdentifier);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
// get schema fom an event and create iceberg table
icebergTable = createIcebergTable(tableIdentifier, event.getValue().get(0));
if (icebergTable == null) {
LOGGER.warn("Iceberg table '{}' not found! Ignoring received data for the table!", tableIdentifier);
continue;
try {
icebergTable = createIcebergTable(tableIdentifier, event.getValue().get(0));
} catch (Exception e2) {
e.printStackTrace();
throw new InterruptedException("Failed to create iceberg table, " + e2.getMessage());
}
}
addToTable(icebergTable, event.getValue());
Expand Down Expand Up @@ -277,20 +275,11 @@ private void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object

}

private List<Integer> getEqualityFieldIds(Table icebergTable) {
List<Integer> fieldIds = new ArrayList<>();

for (SortField f : icebergTable.sortOrder().fields()) {
fieldIds.add(f.sourceId());
}
return fieldIds;
}

private DeleteFile getDeleteDataFile(Table icebergTable, ArrayList<Record> icebergRecords) throws InterruptedException {

final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));
List<Integer> equalityDeleteFieldIds = getEqualityFieldIds(icebergTable);
Set<Integer> equalityDeleteFieldIds = icebergTable.schema().identifierFieldIds();

EqualityDeleteWriter<Record> deleteWriter;

Expand All @@ -316,11 +305,9 @@ private DeleteFile getDeleteDataFile(Table icebergTable, ArrayList<Record> icebe
.overwrite()
.rowSchema(icebergTable.sortOrder().schema())
.withSpec(icebergTable.spec())
.equalityFieldIds(equalityDeleteFieldIds)
//.withKeyMetadata() // ??
.equalityFieldIds(Lists.newArrayList(icebergTable.schema().identifierFieldIds()))
.metricsConfig(MetricsConfig.fromProperties(icebergTable.properties()))
// .withPartition() // ??
// @TODO add sort order v12 ??
.withSortOrder(icebergTable.sortOrder())
.setAll(icebergTable.properties())
.buildEqualityWriter()
;
Expand All @@ -337,13 +324,13 @@ private DeleteFile getDeleteDataFile(Table icebergTable, ArrayList<Record> icebe
// Equality delete files identify deleted rows in a collection of data files by one or more column values,
// and may optionally contain additional columns of the deleted row.
return FileMetadata.deleteFileBuilder(icebergTable.spec())
.ofEqualityDeletes(ArrayUtil.toIntArray(equalityDeleteFieldIds))
.ofEqualityDeletes(Ints.toArray(icebergTable.schema().identifierFieldIds()))
.withFormat(FileFormat.PARQUET)
.withPath(out.location())
.withFileSizeInBytes(deleteWriter.length())
//.withMetrics(deleteWriter.metrics()) //
.withRecordCount(deleteRows.size()) // its mandatory field! replace when with iceberg V 0.12
//.withSortOrder(icebergTable.sortOrder())
.withFileSizeInBytes(deleteWriter.length())
.withRecordCount(deleteRows.size())
.withSortOrder(icebergTable.sortOrder())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ public class IcebergUtil {
protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergUtil.class);
protected static final ObjectMapper jsonObjectMapper = new ObjectMapper();

public static Schema getIcebergSchema(JsonNode eventSchema) {
public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema) {
LOGGER.debug(eventSchema.toString());
return getIcebergSchema(eventSchema, "", -1);
}

public static Schema getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
String schemaType = eventSchema.get("type").textValue();
LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType);
Expand Down Expand Up @@ -83,8 +83,8 @@ public static Schema getIcebergSchema(JsonNode eventSchema, String schemaName, i
//schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get()));
//break;
case "struct":
throw new RuntimeException("Event schema containing nested data '" + fieldName + "' cannot process nested" +
" data!");
throw new RuntimeException("Field:'" + fieldName + "' has nested data type, " +
"nested data types are not supported by consumer");
// //recursive call
// Schema subSchema = SchemaUtil.getIcebergSchema(jsonSchemaFieldNode, fieldName, ++columnId);
// schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema.columns())));
Expand All @@ -96,7 +96,7 @@ public static Schema getIcebergSchema(JsonNode eventSchema, String schemaName, i
break;
}
}
return new Schema(schemaColumns);
return schemaColumns;
}

public static boolean hasSchema(JsonNode jsonNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public void testSimpleUpload() {
Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.customers");
df.show(false);
return df.filter("id is not null").count() >= 4;
} catch (Exception e) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ public ConfigSource() {
config.put("debezium.source.database.server.name", "testc");
config.put("%postgresql.debezium.source.schema.whitelist", "inventory");
config.put("debezium.source.table.whitelist", "inventory.customers,inventory.orders,inventory.products," +
"inventory.geom,inventory.table_datatypes,inventory.test_date_table");
"inventory.table_datatypes,inventory.test_date_table");
config.put("%postgresql.debezium.source.database.whitelist", "inventory");
config.put("%mysql.debezium.source.table.whitelist", "inventory.customers");
config.put("debezium.source.include.schema.changes", "false");

config.put("quarkus.log.level", "INFO");
config.put("quarkus.log.category.\"org.apache.spark\".level", "WARN");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ public void testDatatypes() throws Exception {
"'3f207ac6-5dba-11eb-ae93-0242ac130002'::UUID, 'aBC'::bytea" +
")";
SourcePostgresqlDB.runSQL(sql);
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.table_datatypes");
df.show();
df.show(true);
return df.where("c_text is null AND c_varchar is null AND c_int is null " +
"AND c_date is null AND c_timestamp is null AND c_timestamptz is null " +
"AND c_float is null AND c_decimal is null AND c_numeric is null AND c_interval is null " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,11 @@
import io.debezium.serde.DebeziumSerdes;
import io.debezium.util.Testing;

import java.io.IOException;
import java.util.Collections;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.kafka.common.serialization.Serde;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
Expand All @@ -30,9 +27,10 @@ class TestIcebergUtil {
@Test
public void testNestedJsonRecord() throws JsonProcessingException {
Exception exception = assertThrows(Exception.class, () -> IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(serdeWithSchema).get("schema")));
assertEquals("Event schema containing nested data 'before' cannot process nested data!", exception.getMessage());
assertTrue(exception.getMessage().contains("nested data type"));
}

/*
@Test
public void testUnwrapJsonRecord() throws IOException, InterruptedException {
JsonNode event = new ObjectMapper().readTree(unwrapWithSchema).get("payload");
Expand All @@ -41,6 +39,7 @@ public void testUnwrapJsonRecord() throws IOException, InterruptedException {
assertEquals("orders", record.getField("__table").toString());
assertEquals(16850, record.getField("order_date"));
}
*/

@Test
public void valuePayloadWithSchemaAsJsonNode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class MaxBatchSizeWaitTest extends BaseSparkTest {


@Test
public void testPerformance() throws Exception {
public void testBatchsizeWait() throws Exception {
int iteration = 100;
PGCreateTestDataTable();
for (int i = 0; i <= iteration; i++) {
Expand All @@ -47,7 +47,7 @@ public void testPerformance() throws Exception {
.sql("SELECT substring(input_file,94,60) as input_file, " +
"count(*) as batch_size FROM global_temp.test_date_table_batch_size group " +
"by 1");
df.show(false);
//df.show(false);
return df.filter("batch_size = " + maxBatchSize).count() >= 5;
} catch (Exception e) {
return false;
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<version.awssdk>2.16.88</version.awssdk>
<version.parquet>1.11.1</version.parquet>
<!-- Debezium -->
<version.debezium>1.6.0.Final</version.debezium>
<version.debezium>1.7.0.Alpha1</version.debezium>
<!-- Quarkus -->
<version.quarkus>2.0.0.Final</version.quarkus>
</properties>
Expand Down

0 comments on commit 89fc05a

Please sign in to comment.