Skip to content

Commit ff677ae

Browse files
committed
Add iceberg partition transforms. Co-authored-by: Chengcheng Jin <[email protected]>
1 parent 39c7a70 commit ff677ae

36 files changed

+4087
-253
lines changed

velox/connectors/hive/HiveDataSink.cpp

Lines changed: 44 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,18 @@ HiveDataSink::HiveDataSink(
374374
*insertTableHandle->bucketProperty(),
375375
inputType)
376376
: nullptr,
377-
getNonPartitionChannels(insertTableHandle)) {}
377+
getPartitionChannels(insertTableHandle),
378+
getNonPartitionChannels(insertTableHandle),
379+
!getPartitionChannels(insertTableHandle).empty()
380+
? std::make_unique<PartitionIdGenerator>(
381+
inputType,
382+
getPartitionChannels(insertTableHandle),
383+
hiveConfig->maxPartitionsPerWriters(
384+
connectorQueryCtx->sessionProperties()),
385+
connectorQueryCtx->memoryPool(),
386+
hiveConfig->isPartitionPathAsLowerCase(
387+
connectorQueryCtx->sessionProperties()))
388+
: nullptr) {}
378389

379390
HiveDataSink::HiveDataSink(
380391
RowTypePtr inputType,
@@ -384,7 +395,9 @@ HiveDataSink::HiveDataSink(
384395
const std::shared_ptr<const HiveConfig>& hiveConfig,
385396
uint32_t bucketCount,
386397
std::unique_ptr<core::PartitionFunction> bucketFunction,
387-
const std::vector<column_index_t>& dataChannels)
398+
const std::vector<column_index_t>& partitionChannels,
399+
const std::vector<column_index_t>& dataChannels,
400+
std::unique_ptr<PartitionIdGenerator> partitionIdGenerator)
388401
: inputType_(std::move(inputType)),
389402
insertTableHandle_(std::move(insertTableHandle)),
390403
connectorQueryCtx_(connectorQueryCtx),
@@ -393,17 +406,8 @@ HiveDataSink::HiveDataSink(
393406
updateMode_(getUpdateMode()),
394407
maxOpenWriters_(hiveConfig_->maxPartitionsPerWriters(
395408
connectorQueryCtx->sessionProperties())),
396-
partitionChannels_(getPartitionChannels(insertTableHandle_)),
397-
partitionIdGenerator_(
398-
!partitionChannels_.empty()
399-
? std::make_unique<PartitionIdGenerator>(
400-
inputType_,
401-
partitionChannels_,
402-
maxOpenWriters_,
403-
connectorQueryCtx_->memoryPool(),
404-
hiveConfig_->isPartitionPathAsLowerCase(
405-
connectorQueryCtx->sessionProperties()))
406-
: nullptr),
409+
partitionChannels_(partitionChannels),
410+
partitionIdGenerator_(std::move(partitionIdGenerator)),
407411
dataChannels_(dataChannels),
408412
bucketCount_(static_cast<int32_t>(bucketCount)),
409413
bucketFunction_(std::move(bucketFunction)),
@@ -753,6 +757,32 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
753757
ioStats_.emplace_back(std::make_shared<io::IoStatistics>());
754758
setMemoryReclaimers(writerInfo_.back().get(), ioStats_.back().get());
755759

760+
auto options = createWriterOptions();
761+
// Prevents the memory allocation during the writer creation.
762+
WRITER_NON_RECLAIMABLE_SECTION_GUARD(writerInfo_.size() - 1);
763+
auto writer = writerFactory_->createWriter(
764+
dwio::common::FileSink::create(
765+
writePath,
766+
{
767+
.bufferWrite = false,
768+
.connectorProperties = hiveConfig_->config(),
769+
.fileCreateConfig = hiveConfig_->writeFileCreateConfig(),
770+
.pool = writerInfo_.back()->sinkPool.get(),
771+
.metricLogger = dwio::common::MetricsLog::voidLog(),
772+
.stats = ioStats_.back().get(),
773+
}),
774+
options);
775+
writer = maybeCreateBucketSortWriter(std::move(writer));
776+
writers_.emplace_back(std::move(writer));
777+
778+
extendBuffersForPartitionedTables();
779+
780+
writerIndexMap_.emplace(id, writers_.size() - 1);
781+
return writerIndexMap_[id];
782+
}
783+
784+
std::shared_ptr<dwio::common::WriterOptions> HiveDataSink::createWriterOptions()
785+
const {
756786
// Take the writer options provided by the user as a starting point, or
757787
// allocate a new one.
758788
auto options = insertTableHandle_->writerOptions();
@@ -802,28 +832,7 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
802832
options->adjustTimestampToTimezone =
803833
connectorQueryCtx_->adjustTimestampToTimezone();
804834
options->processConfigs(*hiveConfig_->config(), *connectorSessionProperties);
805-
806-
// Prevents the memory allocation during the writer creation.
807-
WRITER_NON_RECLAIMABLE_SECTION_GUARD(writerInfo_.size() - 1);
808-
auto writer = writerFactory_->createWriter(
809-
dwio::common::FileSink::create(
810-
writePath,
811-
{
812-
.bufferWrite = false,
813-
.connectorProperties = hiveConfig_->config(),
814-
.fileCreateConfig = hiveConfig_->writeFileCreateConfig(),
815-
.pool = writerInfo_.back()->sinkPool.get(),
816-
.metricLogger = dwio::common::MetricsLog::voidLog(),
817-
.stats = ioStats_.back().get(),
818-
}),
819-
options);
820-
writer = maybeCreateBucketSortWriter(std::move(writer));
821-
writers_.emplace_back(std::move(writer));
822-
823-
extendBuffersForPartitionedTables();
824-
825-
writerIndexMap_.emplace(id, writers_.size() - 1);
826-
return writerIndexMap_[id];
835+
return options;
827836
}
828837

829838
std::optional<std::string> HiveDataSink::getPartitionName(

velox/connectors/hive/HiveDataSink.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,9 @@ class HiveDataSink : public DataSink {
529529
const std::shared_ptr<const HiveConfig>& hiveConfig,
530530
uint32_t bucketCount,
531531
std::unique_ptr<core::PartitionFunction> bucketFunction,
532-
const std::vector<column_index_t>& dataChannels);
532+
const std::vector<column_index_t>& partitionChannels,
533+
const std::vector<column_index_t>& dataChannels,
534+
std::unique_ptr<PartitionIdGenerator> partitionIdGenerator);
533535

534536
void appendData(RowVectorPtr input) override;
535537

@@ -633,6 +635,9 @@ class HiveDataSink : public DataSink {
633635
// the newly created writer in 'writers_'.
634636
uint32_t appendWriter(const HiveWriterId& id);
635637

638+
virtual std::shared_ptr<dwio::common::WriterOptions> createWriterOptions()
639+
const;
640+
636641
virtual std::optional<std::string> getPartitionName(
637642
const HiveWriterId& id) const;
638643

velox/connectors/hive/HivePartitionUtil.cpp

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,10 @@ std::pair<std::string, std::string> makePartitionKeyValueString(
7878
const BaseVector* partitionVector,
7979
vector_size_t row,
8080
const std::string& name,
81-
bool isDate,
82-
const std::string& nullValueName) {
81+
bool isDate) {
8382
using T = typename TypeTraits<Kind>::NativeType;
8483
if (partitionVector->as<SimpleVector<T>>()->isNullAt(row)) {
85-
return std::make_pair(name, nullValueName);
84+
return std::make_pair(name, "");
8685
}
8786
if (isDate) {
8887
return std::make_pair(
@@ -100,8 +99,7 @@ std::pair<std::string, std::string> makePartitionKeyValueString(
10099

101100
std::vector<std::pair<std::string, std::string>> extractPartitionKeyValues(
102101
const RowVectorPtr& partitionsVector,
103-
vector_size_t row,
104-
const std::string& nullValueName) {
102+
vector_size_t row) {
105103
std::vector<std::pair<std::string, std::string>> partitionKeyValues;
106104
for (auto i = 0; i < partitionsVector->childrenSize(); i++) {
107105
partitionKeyValues.push_back(PARTITION_TYPE_DISPATCH(
@@ -110,8 +108,7 @@ std::vector<std::pair<std::string, std::string>> extractPartitionKeyValues(
110108
partitionsVector->childAt(i)->loadedVector(),
111109
row,
112110
asRowType(partitionsVector->type())->nameOf(i),
113-
partitionsVector->childAt(i)->type()->isDate(),
114-
nullValueName));
111+
partitionsVector->childAt(i)->type()->isDate()));
115112
}
116113
return partitionKeyValues;
117114
}

velox/connectors/hive/HivePartitionUtil.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ namespace facebook::velox::connector::hive {
2121

2222
std::vector<std::pair<std::string, std::string>> extractPartitionKeyValues(
2323
const RowVectorPtr& partitionsVector,
24-
vector_size_t row,
25-
const std::string& nullValueName = "");
24+
vector_size_t row);
2625

2726
} // namespace facebook::velox::connector::hive

velox/connectors/hive/PartitionIdGenerator.cpp

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ PartitionIdGenerator::PartitionIdGenerator(
2929
uint32_t maxPartitions,
3030
memory::MemoryPool* pool,
3131
bool partitionPathAsLowerCase)
32-
: pool_(pool),
33-
partitionChannels_(std::move(partitionChannels)),
32+
: partitionChannels_(std::move(partitionChannels)),
3433
maxPartitions_(maxPartitions),
35-
partitionPathAsLowerCase_(partitionPathAsLowerCase) {
34+
partitionPathAsLowerCase_(partitionPathAsLowerCase),
35+
pool_(pool) {
3636
VELOX_USER_CHECK(
3737
!partitionChannels_.empty(), "There must be at least one partition key.");
3838
for (auto channel : partitionChannels_) {
@@ -61,6 +61,19 @@ PartitionIdGenerator::PartitionIdGenerator(
6161
}
6262
}
6363

64+
PartitionIdGenerator::PartitionIdGenerator(
65+
std::vector<column_index_t> partitionChannels,
66+
uint32_t maxPartitions,
67+
memory::MemoryPool* pool,
68+
bool partitionPathAsLowerCase)
69+
: partitionChannels_(std::move(partitionChannels)),
70+
maxPartitions_(maxPartitions),
71+
partitionPathAsLowerCase_(partitionPathAsLowerCase),
72+
pool_(pool) {
73+
VELOX_USER_CHECK(
74+
!partitionChannels_.empty(), "There must be at least one partition key.");
75+
}
76+
6477
void PartitionIdGenerator::run(
6578
const RowVectorPtr& input,
6679
raw_vector<uint64_t>& result) {
@@ -97,11 +110,9 @@ void PartitionIdGenerator::run(
97110
}
98111
}
99112

100-
std::string PartitionIdGenerator::partitionName(
101-
uint64_t partitionId,
102-
const std::string& nullValueName) const {
113+
std::string PartitionIdGenerator::partitionName(uint64_t partitionId) const {
103114
return FileUtils::makePartName(
104-
extractPartitionKeyValues(partitionValues_, partitionId, nullValueName),
115+
extractPartitionKeyValues(partitionValues_, partitionId),
105116
partitionPathAsLowerCase_);
106117
}
107118

@@ -172,7 +183,7 @@ void PartitionIdGenerator::updateValueToPartitionIdMapping() {
172183
}
173184

174185
void PartitionIdGenerator::savePartitionValues(
175-
uint64_t partitionId,
186+
uint32_t partitionId,
176187
const RowVectorPtr& input,
177188
vector_size_t row) {
178189
for (auto i = 0; i < partitionChannels_.size(); ++i) {

velox/connectors/hive/PartitionIdGenerator.h

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,12 @@ class PartitionIdGenerator {
3838
memory::MemoryPool* pool,
3939
bool partitionPathAsLowerCase);
4040

41+
virtual ~PartitionIdGenerator() = default;
42+
4143
/// Generate sequential partition IDs for input vector.
4244
/// @param input Input RowVector.
4345
/// @param result Generated integer IDs indexed by input row number.
44-
void run(const RowVectorPtr& input, raw_vector<uint64_t>& result);
46+
virtual void run(const RowVectorPtr& input, raw_vector<uint64_t>& result);
4547

4648
/// Return the total number of distinct partitions processed so far.
4749
uint64_t numPartitions() const {
@@ -52,18 +54,38 @@ class PartitionIdGenerator {
5254
/// style. It is derived from the partitionValues_ at index partitionId.
5355
/// Partition keys appear in the order of partition columns in the table
5456
/// schema.
55-
std::string partitionName(
56-
uint64_t partitionId,
57-
const std::string& nullValueName = "") const;
57+
virtual std::string partitionName(uint64_t partitionId) const;
5858

59-
private:
60-
static constexpr const int32_t kHasherReservePct = 20;
59+
protected:
60+
PartitionIdGenerator(
61+
std::vector<column_index_t> partitionChannels,
62+
uint32_t maxPartitions,
63+
memory::MemoryPool* pool,
64+
bool partitionPathAsLowerCase);
6165

6266
// Computes value IDs using VectorHashers for all rows in 'input'.
6367
void computeValueIds(
6468
const RowVectorPtr& input,
6569
raw_vector<uint64_t>& valueIds);
6670

71+
const std::vector<column_index_t> partitionChannels_;
72+
73+
std::vector<std::unique_ptr<exec::VectorHasher>> hashers_;
74+
75+
// A vector holding unique partition key values. One row per partition. Row
76+
// numbers match partition IDs.
77+
RowVectorPtr partitionValues_;
78+
79+
const uint32_t maxPartitions_;
80+
81+
// A mapping from value ID produced by VectorHashers to a partition ID.
82+
std::unordered_map<uint64_t, uint64_t> partitionIds_;
83+
84+
const bool partitionPathAsLowerCase_;
85+
86+
private:
87+
static constexpr const int32_t kHasherReservePct = 20;
88+
6789
// In case of rehash (when value IDs produced by VectorHashers change), we
6890
// update value id for pre-existing partitions while keeping partition ids.
6991
// This method rebuilds 'partitionIds_' by re-calculating the value ids using
@@ -72,29 +94,15 @@ class PartitionIdGenerator {
7294

7395
// Copies partition values of 'row' from 'input' into 'partitionId' row in
7496
// 'partitionValues_'.
75-
void savePartitionValues(
76-
uint64_t partitionId,
97+
virtual void savePartitionValues(
98+
uint32_t partitionId,
7799
const RowVectorPtr& input,
78100
vector_size_t row);
79101

80102
memory::MemoryPool* const pool_;
81103

82-
const std::vector<column_index_t> partitionChannels_;
83-
84-
const uint32_t maxPartitions_;
85-
86-
const bool partitionPathAsLowerCase_;
87-
88-
std::vector<std::unique_ptr<exec::VectorHasher>> hashers_;
89104
bool hasMultiplierSet_ = false;
90105

91-
// A mapping from value ID produced by VectorHashers to a partition ID.
92-
std::unordered_map<uint64_t, uint64_t> partitionIds_;
93-
94-
// A vector holding unique partition key values. One row per partition. Row
95-
// numbers match partition IDs.
96-
RowVectorPtr partitionValues_;
97-
98106
// All rows are set valid to compute partition IDs for all input rows.
99107
SelectivityVector allRows_;
100108
};

velox/connectors/hive/SplitReader.cpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,18 @@ VectorPtr newConstantFromString(
5454
pool, size, false, type, std::move(days));
5555
}
5656

57+
if constexpr (std::is_same_v<T, int64_t> || std::is_same_v<T, int128_t>) {
58+
if (type->isDecimal()) {
59+
auto [precision, scale] = getDecimalPrecisionScale(*type);
60+
T result;
61+
const auto status = DecimalUtil::castFromString<T>(
62+
StringView(value.value()), precision, scale, result);
63+
VELOX_USER_CHECK(status.ok(), status.message());
64+
return std::make_shared<ConstantVector<T>>(
65+
pool, size, false, type, std::move(result));
66+
}
67+
}
68+
5769
if constexpr (std::is_same_v<T, StringView>) {
5870
return std::make_shared<ConstantVector<StringView>>(
5971
pool, size, false, type, StringView(value.value()));

velox/connectors/hive/SplitReader.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -160,18 +160,18 @@ class SplitReader {
160160
VectorPtr& output,
161161
const std::vector<BaseVector::CopyRange>& ranges);
162162

163+
void setPartitionValue(
164+
common::ScanSpec* spec,
165+
const std::string& partitionKey,
166+
const std::optional<std::string>& value) const;
167+
163168
private:
164169
/// Different table formats may have different meatadata columns.
165170
/// This function will be used to update the scanSpec for these columns.
166-
std::vector<TypePtr> adaptColumns(
171+
virtual std::vector<TypePtr> adaptColumns(
167172
const RowTypePtr& fileType,
168173
const std::shared_ptr<const velox::RowType>& tableSchema) const;
169174

170-
void setPartitionValue(
171-
common::ScanSpec* spec,
172-
const std::string& partitionKey,
173-
const std::optional<std::string>& value) const;
174-
175175
protected:
176176
std::shared_ptr<const HiveConnectorSplit> hiveSplit_;
177177
const std::shared_ptr<const HiveTableHandle> hiveTableHandle_;

velox/connectors/hive/iceberg/CMakeLists.txt

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,23 @@
1414

1515
velox_add_library(
1616
velox_hive_iceberg_connector
17-
IcebergSplitReader.cpp
17+
IcebergDataSink.cpp
18+
IcebergPartitionIdGenerator.cpp
1819
IcebergSplit.cpp
20+
IcebergSplitReader.cpp
21+
Murmur3.cpp
22+
PartitionSpec.cpp
1923
PositionalDeleteFileReader.cpp
20-
IcebergDataSink.cpp)
24+
TransformFactory.cpp
25+
Transforms.cpp)
2126

2227
velox_link_libraries(velox_hive_iceberg_connector velox_connector Folly::folly)
2328

29+
if(VELOX_ENABLE_PARQUET)
30+
velox_link_libraries(velox_hive_iceberg_connector
31+
velox_dwio_arrow_parquet_writer)
32+
endif()
33+
2434
if(${VELOX_BUILD_TESTING})
2535
add_subdirectory(tests)
2636
endif()

0 commit comments

Comments
 (0)