Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public final class DeleteFile
private final List<Integer> equalityFieldIds;
private final Map<Integer, byte[]> lowerBounds;
private final Map<Integer, byte[]> upperBounds;
private final long dataSequenceNumber;

public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile)
{
Expand All @@ -50,6 +51,8 @@ public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile)
Map<Integer, byte[]> upperBounds = firstNonNull(deleteFile.upperBounds(), ImmutableMap.<Integer, ByteBuffer>of())
.entrySet().stream().collect(toImmutableMap(Map.Entry::getKey, entry -> entry.getValue().array().clone()));

long dataSequenceNumber = deleteFile.dataSequenceNumber() != null ? deleteFile.dataSequenceNumber() : 0L;

return new DeleteFile(
fromIcebergFileContent(deleteFile.content()),
deleteFile.path().toString(),
Expand All @@ -58,7 +61,8 @@ public static DeleteFile fromIceberg(org.apache.iceberg.DeleteFile deleteFile)
deleteFile.fileSizeInBytes(),
Optional.ofNullable(deleteFile.equalityFieldIds()).orElseGet(ImmutableList::of),
lowerBounds,
upperBounds);
upperBounds,
dataSequenceNumber);
}

@JsonCreator
Expand All @@ -70,7 +74,8 @@ public DeleteFile(
@JsonProperty("fileSizeInBytes") long fileSizeInBytes,
@JsonProperty("equalityFieldIds") List<Integer> equalityFieldIds,
@JsonProperty("lowerBounds") Map<Integer, byte[]> lowerBounds,
@JsonProperty("upperBounds") Map<Integer, byte[]> upperBounds)
@JsonProperty("upperBounds") Map<Integer, byte[]> upperBounds,
@JsonProperty("dataSequenceNumber") long dataSequenceNumber)
{
this.content = requireNonNull(content, "content is null");
this.path = requireNonNull(path, "path is null");
Expand All @@ -80,6 +85,7 @@ public DeleteFile(
this.equalityFieldIds = ImmutableList.copyOf(requireNonNull(equalityFieldIds, "equalityFieldIds is null"));
this.lowerBounds = ImmutableMap.copyOf(requireNonNull(lowerBounds, "lowerBounds is null"));
this.upperBounds = ImmutableMap.copyOf(requireNonNull(upperBounds, "upperBounds is null"));
this.dataSequenceNumber = dataSequenceNumber;
}

@JsonProperty
Expand Down Expand Up @@ -130,12 +136,19 @@ public Map<Integer, byte[]> getUpperBounds()
return upperBounds;
}

@JsonProperty
public long getDataSequenceNumber()
{
return dataSequenceNumber;
}

@Override
public String toString()
{
return toStringHelper(this)
.addValue(path)
.add("records", recordCount)
.add("dataSequenceNumber", dataSequenceNumber)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ velox::connector::hive::iceberg::FileContent toVeloxFileContent(
return velox::connector::hive::iceberg::FileContent::kData;
} else if (content == protocol::iceberg::FileContent::POSITION_DELETES) {
return velox::connector::hive::iceberg::FileContent::kPositionalDeletes;
} else if (content == protocol::iceberg::FileContent::EQUALITY_DELETES) {
return velox::connector::hive::iceberg::FileContent::kEqualityDeletes;
}
VELOX_UNSUPPORTED("Unsupported file content: {}", fmt::underlying(content));
}
Expand Down Expand Up @@ -176,6 +178,9 @@ IcebergPrestoToVeloxConnector::toVeloxSplit(
VELOX_CHECK_NOT_NULL(
icebergSplit, "Unexpected split type {}", connectorSplit->_type);

const int64_t dataSequenceNumber =
icebergSplit->dataSequenceNumber; // NOLINT(facebook-bugprone-unchecked-pointer-access)

std::unordered_map<std::string, std::optional<std::string>> partitionKeys;
for (const auto& entry : icebergSplit->partitionKeys) {
partitionKeys.emplace(
Expand Down Expand Up @@ -205,14 +210,16 @@ IcebergPrestoToVeloxConnector::toVeloxSplit(
deleteFile.fileSizeInBytes,
std::vector(deleteFile.equalityFieldIds),
lowerBounds,
upperBounds);
upperBounds,
deleteFile.dataSequenceNumber);

deletes.emplace_back(icebergDeleteFile);
}


std::unordered_map<std::string, std::string> infoColumns = {
{"$data_sequence_number",
std::to_string(icebergSplit->dataSequenceNumber)},
std::to_string(dataSequenceNumber)},
{"$path", icebergSplit->path}};

return std::make_unique<velox::connector::hive::iceberg::HiveIcebergSplit>(
Expand All @@ -227,7 +234,9 @@ IcebergPrestoToVeloxConnector::toVeloxSplit(
nullptr,
splitContext->cacheable,
deletes,
infoColumns);
infoColumns,
std::nullopt,
dataSequenceNumber);
}

std::unique_ptr<velox::connector::ColumnHandle>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,13 @@ void to_json(json& j, const DeleteFile& p) {
"DeleteFile",
"Map<Integer, String>",
"upperBounds");
to_json_key(
j,
"dataSequenceNumber",
p.dataSequenceNumber,
"DeleteFile",
"int64_t",
"dataSequenceNumber");
}

void from_json(const json& j, DeleteFile& p) {
Expand Down Expand Up @@ -408,6 +415,13 @@ void from_json(const json& j, DeleteFile& p) {
"DeleteFile",
"Map<Integer, String>",
"upperBounds");
from_json_key(
j,
"dataSequenceNumber",
p.dataSequenceNumber,
"DeleteFile",
"int64_t",
"dataSequenceNumber");
}
} // namespace facebook::presto::protocol::iceberg
namespace facebook::presto::protocol::iceberg {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ void to_json(json& j, const ChangelogSplitInfo& p);
void from_json(const json& j, ChangelogSplitInfo& p);
} // namespace facebook::presto::protocol::iceberg
namespace facebook::presto::protocol::iceberg {
enum class FileContent { DATA, POSITION_DELETES, EQUALITY_DELETES };
enum class FileContent {
DATA,
POSITION_DELETES,
EQUALITY_DELETES,
};
extern void to_json(json& j, const FileContent& e);
extern void from_json(const json& j, FileContent& e);
} // namespace facebook::presto::protocol::iceberg
Expand All @@ -97,6 +101,7 @@ struct DeleteFile {
List<Integer> equalityFieldIds = {};
Map<Integer, String> lowerBounds = {};
Map<Integer, String> upperBounds = {};
int64_t dataSequenceNumber = {};
};
void to_json(json& j, const DeleteFile& p);
void from_json(const json& j, DeleteFile& p);
Expand Down
Loading