Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt iceberg 12 changes #18

Merged
merged 7 commits into from
Aug 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,17 @@
package io.debezium.server.iceberg;

import java.io.IOException;
import java.util.*;

import com.fasterxml.jackson.databind.JsonNode;
import org.apache.iceberg.*;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -26,20 +32,19 @@
public class DebeziumToIcebergTable {
protected static final Logger LOGGER = LoggerFactory.getLogger(DebeziumToIcebergTable.class);

private final Schema tableSchema;
private final Schema tableRowIdentifierSchema;
private final List<Types.NestedField> tableColumns;
private final List<Types.NestedField> tableRowIdentifierColumns;

public DebeziumToIcebergTable(byte[] eventKey, byte[] eventVal) throws IOException {
tableSchema = extractSchema(eventVal);
tableRowIdentifierSchema = extractSchema(eventKey);
public DebeziumToIcebergTable(byte[] eventVal, byte[] eventKey) throws IOException {
tableColumns = extractSchema(eventVal);
tableRowIdentifierColumns = (eventKey == null) ? null : extractSchema(eventKey);
}

public DebeziumToIcebergTable(byte[] eventVal) throws IOException {
tableSchema = extractSchema(eventVal);
tableRowIdentifierSchema = null;
this(eventVal, null);
}

private Schema extractSchema(byte[] eventVal) throws IOException {
private List<Types.NestedField> extractSchema(byte[] eventVal) throws IOException {

JsonNode jsonEvent = IcebergUtil.jsonObjectMapper.readTree(eventVal);

Expand All @@ -51,53 +56,74 @@ private Schema extractSchema(byte[] eventVal) throws IOException {
return null;
}

public Schema getTableSchema() {
return tableSchema;
public boolean hasSchema() {
return tableColumns != null;
}

public Schema getTableRowIdentifierSchema() {
return tableRowIdentifierSchema;
}
private SortOrder getSortOrder(Schema schema) {
SortOrder so = SortOrder.unsorted();

private Schema getIcebergSchema(JsonNode eventSchema) {
return IcebergUtil.getIcebergSchema(eventSchema);
}
if (this.tableRowIdentifierColumns != null) {
SortOrder.Builder sob = SortOrder.builderFor(schema);
for (Types.NestedField coll : tableRowIdentifierColumns) {
sob = sob.asc(coll.name(), NullOrder.NULLS_FIRST);
}
so = sob.build();
}

public boolean hasSchema() {
return tableSchema != null;
return so;
}

public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) {
private Set<Integer> getRowIdentifierFieldIds() throws Exception {

if (this.hasSchema()) {
Catalog.TableBuilder tb = icebergCatalog.buildTable(tableIdentifier, this.tableSchema);
if (this.tableRowIdentifierColumns == null) {
return ImmutableSet.of();
}

if (this.tableRowIdentifierSchema != null) {
SortOrder.Builder sob = SortOrder.builderFor(tableSchema);
for (Types.NestedField coll : tableRowIdentifierSchema.columns()) {
sob = sob.asc(coll.name(), NullOrder.NULLS_FIRST);
Set<Integer> identifierFieldIds = new HashSet<>();

ListIterator<Types.NestedField> idIterator = this.tableRowIdentifierColumns.listIterator();
while (idIterator.hasNext()) {
Types.NestedField ic = idIterator.next();
boolean found = false;

ListIterator<Types.NestedField> colsIterator = this.tableColumns.listIterator();
while (colsIterator.hasNext()) {
Types.NestedField tc = colsIterator.next();
if (Objects.equals(tc.name(), ic.name())) {
identifierFieldIds.add(tc.fieldId());
// set columns as required its part of identifier filed
colsIterator.set(tc.asRequired());
found = true;
break;
}
tb.withSortOrder(sob.build());
// "@TODO waiting spec v2 // use as PK / RowKeyIdentifier
}

LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, tableSchema,
tableRowIdentifierSchema);
Table table = tb.create();
// @TODO remove once spec v2 released
return upgradeToFormatVersion2(table);
if (!found) {
throw new ValidationException("Table Row identifier field `" + ic.name() + "` not found in table columns");
}

}

return null;
return identifierFieldIds;
}

// @TODO remove once spec v2 released! upgrading table to V2
public Table upgradeToFormatVersion2(Table icebergTable) {
TableOperations ops = ((BaseTable) icebergTable).operations();
TableMetadata meta = ops.current();
ops.commit(ops.current(), meta.upgradeToFormatVersion(2));
icebergTable.refresh();
return icebergTable;
public Table create(Catalog icebergCatalog, TableIdentifier tableIdentifier) throws Exception {

Schema schema = new Schema(this.tableColumns, getRowIdentifierFieldIds());

if (this.hasSchema()) {
Catalog.TableBuilder tb = icebergCatalog.buildTable(tableIdentifier, schema)
.withProperty("format-version", "2")
.withSortOrder(getSortOrder(schema));

LOGGER.warn("Creating table:'{}'\nschema:{}\nrowIdentifier:{}", tableIdentifier, schema,
schema.identifierFieldNames());

return tb.create();
}

return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
Expand All @@ -45,7 +47,6 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.util.ArrayUtil;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.eclipse.microprofile.config.ConfigProvider;
Expand Down Expand Up @@ -140,17 +141,13 @@ public String map(String destination) {
return destination.replace(".", "_");
}

private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent<Object, Object> event) {
private Table createIcebergTable(TableIdentifier tableIdentifier, ChangeEvent<Object, Object> event) throws Exception {
if (eventSchemaEnabled && event.value() != null) {
try {
DebeziumToIcebergTable eventSchema = event.key() == null
? new DebeziumToIcebergTable(getBytes(event.value()))
: new DebeziumToIcebergTable(getBytes(event.key()), getBytes(event.value()));
DebeziumToIcebergTable eventSchema = event.key() == null
? new DebeziumToIcebergTable(getBytes(event.value()))
: new DebeziumToIcebergTable(getBytes(event.value()), getBytes(event.key()));

return eventSchema.create(icebergCatalog, tableIdentifier);
} catch (Exception e) {
LOGGER.warn("Failed creating iceberg table! {}", e.getMessage());
}
return eventSchema.create(icebergCatalog, tableIdentifier);
}
return null;
}
Expand All @@ -174,10 +171,11 @@ public void handleBatch(List<ChangeEvent<Object, Object>> records, DebeziumEngin
icebergTable = icebergCatalog.loadTable(tableIdentifier);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
// get schema fom an event and create iceberg table
icebergTable = createIcebergTable(tableIdentifier, event.getValue().get(0));
if (icebergTable == null) {
LOGGER.warn("Iceberg table '{}' not found! Ignoring received data for the table!", tableIdentifier);
continue;
try {
icebergTable = createIcebergTable(tableIdentifier, event.getValue().get(0));
} catch (Exception e2) {
e.printStackTrace();
throw new InterruptedException("Failed to create iceberg table, " + e2.getMessage());
}
}
addToTable(icebergTable, event.getValue());
Expand Down Expand Up @@ -277,20 +275,11 @@ private void addToTable(Table icebergTable, ArrayList<ChangeEvent<Object, Object

}

private List<Integer> getEqualityFieldIds(Table icebergTable) {
List<Integer> fieldIds = new ArrayList<>();

for (SortField f : icebergTable.sortOrder().fields()) {
fieldIds.add(f.sourceId());
}
return fieldIds;
}

private DeleteFile getDeleteDataFile(Table icebergTable, ArrayList<Record> icebergRecords) throws InterruptedException {

final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + FileFormat.PARQUET;
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));
List<Integer> equalityDeleteFieldIds = getEqualityFieldIds(icebergTable);
Set<Integer> equalityDeleteFieldIds = icebergTable.schema().identifierFieldIds();

EqualityDeleteWriter<Record> deleteWriter;

Expand All @@ -316,11 +305,9 @@ private DeleteFile getDeleteDataFile(Table icebergTable, ArrayList<Record> icebe
.overwrite()
.rowSchema(icebergTable.sortOrder().schema())
.withSpec(icebergTable.spec())
.equalityFieldIds(equalityDeleteFieldIds)
//.withKeyMetadata() // ??
.equalityFieldIds(Lists.newArrayList(icebergTable.schema().identifierFieldIds()))
.metricsConfig(MetricsConfig.fromProperties(icebergTable.properties()))
// .withPartition() // ??
// @TODO add sort order v12 ??
.withSortOrder(icebergTable.sortOrder())
.setAll(icebergTable.properties())
.buildEqualityWriter()
;
Expand All @@ -337,13 +324,13 @@ private DeleteFile getDeleteDataFile(Table icebergTable, ArrayList<Record> icebe
// Equality delete files identify deleted rows in a collection of data files by one or more column values,
// and may optionally contain additional columns of the deleted row.
return FileMetadata.deleteFileBuilder(icebergTable.spec())
.ofEqualityDeletes(ArrayUtil.toIntArray(equalityDeleteFieldIds))
.ofEqualityDeletes(Ints.toArray(icebergTable.schema().identifierFieldIds()))
.withFormat(FileFormat.PARQUET)
.withPath(out.location())
.withFileSizeInBytes(deleteWriter.length())
//.withMetrics(deleteWriter.metrics()) //
.withRecordCount(deleteRows.size()) // its mandatory field! replace when with iceberg V 0.12
//.withSortOrder(icebergTable.sortOrder())
.withFileSizeInBytes(deleteWriter.length())
.withRecordCount(deleteRows.size())
.withSortOrder(icebergTable.sortOrder())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ public class IcebergUtil {
protected static final Logger LOGGER = LoggerFactory.getLogger(IcebergUtil.class);
protected static final ObjectMapper jsonObjectMapper = new ObjectMapper();

public static Schema getIcebergSchema(JsonNode eventSchema) {
public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema) {
LOGGER.debug(eventSchema.toString());
return getIcebergSchema(eventSchema, "", -1);
}

public static Schema getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
public static List<Types.NestedField> getIcebergSchema(JsonNode eventSchema, String schemaName, int columnId) {
List<Types.NestedField> schemaColumns = new ArrayList<>();
String schemaType = eventSchema.get("type").textValue();
LOGGER.debug("Converting Schema of: {}::{}", schemaName, schemaType);
Expand Down Expand Up @@ -83,8 +83,8 @@ public static Schema getIcebergSchema(JsonNode eventSchema, String schemaName, i
//schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StringType.get()));
//break;
case "struct":
throw new RuntimeException("Event schema containing nested data '" + fieldName + "' cannot process nested" +
" data!");
throw new RuntimeException("Field:'" + fieldName + "' has nested data type, " +
"nested data types are not supported by consumer");
// //recursive call
// Schema subSchema = SchemaUtil.getIcebergSchema(jsonSchemaFieldNode, fieldName, ++columnId);
// schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema.columns())));
Expand All @@ -96,7 +96,7 @@ public static Schema getIcebergSchema(JsonNode eventSchema, String schemaName, i
break;
}
}
return new Schema(schemaColumns);
return schemaColumns;
}

public static boolean hasSchema(JsonNode jsonNode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public void testSimpleUpload() {
Awaitility.await().atMost(Duration.ofSeconds(60)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.customers");
df.show(false);
return df.filter("id is not null").count() >= 4;
} catch (Exception e) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ public ConfigSource() {
config.put("debezium.source.database.server.name", "testc");
config.put("%postgresql.debezium.source.schema.whitelist", "inventory");
config.put("debezium.source.table.whitelist", "inventory.customers,inventory.orders,inventory.products," +
"inventory.geom,inventory.table_datatypes,inventory.test_date_table");
"inventory.table_datatypes,inventory.test_date_table");
config.put("%postgresql.debezium.source.database.whitelist", "inventory");
config.put("%mysql.debezium.source.table.whitelist", "inventory.customers");
config.put("debezium.source.include.schema.changes", "false");

config.put("quarkus.log.level", "INFO");
config.put("quarkus.log.category.\"org.apache.spark\".level", "WARN");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ public void testDatatypes() throws Exception {
"'3f207ac6-5dba-11eb-ae93-0242ac130002'::UUID, 'aBC'::bytea" +
")";
SourcePostgresqlDB.runSQL(sql);
Awaitility.await().atMost(Duration.ofSeconds(120)).until(() -> {
Awaitility.await().atMost(Duration.ofSeconds(320)).until(() -> {
try {
Dataset<Row> df = getTableData("testc.inventory.table_datatypes");
df.show();
df.show(true);
return df.where("c_text is null AND c_varchar is null AND c_int is null " +
"AND c_date is null AND c_timestamp is null AND c_timestamptz is null " +
"AND c_float is null AND c_decimal is null AND c_numeric is null AND c_interval is null " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,11 @@
import io.debezium.serde.DebeziumSerdes;
import io.debezium.util.Testing;

import java.io.IOException;
import java.util.Collections;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.kafka.common.serialization.Serde;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
Expand All @@ -30,9 +27,10 @@ class TestIcebergUtil {
@Test
public void testNestedJsonRecord() throws JsonProcessingException {
Exception exception = assertThrows(Exception.class, () -> IcebergUtil.getIcebergSchema(new ObjectMapper().readTree(serdeWithSchema).get("schema")));
assertEquals("Event schema containing nested data 'before' cannot process nested data!", exception.getMessage());
assertTrue(exception.getMessage().contains("nested data type"));
}

/*
@Test
public void testUnwrapJsonRecord() throws IOException, InterruptedException {
JsonNode event = new ObjectMapper().readTree(unwrapWithSchema).get("payload");
Expand All @@ -41,6 +39,7 @@ public void testUnwrapJsonRecord() throws IOException, InterruptedException {
assertEquals("orders", record.getField("__table").toString());
assertEquals(16850, record.getField("order_date"));
}
*/

@Test
public void valuePayloadWithSchemaAsJsonNode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class MaxBatchSizeWaitTest extends BaseSparkTest {


@Test
public void testPerformance() throws Exception {
public void testBatchsizeWait() throws Exception {
int iteration = 100;
PGCreateTestDataTable();
for (int i = 0; i <= iteration; i++) {
Expand All @@ -47,7 +47,7 @@ public void testPerformance() throws Exception {
.sql("SELECT substring(input_file,94,60) as input_file, " +
"count(*) as batch_size FROM global_temp.test_date_table_batch_size group " +
"by 1");
df.show(false);
//df.show(false);
return df.filter("batch_size = " + maxBatchSize).count() >= 5;
} catch (Exception e) {
return false;
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<version.awssdk>2.16.88</version.awssdk>
<version.parquet>1.11.1</version.parquet>
<!-- Debezium -->
<version.debezium>1.6.0.Final</version.debezium>
<version.debezium>1.7.0.Alpha1</version.debezium>
<!-- Quarkus -->
<version.quarkus>2.0.0.Final</version.quarkus>
</properties>
Expand Down