Skip to content

Commit

Permalink
Minor improve naming
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Oct 27, 2021
1 parent 26c482d commit 3f58964
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.slf4j.Logger;
Expand Down Expand Up @@ -87,16 +88,28 @@ protected ArrayList<Record> toIcebergRecords(Schema schema, ArrayList<ChangeEven
return icebergRecords;
}

protected DataFile getDataFile(Table icebergTable, ArrayList<Record> icebergRecords) {

FileFormat getFileFormat(Table icebergTable){
String formatAsString = icebergTable.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
FileFormat fileFormat = FileFormat.valueOf(formatAsString.toUpperCase(Locale.ROOT));
return FileFormat.valueOf(formatAsString.toUpperCase(Locale.ROOT));
}

GenericAppenderFactory getAppender(Table icebergTable) {
return new GenericAppenderFactory(
icebergTable.schema(),
icebergTable.spec(),
Ints.toArray(icebergTable.schema().identifierFieldIds()),
icebergTable.schema(),
null);
}

protected DataFile getDataFile(Table icebergTable, ArrayList<Record> icebergRecords) {

FileFormat fileFormat = getFileFormat(icebergTable);
GenericAppenderFactory appender = getAppender(icebergTable);
final String fileName = UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + fileFormat.name();
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));

GenericAppenderFactory apender = new GenericAppenderFactory(icebergTable.schema(), icebergTable.spec());
DataWriter<Record> dw = apender.newDataWriter(icebergTable.encryption().encrypt(out), fileFormat, null);
DataWriter<Record> dw = appender.newDataWriter(icebergTable.encryption().encrypt(out), fileFormat, null);

icebergRecords.stream().filter(this.filterEvents()).forEach(dw::add);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@

import java.io.IOException;
import java.time.Instant;
import java.util.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Collectors;
Expand All @@ -29,11 +32,9 @@
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.primitives.Ints;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.iceberg.TableProperties.*;

@Dependent
@Named("IcebergTableOperatorUpsert")
Expand Down Expand Up @@ -61,20 +62,13 @@ public void initialize() {

private Optional<DeleteFile> getDeleteFile(Table icebergTable, ArrayList<Record> icebergRecords) {

String formatAsString = icebergTable.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT);
FileFormat fileFormat = FileFormat.valueOf(formatAsString.toUpperCase(Locale.ROOT));

FileFormat fileFormat = getFileFormat(icebergTable);
GenericAppenderFactory appender = getAppender(icebergTable);
final String fileName = "del-" + UUID.randomUUID() + "-" + Instant.now().toEpochMilli() + "." + fileFormat.name();
OutputFile out = icebergTable.io().newOutputFile(icebergTable.locationProvider().newDataLocation(fileName));
EncryptedOutputFile eout = icebergTable.encryption().encrypt(out);

GenericAppenderFactory apender = new GenericAppenderFactory(
icebergTable.schema(),
icebergTable.spec(),
Ints.toArray(icebergTable.schema().identifierFieldIds()),
icebergTable.schema(),
null);
EqualityDeleteWriter<Record> edw = apender.newEqDeleteWriter(eout, fileFormat, null);
EqualityDeleteWriter<Record> edw = appender.newEqDeleteWriter(eout, fileFormat, null);

// anything is not an insert.
// upsertKeepDeletes = false, which means delete deletes
Expand Down

0 comments on commit 3f58964

Please sign in to comment.