Skip to content

Commit

Permalink
Minor add code comments (#95)
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Aug 14, 2022
1 parent b1f0cae commit acdea18
Showing 1 changed file with 27 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,56 @@ public class IcebergTableWriterFactory {
@ConfigProperty(name = "debezium.sink.iceberg.upsert-keep-deletes", defaultValue = "true")
boolean upsertKeepDeletes;

private static int partitionId() {
return Integer.parseInt(dtFormater.format(Instant.now()));
}

public BaseTaskWriter<Record> create(Table icebergTable) {

// file format of the table parquet, orc ...
FileFormat format = IcebergUtil.getTableFileFormat(icebergTable);
// appender factory
GenericAppenderFactory appenderFactory = IcebergUtil.getTableAppender(icebergTable);
int partitionId = Integer.parseInt(dtFormater.format(Instant.now()));
OutputFileFactory fileFactory = OutputFileFactory.builderFor(icebergTable, partitionId, 1L)
OutputFileFactory fileFactory = OutputFileFactory.builderFor(icebergTable, partitionId(), 1L)
.defaultSpec(icebergTable.spec()).format(format).build();
// equality Field Ids
List<Integer> equalityFieldIds = new ArrayList<>(icebergTable.schema().identifierFieldIds());

BaseTaskWriter<Record> writer;

// 1. TABLE DONT HAVE identifierFieldIds
// 2. OR RUNNING APPEND MODE
// THEN USE APPEND WRITERS
if (icebergTable.schema().identifierFieldIds().isEmpty() || !upsert) {

if (upsert) {
// running in upsert mode but table dont have identifierFieldIds to do delete!
LOGGER.info("Table don't have Pk defined upsert is not possible falling back to append!");
}

if (icebergTable.spec().isUnpartitioned()) {
// table is un partitioned use un partitioned append writer
writer = new UnpartitionedWriter<>(
icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), Long.MAX_VALUE);

} else {
// table is partitioned use partitioned append writer
writer = new PartitionedAppendWriter(
icebergTable.spec(), format, appenderFactory, fileFactory, icebergTable.io(), Long.MAX_VALUE, icebergTable.schema());
}
} else if (icebergTable.spec().isUnpartitioned()) {

}

// ITS UPSERT MODE
// AND TABLE HAS identifierFieldIds
// USE DELTA WRITERS
else if (icebergTable.spec().isUnpartitioned()) {
// running with upsert mode + un partitioned table
writer = new UnpartitionedDeltaWriter(icebergTable.spec(), format, appenderFactory, fileFactory,
icebergTable.io(),
Long.MAX_VALUE, icebergTable.schema(), equalityFieldIds, true, upsertKeepDeletes);
} else {
// running with upsert mode + partitioned table
writer = new PartitionedDeltaWriter(icebergTable.spec(), format, appenderFactory, fileFactory,
icebergTable.io(),
Long.MAX_VALUE, icebergTable.schema(), equalityFieldIds, true, upsertKeepDeletes);
Expand Down

0 comments on commit acdea18

Please sign in to comment.