Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade iceberg to 0.13.2 #88

Merged
merged 3 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion debezium-server-iceberg-sink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
<artifactId>iceberg-spark-runtime-3.2_2.12</artifactId>
<version>${version.iceberg}</version>
</dependency>

<!-- AWS -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class IcebergChangeEvent {
protected final String destination;
protected final JsonNode value;
protected final JsonNode key;
JsonSchema jsonSchema;
final JsonSchema jsonSchema;

public IcebergChangeEvent(String destination,
JsonNode value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<Record> {
private final Schema schema;
private final Schema deleteSchema;
private final InternalRecordWrapper wrapper;
private final InternalRecordWrapper keyWrapper;
private final boolean upsert;
private final boolean upsertKeepDeletes;

Expand All @@ -35,6 +36,7 @@ abstract class BaseDeltaTaskWriter extends BaseTaskWriter<Record> {
this.schema = schema;
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));
this.wrapper = new InternalRecordWrapper(schema.asStruct());
this.keyWrapper = new InternalRecordWrapper(deleteSchema.asStruct());
this.upsert = upsert;
this.upsertKeepDeletes = upsertKeepDeletes;
}
Expand All @@ -50,7 +52,6 @@ public void write(Record row) throws IOException {
RowDataDeltaWriter writer = route(row);
if (upsert && !row.getField("__op").equals("c")) {// anything which not an insert is upsert
writer.delete(row);
//System.out.println("->" + row);
}
// if its deleted row and upsertKeepDeletes = true then add deleted record to target table
// else deleted records are deleted from target table
Expand All @@ -71,5 +72,10 @@ public class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
protected StructLike asStructLike(Record data) {
return wrapper.wrap(data);
}

@Override
protected StructLike asStructLikeKey(Record data) {
return keyWrapper.wrap(data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,19 @@ private List<IcebergChangeEvent> deduplicateBatch(List<IcebergChangeEvent> event
return new ArrayList<>(icebergRecordsmap.values());
}


/**
* This is used to deduplicate events within given batch.
* <p>
* Forex ample a record can be updated multiple times in the source. for example insert followed by update and
* delete. for this case we need to only pick last change event for the row.
* <p>
* Its used when `upsert` feature enabled (when the consumer operating non append mode) which means it should not add
* duplicate records to target table.
*
* @param lhs
* @param rhs
* @return
*/
private int compareByTsThenOp(JsonNode lhs, JsonNode rhs) {

int result = Long.compare(lhs.get(sourceTsMsColumn).asLong(0), rhs.get(sourceTsMsColumn).asLong(0));
Expand All @@ -94,6 +106,15 @@ private int compareByTsThenOp(JsonNode lhs, JsonNode rhs) {
return result;
}

/**
* If given schema contains new fields compared to target table schema then it adds new fields to target iceberg
* table.
* <p>
* Its used when allow field addition feature is enabled.
*
* @param icebergTable
* @param newSchema
*/
private void applyFieldAddition(Table icebergTable, Schema newSchema) {

UpdateSchema us = icebergTable.updateSchema().
Expand All @@ -108,6 +129,18 @@ private void applyFieldAddition(Table icebergTable, Schema newSchema) {
}
}

/**
* Adds list of events to iceberg table.
* <p>
* If field addition enabled then it groups list of change events by their schema first. Then adds new fields to
* iceberg table if there is any. And then follows with adding data to the table.
* <p>
* New fields are detected using CDC event schema, since events are grouped by their schemas it uses single
* event to find-out schema for the whole list of events.
*
* @param icebergTable
* @param events
*/
public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {

// when operation mode is not upsert deduplicate the events to avoid inserting duplicate row
Expand All @@ -134,6 +167,12 @@ public void addToTable(Table icebergTable, List<IcebergChangeEvent> events) {

}

/**
* Adds list of change events to iceberg table. All the events are having same schema.
*
* @param icebergTable
* @param events
*/
private void addToTablePerSchema(Table icebergTable, List<IcebergChangeEvent> events) {
// Initialize a task writer to write both INSERT and equality DELETE.
BaseTaskWriter<Record> writer = writerFactory.create(icebergTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

public class PartitionedAppendWriter extends PartitionedWriter<Record> {
private final PartitionKey partitionKey;
InternalRecordWrapper wrapper;
final InternalRecordWrapper wrapper;

public PartitionedAppendWriter(PartitionSpec spec, FileFormat format,
FileAppenderFactory<Record> appenderFactory,
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<version.groovy>3.0.9</version.groovy>
<version.assembly.plugin>3.3.0</version.assembly.plugin>
<version.jackson>2.12.6</version.jackson>
<version.iceberg>0.13.1</version.iceberg>
<version.iceberg>0.13.2</version.iceberg>
<version.spark>3.2.1</version.spark>
<version.hadoop>3.3.1</version.hadoop>
<version.awssdk>2.17.120</version.awssdk>
Expand Down