Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,84 @@
*/
#include "presto_cpp/main/operators/ShuffleRead.h"
#include "velox/exec/Exchange.h"
#include "velox/row/CompactRow.h"
#include "velox/serializers/CompactRowSerializer.h"
#include "velox/serializers/RowSerializer.h"

using namespace facebook::velox::exec;
using namespace facebook::velox;
using facebook::velox::serializer::RowIteratorImpl;

namespace facebook::presto::operators {
velox::core::PlanNodeId deserializePlanNodeId(const folly::dynamic& obj) {
return obj["id"].asString();
}

namespace {
std::unique_ptr<RowIterator> shuffleRowIteratorFactory(
ByteInputStream* source,
const VectorSerde::Options* /*unused*/) {
return std::make_unique<RowIteratorImpl>(source, source->size());
}

class ShuffleVectorSerde : public VectorSerde {
public:
ShuffleVectorSerde() : VectorSerde(VectorSerde::Kind::kCompactRow) {}

void estimateSerializedSize(
const BaseVector* /* vector */,
const folly::Range<const IndexRange*>& /* ranges */,
vector_size_t** /* sizes */) override {
// Not used.
VELOX_UNREACHABLE();
}

std::unique_ptr<IterativeVectorSerializer> createIterativeSerializer(
RowTypePtr /* type */,
int32_t /* numRows */,
StreamArena* /* streamArena */,
const Options* /* options */) override {
// Not used.
VELOX_UNREACHABLE();
}

void deserialize(
ByteInputStream* source,
velox::memory::MemoryPool* pool,
RowTypePtr type,
RowVectorPtr* result,
const Options* /* options */) override {
VELOX_UNSUPPORTED("ShuffleVectorSerde::deserialize is not supported");
}

void deserialize(
ByteInputStream* source,
std::unique_ptr<RowIterator>& sourceRowIterator,
uint64_t maxRows,
RowTypePtr type,
RowVectorPtr* result,
velox::memory::MemoryPool* pool,
const Options* options) override {
std::vector<std::string_view> serializedRows;
std::vector<std::unique_ptr<std::string>> serializedBuffers;
velox::serializer::RowDeserializer<std::string_view>::deserialize(
source,
maxRows,
sourceRowIterator,
serializedRows,
serializedBuffers,
shuffleRowIteratorFactory,
options);

if (serializedRows.empty()) {
*result = BaseVector::create<RowVector>(type, 0, pool);
return;
}

*result = row::CompactRow::deserialize(serializedRows, type, pool);
}
};

class ShuffleReadOperator : public Exchange {
public:
ShuffleReadOperator(
Expand All @@ -40,15 +107,15 @@ class ShuffleReadOperator : public Exchange {
velox::VectorSerde::Kind::kCompactRow),
exchangeClient,
"ShuffleRead"),
serde_(std::make_unique<velox::serializer::CompactRowVectorSerde>()) {}
serde_(std::make_unique<ShuffleVectorSerde>()) {}

protected:
VectorSerde* getSerde() override {
return serde_.get();
}

private:
std::unique_ptr<velox::serializer::CompactRowVectorSerde> serde_;
std::unique_ptr<ShuffleVectorSerde> serde_;
};
} // namespace

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,24 +47,13 @@ UnsafeRowExchangeSource::request(
} else {
totalBytes = buffer->size();
VELOX_CHECK_LE(totalBytes, std::numeric_limits<int32_t>::max());

++numBatches_;
velox::serializer::detail::RowGroupHeader rowHeader{
.uncompressedSize = static_cast<int32_t>(totalBytes),
.compressedSize = static_cast<int32_t>(totalBytes),
.compressed = false};
auto headBuffer = std::make_shared<std::string>(
velox::serializer::detail::RowGroupHeader::size(), '0');
rowHeader.write(const_cast<char*>(headBuffer->data()));

auto ioBuf = folly::IOBuf::wrapBuffer(
headBuffer->data(), headBuffer->size());
ioBuf->appendToChain(
folly::IOBuf::wrapBuffer(buffer->as<char>(), buffer->size()));
buffer->as<char>(), buffer->size());
queue_->enqueueLocked(
std::make_unique<velox::exec::SerializedPage>(
std::move(ioBuf),
[buffer, headBuffer](auto& /*unused*/) {}),
[buffer](auto& /*unused*/) {}),
promises);
}
}
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 32 files
+1 −0 .github/workflows/linux-build-base.yml
+1 −1 CMake/resolve_dependency_modules/README.md
+2 −2 CMake/resolve_dependency_modules/simdjson.cmake
+1 −1 CMakeLists.txt
+1 −1 scripts/setup-versions.sh
+10 −14 velox/buffer/Buffer.h
+6 −6 velox/common/caching/AsyncDataCache.cpp
+13 −11 velox/common/caching/AsyncDataCache.h
+1 −8 velox/exec/Exchange.cpp
+1 −1 velox/exec/fuzzer/PrestoQueryRunner.cpp
+1 −1 velox/exec/fuzzer/PrestoQueryRunnerIntermediateTypeTransforms.cpp
+67 −68 velox/exec/tests/PrestoQueryRunnerTimeTransformTest.cpp
+75 −23 velox/expression/CastExpr.cpp
+6 −0 velox/expression/CastExpr.h
+3 −0 velox/expression/fuzzer/SpecialFormSignatureGenerator.cpp
+605 −0 velox/expression/tests/CastExprTest.cpp
+28 −6 velox/functions/prestosql/DateTimeFunctions.h
+13 −11 velox/functions/prestosql/benchmarks/JsonExprBenchmark.cpp
+8 −24 velox/functions/prestosql/json/JsonExtractor.cpp
+32 −43 velox/functions/prestosql/json/JsonExtractor.h
+45 −40 velox/functions/prestosql/json/tests/JsonExtractorTest.cpp
+12 −1 velox/functions/prestosql/registration/DateTimeFunctionsRegistration.cpp
+93 −49 velox/functions/prestosql/tests/DateTimeFunctionsTest.cpp
+3 −1 velox/functions/prestosql/tests/JsonFunctionsTest.cpp
+2 −2 velox/functions/prestosql/types/parser/TypeParser.ll
+2 −3 velox/functions/prestosql/types/parser/TypeParser.yy
+34 −13 velox/functions/prestosql/types/parser/tests/TypeParserTest.cpp
+26 −11 velox/serializers/CompactRowSerializer.cpp
+79 −0 velox/serializers/RowSerializer.h
+26 −11 velox/serializers/UnsafeRowSerializer.cpp
+192 −0 velox/type/Type.cpp
+17 −0 velox/type/Type.h
Loading