Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 95 additions & 75 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,30 +94,18 @@ std::vector<column_index_t> getPartitionChannels(

// Returns the column indices of non-partition data columns.
std::vector<column_index_t> getNonPartitionChannels(
const std::vector<column_index_t>& partitionChannels,
const column_index_t childrenSize) {
const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle) {
std::vector<column_index_t> dataChannels;
dataChannels.reserve(childrenSize - partitionChannels.size());

for (column_index_t i = 0; i < childrenSize; i++) {
if (std::find(partitionChannels.cbegin(), partitionChannels.cend(), i) ==
partitionChannels.cend()) {
for (auto i = 0; i < insertTableHandle->inputColumns().size(); i++) {
if (!insertTableHandle->inputColumns()[i]->isPartitionKey()) {
dataChannels.push_back(i);
}
}

return dataChannels;
}

std::string makePartitionDirectory(
const std::string& tableDirectory,
const std::optional<std::string>& partitionSubdirectory) {
if (partitionSubdirectory.has_value()) {
return fs::path(tableDirectory) / partitionSubdirectory.value();
}
return tableDirectory;
}

std::string makeUuid() {
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
}
Expand Down Expand Up @@ -382,6 +370,18 @@ HiveDataSink::HiveDataSink(
? createBucketFunction(
*insertTableHandle->bucketProperty(),
inputType)
: nullptr,
getPartitionChannels(insertTableHandle),
getNonPartitionChannels(insertTableHandle),
!getPartitionChannels(insertTableHandle).empty()
? std::make_unique<PartitionIdGenerator>(
inputType,
getPartitionChannels(insertTableHandle),
hiveConfig->maxPartitionsPerWriters(
connectorQueryCtx->sessionProperties()),
connectorQueryCtx->memoryPool(),
hiveConfig->isPartitionPathAsLowerCase(
connectorQueryCtx->sessionProperties()))
: nullptr) {}

HiveDataSink::HiveDataSink(
Expand All @@ -391,7 +391,10 @@ HiveDataSink::HiveDataSink(
CommitStrategy commitStrategy,
const std::shared_ptr<const HiveConfig>& hiveConfig,
uint32_t bucketCount,
std::unique_ptr<core::PartitionFunction> bucketFunction)
std::unique_ptr<core::PartitionFunction> bucketFunction,
const std::vector<column_index_t>& partitionChannels,
const std::vector<column_index_t>& dataChannels,
std::unique_ptr<PartitionIdGenerator> partitionIdGenerator)
: inputType_(std::move(inputType)),
insertTableHandle_(std::move(insertTableHandle)),
connectorQueryCtx_(connectorQueryCtx),
Expand All @@ -400,19 +403,9 @@ HiveDataSink::HiveDataSink(
updateMode_(getUpdateMode()),
maxOpenWriters_(hiveConfig_->maxPartitionsPerWriters(
connectorQueryCtx->sessionProperties())),
partitionChannels_(getPartitionChannels(insertTableHandle_)),
partitionIdGenerator_(
!partitionChannels_.empty()
? std::make_unique<PartitionIdGenerator>(
inputType_,
partitionChannels_,
maxOpenWriters_,
connectorQueryCtx_->memoryPool(),
hiveConfig_->isPartitionPathAsLowerCase(
connectorQueryCtx->sessionProperties()))
: nullptr),
dataChannels_(
getNonPartitionChannels(partitionChannels_, inputType_->size())),
partitionChannels_(partitionChannels),
partitionIdGenerator_(std::move(partitionIdGenerator)),
dataChannels_(dataChannels),
bucketCount_(static_cast<int32_t>(bucketCount)),
bucketFunction_(std::move(bucketFunction)),
writerFactory_(
Expand Down Expand Up @@ -488,6 +481,8 @@ void HiveDataSink::appendData(RowVectorPtr input) {
// Compute partition and bucket numbers.
computePartitionAndBucketIds(input);

splitInputRowsAndEnsureWriters(input);

// All inputs belong to a single non-bucketed partition. The partition id
// must be zero.
if (!isBucketed() && partitionIdGenerator_->numPartitions() == 1) {
Expand All @@ -496,8 +491,6 @@ void HiveDataSink::appendData(RowVectorPtr input) {
return;
}

splitInputRowsAndEnsureWriters();

for (auto index = 0; index < writers_.size(); ++index) {
const vector_size_t partitionSize = partitionSizes_[index];
if (partitionSize == 0) {
Expand Down Expand Up @@ -782,6 +775,39 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {

setMemoryReclaimers(writerInfo_.back().get(), ioStats_.back().get());

auto options = createWriterOptions();

// Prevents the memory allocation during the writer creation.
WRITER_NON_RECLAIMABLE_SECTION_GUARD(writerInfo_.size() - 1);
auto writer = writerFactory_->createWriter(
dwio::common::FileSink::create(
writePath,
{
.bufferWrite = false,
.connectorProperties = hiveConfig_->config(),
.fileCreateConfig = hiveConfig_->writeFileCreateConfig(),
.pool = writerInfo_.back()->sinkPool.get(),
.metricLogger = dwio::common::MetricsLog::voidLog(),
.stats = ioStats_.back().get(),
.fileSystemStats = fileSystemStats_.get(),
}),
options);
writer = maybeCreateBucketSortWriter(std::move(writer));
writers_.emplace_back(std::move(writer));
addThreadLocalRuntimeStat(
fmt::format(
"{}WriterCount",
dwio::common::toString(insertTableHandle_->storageFormat())),
RuntimeCounter(1));
// Extends the buffer used for partition rows calculations.
reservePartitionBuffers();

writerIndexMap_.emplace(id, writers_.size() - 1);
return writerIndexMap_[id];
}

std::shared_ptr<dwio::common::WriterOptions> HiveDataSink::createWriterOptions()
const {
// Take the writer options provided by the user as a starting point, or
// allocate a new one.
auto options = insertTableHandle_->writerOptions();
Expand Down Expand Up @@ -832,36 +858,7 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
options->adjustTimestampToTimezone =
connectorQueryCtx_->adjustTimestampToTimezone();
options->processConfigs(*hiveConfig_->config(), *connectorSessionProperties);

// Prevents the memory allocation during the writer creation.
WRITER_NON_RECLAIMABLE_SECTION_GUARD(writerInfo_.size() - 1);
auto writer = writerFactory_->createWriter(
dwio::common::FileSink::create(
writePath,
{
.bufferWrite = false,
.connectorProperties = hiveConfig_->config(),
.fileCreateConfig = hiveConfig_->writeFileCreateConfig(),
.pool = writerInfo_.back()->sinkPool.get(),
.metricLogger = dwio::common::MetricsLog::voidLog(),
.stats = ioStats_.back().get(),
.fileSystemStats = fileSystemStats_.get(),
}),
options);
writer = maybeCreateBucketSortWriter(std::move(writer));
writers_.emplace_back(std::move(writer));
addThreadLocalRuntimeStat(
fmt::format(
"{}WriterCount",
dwio::common::toString(insertTableHandle_->storageFormat())),
RuntimeCounter(1));
// Extends the buffer used for partition rows calculations.
partitionSizes_.emplace_back(0);
partitionRows_.emplace_back(nullptr);
rawPartitionRows_.emplace_back(nullptr);

writerIndexMap_.emplace(id, writers_.size() - 1);
return writerIndexMap_[id];
return options;
}

std::unique_ptr<facebook::velox::dwio::common::Writer>
Expand Down Expand Up @@ -891,6 +888,13 @@ HiveDataSink::maybeCreateBucketSortWriter(
sortWriterFinishTimeSliceLimitMs_);
}

void HiveDataSink::reservePartitionBuffers() {
// Extends the buffer used for partition rows calculations.
partitionSizes_.emplace_back(0);
partitionRows_.emplace_back(nullptr);
rawPartitionRows_.emplace_back(nullptr);
}

HiveWriterId HiveDataSink::getWriterId(size_t row) const {
std::optional<int32_t> partitionId;
if (isPartitioned()) {
Expand All @@ -905,7 +909,26 @@ HiveWriterId HiveDataSink::getWriterId(size_t row) const {
return HiveWriterId{partitionId, bucketId};
}

void HiveDataSink::splitInputRowsAndEnsureWriters() {
void HiveDataSink::updatePartitionRows(
uint32_t index,
vector_size_t numRows,
vector_size_t row) {
VELOX_DCHECK_LT(index, partitionSizes_.size());
VELOX_DCHECK_EQ(partitionSizes_.size(), partitionRows_.size());
VELOX_DCHECK_EQ(partitionRows_.size(), rawPartitionRows_.size());
if (FOLLY_UNLIKELY(partitionRows_[index] == nullptr) ||
(partitionRows_[index]->capacity() < numRows * sizeof(vector_size_t))) {
partitionRows_[index] =
allocateIndices(numRows, connectorQueryCtx_->memoryPool());
rawPartitionRows_[index] =
partitionRows_[index]->asMutable<vector_size_t>();
}
rawPartitionRows_[index][partitionSizes_[index]] = row;
++partitionSizes_[index];
}

void HiveDataSink::splitInputRowsAndEnsureWriters(
const RowVectorPtr& /* input */) {
VELOX_CHECK(isPartitioned() || isBucketed());
if (isBucketed() && isPartitioned()) {
VELOX_CHECK_EQ(bucketIds_.size(), partitionIds_.size());
Expand All @@ -918,19 +941,7 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
for (auto row = 0; row < numRows; ++row) {
const auto id = getWriterId(row);
const uint32_t index = ensureWriter(id);

VELOX_DCHECK_LT(index, partitionSizes_.size());
VELOX_DCHECK_EQ(partitionSizes_.size(), partitionRows_.size());
VELOX_DCHECK_EQ(partitionRows_.size(), rawPartitionRows_.size());
if (FOLLY_UNLIKELY(partitionRows_[index] == nullptr) ||
(partitionRows_[index]->capacity() < numRows * sizeof(vector_size_t))) {
partitionRows_[index] =
allocateIndices(numRows, connectorQueryCtx_->memoryPool());
rawPartitionRows_[index] =
partitionRows_[index]->asMutable<vector_size_t>();
}
rawPartitionRows_[index][partitionSizes_[index]] = row;
++partitionSizes_[index];
updatePartitionRows(index, numRows, row);
}

for (uint32_t i = 0; i < partitionSizes_.size(); ++i) {
Expand All @@ -941,6 +952,15 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
}
}

std::string HiveDataSink::makePartitionDirectory(
const std::string& tableDirectory,
const std::optional<std::string>& partitionSubdirectory) const {
if (partitionSubdirectory.has_value()) {
return fs::path(tableDirectory) / partitionSubdirectory.value();
}
return tableDirectory;
}

HiveWriterParameters HiveDataSink::getWriterParameters(
const std::optional<std::string>& partition,
std::optional<uint32_t> bucketId) const {
Expand Down
34 changes: 32 additions & 2 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,10 @@ class HiveDataSink : public DataSink {
CommitStrategy commitStrategy,
const std::shared_ptr<const HiveConfig>& hiveConfig,
uint32_t bucketCount,
std::unique_ptr<core::PartitionFunction> bucketFunction);
std::unique_ptr<core::PartitionFunction> bucketFunction,
const std::vector<column_index_t>& partitionChannels,
const std::vector<column_index_t>& dataChannels,
std::unique_ptr<PartitionIdGenerator> partitionIdGenerator);

void appendData(RowVectorPtr input) override;

Expand Down Expand Up @@ -631,7 +634,7 @@ class HiveDataSink : public DataSink {
// to each corresponding (bucketed) partition based on the partition and
// bucket ids calculated by 'computePartitionAndBucketIds'. The function also
// ensures that there is a writer created for each (bucketed) partition.
void splitInputRowsAndEnsureWriters();
virtual void splitInputRowsAndEnsureWriters(const RowVectorPtr& input);

// Makes sure to create one writer for the given writer id. The function
// returns the corresponding index in 'writers_'.
Expand All @@ -641,10 +644,37 @@ class HiveDataSink : public DataSink {
// the newly created writer in 'writers_'.
uint32_t appendWriter(const HiveWriterId& id);

// Creates and configures WriterOptions based on file format.
// Sets up compression, schema, and other writer configuration based on the
// insert table handle and connector settings.
virtual std::shared_ptr<dwio::common::WriterOptions> createWriterOptions()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment for all the public functions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

const;

std::unique_ptr<facebook::velox::dwio::common::Writer>
maybeCreateBucketSortWriter(
std::unique_ptr<facebook::velox::dwio::common::Writer> writer);

// Constructs the full partition directory path by combining the table
// directory with an optional partition subdirectory. If partitionSubdirectory
// is provided, returns tableDirectory/partitionSubdirectory; otherwise,
// returns tableDirectory unchanged (for non-partitioned tables).
std::string makePartitionDirectory(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

const std::string& tableDirectory,
const std::optional<std::string>& partitionSubdirectory) const;

// Records a row index for a specific partition. This method maintains the
// mapping of which input rows belong to which partition by storing row
// indices in partition-specific buffers. If the buffer for the partition
// doesn't exist or is too small, it allocates/reallocates the buffer to
// accommodate all rows.
void
updatePartitionRows(uint32_t index, vector_size_t numRows, vector_size_t row);

// Allocates buffer space for a new partition by extending the partition
// tracking vectors. This is called when a new writer is created for a
// partition to ensure there's space to track row indices for that partition.
void reservePartitionBuffers();

HiveWriterParameters getWriterParameters(
const std::optional<std::string>& partition,
std::optional<uint32_t> bucketId) const;
Expand Down
31 changes: 25 additions & 6 deletions velox/connectors/hive/PartitionIdGenerator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ PartitionIdGenerator::PartitionIdGenerator(
uint32_t maxPartitions,
memory::MemoryPool* pool,
bool partitionPathAsLowerCase)
: pool_(pool),
partitionChannels_(std::move(partitionChannels)),
: partitionChannels_(std::move(partitionChannels)),
maxPartitions_(maxPartitions),
partitionPathAsLowerCase_(partitionPathAsLowerCase) {
partitionPathAsLowerCase_(partitionPathAsLowerCase),
pool_(pool) {
VELOX_USER_CHECK(
!partitionChannels_.empty(), "There must be at least one partition key.");
for (auto channel : partitionChannels_) {
Expand Down Expand Up @@ -60,11 +60,30 @@ PartitionIdGenerator::PartitionIdGenerator(
}
}

PartitionIdGenerator::PartitionIdGenerator(
std::vector<column_index_t> partitionChannels,
uint32_t maxPartitions,
memory::MemoryPool* pool,
bool partitionPathAsLowerCase)
: partitionChannels_(std::move(partitionChannels)),
maxPartitions_(maxPartitions),
partitionPathAsLowerCase_(partitionPathAsLowerCase),
pool_(pool) {
VELOX_USER_CHECK(
!partitionChannels_.empty(), "There must be at least one partition key.");
}

void PartitionIdGenerator::run(
const RowVectorPtr& input,
raw_vector<uint64_t>& result) {
result.resize(input->size());
computeAndSavePartitionIds(input, result);
}

void PartitionIdGenerator::computeAndSavePartitionIds(
const RowVectorPtr& input,
raw_vector<uint64_t>& result) {
const auto numRows = input->size();
result.resize(numRows);

// Compute value IDs using VectorHashers and store these in 'result'.
computeValueIds(input, result);
Expand Down Expand Up @@ -96,7 +115,7 @@ void PartitionIdGenerator::run(
}
}

std::string PartitionIdGenerator::partitionName(uint64_t partitionId) const {
std::string PartitionIdGenerator::partitionName(uint32_t partitionId) const {
return FileUtils::makePartName(
HivePartitionUtil::extractPartitionKeyValues(
partitionValues_, partitionId),
Expand Down Expand Up @@ -170,7 +189,7 @@ void PartitionIdGenerator::updateValueToPartitionIdMapping() {
}

void PartitionIdGenerator::savePartitionValues(
uint64_t partitionId,
uint32_t partitionId,
const RowVectorPtr& input,
vector_size_t row) {
for (auto i = 0; i < partitionChannels_.size(); ++i) {
Expand Down
Loading
Loading