Skip to content

Commit 102596e

Browse files
committed
Support iceberg partition transform
1 parent e078023 commit 102596e

25 files changed

+3288
-142
lines changed

velox/connectors/hive/HiveDataSink.cpp

Lines changed: 95 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -94,30 +94,18 @@ std::vector<column_index_t> getPartitionChannels(
9494

9595
// Returns the column indices of non-partition data columns.
9696
std::vector<column_index_t> getNonPartitionChannels(
97-
const std::vector<column_index_t>& partitionChannels,
98-
const column_index_t childrenSize) {
97+
const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle) {
9998
std::vector<column_index_t> dataChannels;
100-
dataChannels.reserve(childrenSize - partitionChannels.size());
10199

102-
for (column_index_t i = 0; i < childrenSize; i++) {
103-
if (std::find(partitionChannels.cbegin(), partitionChannels.cend(), i) ==
104-
partitionChannels.cend()) {
100+
for (auto i = 0; i < insertTableHandle->inputColumns().size(); i++) {
101+
if (!insertTableHandle->inputColumns()[i]->isPartitionKey()) {
105102
dataChannels.push_back(i);
106103
}
107104
}
108105

109106
return dataChannels;
110107
}
111108

112-
std::string makePartitionDirectory(
113-
const std::string& tableDirectory,
114-
const std::optional<std::string>& partitionSubdirectory) {
115-
if (partitionSubdirectory.has_value()) {
116-
return fs::path(tableDirectory) / partitionSubdirectory.value();
117-
}
118-
return tableDirectory;
119-
}
120-
121109
std::string makeUuid() {
122110
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
123111
}
@@ -382,6 +370,18 @@ HiveDataSink::HiveDataSink(
382370
? createBucketFunction(
383371
*insertTableHandle->bucketProperty(),
384372
inputType)
373+
: nullptr,
374+
getPartitionChannels(insertTableHandle),
375+
getNonPartitionChannels(insertTableHandle),
376+
!getPartitionChannels(insertTableHandle).empty()
377+
? std::make_unique<PartitionIdGenerator>(
378+
inputType,
379+
getPartitionChannels(insertTableHandle),
380+
hiveConfig->maxPartitionsPerWriters(
381+
connectorQueryCtx->sessionProperties()),
382+
connectorQueryCtx->memoryPool(),
383+
hiveConfig->isPartitionPathAsLowerCase(
384+
connectorQueryCtx->sessionProperties()))
385385
: nullptr) {}
386386

387387
HiveDataSink::HiveDataSink(
@@ -391,7 +391,10 @@ HiveDataSink::HiveDataSink(
391391
CommitStrategy commitStrategy,
392392
const std::shared_ptr<const HiveConfig>& hiveConfig,
393393
uint32_t bucketCount,
394-
std::unique_ptr<core::PartitionFunction> bucketFunction)
394+
std::unique_ptr<core::PartitionFunction> bucketFunction,
395+
const std::vector<column_index_t>& partitionChannels,
396+
const std::vector<column_index_t>& dataChannels,
397+
std::unique_ptr<PartitionIdGenerator> partitionIdGenerator)
395398
: inputType_(std::move(inputType)),
396399
insertTableHandle_(std::move(insertTableHandle)),
397400
connectorQueryCtx_(connectorQueryCtx),
@@ -400,19 +403,9 @@ HiveDataSink::HiveDataSink(
400403
updateMode_(getUpdateMode()),
401404
maxOpenWriters_(hiveConfig_->maxPartitionsPerWriters(
402405
connectorQueryCtx->sessionProperties())),
403-
partitionChannels_(getPartitionChannels(insertTableHandle_)),
404-
partitionIdGenerator_(
405-
!partitionChannels_.empty()
406-
? std::make_unique<PartitionIdGenerator>(
407-
inputType_,
408-
partitionChannels_,
409-
maxOpenWriters_,
410-
connectorQueryCtx_->memoryPool(),
411-
hiveConfig_->isPartitionPathAsLowerCase(
412-
connectorQueryCtx->sessionProperties()))
413-
: nullptr),
414-
dataChannels_(
415-
getNonPartitionChannels(partitionChannels_, inputType_->size())),
406+
partitionChannels_(partitionChannels),
407+
partitionIdGenerator_(std::move(partitionIdGenerator)),
408+
dataChannels_(dataChannels),
416409
bucketCount_(static_cast<int32_t>(bucketCount)),
417410
bucketFunction_(std::move(bucketFunction)),
418411
writerFactory_(
@@ -488,6 +481,8 @@ void HiveDataSink::appendData(RowVectorPtr input) {
488481
// Compute partition and bucket numbers.
489482
computePartitionAndBucketIds(input);
490483

484+
splitInputRowsAndEnsureWriters(input);
485+
491486
// All inputs belong to a single non-bucketed partition. The partition id
492487
// must be zero.
493488
if (!isBucketed() && partitionIdGenerator_->numPartitions() == 1) {
@@ -496,8 +491,6 @@ void HiveDataSink::appendData(RowVectorPtr input) {
496491
return;
497492
}
498493

499-
splitInputRowsAndEnsureWriters();
500-
501494
for (auto index = 0; index < writers_.size(); ++index) {
502495
const vector_size_t partitionSize = partitionSizes_[index];
503496
if (partitionSize == 0) {
@@ -782,6 +775,39 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
782775

783776
setMemoryReclaimers(writerInfo_.back().get(), ioStats_.back().get());
784777

778+
auto options = createWriterOptions();
779+
780+
// Prevents the memory allocation during the writer creation.
781+
WRITER_NON_RECLAIMABLE_SECTION_GUARD(writerInfo_.size() - 1);
782+
auto writer = writerFactory_->createWriter(
783+
dwio::common::FileSink::create(
784+
writePath,
785+
{
786+
.bufferWrite = false,
787+
.connectorProperties = hiveConfig_->config(),
788+
.fileCreateConfig = hiveConfig_->writeFileCreateConfig(),
789+
.pool = writerInfo_.back()->sinkPool.get(),
790+
.metricLogger = dwio::common::MetricsLog::voidLog(),
791+
.stats = ioStats_.back().get(),
792+
.fileSystemStats = fileSystemStats_.get(),
793+
}),
794+
options);
795+
writer = maybeCreateBucketSortWriter(std::move(writer));
796+
writers_.emplace_back(std::move(writer));
797+
addThreadLocalRuntimeStat(
798+
fmt::format(
799+
"{}WriterCount",
800+
dwio::common::toString(insertTableHandle_->storageFormat())),
801+
RuntimeCounter(1));
802+
// Extends the buffer used for partition rows calculations.
803+
reservePartitionBuffers();
804+
805+
writerIndexMap_.emplace(id, writers_.size() - 1);
806+
return writerIndexMap_[id];
807+
}
808+
809+
std::shared_ptr<dwio::common::WriterOptions> HiveDataSink::createWriterOptions()
810+
const {
785811
// Take the writer options provided by the user as a starting point, or
786812
// allocate a new one.
787813
auto options = insertTableHandle_->writerOptions();
@@ -832,36 +858,7 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
832858
options->adjustTimestampToTimezone =
833859
connectorQueryCtx_->adjustTimestampToTimezone();
834860
options->processConfigs(*hiveConfig_->config(), *connectorSessionProperties);
835-
836-
// Prevents the memory allocation during the writer creation.
837-
WRITER_NON_RECLAIMABLE_SECTION_GUARD(writerInfo_.size() - 1);
838-
auto writer = writerFactory_->createWriter(
839-
dwio::common::FileSink::create(
840-
writePath,
841-
{
842-
.bufferWrite = false,
843-
.connectorProperties = hiveConfig_->config(),
844-
.fileCreateConfig = hiveConfig_->writeFileCreateConfig(),
845-
.pool = writerInfo_.back()->sinkPool.get(),
846-
.metricLogger = dwio::common::MetricsLog::voidLog(),
847-
.stats = ioStats_.back().get(),
848-
.fileSystemStats = fileSystemStats_.get(),
849-
}),
850-
options);
851-
writer = maybeCreateBucketSortWriter(std::move(writer));
852-
writers_.emplace_back(std::move(writer));
853-
addThreadLocalRuntimeStat(
854-
fmt::format(
855-
"{}WriterCount",
856-
dwio::common::toString(insertTableHandle_->storageFormat())),
857-
RuntimeCounter(1));
858-
// Extends the buffer used for partition rows calculations.
859-
partitionSizes_.emplace_back(0);
860-
partitionRows_.emplace_back(nullptr);
861-
rawPartitionRows_.emplace_back(nullptr);
862-
863-
writerIndexMap_.emplace(id, writers_.size() - 1);
864-
return writerIndexMap_[id];
861+
return options;
865862
}
866863

867864
std::unique_ptr<facebook::velox::dwio::common::Writer>
@@ -891,6 +888,13 @@ HiveDataSink::maybeCreateBucketSortWriter(
891888
sortWriterFinishTimeSliceLimitMs_);
892889
}
893890

891+
void HiveDataSink::reservePartitionBuffers() {
892+
// Extends the buffer used for partition rows calculations.
893+
partitionSizes_.emplace_back(0);
894+
partitionRows_.emplace_back(nullptr);
895+
rawPartitionRows_.emplace_back(nullptr);
896+
}
897+
894898
HiveWriterId HiveDataSink::getWriterId(size_t row) const {
895899
std::optional<int32_t> partitionId;
896900
if (isPartitioned()) {
@@ -905,7 +909,26 @@ HiveWriterId HiveDataSink::getWriterId(size_t row) const {
905909
return HiveWriterId{partitionId, bucketId};
906910
}
907911

908-
void HiveDataSink::splitInputRowsAndEnsureWriters() {
912+
void HiveDataSink::updatePartitionRows(
913+
uint32_t index,
914+
vector_size_t numRows,
915+
vector_size_t row) {
916+
VELOX_DCHECK_LT(index, partitionSizes_.size());
917+
VELOX_DCHECK_EQ(partitionSizes_.size(), partitionRows_.size());
918+
VELOX_DCHECK_EQ(partitionRows_.size(), rawPartitionRows_.size());
919+
if (FOLLY_UNLIKELY(partitionRows_[index] == nullptr) ||
920+
(partitionRows_[index]->capacity() < numRows * sizeof(vector_size_t))) {
921+
partitionRows_[index] =
922+
allocateIndices(numRows, connectorQueryCtx_->memoryPool());
923+
rawPartitionRows_[index] =
924+
partitionRows_[index]->asMutable<vector_size_t>();
925+
}
926+
rawPartitionRows_[index][partitionSizes_[index]] = row;
927+
++partitionSizes_[index];
928+
}
929+
930+
void HiveDataSink::splitInputRowsAndEnsureWriters(
931+
const RowVectorPtr& /* input */) {
909932
VELOX_CHECK(isPartitioned() || isBucketed());
910933
if (isBucketed() && isPartitioned()) {
911934
VELOX_CHECK_EQ(bucketIds_.size(), partitionIds_.size());
@@ -918,19 +941,7 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
918941
for (auto row = 0; row < numRows; ++row) {
919942
const auto id = getWriterId(row);
920943
const uint32_t index = ensureWriter(id);
921-
922-
VELOX_DCHECK_LT(index, partitionSizes_.size());
923-
VELOX_DCHECK_EQ(partitionSizes_.size(), partitionRows_.size());
924-
VELOX_DCHECK_EQ(partitionRows_.size(), rawPartitionRows_.size());
925-
if (FOLLY_UNLIKELY(partitionRows_[index] == nullptr) ||
926-
(partitionRows_[index]->capacity() < numRows * sizeof(vector_size_t))) {
927-
partitionRows_[index] =
928-
allocateIndices(numRows, connectorQueryCtx_->memoryPool());
929-
rawPartitionRows_[index] =
930-
partitionRows_[index]->asMutable<vector_size_t>();
931-
}
932-
rawPartitionRows_[index][partitionSizes_[index]] = row;
933-
++partitionSizes_[index];
944+
updatePartitionRows(index, numRows, row);
934945
}
935946

936947
for (uint32_t i = 0; i < partitionSizes_.size(); ++i) {
@@ -941,6 +952,15 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
941952
}
942953
}
943954

955+
std::string HiveDataSink::makePartitionDirectory(
956+
const std::string& tableDirectory,
957+
const std::optional<std::string>& partitionSubdirectory) const {
958+
if (partitionSubdirectory.has_value()) {
959+
return fs::path(tableDirectory) / partitionSubdirectory.value();
960+
}
961+
return tableDirectory;
962+
}
963+
944964
HiveWriterParameters HiveDataSink::getWriterParameters(
945965
const std::optional<std::string>& partition,
946966
std::optional<uint32_t> bucketId) const {

velox/connectors/hive/HiveDataSink.h

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,10 @@ class HiveDataSink : public DataSink {
530530
CommitStrategy commitStrategy,
531531
const std::shared_ptr<const HiveConfig>& hiveConfig,
532532
uint32_t bucketCount,
533-
std::unique_ptr<core::PartitionFunction> bucketFunction);
533+
std::unique_ptr<core::PartitionFunction> bucketFunction,
534+
const std::vector<column_index_t>& partitionChannels,
535+
const std::vector<column_index_t>& dataChannels,
536+
std::unique_ptr<PartitionIdGenerator> partitionIdGenerator);
534537

535538
void appendData(RowVectorPtr input) override;
536539

@@ -631,7 +634,7 @@ class HiveDataSink : public DataSink {
631634
// to each corresponding (bucketed) partition based on the partition and
632635
// bucket ids calculated by 'computePartitionAndBucketIds'. The function also
633636
// ensures that there is a writer created for each (bucketed) partition.
634-
void splitInputRowsAndEnsureWriters();
637+
virtual void splitInputRowsAndEnsureWriters(const RowVectorPtr& input);
635638

636639
// Makes sure to create one writer for the given writer id. The function
637640
// returns the corresponding index in 'writers_'.
@@ -641,10 +644,37 @@ class HiveDataSink : public DataSink {
641644
// the newly created writer in 'writers_'.
642645
uint32_t appendWriter(const HiveWriterId& id);
643646

647+
// Creates and configures WriterOptions based on file format.
648+
// Sets up compression, schema, and other writer configuration based on the
649+
// insert table handle and connector settings.
650+
virtual std::shared_ptr<dwio::common::WriterOptions> createWriterOptions()
651+
const;
652+
644653
std::unique_ptr<facebook::velox::dwio::common::Writer>
645654
maybeCreateBucketSortWriter(
646655
std::unique_ptr<facebook::velox::dwio::common::Writer> writer);
647656

657+
// Constructs the full partition directory path by combining the table
658+
// directory with an optional partition subdirectory. If partitionSubdirectory
659+
// is provided, returns tableDirectory/partitionSubdirectory; otherwise,
660+
// returns tableDirectory unchanged (for non-partitioned tables).
661+
std::string makePartitionDirectory(
662+
const std::string& tableDirectory,
663+
const std::optional<std::string>& partitionSubdirectory) const;
664+
665+
// Records a row index for a specific partition. This method maintains the
666+
// mapping of which input rows belong to which partition by storing row
667+
// indices in partition-specific buffers. If the buffer for the partition
668+
// doesn't exist or is too small, it allocates/reallocates the buffer to
669+
// accommodate all rows.
670+
void
671+
updatePartitionRows(uint32_t index, vector_size_t numRows, vector_size_t row);
672+
673+
// Allocates buffer space for a new partition by extending the partition
674+
// tracking vectors. This is called when a new writer is created for a
675+
// partition to ensure there's space to track row indices for that partition.
676+
void reservePartitionBuffers();
677+
648678
HiveWriterParameters getWriterParameters(
649679
const std::optional<std::string>& partition,
650680
std::optional<uint32_t> bucketId) const;

velox/connectors/hive/PartitionIdGenerator.cpp

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ PartitionIdGenerator::PartitionIdGenerator(
2929
uint32_t maxPartitions,
3030
memory::MemoryPool* pool,
3131
bool partitionPathAsLowerCase)
32-
: pool_(pool),
33-
partitionChannels_(std::move(partitionChannels)),
32+
: partitionChannels_(std::move(partitionChannels)),
3433
maxPartitions_(maxPartitions),
35-
partitionPathAsLowerCase_(partitionPathAsLowerCase) {
34+
partitionPathAsLowerCase_(partitionPathAsLowerCase),
35+
pool_(pool) {
3636
VELOX_USER_CHECK(
3737
!partitionChannels_.empty(), "There must be at least one partition key.");
3838
for (auto channel : partitionChannels_) {
@@ -60,11 +60,30 @@ PartitionIdGenerator::PartitionIdGenerator(
6060
}
6161
}
6262

63+
PartitionIdGenerator::PartitionIdGenerator(
64+
std::vector<column_index_t> partitionChannels,
65+
uint32_t maxPartitions,
66+
memory::MemoryPool* pool,
67+
bool partitionPathAsLowerCase)
68+
: partitionChannels_(std::move(partitionChannels)),
69+
maxPartitions_(maxPartitions),
70+
partitionPathAsLowerCase_(partitionPathAsLowerCase),
71+
pool_(pool) {
72+
VELOX_USER_CHECK(
73+
!partitionChannels_.empty(), "There must be at least one partition key.");
74+
}
75+
6376
void PartitionIdGenerator::run(
6477
const RowVectorPtr& input,
6578
raw_vector<uint64_t>& result) {
79+
result.resize(input->size());
80+
computeAndSavePartitionIds(input, result);
81+
}
82+
83+
void PartitionIdGenerator::computeAndSavePartitionIds(
84+
const RowVectorPtr& input,
85+
raw_vector<uint64_t>& result) {
6686
const auto numRows = input->size();
67-
result.resize(numRows);
6887

6988
// Compute value IDs using VectorHashers and store these in 'result'.
7089
computeValueIds(input, result);
@@ -96,7 +115,7 @@ void PartitionIdGenerator::run(
96115
}
97116
}
98117

99-
std::string PartitionIdGenerator::partitionName(uint64_t partitionId) const {
118+
std::string PartitionIdGenerator::partitionName(uint32_t partitionId) const {
100119
return FileUtils::makePartName(
101120
HivePartitionUtil::extractPartitionKeyValues(
102121
partitionValues_, partitionId),
@@ -170,7 +189,7 @@ void PartitionIdGenerator::updateValueToPartitionIdMapping() {
170189
}
171190

172191
void PartitionIdGenerator::savePartitionValues(
173-
uint64_t partitionId,
192+
uint32_t partitionId,
174193
const RowVectorPtr& input,
175194
vector_size_t row) {
176195
for (auto i = 0; i < partitionChannels_.size(); ++i) {

0 commit comments

Comments
 (0)