diff --git a/velox/connectors/CMakeLists.txt b/velox/connectors/CMakeLists.txt index 7caf455553f..84edc8252fb 100644 --- a/velox/connectors/CMakeLists.txt +++ b/velox/connectors/CMakeLists.txt @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -add_library(velox_connector Connector.cpp) +add_library(velox_connector Connector.cpp WriteProtocol.cpp) target_link_libraries(velox_connector velox_config velox_vector) diff --git a/velox/connectors/Connector.h b/velox/connectors/Connector.h index 412de006750..b5821e61d82 100644 --- a/velox/connectors/Connector.h +++ b/velox/connectors/Connector.h @@ -28,7 +28,7 @@ class Filter; } namespace facebook::velox::core { class ITypedExpr; -} +} // namespace facebook::velox::core namespace facebook::velox::exec { class ExprSet; } @@ -171,21 +171,25 @@ class ConnectorQueryCtx { public: ConnectorQueryCtx( memory::MemoryPool* pool, - Config* config, + const Config* FOLLY_NONNULL connectorConfig, ExpressionEvaluator* expressionEvaluator, - memory::MappedMemory* mappedMemory, - const std::string& scanId) + memory::MappedMemory* FOLLY_NONNULL mappedMemory, + const std::string& taskId, + const std::string& planNodeId, + int driverId) : pool_(pool), - config_(config), + config_(connectorConfig), expressionEvaluator_(expressionEvaluator), mappedMemory_(mappedMemory), - scanId_(scanId) {} + scanId_(fmt::format("{}.{}", taskId, planNodeId)), + taskId_(taskId), + driverId_(driverId) {} memory::MemoryPool* memoryPool() const { return pool_; } - Config* config() const { + const Config* config() const { return config_; } @@ -199,21 +203,30 @@ class ConnectorQueryCtx { return mappedMemory_; } - // Returns an id that allows sharing state between different threads - // of the same scan. This is typically a query id plus the scan's - // PlanNodeId. This is used for locating a scanTracker, which tracks - // the read density of columns for prefetch and other memory - // hierarchy purposes. + // This is a combination of task id and the scan's PlanNodeId. This is an id + // that allows sharing state between different threads of the same scan. This + // is used for locating a scanTracker, which tracks the read density of + // columns for prefetch and other memory hierarchy purposes. const std::string& scanId() const { return scanId_; } + const std::string& taskId() const { + return taskId_; + } + + int driverId() const { + return driverId_; + } + private: memory::MemoryPool* pool_; - Config* config_; + const Config* config_; ExpressionEvaluator* expressionEvaluator_; memory::MappedMemory* mappedMemory_; - std::string scanId_; + const std::string scanId_; + const std::string taskId_; + const int driverId_; }; class Connector { diff --git a/velox/connectors/WriteProtocol.cpp b/velox/connectors/WriteProtocol.cpp new file mode 100644 index 00000000000..0b37cb45198 --- /dev/null +++ b/velox/connectors/WriteProtocol.cpp @@ -0,0 +1,74 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/WriteProtocol.h" +#include "velox/connectors/Connector.h" +#include "velox/vector/ConstantVector.h" + +#include + +namespace facebook::velox::connector { + +namespace { + +using RegisteredWriteProtocols = std::unordered_map< + WriteProtocol::CommitStrategy, + std::shared_ptr>; + +RegisteredWriteProtocols& registeredWriteProtocols() { + // Default write protocol registered + static RegisteredWriteProtocols protocols{ + {WriteProtocol::CommitStrategy::kNoCommit, + std::make_shared()}}; + return protocols; +} + +} // namespace + +// static +bool WriteProtocol::registerWriteProtocol( + WriteProtocol::CommitStrategy commitStrategy, + std::shared_ptr writeProtocol) { + return registeredWriteProtocols() + .insert_or_assign(commitStrategy, writeProtocol) + .second; +} + +// static +std::shared_ptr WriteProtocol::getWriteProtocol( + CommitStrategy commitStrategy) { + const auto iter = registeredWriteProtocols().find(commitStrategy); + // Fail if no WriteProtocol has been registered for the given CommitStrategy. + VELOX_CHECK( + iter != registeredWriteProtocols().end(), + "No write protocol found for commit strategy {}", + commitStrategyToString(commitStrategy)); + return iter->second; +} + +RowVectorPtr DefaultWriteProtocol::commit( + const WriteInfo& writeInfo, + velox::memory::MemoryPool* FOLLY_NONNULL pool) { + return std::make_shared( + pool, + ROW({"rowCount"}, {BIGINT()}), + BufferPtr(nullptr), + 1, + std::vector{std::make_shared>( + pool, 1, false, BIGINT(), writeInfo.numWrittenRows())}); +} + +} // namespace facebook::velox::connector diff --git a/velox/connectors/WriteProtocol.h b/velox/connectors/WriteProtocol.h new file mode 100644 index 00000000000..670609907e1 --- /dev/null +++ b/velox/connectors/WriteProtocol.h @@ -0,0 +1,123 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/vector/ComplexVector.h" + +namespace facebook::velox::connector { + +class ConnectorInsertTableHandle; +class ConnectorQueryCtx; + +/// Interface to provide key parameters for writers. Ex., write and commit +/// locations, append or overwrite, etc. +class WriterParameters { + public: + virtual ~WriterParameters() = default; +}; + +/// Interface that includes all info from write. It will be passed to commit() +/// of WriteProtocol. +class WriteInfo { + public: + virtual ~WriteInfo() = default; + + virtual vector_size_t numWrittenRows() const = 0; +}; + +/// Abstraction for write behaviors. Systems register WriteProtocols +/// by CommitStrategy. Writers call getWriteProtocol() to get the registered +/// instance of the WriteProtocol when needed. +class WriteProtocol { + public: + /// Represents the commit strategy of a write protocol. + enum class CommitStrategy { + kNoCommit, // No more commit actions are needed. + kTaskCommit // Task level commit is needed. + }; + + virtual ~WriteProtocol() {} + + /// Return the commit strategy of the write protocol. It will be the commit + /// strategy that the write protocol registers for. + virtual CommitStrategy commitStrategy() const = 0; + + /// Return a string encoding of the given commit strategy. + static std::string commitStrategyToString(CommitStrategy commitStrategy) { + switch (commitStrategy) { + case CommitStrategy::kNoCommit: + return "NO_COMMIT"; + case CommitStrategy::kTaskCommit: + return "TASK_COMMIT"; + default: + VELOX_UNREACHABLE(); + } + } + + /// Perform actions of commit. It would be called by the writers and could + /// return outputs that would be included in writer outputs. Return nullptr if + /// the commit action does not need to add output to the table writer output. + virtual RowVectorPtr commit( + const WriteInfo& writeInfo, + velox::memory::MemoryPool* FOLLY_NONNULL pool) { + return nullptr; + } + + /// Return parameters for writers. Ex., write and commit locations. Return + /// nullptr if the writer does not need parameters from the write protocol. + virtual std::shared_ptr getWriterParameters( + const std::shared_ptr& + tableHandle, + const velox::connector::ConnectorQueryCtx* FOLLY_NONNULL + connectorQueryCtx) const = 0; + + /// Register a WriteProtocol implementation for the given CommitStrategy. If + /// the CommitStrategy has already been registered, it will replace the old + /// WriteProtocol implementation with the new one and return false; otherwise + /// return true. + static bool registerWriteProtocol( + CommitStrategy commitStrategy, + std::shared_ptr writeProtocol); + + /// Return the instance of the WriteProtocol registered for + /// the given CommitStrategy. + static std::shared_ptr getWriteProtocol( + CommitStrategy commitStrategy); +}; + +class DefaultWriteProtocol : public WriteProtocol { + public: + ~DefaultWriteProtocol() override {} + + CommitStrategy commitStrategy() const override { + return CommitStrategy::kNoCommit; + } + + RowVectorPtr commit( + const WriteInfo& writeInfo, + velox::memory::MemoryPool* FOLLY_NONNULL pool) override; + + std::shared_ptr getWriterParameters( + const std::shared_ptr& + tableHandle, + const velox::connector::ConnectorQueryCtx* FOLLY_NONNULL + connectorQueryCtx) const override { + return std::make_shared(); + } +}; + +} // namespace facebook::velox::connector diff --git a/velox/connectors/hive/CMakeLists.txt b/velox/connectors/hive/CMakeLists.txt index 3b91864bc38..ac93927a94b 100644 --- a/velox/connectors/hive/CMakeLists.txt +++ b/velox/connectors/hive/CMakeLists.txt @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -add_library(velox_hive_connector OBJECT HiveConnector.cpp FileHandle.cpp) +add_library(velox_hive_connector OBJECT HiveConnector.cpp FileHandle.cpp + HiveWriteProtocol.cpp) target_link_libraries(velox_hive_connector velox_connector velox_dwio_dwrf_reader velox_dwio_dwrf_writer velox_file) diff --git a/velox/connectors/hive/HiveConnector.cpp b/velox/connectors/hive/HiveConnector.cpp index 44eefaf8a41..c73a49486bf 100644 --- a/velox/connectors/hive/HiveConnector.cpp +++ b/velox/connectors/hive/HiveConnector.cpp @@ -16,6 +16,7 @@ #include "velox/connectors/hive/HiveConnector.h" +#include "velox/common/base/Fs.h" #include "velox/dwio/common/InputStream.h" #include "velox/dwio/common/ReaderFactory.h" #include "velox/expression/FieldReference.h" @@ -23,6 +24,10 @@ #include "velox/type/Type.h" #include "velox/type/Variant.h" +#include +#include +#include + #include using namespace facebook::velox::dwrf; @@ -80,29 +85,49 @@ std::string HiveTableHandle::toString() const { } HiveDataSink::HiveDataSink( - std::shared_ptr inputType, - const std::string& filePath, - velox::memory::MemoryPool* memoryPool) - : inputType_(inputType) { + RowTypePtr inputType, + std::shared_ptr insertTableHandle, + const ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx) + : inputType_(std::move(inputType)), + insertTableHandle_(std::move(insertTableHandle)), + connectorQueryCtx_(connectorQueryCtx) { + if (!insertTableHandle_->isPartitioned()) { + writers_.emplace_back(createWriter()); + } +} + +void HiveDataSink::appendData(VectorPtr input) { + // For the time being the hive data sink supports one file. + // To extend it we can create a new writer for every + // partition. + if (writers_.empty()) { + writers_.emplace_back(createWriter()); + } + writers_[0]->write(input); +} + +void HiveDataSink::close() { + for (const auto& writer : writers_) { + writer->close(); + } +} + +std::unique_ptr HiveDataSink::createWriter() { auto config = std::make_shared(); // TODO: Wire up serde properties to writer configs. facebook::velox::dwrf::WriterOptions options; options.config = config; - options.schema = inputType; + options.schema = inputType_; // Without explicitly setting flush policy, the default memory based flush // policy is used. - auto sink = facebook::velox::dwio::common::DataSink::create(filePath); - writer_ = std::make_unique(options, std::move(sink), *memoryPool); -} - -void HiveDataSink::appendData(VectorPtr input) { - writer_->write(input); -} - -void HiveDataSink::close() { - writer_->close(); + auto fileName = + boost::lexical_cast(boost::uuids::random_generator()()); + auto sink = dwio::common::DataSink::create( + fs::path(insertTableHandle_->locationHandle()->writePath()) / fileName); + return std::make_unique( + options, std::move(sink), *connectorQueryCtx_->memoryPool()); } namespace { @@ -163,7 +188,7 @@ static void makeFieldSpecs( std::shared_ptr makeScanSpec( const SubfieldFilters& filters, - const std::shared_ptr& rowType) { + const RowTypePtr& rowType) { auto spec = std::make_shared("root"); makeFieldSpecs("", 0, rowType, spec.get()); @@ -186,7 +211,7 @@ std::shared_ptr makeScanSpec( } // namespace HiveDataSource::HiveDataSource( - const std::shared_ptr& outputType, + const RowTypePtr& outputType, const std::shared_ptr& tableHandle, const std::unordered_map< std::string, @@ -490,7 +515,7 @@ void HiveDataSource::addSplit(std::shared_ptr split) { std::shared_ptr cs; if (columnNames.empty()) { - static const std::shared_ptr kEmpty{ROW({}, {})}; + static const RowTypePtr kEmpty{ROW({}, {})}; cs = std::make_shared(kEmpty); } else { cs = std::make_shared(fileType, columnNames); diff --git a/velox/connectors/hive/HiveConnector.h b/velox/connectors/hive/HiveConnector.h index 8ed1ab3a3a4..36df6b7b8eb 100644 --- a/velox/connectors/hive/HiveConnector.h +++ b/velox/connectors/hive/HiveConnector.h @@ -51,6 +51,10 @@ class HiveColumnHandle : public ColumnHandle { return dataType_; } + bool isPartitionKey() const { + return columnType_ == ColumnType::kPartitionKey; + } + private: const std::string name_; const ColumnType columnType_; @@ -92,38 +96,122 @@ class HiveTableHandle : public ConnectorTableHandle { const core::TypedExprPtr remainingFilter_; }; +/// Location related properties of the Hive table to be written +class LocationHandle { + public: + enum class TableType { + kNew, // Write to a new table to be created. + kExisting, // Write to an existing table. + kTemporary, // Write to a temporary table. + }; + + enum class WriteMode { + // Write to a staging directory and then move to the target directory + // after write finishes. + kStageAndMoveToTargetDirectory, + // Directly write to the target directory to be created. + kDirectToTargetNewDirectory, + // Directly write to the existing target directory. + kDirectToTargetExistingDirectory, + }; + + LocationHandle( + std::string targetPath, + std::string writePath, + TableType tableType, + WriteMode writeMode) + : targetPath_(std::move(targetPath)), + writePath_(std::move(writePath)), + tableType_(tableType), + writeMode_(writeMode) {} + + const std::string& targetPath() const { + return targetPath_; + } + + const std::string& writePath() const { + return writePath_; + } + + TableType tableType() const { + return tableType_; + } + + WriteMode writeMode() const { + return writeMode_; + } + + private: + // Target directory path. + const std::string targetPath_; + // Staging directory path. + const std::string writePath_; + // Whether the table to be written is new, already existing or temporary. + const TableType tableType_; + // How the target path and directory path could be used. + const WriteMode writeMode_; +}; + /** * Represents a request for Hive write */ class HiveInsertTableHandle : public ConnectorInsertTableHandle { public: - explicit HiveInsertTableHandle(const std::string& filePath) - : filePath_(filePath) {} + HiveInsertTableHandle( + std::vector> inputColumns, + std::shared_ptr locationHandle) + : inputColumns_(std::move(inputColumns)), + locationHandle_(std::move(locationHandle)) {} - const std::string& filePath() const { - return filePath_; + virtual ~HiveInsertTableHandle() = default; + + const std::vector>& inputColumns() + const { + return inputColumns_; + } + + const std::shared_ptr& locationHandle() const { + return locationHandle_; + } + + bool isPartitioned() const { + return std::any_of( + inputColumns_.begin(), inputColumns_.end(), [](auto column) { + return column->isPartitionKey(); + }); + } + + bool isCreateTable() const { + return locationHandle_->tableType() == LocationHandle::TableType::kNew; } - virtual ~HiveInsertTableHandle() {} + bool isInsertTable() const { + return locationHandle_->tableType() == LocationHandle::TableType::kExisting; + } private: - const std::string filePath_; + const std::vector> inputColumns_; + const std::shared_ptr locationHandle_; }; class HiveDataSink : public DataSink { public: explicit HiveDataSink( - std::shared_ptr inputType, - const std::string& filePath, - velox::memory::MemoryPool* FOLLY_NONNULL memoryPool); + RowTypePtr inputType, + std::shared_ptr insertTableHandle, + const ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx); void appendData(VectorPtr input) override; void close() override; private: - const std::shared_ptr inputType_; - std::unique_ptr writer_; + std::unique_ptr createWriter(); + + const RowTypePtr inputType_; + const std::shared_ptr insertTableHandle_; + const ConnectorQueryCtx* connectorQueryCtx_; + std::vector> writers_; }; class HiveConnector; @@ -131,7 +219,7 @@ class HiveConnector; class HiveDataSource : public DataSource { public: HiveDataSource( - const std::shared_ptr& outputType, + const RowTypePtr& outputType, const std::shared_ptr& tableHandle, const std::unordered_map< std::string, @@ -187,7 +275,7 @@ class HiveDataSource : public DataSource { /// Clear split_, reader_ and rowReader_ after split has been fully processed. void resetSplit(); - const std::shared_ptr outputType_; + const RowTypePtr outputType_; // Column handles for the partition key columns keyed on partition key column // name. std::unordered_map> @@ -203,7 +291,7 @@ class HiveDataSource : public DataSource { std::unique_ptr reader_; std::unique_ptr rowReader_; std::unique_ptr remainingFilterExprSet_; - std::shared_ptr readerOutputType_; + RowTypePtr readerOutputType_; bool emptySplit_; dwio::common::RuntimeStatistics runtimeStats_; @@ -223,6 +311,19 @@ class HiveDataSource : public DataSource { folly::Executor* FOLLY_NULLABLE executor_; }; +/// Hive connector configs +class HiveConfig { + public: + /// Can new data be inserted into existing partitions or existing + /// unpartitioned tables + static constexpr const char* kImmutablePartitions = + "hive.immutable-partitions"; + + static bool isImmutablePartitions(const Config* baseConfig) { + return baseConfig->get(kImmutablePartitions, true); + } +}; + class HiveConnector final : public Connector { public: explicit HiveConnector( @@ -235,7 +336,7 @@ class HiveConnector final : public Connector { } std::shared_ptr createDataSource( - const std::shared_ptr& outputType, + const RowTypePtr& outputType, const std::shared_ptr& tableHandle, const std::unordered_map< std::string, @@ -254,18 +355,15 @@ class HiveConnector final : public Connector { } std::shared_ptr createDataSink( - std::shared_ptr inputType, + RowTypePtr inputType, std::shared_ptr connectorInsertTableHandle, ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx) override final { auto hiveInsertHandle = std::dynamic_pointer_cast( connectorInsertTableHandle); - VELOX_CHECK( - hiveInsertHandle != nullptr, - "Hive connector expecting hive write handle!"); + VELOX_CHECK_NOT_NULL( + hiveInsertHandle, "Hive connector expecting hive write handle!"); return std::make_shared( - inputType, - hiveInsertHandle->filePath(), - connectorQueryCtx->memoryPool()); + inputType, hiveInsertHandle, connectorQueryCtx); } folly::Executor* FOLLY_NULLABLE executor() { diff --git a/velox/connectors/hive/HiveWriteProtocol.cpp b/velox/connectors/hive/HiveWriteProtocol.cpp new file mode 100644 index 00000000000..b5e22d25065 --- /dev/null +++ b/velox/connectors/hive/HiveWriteProtocol.cpp @@ -0,0 +1,102 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/HiveWriteProtocol.h" +#include "velox/connectors/hive/HiveConnector.h" + +#include +#include +#include + +namespace facebook::velox::connector::hive { + +namespace { + +std::string makeUuid() { + return boost::lexical_cast(boost::uuids::random_generator()()); +} + +} // namespace + +std::shared_ptr +HiveNoCommitWriteProtocol::getWriterParameters( + const std::shared_ptr& tableWriteHandle, + const ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx) const { + auto hiveTableWriteHandle = + std::dynamic_pointer_cast(tableWriteHandle); + VELOX_CHECK_NOT_NULL( + hiveTableWriteHandle, + "This write protocol cannot be used for non-Hive connector"); + VELOX_USER_CHECK( + !hiveTableWriteHandle->isPartitioned(), + "Getting write parameters for partitioned Hive tables is not implemented yet."); + VELOX_USER_CHECK( + hiveTableWriteHandle->isCreateTable() || + !HiveConfig::isImmutablePartitions(connectorQueryCtx->config()), + "Unpartitioned Hive tables are immutable"); + + auto targetFileName = fmt::format( + "{}_{}_{}", + connectorQueryCtx->taskId(), + connectorQueryCtx->driverId(), + makeUuid()); + + return std::make_shared( + hiveTableWriteHandle->isCreateTable() + ? HiveWriterParameters::UpdateMode::kNew + : HiveWriterParameters::UpdateMode::kAppend, + targetFileName, + hiveTableWriteHandle->locationHandle()->targetPath(), + targetFileName, + hiveTableWriteHandle->locationHandle()->writePath()); +} + +std::shared_ptr +HiveTaskCommitWriteProtocol::getWriterParameters( + const std::shared_ptr& tableWriteHandle, + const ConnectorQueryCtx* FOLLY_NONNULL connectorQueryCtx) const { + auto hiveTableWriteHandle = + std::dynamic_pointer_cast(tableWriteHandle); + VELOX_CHECK_NOT_NULL( + hiveTableWriteHandle, + "This write protocol cannot be used for non-Hive connector"); + VELOX_USER_CHECK( + !hiveTableWriteHandle->isPartitioned(), + "Getting write parameters for partitioned Hive tables is not implemented yet."); + VELOX_USER_CHECK( + hiveTableWriteHandle->isCreateTable() || + !HiveConfig::isImmutablePartitions(connectorQueryCtx->config()), + "Unpartitioned Hive tables are immutable"); + + auto targetFileName = fmt::format( + "{}_{}_{}", + connectorQueryCtx->taskId(), + connectorQueryCtx->driverId(), + 0); + auto writeFileName = + fmt::format(".tmp.velox.{}_{}", targetFileName, makeUuid()); + + return std::make_shared( + hiveTableWriteHandle->isCreateTable() + ? HiveWriterParameters::UpdateMode::kNew + : HiveWriterParameters::UpdateMode::kAppend, + targetFileName, + hiveTableWriteHandle->locationHandle()->targetPath(), + writeFileName, + hiveTableWriteHandle->locationHandle()->writePath()); +} + +} // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/HiveWriteProtocol.h b/velox/connectors/hive/HiveWriteProtocol.h new file mode 100644 index 00000000000..cb328d4abb8 --- /dev/null +++ b/velox/connectors/hive/HiveWriteProtocol.h @@ -0,0 +1,145 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/WriteProtocol.h" + +namespace facebook::velox::connector::hive { + +/// Parameters for Hive writers. +class HiveWriterParameters : public WriterParameters { + public: + enum class UpdateMode { + kNew, // Write files to a new directory. + kAppend, // Append files to an existing directory. + kOverwrite, // Overwrite an existing directory. + }; + + /// @param updateMode Write the files to a new directory, or append to an + /// existing directory or overwrite an existing directory. + /// @param targetFileName The final name of a file after committing. + /// @param targetDirectory The final directory that a file should be in after + /// committing. + /// @param writeFileName The temporary name of the file that a running writer + /// writes to. If a running writer writes directory to the target file, set + /// writeFileName to targetFileName by default. + /// @param writeDirectory The temporary directory that a running writer writes + /// to. If a running writer writes directory to the target directory, set + /// writeDirectory to targetDirectory by default. + HiveWriterParameters( + UpdateMode updateMode, + std::string targetFileName, + std::string targetDirectory, + std::optional writeFileName = std::nullopt, + std::optional writeDirectory = std::nullopt) + : WriterParameters(), + updateMode_(updateMode), + targetFileName_(std::move(targetFileName)), + targetDirectory_(std::move(targetDirectory)), + writeFileName_(writeFileName.value_or(targetFileName_)), + writeDirectory_(writeDirectory.value_or(targetDirectory_)) {} + + UpdateMode updateMode() const { + return updateMode_; + } + + static std::string updateModeToString(UpdateMode updateMode) { + switch (updateMode) { + case UpdateMode::kNew: + return "NEW"; + case UpdateMode::kAppend: + return "APPEND"; + case UpdateMode::kOverwrite: + return "OVERWRITE"; + default: + VELOX_UNSUPPORTED("Unsupported update mode."); + } + } + + const std::string& targetFileName() const { + return targetFileName_; + } + + const std::string& writeFileName() const { + return writeFileName_; + } + + const std::string& targetDirectory() const { + return targetDirectory_; + } + + const std::string& writeDirectory() const { + return writeDirectory_; + } + + private: + const UpdateMode updateMode_; + const std::string targetFileName_; + const std::string targetDirectory_; + const std::string writeFileName_; + const std::string writeDirectory_; +}; + +/// WriteProtocol base implementation for Hive writes. WriterParameters have the +/// write file name the same as the target file name, so no commit is needed for +/// file move. +class HiveNoCommitWriteProtocol : public DefaultWriteProtocol { + public: + ~HiveNoCommitWriteProtocol() override {} + + CommitStrategy commitStrategy() const override { + return CommitStrategy::kNoCommit; + } + + std::shared_ptr getWriterParameters( + const std::shared_ptr& + tableHandle, + const velox::connector::ConnectorQueryCtx* FOLLY_NONNULL + connectorQueryCtx) const override; + + static bool registerProtocol() { + return registerWriteProtocol( + CommitStrategy::kNoCommit, + std::make_shared()); + } +}; + +/// WriteProtocol implementation for Hive writes. WriterParameters have the +/// write file name different from the target file name. So commit is needed to +/// move write file to the target location. +class HiveTaskCommitWriteProtocol : public DefaultWriteProtocol { + public: + ~HiveTaskCommitWriteProtocol() override {} + + CommitStrategy commitStrategy() const override { + return CommitStrategy::kTaskCommit; + } + + std::shared_ptr getWriterParameters( + const std::shared_ptr& + tableHandle, + const velox::connector::ConnectorQueryCtx* FOLLY_NONNULL + connectorQueryCtx) const override; + + static bool registerProtocol() { + return registerWriteProtocol( + CommitStrategy::kTaskCommit, + std::make_shared()); + } +}; + +} // namespace facebook::velox::connector::hive diff --git a/velox/connectors/hive/tests/CMakeLists.txt b/velox/connectors/hive/tests/CMakeLists.txt index ddf46ba568d..eb45a305998 100644 --- a/velox/connectors/hive/tests/CMakeLists.txt +++ b/velox/connectors/hive/tests/CMakeLists.txt @@ -11,8 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -add_executable(velox_hive_connector_test HivePartitionFunctionTest.cpp - FileHandleTest.cpp) +add_executable( + velox_hive_connector_test HivePartitionFunctionTest.cpp FileHandleTest.cpp + HiveWriteProtocolTest.cpp) add_test(velox_hive_connector_test velox_hive_connector_test) target_link_libraries( diff --git a/velox/connectors/hive/tests/HiveWriteProtocolTest.cpp b/velox/connectors/hive/tests/HiveWriteProtocolTest.cpp new file mode 100644 index 00000000000..d7014abfb3e --- /dev/null +++ b/velox/connectors/hive/tests/HiveWriteProtocolTest.cpp @@ -0,0 +1,71 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/hive/HiveWriteProtocol.h" +#include "velox/connectors/hive/HiveConnector.h" +#include "velox/core/QueryCtx.h" + +#include "gtest/gtest.h" + +using namespace facebook::velox; +using namespace facebook::velox::connector; +using namespace facebook::velox::connector::hive; +using namespace facebook::velox::core; + +TEST(HiveWriteProtocolTest, writerParameters) { + HiveNoCommitWriteProtocol::registerProtocol(); + HiveTaskCommitWriteProtocol::registerProtocol(); + + auto queryCtx = QueryCtx::createForTest(); + ConnectorQueryCtx connectorQueryCtx( + queryCtx->pool(), + queryCtx->getConnectorConfig(HiveConnectorFactory::kHiveConnectorName), + nullptr, + queryCtx->mappedMemory(), + "test_task_id", + "test_plan_node_id", + 0); + std::vector> inputColumns{ + std::make_shared( + "col", HiveColumnHandle::ColumnType::kRegular, BIGINT())}; + auto insertTableHandle = std::make_shared( + inputColumns, + std::make_shared( + "test_table_directory", + "test_table_directory", + LocationHandle::TableType::kNew, + LocationHandle::WriteMode::kDirectToTargetNewDirectory)); + + auto noCommitWriteProtocol = + WriteProtocol::getWriteProtocol(WriteProtocol::CommitStrategy::kNoCommit); + auto noCommitWriterParameters = + std::dynamic_pointer_cast( + noCommitWriteProtocol->getWriterParameters( + insertTableHandle, &connectorQueryCtx)); + ASSERT_EQ( + noCommitWriterParameters->writeFileName(), + noCommitWriterParameters->targetFileName()); + + auto taskCommitWriteProtocol = WriteProtocol::getWriteProtocol( + WriteProtocol::CommitStrategy::kTaskCommit); + auto taskCommitWriterParameters = + std::dynamic_pointer_cast( + taskCommitWriteProtocol->getWriterParameters( + insertTableHandle, &connectorQueryCtx)); + ASSERT_NE( + taskCommitWriterParameters->writeFileName(), + taskCommitWriterParameters->targetFileName()); +} diff --git a/velox/examples/ScanAndSort.cpp b/velox/examples/ScanAndSort.cpp index 0841fe4afe4..3988bc96db4 100644 --- a/velox/examples/ScanAndSort.cpp +++ b/velox/examples/ScanAndSort.cpp @@ -14,12 +14,14 @@ * limitations under the License. */ +#include "velox/common/base/Fs.h" #include "velox/common/file/FileSystems.h" #include "velox/common/memory/Memory.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/dwio/dwrf/reader/DwrfReader.h" #include "velox/exec/Task.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/type/Type.h" @@ -29,6 +31,7 @@ #include using namespace facebook::velox; +using exec::test::HiveConnectorTestBase; // This file contains a step-by-step minimal example of a workflow that: // @@ -73,13 +76,6 @@ int main(int argc, char** argv) { LOG(INFO) << rowVector->toString(i); } - // Create a temporary dir to store the local file created. Note that this - // directory is automatically removed when the `tempDir` object runs out of - // scope. - auto tempDir = exec::test::TempDirectoryPath::create(); - const std::string filePath = tempDir->path + "/file1.dwrf"; - LOG(INFO) << "Writing dwrf file to '" << filePath << "'."; - // In order to read and write data and files from storage, we need to use a // Connector. Let's instantiate and register a HiveConnector for this // example: @@ -100,6 +96,11 @@ int main(int argc, char** argv) { filesystems::registerLocalFileSystem(); dwrf::registerDwrfReaderFactory(); + // Create a temporary dir to store the local file created. Note that this + // directory is automatically removed when the `tempDir` object runs out of + // scope. + auto tempDir = exec::test::TempDirectoryPath::create(); + // Once we finalize setting up the Hive connector, let's define our query // plan. We use the helper `PlanBuilder` class to generate the query plan // for this example, but this is usually done programatically based on the @@ -117,8 +118,12 @@ int main(int argc, char** argv) { inputRowType->names(), std::make_shared( kHiveConnectorId, - std::make_shared( - filePath))) + HiveConnectorTestBase::makeHiveInsertTableHandle( + inputRowType->names(), + inputRowType->children(), + {}, + HiveConnectorTestBase::makeLocationHandle( + tempDir->path)))) .planFragment(); // Task is the top-level execution concept. A task needs a taskId (as a @@ -166,14 +171,17 @@ int main(int argc, char** argv) { // HiveConnectorSplit for each file, using the same HiveConnector id defined // above, the local file path (the "file:" prefix specifies which FileSystem // to use; local, in this case), and the file format (DWRF/ORC). - auto connectorSplit = std::make_shared( - kHiveConnectorId, "file:" + filePath, dwio::common::FileFormat::DWRF); - - // Wrap it in a `Split` object and add to the task. We need to specify to - // which operator we're adding the split (that's why we captured the - // TableScan's id above). Here we could pump subsequent split/files into the - // TableScan. - readTask->addSplit(scanNodeId, exec::Split{connectorSplit}); + for (auto& filePath : fs::directory_iterator(tempDir->path)) { + auto connectorSplit = std::make_shared( + kHiveConnectorId, + "file:" + filePath.path().string(), + dwio::common::FileFormat::DWRF); + // Wrap it in a `Split` object and add to the task. We need to specify to + // which operator we're adding the split (that's why we captured the + // TableScan's id above). Here we could pump subsequent split/files into the + // TableScan. + readTask->addSplit(scanNodeId, exec::Split{connectorSplit}); + } // Signal that no more splits will be added. After this point, calling next() // on the task will start the plan execution using the current thread. diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index ed9eb00649d..c93739cddb4 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -83,7 +83,9 @@ OperatorCtx::createConnectorQueryCtx( driverCtx_->task->queryCtx()->getConnectorConfig(connectorId), expressionEvaluator_.get(), driverCtx_->task->queryCtx()->mappedMemory(), - fmt::format("{}.{}", driverCtx_->task->taskId(), planNodeId)); + taskId(), + planNodeId, + driverCtx_->driverId); } std::optional OperatorCtx::makeSpillConfig( diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index cb442c84ea4..6e03e200e0a 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -17,10 +17,12 @@ #include "velox/dwio/common/DataSink.h" #include "velox/exec/tests/utils/HiveConnectorTestBase.h" #include "velox/exec/tests/utils/PlanBuilder.h" +#include "velox/exec/tests/utils/TempDirectoryPath.h" using namespace facebook::velox; using namespace facebook::velox::exec; using namespace facebook::velox::exec::test; +using namespace facebook::velox::connector; using namespace facebook::velox::connector::hive; class TableWriteTest : public HiveConnectorTestBase { @@ -33,6 +35,16 @@ class TableWriteTest : public HiveConnectorTestBase { return BaseVector::createConstant(value, size, pool_.get()); } + std::vector> + makeHiveConnectorSplits( + const std::shared_ptr& directoryPath) { + std::vector> splits; + for (auto& filePath : fs::directory_iterator(directoryPath->path)) { + splits.push_back(makeHiveConnectorSplit(filePath.path().string())); + } + return splits; + } + RowTypePtr rowType_{ ROW({"c0", "c1", "c2", "c3", "c4", "c5"}, {BIGINT(), INTEGER(), SMALLINT(), REAL(), DOUBLE(), VARCHAR()})}; @@ -46,7 +58,7 @@ TEST_F(TableWriteTest, scanFilterProjectWrite) { writeToFile(filePaths[i]->path, vectors[i]); } - auto outputFile = TempFilePath::create(); + auto outputDirectory = TempDirectoryPath::create(); createDuckDbTable(vectors); @@ -58,16 +70,19 @@ TEST_F(TableWriteTest, scanFilterProjectWrite) { std::vector columnNames = { "c0", "c1", "c1_plus_c2", "substr_c5"}; - auto plan = - planBuilder - .tableWrite( - columnNames, - std::make_shared( - kHiveConnectorId, - std::make_shared(outputFile->path)), - "rows") - .project({"rows"}) - .planNode(); + auto plan = planBuilder + .tableWrite( + columnNames, + std::make_shared( + kHiveConnectorId, + makeHiveInsertTableHandle( + columnNames, + rowType_->children(), + {}, + makeLocationHandle(outputDirectory->path))), + "rows") + .project({"rows"}) + .planNode(); assertQuery(plan, filePaths, "SELECT count(*) FROM tmp WHERE c0 <> 0"); @@ -79,7 +94,7 @@ TEST_F(TableWriteTest, scanFilterProjectWrite) { auto rowType = ROW(std::move(columnNames), std::move(types)); assertQuery( PlanBuilder().tableScan(rowType).planNode(), - {outputFile}, + makeHiveConnectorSplits(outputDirectory), "SELECT c0, c1, c1 + c2, substr(c5, 1, 1) FROM tmp WHERE c0 <> 0"); } @@ -92,23 +107,26 @@ TEST_F(TableWriteTest, renameAndReorderColumns) { writeToFile(filePaths[i]->path, vectors[i]); } - auto outputFile = TempFilePath::create(); + auto outputDirectory = TempDirectoryPath::create(); createDuckDbTable(vectors); auto tableRowType = ROW({"d", "c", "b"}, {VARCHAR(), DOUBLE(), INTEGER()}); - auto plan = - PlanBuilder() - .tableScan(rowType) - .tableWrite( - tableRowType, - {"x", "y", "z"}, - std::make_shared( - kHiveConnectorId, - std::make_shared(outputFile->path)), - "rows") - .project({"rows"}) - .planNode(); + auto plan = PlanBuilder() + .tableScan(rowType) + .tableWrite( + tableRowType, + {"x", "y", "z"}, + std::make_shared( + kHiveConnectorId, + makeHiveInsertTableHandle( + {"x", "y", "z"}, + tableRowType->children(), + {}, + makeLocationHandle(outputDirectory->path))), + "rows") + .project({"rows"}) + .planNode(); assertQuery(plan, filePaths, "SELECT count(*) FROM tmp"); @@ -116,7 +134,7 @@ TEST_F(TableWriteTest, renameAndReorderColumns) { PlanBuilder() .tableScan(ROW({"x", "y", "z"}, {VARCHAR(), DOUBLE(), INTEGER()})) .planNode(), - {outputFile}, + makeHiveConnectorSplits(outputDirectory), "SELECT d, c, b FROM tmp"); } @@ -128,20 +146,22 @@ TEST_F(TableWriteTest, directReadWrite) { writeToFile(filePaths[i]->path, vectors[i]); } - auto outputFile = TempFilePath::create(); + auto outputDirectory = TempDirectoryPath::create(); createDuckDbTable(vectors); - - auto plan = - PlanBuilder() - .tableScan(rowType_) - .tableWrite( - rowType_->names(), - std::make_shared( - kHiveConnectorId, - std::make_shared(outputFile->path)), - "rows") - .project({"rows"}) - .planNode(); + auto plan = PlanBuilder() + .tableScan(rowType_) + .tableWrite( + rowType_->names(), + std::make_shared( + kHiveConnectorId, + makeHiveInsertTableHandle( + rowType_->names(), + rowType_->children(), + {}, + makeLocationHandle(outputDirectory->path))), + "rows") + .project({"rows"}) + .planNode(); assertQuery(plan, filePaths, "SELECT count(*) FROM tmp"); @@ -151,7 +171,7 @@ TEST_F(TableWriteTest, directReadWrite) { assertQuery( PlanBuilder().tableScan(rowType_).planNode(), - {outputFile}, + makeHiveConnectorSplits(outputDirectory), "SELECT * FROM tmp"); } @@ -180,43 +200,47 @@ TEST_F(TableWriteTest, constantVectors) { createDuckDbTable({vector}); - auto outputFile = TempFilePath::create(); - auto op = - PlanBuilder() - .values({vector}) - .tableWrite( - rowType->names(), - std::make_shared( - kHiveConnectorId, - std::make_shared(outputFile->path)), - "rows") - .project({"rows"}) - .planNode(); + auto outputDirectory = TempDirectoryPath::create(); + auto op = PlanBuilder() + .values({vector}) + .tableWrite( + rowType->names(), + std::make_shared( + kHiveConnectorId, + makeHiveInsertTableHandle( + rowType_->names(), + rowType_->children(), + {}, + makeLocationHandle(outputDirectory->path))), + "rows") + .project({"rows"}) + .planNode(); assertQuery(op, fmt::format("SELECT {}", size)); assertQuery( PlanBuilder().tableScan(rowType).planNode(), - {outputFile}, + makeHiveConnectorSplits(outputDirectory), "SELECT * FROM tmp"); } // Test TableWriter create empty ORC or not based on the config TEST_F(TableWriteTest, writeEmptyFile) { - auto outputFile = TempFilePath::create(); - fs::remove(outputFile->path); - - auto plan = - PlanBuilder() - .tableScan(rowType_) - .filter("false") - .tableWrite( - rowType_->names(), - std::make_shared( - kHiveConnectorId, - std::make_shared(outputFile->path)), - "rows") - .planNode(); + auto outputDirectory = TempDirectoryPath::create(); + auto plan = PlanBuilder() + .tableScan(rowType_) + .filter("false") + .tableWrite( + rowType_->names(), + std::make_shared( + kHiveConnectorId, + makeHiveInsertTableHandle( + rowType_->names(), + rowType_->children(), + {}, + makeLocationHandle(outputDirectory->path))), + "rows") + .planNode(); auto execute = [](const std::shared_ptr& plan, std::shared_ptr queryCtx = @@ -228,11 +252,11 @@ TEST_F(TableWriteTest, writeEmptyFile) { }; execute(plan); - ASSERT_FALSE(fs::exists(outputFile->path)); + ASSERT_TRUE(fs::is_empty(outputDirectory->path)); auto queryCtx = core::QueryCtx::createForTest(); queryCtx->setConfigOverridesUnsafe( {{core::QueryConfig::kCreateEmptyFiles, "true"}}); execute(plan, queryCtx); - ASSERT_TRUE(fs::exists(outputFile->path)); + ASSERT_FALSE(fs::is_empty(outputDirectory->path)); } diff --git a/velox/exec/tests/utils/CMakeLists.txt b/velox/exec/tests/utils/CMakeLists.txt index aaa714449ba..46e8f6ba1f5 100644 --- a/velox/exec/tests/utils/CMakeLists.txt +++ b/velox/exec/tests/utils/CMakeLists.txt @@ -44,4 +44,5 @@ target_link_libraries( velox_tpch_connector velox_presto_serializer velox_functions_prestosql - velox_aggregates) + velox_aggregates + velox_vector_test_lib) diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.cpp b/velox/exec/tests/utils/HiveConnectorTestBase.cpp index b4229bd33e6..f1f6b54ff7e 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.cpp +++ b/velox/exec/tests/utils/HiveConnectorTestBase.cpp @@ -152,6 +152,38 @@ HiveConnectorTestBase::makeHiveConnectorSplit( .build(); } +// static +std::shared_ptr +HiveConnectorTestBase::makeHiveInsertTableHandle( + const std::vector& tableColumnNames, + const std::vector& tableColumnTypes, + const std::vector& partitionedBy, + std::shared_ptr locationHandle) { + std::vector> + columnHandles; + for (int i = 0; i < tableColumnNames.size(); ++i) { + if (std::find( + partitionedBy.cbegin(), + partitionedBy.cend(), + tableColumnNames.at(i)) != partitionedBy.cend()) { + columnHandles.push_back( + std::make_shared( + tableColumnNames.at(i), + connector::hive::HiveColumnHandle::ColumnType::kPartitionKey, + tableColumnTypes.at(i))); + } else { + columnHandles.push_back( + std::make_shared( + tableColumnNames.at(i), + connector::hive::HiveColumnHandle::ColumnType::kRegular, + tableColumnTypes.at(i))); + } + } + + return std::make_shared( + columnHandles, locationHandle); +} + std::shared_ptr HiveConnectorTestBase::regularColumn( const std::string& name, diff --git a/velox/exec/tests/utils/HiveConnectorTestBase.h b/velox/exec/tests/utils/HiveConnectorTestBase.h index 20e17851cb7..9fab92df033 100644 --- a/velox/exec/tests/utils/HiveConnectorTestBase.h +++ b/velox/exec/tests/utils/HiveConnectorTestBase.h @@ -86,6 +86,39 @@ class HiveConnectorTestBase : public OperatorTestBase { remainingFilter); } + /// @param targetDirectory Final directory of the target table after commit. + /// @param writeDirectory Write directory of the target table before commit. + /// @param tableType Whether to create a new table, insert into an existing + /// table, or write a temporary table. + /// @param writeMode How to write to the target directory. + static std::shared_ptr makeLocationHandle( + std::string targetDirectory, + std::optional writeDirectory = std::nullopt, + connector::hive::LocationHandle::TableType tableType = + connector::hive::LocationHandle::TableType::kNew, + connector::hive::LocationHandle::WriteMode writeMode = connector::hive:: + LocationHandle::WriteMode::kDirectToTargetNewDirectory) { + return std::make_shared( + targetDirectory, + writeDirectory.value_or(targetDirectory), + tableType, + writeMode); + } + + /// Build a HiveInsertTableHandle. + /// @param tableColumnNames Column names of the target table. Corresponding + /// type of tableColumnNames[i] is tableColumnTypes[i]. + /// @param tableColumnTypes Column types of the target table. Corresponding + /// name of tableColumnTypes[i] is tableColumnNames[i]. + /// @param partitionedBy A list of partition columns of the target table. + /// @param locationHandle Location handle for the table write. + static std::shared_ptr + makeHiveInsertTableHandle( + const std::vector& tableColumnNames, + const std::vector& tableColumnTypes, + const std::vector& partitionedBy, + std::shared_ptr locationHandle); + static std::shared_ptr regularColumn( const std::string& name, const TypePtr& type);