Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
#include "velox/dwio/parquet/RegisterParquetWriter.h"
#include "velox/dwio/text/RegisterTextWriter.h"
#include "velox/exec/OutputBufferManager.h"
#include "velox/exec/TraceUtil.h"
#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h"
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"
#include "velox/functions/prestosql/window/WindowFunctionsRegistration.h"
Expand Down Expand Up @@ -437,6 +438,7 @@ void PrestoServer::run() {
registerRemoteFunctions();
registerVectorSerdes();
registerPrestoPlanNodeSerDe();
registerTraceNodeFactories();
registerDynamicFunctions();

facebook::velox::exec::ExchangeSource::registerFactory(
Expand Down Expand Up @@ -1805,4 +1807,29 @@ void PrestoServer::reportNodeStats(proxygen::ResponseHandler* downstream) {

http::sendOkResponse(downstream, json(nodeStats));
}

void PrestoServer::registerTraceNodeFactories() {
// Register trace node factory for PartitionAndSerialize operator
velox::exec::trace::registerTraceNodeFactory(
"PartitionAndSerialize",
[](const velox::core::PlanNode* traceNode,
const velox::core::PlanNodeId& nodeId) -> velox::core::PlanNodePtr {
if (const auto* partitionAndSerializeNode =
dynamic_cast<const operators::PartitionAndSerializeNode*>(
traceNode)) {
return std::make_shared<operators::PartitionAndSerializeNode>(
nodeId,
partitionAndSerializeNode->keys(),
partitionAndSerializeNode->numPartitions(),
partitionAndSerializeNode->serializedRowType(),
std::make_shared<velox::exec::trace::DummySourceNode>(
partitionAndSerializeNode->sources().front()->outputType()),
partitionAndSerializeNode->isReplicateNullsAndAny(),
partitionAndSerializeNode->partitionFunctionFactory(),
partitionAndSerializeNode->sortingOrders(),
partitionAndSerializeNode->sortingKeys());
}
return nullptr;
});
}
} // namespace facebook::presto
2 changes: 2 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ class PrestoServer {

virtual void registerMemoryArbitrators();

virtual void registerTraceNodeFactories();

/// Invoked after creating global (singleton) config objects (SystemConfig and
/// NodeConfig) and before loading their properties from the file.
/// In the implementation any extra config properties can be registered.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,98 @@ class ShuffleTest : public exec::test::OperatorTestBase {
fileSystem->remove(file);
}
}

void runPartitionAndSerializeSerdeTest(
const RowVectorPtr& data,
size_t numPartitions,
const std::optional<std::vector<std::string>>& serdeLayout =
std::nullopt) {
TestShuffleWriter::reset();

auto shuffleInfo = testShuffleInfo(numPartitions, 1 << 20);
TestShuffleWriter::createWriter(shuffleInfo, pool());

auto plan = exec::test::PlanBuilder()
.values({data}, true)
.addNode(addPartitionAndSerializeNode(
numPartitions,
false,
serdeLayout.value_or(std::vector<std::string>{})))
.localPartition(std::vector<std::string>{})
.addNode(addShuffleWriteNode(
numPartitions,
std::string(TestShuffleFactory::kShuffleName),
shuffleInfo))
.planNode();

exec::CursorParameters params;
params.planNode = plan;
params.maxDrivers = 1;

auto [taskCursor, results] = exec::test::readCursor(params);
ASSERT_EQ(results.size(), 0);

auto shuffleWriter = TestShuffleWriter::getInstance();
ASSERT_NE(shuffleWriter, nullptr);

auto readyPartitions = shuffleWriter->readyPartitions();
ASSERT_NE(readyPartitions, nullptr);

size_t totalRows = 0;
for (size_t partitionIdx = 0; partitionIdx < numPartitions;
++partitionIdx) {
for (const auto& batch : (*readyPartitions)[partitionIdx]) {
totalRows += batch->rows.size();
}
}
ASSERT_EQ(totalRows, data->size());

auto expectedType = serdeLayout.has_value()
? createSerdeLayoutType(asRowType(data->type()), serdeLayout.value())
: asRowType(data->type());

std::vector<RowVectorPtr> deserializedData;
for (size_t partitionIdx = 0; partitionIdx < numPartitions;
++partitionIdx) {
for (const auto& batch : (*readyPartitions)[partitionIdx]) {
auto deserialized = std::dynamic_pointer_cast<RowVector>(
row::CompactRow::deserialize(batch->rows, expectedType, pool()));
if (deserialized != nullptr && deserialized->size() > 0) {
deserializedData.push_back(deserialized);
}
}
}

auto expected = serdeLayout.has_value()
? reorderColumns(data, serdeLayout.value())
: data;
velox::exec::test::assertEqualResults({expected}, deserializedData);
}

private:
RowTypePtr createSerdeLayoutType(
const RowTypePtr& originalType,
const std::vector<std::string>& layout) {
std::vector<std::string> names;
std::vector<TypePtr> types;
for (const auto& name : layout) {
auto idx = originalType->getChildIdx(name);
names.push_back(name);
types.push_back(originalType->childAt(idx));
}
return ROW(std::move(names), std::move(types));
}

RowVectorPtr reorderColumns(
const RowVectorPtr& data,
const std::vector<std::string>& newLayout) {
auto rowType = asRowType(data->type());
std::vector<VectorPtr> columns;
for (const auto& name : newLayout) {
columns.push_back(data->childAt(rowType->getChildIdx(name)));
}
return makeRowVector(columns);
}
};

TEST_F(ShuffleTest, operators) {
Expand Down Expand Up @@ -1583,6 +1675,23 @@ TEST_F(ShuffleTest, shuffleReadRuntimeStats) {
ASSERT_EQ(velox::RuntimeCounter::Unit::kNone, numBatchesStat.unit);
}
}

TEST_F(ShuffleTest, partitionAndSerializeEndToEnd) {
auto data = makeRowVector({
makeFlatVector<int32_t>({1, 2, 3, 4, 5, 6}),
makeFlatVector<int64_t>({10, 20, 30, 40, 50, 60}),
});
runPartitionAndSerializeSerdeTest(data, 4);

data = makeRowVector({
makeFlatVector<int32_t>({1, 2, 3, 4}),
makeFlatVector<int64_t>({10, 20, 30, 40}),
makeFlatVector<std::string>({"a", "b", "c", "d"}),
});

runPartitionAndSerializeSerdeTest(data, 2, {{"c2", "c0"}});
}

} // namespace facebook::presto::operators::test

int main(int argc, char** argv) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*/
#include "presto_cpp/main/tool/trace/PartitionAndSerializeReplayer.h"

#include "presto_cpp/main/operators/PartitionAndSerialize.h"
#include "velox/tool/trace/TraceReplayTaskRunner.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
using namespace facebook::velox::exec::test;
using namespace facebook::presto;

namespace facebook::velox::tool::trace {

PartitionAndSerializeReplayer::PartitionAndSerializeReplayer(
const std::string& traceDir,
const std::string& queryId,
const std::string& taskId,
const std::string& nodeId,
const std::string& nodeName,
const std::string& driverIds,
uint64_t queryCapacity,
folly::Executor* executor)
: OperatorReplayerBase(
traceDir,
queryId,
taskId,
nodeId,
nodeName,
driverIds,
queryCapacity,
executor) {}

RowVectorPtr PartitionAndSerializeReplayer::run(bool copyResults) {
TraceReplayTaskRunner traceTaskRunner(createPlan(), createQueryCtx());
auto [task, result] =
traceTaskRunner.maxDrivers(driverIds_.size()).run(copyResults);
printStats(task);
return result;
}

core::PlanNodePtr PartitionAndSerializeReplayer::createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const {
const auto partitionAndSerializeNode =
dynamic_cast<const presto::operators::PartitionAndSerializeNode*>(node);
VELOX_CHECK_NOT_NULL(partitionAndSerializeNode);

return std::make_shared<presto::operators::PartitionAndSerializeNode>(
nodeId,
partitionAndSerializeNode->keys(),
partitionAndSerializeNode->numPartitions(),
partitionAndSerializeNode->serializedRowType(),
source,
partitionAndSerializeNode->isReplicateNullsAndAny(),
partitionAndSerializeNode->partitionFunctionFactory(),
partitionAndSerializeNode->sortingOrders(),
partitionAndSerializeNode->sortingKeys());
}

} // namespace facebook::velox::tool::trace
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*/
#pragma once

#include "velox/core/PlanNode.h"
#include "velox/tool/trace/OperatorReplayerBase.h"

namespace facebook::velox::tool::trace {

/// The replayer to replay the traced 'PartitionAndSerialize' operators.
class PartitionAndSerializeReplayer final : public OperatorReplayerBase {
public:
PartitionAndSerializeReplayer(
const std::string& traceDir,
const std::string& queryId,
const std::string& taskId,
const std::string& nodeId,
const std::string& nodeName,
const std::string& driverIds,
uint64_t queryCapacity,
folly::Executor* executor);

RowVectorPtr run(bool copyResults = true) override;

private:
core::PlanNodePtr createPlanNode(
const core::PlanNode* node,
const core::PlanNodeId& nodeId,
const core::PlanNodePtr& source) const override;
};

} // namespace facebook::velox::tool::trace
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "velox/tool/trace/TraceReplayRunner.h"

#include <folly/init/Init.h>
#include "presto_cpp/main/operators/PartitionAndSerialize.h"
#include "presto_cpp/main/tool/trace/PartitionAndSerializeReplayer.h"
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"

using namespace facebook::velox;
using namespace facebook::presto;

namespace {
/// Custom trace replay runner for Presto operators.
/// This runner extends the base Velox TraceReplayRunner to support:
/// - Presto-specific operators (e.g., PartitionAndSerialize)
/// - Presto plan node serialization/deserialization
class PrestoTraceReplayRunner
: public facebook::velox::tool::trace::TraceReplayRunner {
public:
void init() override {
// Register Presto plan node SerDe for reading traced plan nodes
registerPrestoPlanNodeSerDe();

// Register custom Presto operators to execute during replay
exec::Operator::registerOperator(
std::make_unique<operators::PartitionAndSerializeTranslator>());

// Call base init to complete initialization
TraceReplayRunner::init();
}

private:
std::unique_ptr<tool::trace::OperatorReplayerBase> createReplayer()
const override {
const auto nodeName = taskTraceMetadataReader_->nodeName(FLAGS_node_id);
const auto queryCapacityBytes = (1ULL * FLAGS_query_memory_capacity_mb)
<< 20;

if (nodeName == "PartitionAndSerialize") {
return std::make_unique<tool::trace::PartitionAndSerializeReplayer>(
FLAGS_root_dir,
FLAGS_query_id,
FLAGS_task_id,
FLAGS_node_id,
nodeName,
FLAGS_driver_ids,
queryCapacityBytes,
cpuExecutor_.get());
}

// Fall back to base class for standard Velox operators
return TraceReplayRunner::createReplayer();
}
};
} // namespace

int main(int argc, char** argv) {
folly::Init init(&argc, &argv);
PrestoTraceReplayRunner runner;
runner.init();
runner.run();
return 0;
}
Loading
Loading