Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/linux-build-base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ jobs:
env:
VELOX_DEPENDENCY_SOURCE: BUNDLED
ICU_SOURCE: SYSTEM
MAKEFLAGS: NUM_THREADS=8 MAX_HIGH_MEM_JOBS=4 MAX_LINK_JOBS=3
MAKEFLAGS: NUM_THREADS=8 MAX_HIGH_MEM_JOBS=4 MAX_LINK_JOBS=2
EXTRA_CMAKE_FLAGS: >-
-DCMAKE_LINK_LIBRARIES_STRATEGY=REORDER_FREELY
-DVELOX_ENABLE_PARQUET=ON
Expand Down
243 changes: 154 additions & 89 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,36 +77,23 @@ RowVectorPtr makeDataInput(
input->getNullCount());
}

// Returns a subset of column indices corresponding to partition keys.
std::vector<column_index_t> getPartitionChannels(
const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle) {
std::vector<column_index_t> channels;

for (column_index_t i = 0; i < insertTableHandle->inputColumns().size();
i++) {
if (insertTableHandle->inputColumns()[i]->isPartitionKey()) {
channels.push_back(i);
}
}

return channels;
}

// Returns the column indices of non-partition data columns.
std::vector<column_index_t> getNonPartitionChannels(
const std::vector<column_index_t>& partitionChannels,
const column_index_t childrenSize) {
std::vector<column_index_t> dataChannels;
dataChannels.reserve(childrenSize - partitionChannels.size());

for (column_index_t i = 0; i < childrenSize; i++) {
if (std::find(partitionChannels.cbegin(), partitionChannels.cend(), i) ==
partitionChannels.cend()) {
dataChannels.push_back(i);
}
// Creates a PartitionIdGenerator if the table is partitioned, otherwise returns
// nullptr.
std::unique_ptr<PartitionIdGenerator> createPartitionIdGenerator(
const RowTypePtr& inputType,
const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle,
const std::shared_ptr<const HiveConfig>& hiveConfig,
const ConnectorQueryCtx* connectorQueryCtx) {
auto partitionChannels = insertTableHandle->partitionChannels();
if (partitionChannels.empty()) {
return nullptr;
}

return dataChannels;
return std::make_unique<PartitionIdGenerator>(
inputType,
partitionChannels,
hiveConfig->maxPartitionsPerWriters(
connectorQueryCtx->sessionProperties()),
connectorQueryCtx->memoryPool());
}

std::string makePartitionDirectory(
Expand Down Expand Up @@ -201,6 +188,28 @@ getBucketCount(const HiveBucketProperty* bucketProperty) {
return bucketProperty == nullptr ? 0 : bucketProperty->bucketCount();
}

std::vector<column_index_t> computePartitionChannels(
const std::vector<std::shared_ptr<const HiveColumnHandle>>& inputColumns) {
std::vector<column_index_t> channels;
for (auto i = 0; i < inputColumns.size(); i++) {
if (inputColumns[i]->isPartitionKey()) {
channels.push_back(i);
}
}
return channels;
}

std::vector<column_index_t> computeNonPartitionChannels(
const std::vector<std::shared_ptr<const HiveColumnHandle>>& inputColumns) {
std::vector<column_index_t> channels;
for (auto i = 0; i < inputColumns.size(); i++) {
if (!inputColumns[i]->isPartitionKey()) {
channels.push_back(i);
}
}
return channels;
}

} // namespace

const HiveWriterId& HiveWriterId::unpartitionedId() {
Expand Down Expand Up @@ -366,6 +375,52 @@ std::string HiveBucketProperty::toString() const {
return out.str();
}

HiveInsertTableHandle::HiveInsertTableHandle(
std::vector<std::shared_ptr<const HiveColumnHandle>> inputColumns,
std::shared_ptr<const LocationHandle> locationHandle,
dwio::common::FileFormat storageFormat,
std::shared_ptr<const HiveBucketProperty> bucketProperty,
std::optional<common::CompressionKind> compressionKind,
const std::unordered_map<std::string, std::string>& serdeParameters,
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions,
// When this option is set the HiveDataSink will always write a file even
// if there's no data. This is useful when the table is bucketed, but the
// engine handles ensuring a 1 to 1 mapping from task to bucket.
const bool ensureFiles,
std::shared_ptr<const FileNameGenerator> fileNameGenerator)
: inputColumns_(std::move(inputColumns)),
locationHandle_(std::move(locationHandle)),
storageFormat_(storageFormat),
bucketProperty_(std::move(bucketProperty)),
compressionKind_(compressionKind),
serdeParameters_(serdeParameters),
writerOptions_(writerOptions),
ensureFiles_(ensureFiles),
fileNameGenerator_(std::move(fileNameGenerator)),
partitionChannels_(computePartitionChannels(inputColumns_)),
nonPartitionChannels_(computeNonPartitionChannels(inputColumns_)) {
if (compressionKind.has_value()) {
VELOX_CHECK(
compressionKind.value() != common::CompressionKind_MAX,
"Unsupported compression type: CompressionKind_MAX");
}

if (ensureFiles_) {
// If ensureFiles is set and either the bucketProperty is set or some
// partition keys are in the data, there is not a 1:1 mapping from Task to
// files so we can't proactively create writers.
VELOX_CHECK(
bucketProperty_ == nullptr || bucketProperty_->bucketCount() == 0,
"ensureFiles is not supported with bucketing");

for (const auto& inputColumn : inputColumns_) {
VELOX_CHECK(
!inputColumn->isPartitionKey(),
"ensureFiles is not supported with partition keys in the data");
}
}
}

HiveDataSink::HiveDataSink(
RowTypePtr inputType,
std::shared_ptr<const HiveInsertTableHandle> insertTableHandle,
Expand All @@ -384,8 +439,13 @@ HiveDataSink::HiveDataSink(
*insertTableHandle->bucketProperty(),
inputType)
: nullptr,
getPartitionChannels(insertTableHandle),
nullptr) {}
insertTableHandle->partitionChannels(),
insertTableHandle->nonPartitionChannels(),
createPartitionIdGenerator(
inputType,
insertTableHandle,
hiveConfig,
connectorQueryCtx)) {}

HiveDataSink::HiveDataSink(
RowTypePtr inputType,
Expand All @@ -403,7 +463,8 @@ HiveDataSink::HiveDataSink(
hiveConfig,
bucketCount,
std::move(bucketFunction),
getPartitionChannels(insertTableHandle),
insertTableHandle->partitionChannels(),
insertTableHandle->nonPartitionChannels(),
nullptr) {}

HiveDataSink::HiveDataSink(
Expand All @@ -415,6 +476,7 @@ HiveDataSink::HiveDataSink(
uint32_t bucketCount,
std::unique_ptr<core::PartitionFunction> bucketFunction,
const std::vector<column_index_t>& partitionChannels,
const std::vector<column_index_t>& dataChannels,
std::unique_ptr<PartitionIdGenerator> partitionIdGenerator)
: inputType_(std::move(inputType)),
insertTableHandle_(std::move(insertTableHandle)),
Expand All @@ -425,17 +487,8 @@ HiveDataSink::HiveDataSink(
maxOpenWriters_(hiveConfig_->maxPartitionsPerWriters(
connectorQueryCtx->sessionProperties())),
partitionChannels_(partitionChannels),
partitionIdGenerator_(
partitionIdGenerator ? std::move(partitionIdGenerator)
: !partitionChannels_.empty()
? std::make_unique<PartitionIdGenerator>(
inputType_,
partitionChannels_,
maxOpenWriters_,
connectorQueryCtx_->memoryPool())
: nullptr),
dataChannels_(
getNonPartitionChannels(partitionChannels_, inputType_->size())),
partitionIdGenerator_(std::move(partitionIdGenerator)),
dataChannels_(dataChannels),
bucketCount_(static_cast<int32_t>(bucketCount)),
bucketFunction_(std::move(bucketFunction)),
writerFactory_(
Expand Down Expand Up @@ -773,39 +826,8 @@ uint32_t HiveDataSink::ensureWriter(const HiveWriterId& id) {
return appendWriter(id);
}

uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
// Check max open writers.
VELOX_USER_CHECK_LE(
writers_.size(), maxOpenWriters_, "Exceeded open writer limit");
VELOX_CHECK_EQ(writers_.size(), writerInfo_.size());
VELOX_CHECK_EQ(writerIndexMap_.size(), writerInfo_.size());

std::optional<std::string> partitionName;
if (isPartitioned()) {
partitionName = getPartitionName(id.partitionId.value());
}

// Without explicitly setting flush policy, the default memory based flush
// policy is used.
auto writerParameters = getWriterParameters(partitionName, id.bucketId);
const auto writePath = fs::path(writerParameters.writeDirectory()) /
writerParameters.writeFileName();
auto writerPool = createWriterPool(id);
auto sinkPool = createSinkPool(writerPool);
std::shared_ptr<memory::MemoryPool> sortPool{nullptr};
if (sortWrite()) {
sortPool = createSortPool(writerPool);
}
writerInfo_.emplace_back(
std::make_shared<HiveWriterInfo>(
std::move(writerParameters),
std::move(writerPool),
std::move(sinkPool),
std::move(sortPool)));
ioStats_.emplace_back(std::make_unique<io::IoStatistics>());

setMemoryReclaimers(writerInfo_.back().get(), ioStats_.back().get());

std::shared_ptr<dwio::common::WriterOptions> HiveDataSink::createWriterOptions()
const {
// Take the writer options provided by the user as a starting point, or
// allocate a new one.
auto options = insertTableHandle_->writerOptions();
Expand Down Expand Up @@ -856,6 +878,43 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
options->adjustTimestampToTimezone =
connectorQueryCtx_->adjustTimestampToTimezone();
options->processConfigs(*hiveConfig_->config(), *connectorSessionProperties);
return options;
}

uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
// Check max open writers.
VELOX_USER_CHECK_LE(
writers_.size(), maxOpenWriters_, "Exceeded open writer limit");
VELOX_CHECK_EQ(writers_.size(), writerInfo_.size());
VELOX_CHECK_EQ(writerIndexMap_.size(), writerInfo_.size());

std::optional<std::string> partitionName;
if (isPartitioned()) {
partitionName = getPartitionName(id.partitionId.value());
}

// Without explicitly setting flush policy, the default memory based flush
// policy is used.
auto writerParameters = getWriterParameters(partitionName, id.bucketId);
const auto writePath = fs::path(writerParameters.writeDirectory()) /
writerParameters.writeFileName();
auto writerPool = createWriterPool(id);
auto sinkPool = createSinkPool(writerPool);
std::shared_ptr<memory::MemoryPool> sortPool{nullptr};
if (sortWrite()) {
sortPool = createSortPool(writerPool);
}
writerInfo_.emplace_back(
std::make_shared<HiveWriterInfo>(
std::move(writerParameters),
std::move(writerPool),
std::move(sinkPool),
std::move(sortPool)));
ioStats_.emplace_back(std::make_unique<io::IoStatistics>());

setMemoryReclaimers(writerInfo_.back().get(), ioStats_.back().get());

auto options = createWriterOptions();

// Prevents the memory allocation during the writer creation.
WRITER_NON_RECLAIMABLE_SECTION_GUARD(writerInfo_.size() - 1);
Expand Down Expand Up @@ -938,6 +997,24 @@ HiveWriterId HiveDataSink::getWriterId(size_t row) const {
return HiveWriterId{partitionId, bucketId};
}

void HiveDataSink::updatePartitionRows(
uint32_t index,
vector_size_t numRows,
vector_size_t row) {
VELOX_DCHECK_LT(index, partitionSizes_.size());
VELOX_DCHECK_EQ(partitionSizes_.size(), partitionRows_.size());
VELOX_DCHECK_EQ(partitionRows_.size(), rawPartitionRows_.size());
if (FOLLY_UNLIKELY(partitionRows_[index] == nullptr) ||
(partitionRows_[index]->capacity() < numRows * sizeof(vector_size_t))) {
partitionRows_[index] =
allocateIndices(numRows, connectorQueryCtx_->memoryPool());
rawPartitionRows_[index] =
partitionRows_[index]->asMutable<vector_size_t>();
}
rawPartitionRows_[index][partitionSizes_[index]] = row;
++partitionSizes_[index];
}

void HiveDataSink::splitInputRowsAndEnsureWriters() {
VELOX_CHECK(isPartitioned() || isBucketed());
if (isBucketed() && isPartitioned()) {
Expand All @@ -951,19 +1028,7 @@ void HiveDataSink::splitInputRowsAndEnsureWriters() {
for (auto row = 0; row < numRows; ++row) {
const auto id = getWriterId(row);
const uint32_t index = ensureWriter(id);

VELOX_DCHECK_LT(index, partitionSizes_.size());
VELOX_DCHECK_EQ(partitionSizes_.size(), partitionRows_.size());
VELOX_DCHECK_EQ(partitionRows_.size(), rawPartitionRows_.size());
if (FOLLY_UNLIKELY(partitionRows_[index] == nullptr) ||
(partitionRows_[index]->capacity() < numRows * sizeof(vector_size_t))) {
partitionRows_[index] =
allocateIndices(numRows, connectorQueryCtx_->memoryPool());
rawPartitionRows_[index] =
partitionRows_[index]->asMutable<vector_size_t>();
}
rawPartitionRows_[index][partitionSizes_[index]] = row;
++partitionSizes_[index];
updatePartitionRows(index, numRows, row);
}

for (uint32_t i = 0; i < partitionSizes_.size(); ++i) {
Expand Down
Loading
Loading