Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions bolt/common/base/SpillConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
#include "bolt/common/base/SpillConfig.h"

#include <fmt/format.h>
#include <folly/dynamic.h>

#include "bolt/common/base/Exceptions.h"
#include "bolt/common/base/SuccinctPrinter.h"
#include "bolt/common/compression/Compression.h"

namespace bytedance::bolt::common {

Expand Down Expand Up @@ -225,6 +227,92 @@ std::string SpillConfig::toString() const {
jitEnabled);
}

folly::dynamic SpillConfig::serialize() const {
folly::dynamic obj = folly::dynamic::object;
obj["name"] = "SpillConfig";
obj["fileNamePrefix"] = fileNamePrefix;
obj["maxFileSize"] = static_cast<int64_t>(maxFileSize);
obj["spillUringEnabled"] = spillUringEnabled;
obj["writeBufferSize"] = static_cast<int64_t>(writeBufferSize);
obj["minSpillableReservationPct"] = minSpillableReservationPct;
obj["spillableReservationGrowthPct"] = spillableReservationGrowthPct;
obj["startPartitionBit"] = static_cast<int64_t>(startPartitionBit);
obj["joinPartitionBits"] = static_cast<int64_t>(joinPartitionBits);
obj["joinRepartitionBits"] = static_cast<int64_t>(joinRepartitionBits);
obj["maxSpillLevel"] = maxSpillLevel;
obj["maxSpillRunRows"] = static_cast<int64_t>(maxSpillRunRows);
obj["writerFlushThresholdSize"] =
static_cast<int64_t>(writerFlushThresholdSize);
obj["testSpillPct"] = testSpillPct;
obj["compressionKind"] = static_cast<int>(compressionKind);
obj["fileCreateConfig"] = fileCreateConfig;
obj["rowBasedSpillMode"] = static_cast<int>(rowBasedSpillMode);
obj["singlePartitionSerdeKind"] = singlePartitionSerdeKind;
obj["spillPartitionsAdaptiveThreshold"] =
static_cast<int64_t>(spillPartitionsAdaptiveThreshold);
obj["jitEnabled"] = jitEnabled;
obj["needSetNextEqual"] = needSetNextEqual;
obj["aggBypassHTEqualNum"] = static_cast<int64_t>(aggBypassHTEqualNum);
// getSpillDirPathCb, updateAndCheckSpillLimitCb, and executor are omitted;
// they must be re-injected by the host after deserialization.
return obj;
}

std::shared_ptr<SpillConfig> SpillConfig::deserialize(
const folly::dynamic& obj) {
auto spillConfig = std::make_shared<SpillConfig>();
spillConfig->fileNamePrefix = obj["fileNamePrefix"].asString();
spillConfig->maxFileSize = static_cast<uint64_t>(obj["maxFileSize"].asInt());
spillConfig->spillUringEnabled = obj["spillUringEnabled"].asBool();
spillConfig->writeBufferSize =
static_cast<uint64_t>(obj["writeBufferSize"].asInt());
spillConfig->minSpillableReservationPct =
static_cast<int32_t>(obj["minSpillableReservationPct"].asInt());
spillConfig->spillableReservationGrowthPct =
static_cast<int32_t>(obj["spillableReservationGrowthPct"].asInt());
spillConfig->startPartitionBit =
static_cast<uint8_t>(obj["startPartitionBit"].asInt());
spillConfig->joinPartitionBits =
static_cast<uint8_t>(obj["joinPartitionBits"].asInt());
spillConfig->joinRepartitionBits =
static_cast<uint8_t>(obj["joinRepartitionBits"].asInt());
spillConfig->maxSpillLevel =
static_cast<int32_t>(obj["maxSpillLevel"].asInt());
spillConfig->maxSpillRunRows =
static_cast<uint64_t>(obj["maxSpillRunRows"].asInt());
spillConfig->writerFlushThresholdSize =
static_cast<uint64_t>(obj["writerFlushThresholdSize"].asInt());
spillConfig->testSpillPct = static_cast<int32_t>(obj["testSpillPct"].asInt());
spillConfig->compressionKind =
static_cast<common::CompressionKind>(obj["compressionKind"].asInt());
spillConfig->fileCreateConfig = obj["fileCreateConfig"].asString();
spillConfig->rowBasedSpillMode =
static_cast<RowBasedSpillMode>(obj["rowBasedSpillMode"].asInt());
spillConfig->singlePartitionSerdeKind =
obj["singlePartitionSerdeKind"].asString();
spillConfig->spillPartitionsAdaptiveThreshold =
static_cast<uint32_t>(obj["spillPartitionsAdaptiveThreshold"].asInt());
spillConfig->jitEnabled = obj["jitEnabled"].asBool();
spillConfig->needSetNextEqual = obj["needSetNextEqual"].asBool();
spillConfig->aggBypassHTEqualNum =
static_cast<size_t>(obj["aggBypassHTEqualNum"].asInt());
// getSpillDirPathCb, updateAndCheckSpillLimitCb, and executor remain
// default-initialized (null); callers must re-inject them as needed.
return spillConfig;
}

void SpillConfig::registerSerDe() {
auto& registry = bolt::DeserializationRegistryForSharedPtr();
registry.Register("SpillConfig", SpillConfig::deserialize);
}

namespace {
const bool kSpillConfigSerdeRegistered = []() {
SpillConfig::registerSerDe();
return true;
}();
} // namespace

SpillConfig& SpillConfig::setJITenableForSpill(bool enabled) noexcept {
jitEnabled = enabled;
return *this;
Expand Down
14 changes: 13 additions & 1 deletion bolt/common/base/SpillConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
#include <stdint.h>
#include <string.h>

#include <folly/dynamic.h>
#include <folly/executors/CPUThreadPoolExecutor.h>
#include <optional>
#include "bolt/common/compression/Compression.h"
#include "bolt/common/serialization/Serializable.h"
#include "bolt/vector/VectorStream.h"
namespace bytedance::bolt::common {

Expand Down Expand Up @@ -73,7 +75,7 @@ enum class RowBasedSpillMode {
RowBasedSpillMode strToRowBasedSpillMode(const std::string& str);

/// Specifies the config for spilling.
struct SpillConfig {
struct SpillConfig : public bytedance::bolt::ISerializable {
SpillConfig() = default;
SpillConfig(
GetSpillDirectoryPathCB _getSpillDirPathCb,
Expand Down Expand Up @@ -112,6 +114,16 @@ struct SpillConfig {

std::string toString() const;

/// Serializes all value fields. Callbacks and executor are omitted and must
/// be re-injected by the host after deserialization.
folly::dynamic serialize() const override;

/// Reconstructs a SpillConfig from a serialized object. Callbacks and
/// executor are left at their default (null/empty) values.
static std::shared_ptr<SpillConfig> deserialize(const folly::dynamic& obj);

static void registerSerDe();

/// A callback function that returns the spill directory path. Implementations
/// can use it to ensure the path exists before returning.
GetSpillDirectoryPathCB getSpillDirPathCb;
Expand Down
144 changes: 144 additions & 0 deletions bolt/common/base/tests/SpillConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
#include "bolt/common/base/SpillConfig.h"
#include <gtest/gtest.h>
#include "bolt/common/base/tests/GTestUtils.h"
#include "bolt/common/serialization/Serializable.h"
#include "bolt/exec/HashBitRange.h"
using namespace bytedance::bolt;
using namespace bytedance::bolt::common;
using namespace bytedance::bolt::exec;

Expand Down Expand Up @@ -159,6 +161,148 @@ TEST(SpillConfig, spillLevelLimit) {
}
}

// ---- Serialization helpers --------------------------------------------------

namespace {

// Returns a SpillConfig with every serializable field set to a non-default
// value, making it easy to detect fields that were silently dropped.
SpillConfig makeFullConfig() {
SpillConfig cfg;
cfg.fileNamePrefix = "test_spill";
cfg.maxFileSize = 512UL * 1024 * 1024;
cfg.spillUringEnabled = true;
cfg.writeBufferSize = 8UL * 1024 * 1024;
cfg.minSpillableReservationPct = 10;
cfg.spillableReservationGrowthPct = 25;
cfg.startPartitionBit = 29;
cfg.joinPartitionBits = 4;
cfg.joinRepartitionBits = 2;
cfg.maxSpillLevel = 3;
cfg.maxSpillRunRows = 100'000;
cfg.writerFlushThresholdSize = 64UL * 1024 * 1024;
cfg.testSpillPct = 5;
cfg.compressionKind = CompressionKind_ZSTD;
cfg.fileCreateConfig = R"({"option":"value"})";
cfg.rowBasedSpillMode = RowBasedSpillMode::RAW;
cfg.singlePartitionSerdeKind = "Arrow";
cfg.spillPartitionsAdaptiveThreshold = 64;
cfg.jitEnabled = false;
cfg.needSetNextEqual = true;
cfg.aggBypassHTEqualNum = 42;
return cfg;
}

void assertFieldsEqual(const SpillConfig& e, const SpillConfig& a) {
EXPECT_EQ(e.fileNamePrefix, a.fileNamePrefix);
EXPECT_EQ(e.maxFileSize, a.maxFileSize);
EXPECT_EQ(e.spillUringEnabled, a.spillUringEnabled);
EXPECT_EQ(e.writeBufferSize, a.writeBufferSize);
EXPECT_EQ(e.minSpillableReservationPct, a.minSpillableReservationPct);
EXPECT_EQ(e.spillableReservationGrowthPct, a.spillableReservationGrowthPct);
EXPECT_EQ(e.startPartitionBit, a.startPartitionBit);
EXPECT_EQ(e.joinPartitionBits, a.joinPartitionBits);
EXPECT_EQ(e.joinRepartitionBits, a.joinRepartitionBits);
EXPECT_EQ(e.maxSpillLevel, a.maxSpillLevel);
EXPECT_EQ(e.maxSpillRunRows, a.maxSpillRunRows);
EXPECT_EQ(e.writerFlushThresholdSize, a.writerFlushThresholdSize);
EXPECT_EQ(e.testSpillPct, a.testSpillPct);
EXPECT_EQ(e.compressionKind, a.compressionKind);
EXPECT_EQ(e.fileCreateConfig, a.fileCreateConfig);
EXPECT_EQ(e.rowBasedSpillMode, a.rowBasedSpillMode);
EXPECT_EQ(e.singlePartitionSerdeKind, a.singlePartitionSerdeKind);
EXPECT_EQ(
e.spillPartitionsAdaptiveThreshold, a.spillPartitionsAdaptiveThreshold);
EXPECT_EQ(e.jitEnabled, a.jitEnabled);
EXPECT_EQ(e.needSetNextEqual, a.needSetNextEqual);
EXPECT_EQ(e.aggBypassHTEqualNum, a.aggBypassHTEqualNum);
}

} // namespace

// ---- Serialization tests ----------------------------------------------------

TEST(SpillConfig, serializeContainsNameField) {
folly::dynamic dyn = makeFullConfig().serialize();
ASSERT_TRUE(dyn.isObject());
ASSERT_EQ("SpillConfig", dyn["name"].asString());
}

TEST(SpillConfig, roundTripAllFields) {
SpillConfig cfg = makeFullConfig();
auto rt = SpillConfig::deserialize(cfg.serialize());
ASSERT_NE(nullptr, rt);
assertFieldsEqual(cfg, *rt);
}

TEST(SpillConfig, callbacksAndExecutorNullAfterRoundTrip) {
SpillConfig cfg = makeFullConfig();
cfg.getSpillDirPathCb = []() -> const std::string& {
static const std::string path = "/tmp/spill";
return path;
};
cfg.updateAndCheckSpillLimitCb = [](uint64_t) {};

auto rt = SpillConfig::deserialize(cfg.serialize());
ASSERT_NE(nullptr, rt);
EXPECT_FALSE(static_cast<bool>(rt->getSpillDirPathCb));
EXPECT_FALSE(static_cast<bool>(rt->updateAndCheckSpillLimitCb));
EXPECT_EQ(nullptr, rt->executor);
}

TEST(SpillConfig, roundTripViaISerializableRegistry) {
SpillConfig cfg = makeFullConfig();
auto rt = ISerializable::deserialize<SpillConfig>(cfg.serialize());
ASSERT_NE(nullptr, rt);
assertFieldsEqual(cfg, *rt);
}

TEST(SpillConfig, roundTripDefaultConstructed) {
SpillConfig cfg;
auto rt = SpillConfig::deserialize(cfg.serialize());
ASSERT_NE(nullptr, rt);
assertFieldsEqual(cfg, *rt);
}

TEST(SpillConfig, roundTripRowBasedSpillModes) {
for (auto mode :
{RowBasedSpillMode::DISABLE,
RowBasedSpillMode::RAW,
RowBasedSpillMode::COMPRESSION}) {
SpillConfig cfg = makeFullConfig();
cfg.rowBasedSpillMode = mode;
auto rt = SpillConfig::deserialize(cfg.serialize());
ASSERT_NE(nullptr, rt);
EXPECT_EQ(mode, rt->rowBasedSpillMode);
}
}

TEST(SpillConfig, roundTripCompressionKinds) {
for (auto kind :
{CompressionKind_NONE,
CompressionKind_ZLIB,
CompressionKind_SNAPPY,
CompressionKind_ZSTD,
CompressionKind_LZ4,
CompressionKind_GZIP}) {
SpillConfig cfg = makeFullConfig();
cfg.compressionKind = kind;
auto rt = SpillConfig::deserialize(cfg.serialize());
ASSERT_NE(nullptr, rt);
EXPECT_EQ(kind, rt->compressionKind);
}
}

TEST(SpillConfig, maxSpillLevelUnlimited) {
SpillConfig cfg = makeFullConfig();
cfg.maxSpillLevel = -1; // -1 means no limit
auto rt = SpillConfig::deserialize(cfg.serialize());
ASSERT_NE(nullptr, rt);
EXPECT_EQ(-1, rt->maxSpillLevel);
}

// ---- spillableReservationPercentages ----------------------------------------

TEST(SpillConfig, spillableReservationPercentages) {
struct {
uint32_t growthPct;
Expand Down
Loading