Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 20 additions & 25 deletions velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
#include "velox/common/testutil/TestValue.h"
#include "velox/connectors/hive/HiveConnector.h"
#include "velox/core/QueryCtx.h"
#include "velox/dwio/parquet/RegisterParquetWriter.h"
#include "velox/dwio/parquet/tests/ParquetTestBase.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
#include "velox/exec/tests/utils/Cursor.h"
#include "velox/exec/tests/utils/PlanBuilder.h"
#include "velox/exec/tests/utils/QueryAssertions.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"

namespace {

using namespace facebook::velox;
using namespace facebook::velox::common;
using namespace facebook::velox::dwio::common;
Expand All @@ -44,6 +48,7 @@ class ParquetWriterTest : public ParquetTestBase {
->newConnector(
kHiveConnectorId, std::make_shared<core::MemConfig>());
connector::registerConnector(hiveConnector);
parquet::registerParquetWriterFactory();
}

std::unique_ptr<RowReader> createRowReaderWithSchema(
Expand Down Expand Up @@ -178,34 +183,24 @@ DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromHiveConfig) {
const auto data = makeRowVector({makeFlatVector<Timestamp>(
10'000, [](auto row) { return Timestamp(row, row); })});
const auto outputDirectory = TempDirectoryPath::create();
const auto plan =
PlanBuilder()
.values({data})
.tableWrite(
outputDirectory->getPath(), dwio::common::FileFormat::PARQUET)
.planNode();

CursorParameters params;
std::shared_ptr<folly::Executor> executor =
std::make_shared<folly::CPUThreadPoolExecutor>(
std::thread::hardware_concurrency());
std::shared_ptr<core::QueryCtx> queryCtx =
core::QueryCtx::create(executor.get());
std::unordered_map<std::string, std::string> session = {
{std::string(
connector::hive::HiveConfig::kParquetWriteTimestampUnitSession),
"6" /*kMicro*/}};
queryCtx->setConnectorSessionOverridesUnsafe(
kHiveConnectorId, std::move(session));
params.queryCtx = queryCtx;
params.planNode = plan;

auto addSplits = [&](exec::Task* task) {};
auto result = readCursor(params, addSplits);
ASSERT_TRUE(waitForTaskCompletion(result.first->task().get()));

auto writerOptions = std::make_shared<parquet::WriterOptions>();
writerOptions->parquetWriteTimestampUnit = TimestampUnit::kMicro;

const auto plan = PlanBuilder()
.values({data})
.tableWrite(
outputDirectory->getPath(),
dwio::common::FileFormat::PARQUET,
{},
writerOptions)
.planNode();
AssertQueryBuilder(plan).copyResults(pool_.get());
}
#endif

} // namespace

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
folly::Init init{&argc, &argv, false};
Expand Down
53 changes: 37 additions & 16 deletions velox/exec/tests/utils/PlanBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ namespace facebook::velox::exec::test {

namespace {

// TODO Avoid duplication.
static const std::string kHiveConnectorId = "test-hive";
static const std::string kTpchConnectorId = "test-tpch";

core::TypedExprPtr parseExpr(
const std::string& text,
const RowTypePtr& rowType,
Expand Down Expand Up @@ -132,7 +128,7 @@ PlanBuilder& PlanBuilder::tpchTableScan(
return TableScanBuilder(*this)
.outputType(rowType)
.tableHandle(std::make_shared<connector::tpch::TpchTableHandle>(
kTpchConnectorId, table, scaleFactor))
std::string(kTpchDefaultConnectorId), table, scaleFactor))
.assignments(assignmentsMap)
.endTableScan();
}
Expand Down Expand Up @@ -344,17 +340,38 @@ PlanBuilder& PlanBuilder::filter(const std::string& filter) {
PlanBuilder& PlanBuilder::tableWrite(
const std::string& outputDirectoryPath,
const dwio::common::FileFormat fileFormat,
const std::vector<std::string>& aggregates) {
return tableWrite(outputDirectoryPath, {}, 0, {}, {}, fileFormat, aggregates);
const std::vector<std::string>& aggregates,
const std::shared_ptr<dwio::common::WriterOptions>& options) {
return tableWrite(
outputDirectoryPath,
{},
0,
{},
{},
fileFormat,
aggregates,
kHiveDefaultConnectorId,
{},
options);
}

PlanBuilder& PlanBuilder::tableWrite(
const std::string& outputDirectoryPath,
const std::vector<std::string>& partitionBy,
const dwio::common::FileFormat fileFormat,
const std::vector<std::string>& aggregates) {
const std::vector<std::string>& aggregates,
const std::shared_ptr<dwio::common::WriterOptions>& options) {
return tableWrite(
outputDirectoryPath, partitionBy, 0, {}, {}, fileFormat, aggregates);
outputDirectoryPath,
partitionBy,
0,
{},
{},
fileFormat,
aggregates,
kHiveDefaultConnectorId,
{},
options);
}

PlanBuilder& PlanBuilder::tableWrite(
Expand All @@ -363,15 +380,19 @@ PlanBuilder& PlanBuilder::tableWrite(
int32_t bucketCount,
const std::vector<std::string>& bucketedBy,
const dwio::common::FileFormat fileFormat,
const std::vector<std::string>& aggregates) {
const std::vector<std::string>& aggregates,
const std::shared_ptr<dwio::common::WriterOptions>& options) {
return tableWrite(
outputDirectoryPath,
partitionBy,
bucketCount,
bucketedBy,
{},
fileFormat,
aggregates);
aggregates,
kHiveDefaultConnectorId,
{},
options);
}

PlanBuilder& PlanBuilder::tableWrite(
Expand All @@ -382,9 +403,9 @@ PlanBuilder& PlanBuilder::tableWrite(
const std::vector<std::shared_ptr<const HiveSortingColumn>>& sortBy,
const dwio::common::FileFormat fileFormat,
const std::vector<std::string>& aggregates,
const std::string& connectorId,
const std::string_view& connectorId,
const std::unordered_map<std::string, std::string>& serdeParameters,
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions) {
const std::shared_ptr<dwio::common::WriterOptions>& options) {
VELOX_CHECK_NOT_NULL(planNode_, "TableWrite cannot be the source node");
auto rowType = planNode_->outputType();

Expand Down Expand Up @@ -420,10 +441,10 @@ PlanBuilder& PlanBuilder::tableWrite(
bucketProperty,
common::CompressionKind_NONE,
serdeParameters,
writerOptions);
options);

auto insertHandle =
std::make_shared<core::InsertTableHandle>(connectorId, hiveHandle);
auto insertHandle = std::make_shared<core::InsertTableHandle>(
std::string(connectorId), hiveHandle);

std::shared_ptr<core::AggregationNode> aggregationNode;
if (!aggregates.empty()) {
Expand Down
22 changes: 15 additions & 7 deletions velox/exec/tests/utils/PlanBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ class PlanBuilder {
explicit PlanBuilder(memory::MemoryPool* pool = nullptr)
: PlanBuilder(std::make_shared<core::PlanNodeIdGenerator>(), pool) {}

static constexpr const std::string_view kHiveDefaultConnectorId{"test-hive"};
static constexpr const std::string_view kTpchDefaultConnectorId{"test-tpch"};

/// Add a TableScanNode to scan a Hive table.
///
/// @param outputType List of column names and types to read from the table.
Expand Down Expand Up @@ -272,7 +275,7 @@ class PlanBuilder {

PlanBuilder& planBuilder_;
std::string tableName_{"hive_table"};
std::string connectorId_{"test-hive"};
std::string connectorId_{kHiveDefaultConnectorId};
RowTypePtr outputType_;
std::vector<std::string> subfieldFilters_;
std::string remainingFilter_;
Expand Down Expand Up @@ -378,6 +381,7 @@ class PlanBuilder {
/// @param outputDirectoryPath Path to a directory to write data to.
/// @param fileFormat File format to use for the written data.
/// @param aggregates Aggregations for column statistics collection during
/// @param polymorphic options object to be passed to the writer.
/// write, supported aggregation types vary for different column types.
/// For example:
/// Boolean: count, countIf.
Expand All @@ -388,7 +392,8 @@ class PlanBuilder {
const std::string& outputDirectoryPath,
const dwio::common::FileFormat fileFormat =
dwio::common::FileFormat::DWRF,
const std::vector<std::string>& aggregates = {});
const std::vector<std::string>& aggregates = {},
const std::shared_ptr<dwio::common::WriterOptions>& options = nullptr);

/// Adds a TableWriteNode to write all input columns into a partitioned Hive
/// table without compression.
Expand All @@ -398,12 +403,14 @@ class PlanBuilder {
/// @param fileFormat File format to use for the written data.
/// @param aggregates Aggregations for column statistics collection during
/// write.
/// @param polymorphic options object to be passed to the writer.
PlanBuilder& tableWrite(
const std::string& outputDirectoryPath,
const std::vector<std::string>& partitionBy,
const dwio::common::FileFormat fileFormat =
dwio::common::FileFormat::DWRF,
const std::vector<std::string>& aggregates = {});
const std::vector<std::string>& aggregates = {},
const std::shared_ptr<dwio::common::WriterOptions>& options = nullptr);

/// Adds a TableWriteNode to write all input columns into a non-sorted
/// bucketed Hive table without compression.
Expand All @@ -415,14 +422,16 @@ class PlanBuilder {
/// @param fileFormat File format to use for the written data.
/// @param aggregates Aggregations for column statistics collection during
/// write.
/// @param polymorphic options object to be passed to the writer.
PlanBuilder& tableWrite(
const std::string& outputDirectoryPath,
const std::vector<std::string>& partitionBy,
int32_t bucketCount,
const std::vector<std::string>& bucketedBy,
const dwio::common::FileFormat fileFormat =
dwio::common::FileFormat::DWRF,
const std::vector<std::string>& aggregates = {});
const std::vector<std::string>& aggregates = {},
const std::shared_ptr<dwio::common::WriterOptions>& options = nullptr);

/// Adds a TableWriteNode to write all input columns into a sorted bucket Hive
/// table without compression.
Expand All @@ -448,10 +457,9 @@ class PlanBuilder {
const dwio::common::FileFormat fileFormat =
dwio::common::FileFormat::DWRF,
const std::vector<std::string>& aggregates = {},
const std::string& connectorId = "test-hive",
const std::string_view& connectorId = kHiveDefaultConnectorId,
const std::unordered_map<std::string, std::string>& serdeParameters = {},
const std::shared_ptr<dwio::common::WriterOptions>& writerOptions =
nullptr);
const std::shared_ptr<dwio::common::WriterOptions>& options = nullptr);

/// Add a TableWriteMergeNode.
PlanBuilder& tableWriteMerge(
Expand Down