Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions velox/connectors/hive/HiveConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/connectors/hive/HiveDataSource.h"
#include "velox/connectors/hive/HivePartitionFunction.h"
#include "velox/connectors/hive/iceberg/IcebergDataSink.h"

#include <boost/lexical_cast.hpp>
#include <memory>
Expand Down Expand Up @@ -87,17 +88,29 @@ std::unique_ptr<DataSink> HiveConnector::createDataSink(
ConnectorInsertTableHandlePtr connectorInsertTableHandle,
ConnectorQueryCtx* connectorQueryCtx,
CommitStrategy commitStrategy) {
auto hiveInsertHandle =
std::dynamic_pointer_cast<const HiveInsertTableHandle>(
connectorInsertTableHandle);
VELOX_CHECK_NOT_NULL(
hiveInsertHandle, "Hive connector expecting hive write handle!");
return std::make_unique<HiveDataSink>(
inputType,
hiveInsertHandle,
connectorQueryCtx,
commitStrategy,
hiveConfig_);
if (auto icebergInsertHandle =
std::dynamic_pointer_cast<const iceberg::IcebergInsertTableHandle>(
connectorInsertTableHandle)) {
return std::make_unique<iceberg::IcebergDataSink>(
inputType,
icebergInsertHandle,
connectorQueryCtx,
commitStrategy,
hiveConfig_);
} else {
auto hiveInsertHandle =
std::dynamic_pointer_cast<const HiveInsertTableHandle>(
connectorInsertTableHandle);

VELOX_CHECK_NOT_NULL(
hiveInsertHandle, "Hive connector expecting hive write handle!");
return std::make_unique<HiveDataSink>(
inputType,
hiveInsertHandle,
connectorQueryCtx,
commitStrategy,
hiveConfig_);
}
}

std::unique_ptr<core::PartitionFunction> HivePartitionFunctionSpec::create(
Expand Down
9 changes: 9 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
#include "velox/expression/Expr.h"
#include "velox/expression/ExprToSubfieldFilter.h"

#include <boost/lexical_cast.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>

namespace facebook::velox::connector::hive {
namespace {

Expand Down Expand Up @@ -925,4 +929,9 @@ core::TypedExprPtr extractFiltersFromRemainingFilter(
}
return expr;
}

std::string makeUuid() {
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
}

} // namespace facebook::velox::connector::hive
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveConnectorUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,6 @@ core::TypedExprPtr extractFiltersFromRemainingFilter(
common::SubfieldFilters& filters,
double& sampleRate);

std::string makeUuid();

} // namespace facebook::velox::connector::hive
139 changes: 80 additions & 59 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,6 @@
#include "velox/exec/OperatorUtils.h"
#include "velox/exec/SortBuffer.h"

#include <boost/lexical_cast.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>

using facebook::velox::common::testutil::TestValue;

namespace facebook::velox::connector::hive {
Expand Down Expand Up @@ -95,14 +91,12 @@ std::vector<column_index_t> getPartitionChannels(

// Returns the column indices of non-partition data columns.
std::vector<column_index_t> getNonPartitionChannels(
const std::vector<column_index_t>& partitionChannels,
const column_index_t childrenSize) {
const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle) {
std::vector<column_index_t> dataChannels;
dataChannels.reserve(childrenSize - partitionChannels.size());

for (column_index_t i = 0; i < childrenSize; i++) {
if (std::find(partitionChannels.cbegin(), partitionChannels.cend(), i) ==
partitionChannels.cend()) {
for (column_index_t i = 0; i < insertTableHandle->inputColumns().size();
i++) {
if (!insertTableHandle->inputColumns()[i]->isPartitionKey()) {
dataChannels.push_back(i);
}
}
Expand All @@ -119,10 +113,6 @@ std::string makePartitionDirectory(
return tableDirectory;
}

std::string makeUuid() {
return boost::lexical_cast<std::string>(boost::uuids::random_generator()());
}

std::unordered_map<LocationHandle::TableType, std::string> tableTypeNames() {
return {
{LocationHandle::TableType::kNew, "kNew"},
Expand Down Expand Up @@ -383,7 +373,8 @@ HiveDataSink::HiveDataSink(
? createBucketFunction(
*insertTableHandle->bucketProperty(),
inputType)
: nullptr) {}
: nullptr,
getNonPartitionChannels(insertTableHandle)) {}

HiveDataSink::HiveDataSink(
RowTypePtr inputType,
Expand All @@ -392,7 +383,8 @@ HiveDataSink::HiveDataSink(
CommitStrategy commitStrategy,
const std::shared_ptr<const HiveConfig>& hiveConfig,
uint32_t bucketCount,
std::unique_ptr<core::PartitionFunction> bucketFunction)
std::unique_ptr<core::PartitionFunction> bucketFunction,
const std::vector<column_index_t>& dataChannels)
: inputType_(std::move(inputType)),
insertTableHandle_(std::move(insertTableHandle)),
connectorQueryCtx_(connectorQueryCtx),
Expand All @@ -412,8 +404,7 @@ HiveDataSink::HiveDataSink(
hiveConfig_->isPartitionPathAsLowerCase(
connectorQueryCtx->sessionProperties()))
: nullptr),
dataChannels_(
getNonPartitionChannels(partitionChannels_, inputType_->size())),
dataChannels_(dataChannels),
bucketCount_(static_cast<int32_t>(bucketCount)),
bucketFunction_(std::move(bucketFunction)),
writerFactory_(
Expand Down Expand Up @@ -489,6 +480,8 @@ void HiveDataSink::appendData(RowVectorPtr input) {
// Compute partition and bucket numbers.
computePartitionAndBucketIds(input);

splitInputRowsAndEnsureWriters(input);

// All inputs belong to a single non-bucketed partition. The partition id
// must be zero.
if (!isBucketed() && partitionIdGenerator_->numPartitions() == 1) {
Expand All @@ -497,8 +490,6 @@ void HiveDataSink::appendData(RowVectorPtr input) {
return;
}

splitInputRowsAndEnsureWriters();

for (auto index = 0; index < writers_.size(); ++index) {
const vector_size_t partitionSize = partitionSizes_[index];
if (partitionSize == 0) {
Expand Down Expand Up @@ -683,30 +674,33 @@ bool HiveDataSink::finish() {
std::vector<std::string> HiveDataSink::close() {
setState(State::kClosed);
closeInternal();
return commitMessage();
}

std::vector<std::string> HiveDataSink::commitMessage() const {
std::vector<std::string> partitionUpdates;
partitionUpdates.reserve(writerInfo_.size());
for (int i = 0; i < writerInfo_.size(); ++i) {
const auto& info = writerInfo_.at(i);
VELOX_CHECK_NOT_NULL(info);
// clang-format off
auto partitionUpdateJson = folly::toJson(
folly::dynamic::object
("name", info->writerParameters.partitionName().value_or(""))
("updateMode",
HiveWriterParameters::updateModeToString(
info->writerParameters.updateMode()))
("writePath", info->writerParameters.writeDirectory())
("targetPath", info->writerParameters.targetDirectory())
("fileWriteInfos", folly::dynamic::array(
folly::dynamic::object
("writeFileName", info->writerParameters.writeFileName())
("targetFileName", info->writerParameters.targetFileName())
("fileSize", ioStats_.at(i)->rawBytesWritten())))
("rowCount", info->numWrittenRows)
("inMemoryDataSizeInBytes", info->inputSizeInBytes)
("onDiskDataSizeInBytes", ioStats_.at(i)->rawBytesWritten())
("containsNumberedFileNames", true));
auto partitionUpdateJson = folly::toJson(
folly::dynamic::object
("name", info->writerParameters.partitionName().value_or(""))
("updateMode",
HiveWriterParameters::updateModeToString(
info->writerParameters.updateMode()))
("writePath", info->writerParameters.writeDirectory())
("targetPath", info->writerParameters.targetDirectory())
("fileWriteInfos", folly::dynamic::array(
folly::dynamic::object
("writeFileName", info->writerParameters.writeFileName())
("targetFileName", info->writerParameters.targetFileName())
("fileSize", ioStats_.at(i)->rawBytesWritten())))
("rowCount", info->numWrittenRows)
("inMemoryDataSizeInBytes", info->inputSizeInBytes)
("onDiskDataSizeInBytes", ioStats_.at(i)->rawBytesWritten())
("containsNumberedFileNames", true));
// clang-format on
partitionUpdates.push_back(partitionUpdateJson);
}
Expand Down Expand Up @@ -753,11 +747,7 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
VELOX_CHECK_EQ(writers_.size(), writerInfo_.size());
VELOX_CHECK_EQ(writerIndexMap_.size(), writerInfo_.size());

std::optional<std::string> partitionName;
if (isPartitioned()) {
partitionName =
partitionIdGenerator_->partitionName(id.partitionId.value());
}
std::optional<std::string> partitionName = getPartitionName(id);

// Without explicitly setting flush policy, the default memory based flush
// policy is used.
Expand Down Expand Up @@ -846,15 +836,23 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
options);
writer = maybeCreateBucketSortWriter(std::move(writer));
writers_.emplace_back(std::move(writer));
// Extends the buffer used for partition rows calculations.
partitionSizes_.emplace_back(0);
partitionRows_.emplace_back(nullptr);
rawPartitionRows_.emplace_back(nullptr);

extendBuffersForPartitionedTables();

writerIndexMap_.emplace(id, writers_.size() - 1);
return writerIndexMap_[id];
}

std::optional<std::string> HiveDataSink::getPartitionName(
const HiveWriterId& id) const {
std::optional<std::string> partitionName;
if (isPartitioned()) {
partitionName =
partitionIdGenerator_->partitionName(id.partitionId.value());
}
return partitionName;
}

std::unique_ptr<facebook::velox::dwio::common::Writer>
HiveDataSink::maybeCreateBucketSortWriter(
std::unique_ptr<facebook::velox::dwio::common::Writer> writer) {
Expand Down Expand Up @@ -882,6 +880,13 @@ HiveDataSink::maybeCreateBucketSortWriter(
sortWriterFinishTimeSliceLimitMs_);
}

void HiveDataSink::extendBuffersForPartitionedTables() {
// Extends the buffer used for partition rows calculations.
partitionSizes_.emplace_back(0);
partitionRows_.emplace_back(nullptr);
rawPartitionRows_.emplace_back(nullptr);
}

HiveWriterId HiveDataSink::getWriterId(size_t row) const {
std::optional<int32_t> partitionId;
if (isPartitioned()) {
Expand All @@ -896,7 +901,25 @@ HiveWriterId HiveDataSink::getWriterId(size_t row) const {
return HiveWriterId{partitionId, bucketId};
}

void HiveDataSink::splitInputRowsAndEnsureWriters() {
void HiveDataSink::updatePartitionRows(
uint32_t index,
vector_size_t numRows,
vector_size_t row) {
VELOX_DCHECK_LT(index, partitionSizes_.size());
VELOX_DCHECK_EQ(partitionSizes_.size(), partitionRows_.size());
VELOX_DCHECK_EQ(partitionRows_.size(), rawPartitionRows_.size());
if (FOLLY_UNLIKELY(partitionRows_[index] == nullptr) ||
(partitionRows_[index]->capacity() < numRows * sizeof(vector_size_t))) {
partitionRows_[index] =
allocateIndices(numRows, connectorQueryCtx_->memoryPool());
rawPartitionRows_[index] =
partitionRows_[index]->asMutable<vector_size_t>();
}
rawPartitionRows_[index][partitionSizes_[index]] = row;
++partitionSizes_[index];
}

void HiveDataSink::splitInputRowsAndEnsureWriters(RowVectorPtr /* input */) {
VELOX_CHECK(isPartitioned() || isBucketed());
if (isBucketed() && isPartitioned()) {
VELOX_CHECK_EQ(bucketIds_.size(), partitionIds_.size());
Expand All @@ -910,18 +933,7 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
const auto id = getWriterId(row);
const uint32_t index = ensureWriter(id);

VELOX_DCHECK_LT(index, partitionSizes_.size());
VELOX_DCHECK_EQ(partitionSizes_.size(), partitionRows_.size());
VELOX_DCHECK_EQ(partitionRows_.size(), rawPartitionRows_.size());
if (FOLLY_UNLIKELY(partitionRows_[index] == nullptr) ||
(partitionRows_[index]->capacity() < numRows * sizeof(vector_size_t))) {
partitionRows_[index] =
allocateIndices(numRows, connectorQueryCtx_->memoryPool());
rawPartitionRows_[index] =
partitionRows_[index]->asMutable<vector_size_t>();
}
rawPartitionRows_[index][partitionSizes_[index]] = row;
++partitionSizes_[index];
updatePartitionRows(index, numRows, row);
}

for (uint32_t i = 0; i < partitionSizes_.size(); ++i) {
Expand All @@ -932,6 +944,15 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
}
}

std::string HiveDataSink::makePartitionDirectory(
const std::string& tableDirectory,
const std::optional<std::string>& partitionSubdirectory) const {
if (partitionSubdirectory.has_value()) {
return fs::path(tableDirectory) / partitionSubdirectory.value();
}
return tableDirectory;
}

HiveWriterParameters HiveDataSink::getWriterParameters(
const std::optional<std::string>& partition,
std::optional<uint32_t> bucketId) const {
Expand Down
Loading
Loading