Skip to content

Commit

Permalink
Upgrade iceberg to 0.13.2 (#88)
Browse files Browse the repository at this point in the history
* Upgrade iceberg 0.13.2

* Improve code comments
  • Loading branch information
ismailsimsek committed Jun 14, 2022
1 parent 7882ce5 commit 63e1793
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 6 deletions.
1 change: 0 additions & 1 deletion debezium-server-iceberg-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
<artifactId>iceberg-spark-runtime-3.2_2.12</artifactId>
<version>${version.iceberg}</version>
</dependency>

<!-- AWS -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class IcebergChangeEvent {
protected final String destination;
protected final JsonNode value;
protected final JsonNode key;
JsonSchema jsonSchema;
final JsonSchema jsonSchema;

public IcebergChangeEvent(String destination,
JsonNode value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<Record> {
private final Schema schema;
private final Schema deleteSchema;
private final InternalRecordWrapper wrapper;
private final InternalRecordWrapper keyWrapper;
private final boolean upsert;
private final boolean upsertKeepDeletes;

Expand All @@ -35,6 +36,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<Record> {
this.schema = schema;
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.wrapper = new InternalRecordWrapper(schema.asStruct());
this.keyWrapper = new InternalRecordWrapper(deleteSchema.asStruct());
this.upsert = upsert;
this.upsertKeepDeletes = upsertKeepDeletes;
}
Expand All @@ -50,7 +52,6 @@ public void write(Record row) throws IOException {
RowDataDeltaWriter writer = route(row);
if (upsert && !row.getField("__op").equals("c")) {// anything which not an insert is upsert
writer.delete(row);
//System.out.println("->" + row);
}
// if its deleted row and upsertKeepDeletes = true then add deleted record to target table
// else deleted records are deleted from target table
Expand All @@ -71,5 +72,10 @@ public class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
protected StructLike asStructLike(Record data) {
return wrapper.wrap(data);
}

@Override
protected StructLike asStructLikeKey(Record data) {
return keyWrapper.wrap(data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,19 @@ private List<IcebergChangeEvent> deduplicateBatch(List<IcebergChangeEvent> event
return new ArrayList<>(icebergRecordsmap.values());
}


/**
* This is used to deduplicate events within given batch.
* <p>
* Forex ample a record can be updated multiple times in the source. for example insert followed by update and
* delete. for this case we need to only pick last change event for the row.
* <p>
* Its used when `upsert` feature enabled (when the consumer operating non append mode) which means it should not add
* duplicate records to target table.
*
* @param lhs
* @param rhs
* @return
*/
private int compareByTsThenOp(JsonNode lhs, JsonNode rhs) {

int result = Long.compare(lhs.get(sourceTsMsColumn).asLong(0), rhs.get(sourceTsMsColumn).asLong(0));
Expand All @@ -94,6 +106,15 @@ private int compareByTsThenOp(JsonNode lhs, JsonNode rhs) {
return result;
}

/**
* If given schema contains new fields compared to target table schema then it adds new fields to target iceberg
* table.
* <p>
* Its used when allow field addition feature is enabled.
*
* @param icebergTable
* @param newSchema
*/
private void applyFieldAddition(Table icebergTable, Schema newSchema) {

UpdateSchema us = icebergTable.updateSchema().
Expand All @@ -108,6 +129,18 @@ private void applyFieldAddition(Table icebergTable, Schema newSchema) {
}
}

/**
* Adds list of events to iceberg table.
* <p>
* If field addition enabled then it groups list of change events by their schema first. Then adds new fields to
* iceberg table if there is any. And then follows with adding data to the table.
* <p>
* New fields are detected using CDC event schema, since events are grouped by their schemas it uses single
* event to find-out schema for the whole list of events.
*
* @param icebergTable
* @param events
*/
public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {

// when operation mode is not upsert deduplicate the events to avoid inserting duplicate row
Expand All @@ -134,6 +167,12 @@ public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {

}

/**
* Adds list of change events to iceberg table. All the events are having same schema.
*
* @param icebergTable
* @param events
*/
private void addToTablePerSchema(Table icebergTable, List<IcebergChangeEvent> events) {
// Initialize a task writer to write both INSERT and equality DELETE.
BaseTaskWriter<Record> writer = writerFactory.create(icebergTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

public class PartitionedAppendWriter extends PartitionedWriter<Record> {
private final PartitionKey partitionKey;
InternalRecordWrapper wrapper;
final InternalRecordWrapper wrapper;

public PartitionedAppendWriter(PartitionSpec spec, FileFormat format,
FileAppenderFactory<Record> appenderFactory,
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<version.groovy>3.0.9</version.groovy>
<version.assembly.plugin>3.3.0</version.assembly.plugin>
<version.jackson>2.12.6</version.jackson>
<version.iceberg>0.13.1</version.iceberg>
<version.iceberg>0.13.2</version.iceberg>
<version.spark>3.2.1</version.spark>
<version.hadoop>3.3.1</version.hadoop>
<version.awssdk>2.17.120</version.awssdk>
Expand Down

0 comments on commit 63e1793

Please sign in to comment.