diff --git a/bolt/common/base/SpillConfig.cpp b/bolt/common/base/SpillConfig.cpp index ffa493bf..ae8f90cb 100644 --- a/bolt/common/base/SpillConfig.cpp +++ b/bolt/common/base/SpillConfig.cpp @@ -31,9 +31,11 @@ #include "bolt/common/base/SpillConfig.h" #include +#include #include "bolt/common/base/Exceptions.h" #include "bolt/common/base/SuccinctPrinter.h" +#include "bolt/common/compression/Compression.h" namespace bytedance::bolt::common { @@ -225,6 +227,92 @@ std::string SpillConfig::toString() const { jitEnabled); } +folly::dynamic SpillConfig::serialize() const { + folly::dynamic obj = folly::dynamic::object; + obj["name"] = "SpillConfig"; + obj["fileNamePrefix"] = fileNamePrefix; + obj["maxFileSize"] = static_cast(maxFileSize); + obj["spillUringEnabled"] = spillUringEnabled; + obj["writeBufferSize"] = static_cast(writeBufferSize); + obj["minSpillableReservationPct"] = minSpillableReservationPct; + obj["spillableReservationGrowthPct"] = spillableReservationGrowthPct; + obj["startPartitionBit"] = static_cast(startPartitionBit); + obj["joinPartitionBits"] = static_cast(joinPartitionBits); + obj["joinRepartitionBits"] = static_cast(joinRepartitionBits); + obj["maxSpillLevel"] = maxSpillLevel; + obj["maxSpillRunRows"] = static_cast(maxSpillRunRows); + obj["writerFlushThresholdSize"] = + static_cast(writerFlushThresholdSize); + obj["testSpillPct"] = testSpillPct; + obj["compressionKind"] = static_cast(compressionKind); + obj["fileCreateConfig"] = fileCreateConfig; + obj["rowBasedSpillMode"] = static_cast(rowBasedSpillMode); + obj["singlePartitionSerdeKind"] = singlePartitionSerdeKind; + obj["spillPartitionsAdaptiveThreshold"] = + static_cast(spillPartitionsAdaptiveThreshold); + obj["jitEnabled"] = jitEnabled; + obj["needSetNextEqual"] = needSetNextEqual; + obj["aggBypassHTEqualNum"] = static_cast(aggBypassHTEqualNum); + // getSpillDirPathCb, updateAndCheckSpillLimitCb, and executor are omitted; + // they must be re-injected by the host after deserialization. + return obj; +} + +std::shared_ptr SpillConfig::deserialize( + const folly::dynamic& obj) { + auto spillConfig = std::make_shared(); + spillConfig->fileNamePrefix = obj["fileNamePrefix"].asString(); + spillConfig->maxFileSize = static_cast(obj["maxFileSize"].asInt()); + spillConfig->spillUringEnabled = obj["spillUringEnabled"].asBool(); + spillConfig->writeBufferSize = + static_cast(obj["writeBufferSize"].asInt()); + spillConfig->minSpillableReservationPct = + static_cast(obj["minSpillableReservationPct"].asInt()); + spillConfig->spillableReservationGrowthPct = + static_cast(obj["spillableReservationGrowthPct"].asInt()); + spillConfig->startPartitionBit = + static_cast(obj["startPartitionBit"].asInt()); + spillConfig->joinPartitionBits = + static_cast(obj["joinPartitionBits"].asInt()); + spillConfig->joinRepartitionBits = + static_cast(obj["joinRepartitionBits"].asInt()); + spillConfig->maxSpillLevel = + static_cast(obj["maxSpillLevel"].asInt()); + spillConfig->maxSpillRunRows = + static_cast(obj["maxSpillRunRows"].asInt()); + spillConfig->writerFlushThresholdSize = + static_cast(obj["writerFlushThresholdSize"].asInt()); + spillConfig->testSpillPct = static_cast(obj["testSpillPct"].asInt()); + spillConfig->compressionKind = + static_cast(obj["compressionKind"].asInt()); + spillConfig->fileCreateConfig = obj["fileCreateConfig"].asString(); + spillConfig->rowBasedSpillMode = + static_cast(obj["rowBasedSpillMode"].asInt()); + spillConfig->singlePartitionSerdeKind = + obj["singlePartitionSerdeKind"].asString(); + spillConfig->spillPartitionsAdaptiveThreshold = + static_cast(obj["spillPartitionsAdaptiveThreshold"].asInt()); + spillConfig->jitEnabled = obj["jitEnabled"].asBool(); + spillConfig->needSetNextEqual = obj["needSetNextEqual"].asBool(); + spillConfig->aggBypassHTEqualNum = + static_cast(obj["aggBypassHTEqualNum"].asInt()); + // getSpillDirPathCb, updateAndCheckSpillLimitCb, and executor remain + // default-initialized (null); callers must re-inject them as needed. + return spillConfig; +} + +void SpillConfig::registerSerDe() { + auto& registry = bolt::DeserializationRegistryForSharedPtr(); + registry.Register("SpillConfig", SpillConfig::deserialize); +} + +namespace { +const bool kSpillConfigSerdeRegistered = []() { + SpillConfig::registerSerDe(); + return true; +}(); +} // namespace + SpillConfig& SpillConfig::setJITenableForSpill(bool enabled) noexcept { jitEnabled = enabled; return *this; diff --git a/bolt/common/base/SpillConfig.h b/bolt/common/base/SpillConfig.h index ad59c1e8..44e0f58b 100644 --- a/bolt/common/base/SpillConfig.h +++ b/bolt/common/base/SpillConfig.h @@ -33,9 +33,11 @@ #include #include +#include #include #include #include "bolt/common/compression/Compression.h" +#include "bolt/common/serialization/Serializable.h" #include "bolt/vector/VectorStream.h" namespace bytedance::bolt::common { @@ -73,7 +75,7 @@ enum class RowBasedSpillMode { RowBasedSpillMode strToRowBasedSpillMode(const std::string& str); /// Specifies the config for spilling. -struct SpillConfig { +struct SpillConfig : public bytedance::bolt::ISerializable { SpillConfig() = default; SpillConfig( GetSpillDirectoryPathCB _getSpillDirPathCb, @@ -112,6 +114,16 @@ struct SpillConfig { std::string toString() const; + /// Serializes all value fields. Callbacks and executor are omitted and must + /// be re-injected by the host after deserialization. + folly::dynamic serialize() const override; + + /// Reconstructs a SpillConfig from a serialized object. Callbacks and + /// executor are left at their default (null/empty) values. + static std::shared_ptr deserialize(const folly::dynamic& obj); + + static void registerSerDe(); + /// A callback function that returns the spill directory path. Implementations /// can use it to ensure the path exists before returning. GetSpillDirectoryPathCB getSpillDirPathCb; diff --git a/bolt/common/base/tests/SpillConfigTest.cpp b/bolt/common/base/tests/SpillConfigTest.cpp index 4e82b835..d4da2097 100644 --- a/bolt/common/base/tests/SpillConfigTest.cpp +++ b/bolt/common/base/tests/SpillConfigTest.cpp @@ -31,7 +31,9 @@ #include "bolt/common/base/SpillConfig.h" #include #include "bolt/common/base/tests/GTestUtils.h" +#include "bolt/common/serialization/Serializable.h" #include "bolt/exec/HashBitRange.h" +using namespace bytedance::bolt; using namespace bytedance::bolt::common; using namespace bytedance::bolt::exec; @@ -159,6 +161,148 @@ TEST(SpillConfig, spillLevelLimit) { } } +// ---- Serialization helpers -------------------------------------------------- + +namespace { + +// Returns a SpillConfig with every serializable field set to a non-default +// value, making it easy to detect fields that were silently dropped. +SpillConfig makeFullConfig() { + SpillConfig cfg; + cfg.fileNamePrefix = "test_spill"; + cfg.maxFileSize = 512UL * 1024 * 1024; + cfg.spillUringEnabled = true; + cfg.writeBufferSize = 8UL * 1024 * 1024; + cfg.minSpillableReservationPct = 10; + cfg.spillableReservationGrowthPct = 25; + cfg.startPartitionBit = 29; + cfg.joinPartitionBits = 4; + cfg.joinRepartitionBits = 2; + cfg.maxSpillLevel = 3; + cfg.maxSpillRunRows = 100'000; + cfg.writerFlushThresholdSize = 64UL * 1024 * 1024; + cfg.testSpillPct = 5; + cfg.compressionKind = CompressionKind_ZSTD; + cfg.fileCreateConfig = R"({"option":"value"})"; + cfg.rowBasedSpillMode = RowBasedSpillMode::RAW; + cfg.singlePartitionSerdeKind = "Arrow"; + cfg.spillPartitionsAdaptiveThreshold = 64; + cfg.jitEnabled = false; + cfg.needSetNextEqual = true; + cfg.aggBypassHTEqualNum = 42; + return cfg; +} + +void assertFieldsEqual(const SpillConfig& e, const SpillConfig& a) { + EXPECT_EQ(e.fileNamePrefix, a.fileNamePrefix); + EXPECT_EQ(e.maxFileSize, a.maxFileSize); + EXPECT_EQ(e.spillUringEnabled, a.spillUringEnabled); + EXPECT_EQ(e.writeBufferSize, a.writeBufferSize); + EXPECT_EQ(e.minSpillableReservationPct, a.minSpillableReservationPct); + EXPECT_EQ(e.spillableReservationGrowthPct, a.spillableReservationGrowthPct); + EXPECT_EQ(e.startPartitionBit, a.startPartitionBit); + EXPECT_EQ(e.joinPartitionBits, a.joinPartitionBits); + EXPECT_EQ(e.joinRepartitionBits, a.joinRepartitionBits); + EXPECT_EQ(e.maxSpillLevel, a.maxSpillLevel); + EXPECT_EQ(e.maxSpillRunRows, a.maxSpillRunRows); + EXPECT_EQ(e.writerFlushThresholdSize, a.writerFlushThresholdSize); + EXPECT_EQ(e.testSpillPct, a.testSpillPct); + EXPECT_EQ(e.compressionKind, a.compressionKind); + EXPECT_EQ(e.fileCreateConfig, a.fileCreateConfig); + EXPECT_EQ(e.rowBasedSpillMode, a.rowBasedSpillMode); + EXPECT_EQ(e.singlePartitionSerdeKind, a.singlePartitionSerdeKind); + EXPECT_EQ( + e.spillPartitionsAdaptiveThreshold, a.spillPartitionsAdaptiveThreshold); + EXPECT_EQ(e.jitEnabled, a.jitEnabled); + EXPECT_EQ(e.needSetNextEqual, a.needSetNextEqual); + EXPECT_EQ(e.aggBypassHTEqualNum, a.aggBypassHTEqualNum); +} + +} // namespace + +// ---- Serialization tests ---------------------------------------------------- + +TEST(SpillConfig, serializeContainsNameField) { + folly::dynamic dyn = makeFullConfig().serialize(); + ASSERT_TRUE(dyn.isObject()); + ASSERT_EQ("SpillConfig", dyn["name"].asString()); +} + +TEST(SpillConfig, roundTripAllFields) { + SpillConfig cfg = makeFullConfig(); + auto rt = SpillConfig::deserialize(cfg.serialize()); + ASSERT_NE(nullptr, rt); + assertFieldsEqual(cfg, *rt); +} + +TEST(SpillConfig, callbacksAndExecutorNullAfterRoundTrip) { + SpillConfig cfg = makeFullConfig(); + cfg.getSpillDirPathCb = []() -> const std::string& { + static const std::string path = "/tmp/spill"; + return path; + }; + cfg.updateAndCheckSpillLimitCb = [](uint64_t) {}; + + auto rt = SpillConfig::deserialize(cfg.serialize()); + ASSERT_NE(nullptr, rt); + EXPECT_FALSE(static_cast(rt->getSpillDirPathCb)); + EXPECT_FALSE(static_cast(rt->updateAndCheckSpillLimitCb)); + EXPECT_EQ(nullptr, rt->executor); +} + +TEST(SpillConfig, roundTripViaISerializableRegistry) { + SpillConfig cfg = makeFullConfig(); + auto rt = ISerializable::deserialize(cfg.serialize()); + ASSERT_NE(nullptr, rt); + assertFieldsEqual(cfg, *rt); +} + +TEST(SpillConfig, roundTripDefaultConstructed) { + SpillConfig cfg; + auto rt = SpillConfig::deserialize(cfg.serialize()); + ASSERT_NE(nullptr, rt); + assertFieldsEqual(cfg, *rt); +} + +TEST(SpillConfig, roundTripRowBasedSpillModes) { + for (auto mode : + {RowBasedSpillMode::DISABLE, + RowBasedSpillMode::RAW, + RowBasedSpillMode::COMPRESSION}) { + SpillConfig cfg = makeFullConfig(); + cfg.rowBasedSpillMode = mode; + auto rt = SpillConfig::deserialize(cfg.serialize()); + ASSERT_NE(nullptr, rt); + EXPECT_EQ(mode, rt->rowBasedSpillMode); + } +} + +TEST(SpillConfig, roundTripCompressionKinds) { + for (auto kind : + {CompressionKind_NONE, + CompressionKind_ZLIB, + CompressionKind_SNAPPY, + CompressionKind_ZSTD, + CompressionKind_LZ4, + CompressionKind_GZIP}) { + SpillConfig cfg = makeFullConfig(); + cfg.compressionKind = kind; + auto rt = SpillConfig::deserialize(cfg.serialize()); + ASSERT_NE(nullptr, rt); + EXPECT_EQ(kind, rt->compressionKind); + } +} + +TEST(SpillConfig, maxSpillLevelUnlimited) { + SpillConfig cfg = makeFullConfig(); + cfg.maxSpillLevel = -1; // -1 means no limit + auto rt = SpillConfig::deserialize(cfg.serialize()); + ASSERT_NE(nullptr, rt); + EXPECT_EQ(-1, rt->maxSpillLevel); +} + +// ---- spillableReservationPercentages ---------------------------------------- + TEST(SpillConfig, spillableReservationPercentages) { struct { uint32_t growthPct; diff --git a/bolt/dwio/common/Options.cpp b/bolt/dwio/common/Options.cpp index 779ddc93..95526886 100644 --- a/bolt/dwio/common/Options.cpp +++ b/bolt/dwio/common/Options.cpp @@ -30,6 +30,7 @@ #include "bolt/dwio/common/Options.h" #include +#include "bolt/type/Type.h" namespace bytedance::bolt::dwio::common { FileFormat toFileFormat(std::string s) { @@ -98,4 +99,103 @@ ColumnReaderOptions makeColumnReaderOptions(const ReaderOptions& options) { return columnReaderOptions; } +// We do *not* serialize pool, nonReclaimableSection, or the callbacks / +// executor inside spillConfig—they must be re‐injected by the host. +folly::dynamic WriterOptions::serialize() const { + folly::dynamic obj = folly::dynamic::object; + + if (schema) { + obj["schema"] = schema->serialize(); + } + + if (compressionKind) { + obj["compressionKind"] = static_cast(*compressionKind); + } + + if (!serdeParameters.empty()) { + folly::dynamic mapObj = folly::dynamic::object; + for (const auto& [k, v] : serdeParameters) { + mapObj[k] = v; + } + obj["serdeParameters"] = std::move(mapObj); + } + + if (maxStripeSize) { + obj["maxStripeSize"] = static_cast(*maxStripeSize); + } + + if (arrowBridgeTimestampUnit) { + obj["arrowBridgeTimestampUnit"] = + static_cast(*arrowBridgeTimestampUnit); + } + + if (zlibCompressionLevel) { + obj["zlibCompressionLevel"] = static_cast(*zlibCompressionLevel); + } + + // spillConfig (value fields only; callbacks/executor re-injected by host) + if (spillConfig) { + obj["spillConfig"] = spillConfig->serialize(); + } + + return obj; +} + +// pool, nonReclaimableSection, and spillConfig callbacks/executor remain at +// default and must be re-injected by the host. +std::shared_ptr WriterOptions::deserialize( + const folly::dynamic& obj) { + auto opts = std::make_shared(); + + // 1) schema + if (auto p = obj.get_ptr("schema")) { + opts->schema = ISerializable::deserialize(*p); + } + + // 2) compressionKind + if (auto p = obj.get_ptr("compressionKind")) { + opts->compressionKind = + static_cast(p->asInt()); + } + + // 3) serdeParameters + if (auto p = obj.get_ptr("serdeParameters")) { + opts->serdeParameters.clear(); + for (auto& kv : p->items()) { + opts->serdeParameters.emplace(kv.first.asString(), kv.second.asString()); + } + } + + // 4) maxStripeSize + if (auto p = obj.get_ptr("maxStripeSize")) { + opts->maxStripeSize = static_cast(p->asInt()); + } + + // 5) arrowBridgeTimestampUnit + if (auto p = obj.get_ptr("arrowBridgeTimestampUnit")) { + opts->arrowBridgeTimestampUnit = static_cast(p->asInt()); + } + + // 6) zlibCompressionLevel + if (auto p = obj.get_ptr("zlibCompressionLevel")) { + opts->zlibCompressionLevel = static_cast(p->asInt()); + } + + // 7) spillConfig + if (auto p = obj.get_ptr("spillConfig")) { + opts->ownedSpillConfig = + ISerializable::deserialize(*p); + opts->spillConfig = opts->ownedSpillConfig.get(); + } + + return opts; +} + +void WriterOptions::registerSerDe() { + bolt::Type::registerSerDe(); + bolt::common::SpillConfig::registerSerDe(); + auto& registry = DeserializationRegistryForSharedPtr(); + registry.Register("WriterOptions", WriterOptions::deserialize); +} + } // namespace bytedance::bolt::dwio::common diff --git a/bolt/dwio/common/Options.h b/bolt/dwio/common/Options.h index a11217f1..13af360c 100644 --- a/bolt/dwio/common/Options.h +++ b/bolt/dwio/common/Options.h @@ -725,10 +725,12 @@ class ReaderOptions : public io::ReaderOptions { } }; -struct WriterOptions { +struct WriterOptions : public ISerializable { TypePtr schema; - bolt::memory::MemoryPool* memoryPool; + bolt::memory::MemoryPool* memoryPool{nullptr}; const bolt::common::SpillConfig* spillConfig{nullptr}; + // Owns the SpillConfig when it was produced by deserialize(). + std::shared_ptr ownedSpillConfig; tsan_atomic* nonReclaimableSection{nullptr}; std::optional compressionKind; std::optional maxStripeSize{std::nullopt}; @@ -743,6 +745,10 @@ struct WriterOptions { virtual void processHiveConnectorConfigs(const config::ConfigBase&) {} virtual ~WriterOptions() = default; + + folly::dynamic serialize() const override; + static std::shared_ptr deserialize(const folly::dynamic& obj); + static void registerSerDe(); }; struct ColumnReaderOptions { diff --git a/bolt/dwio/common/tests/CMakeLists.txt b/bolt/dwio/common/tests/CMakeLists.txt index 69da6f8d..e09917f0 100644 --- a/bolt/dwio/common/tests/CMakeLists.txt +++ b/bolt/dwio/common/tests/CMakeLists.txt @@ -38,6 +38,7 @@ add_executable( ExecutorBarrierTest.cpp LocalFileSinkTest.cpp LoggedExceptionTest.cpp + OptionsTests.cpp ParallelForTest.cpp RangeTests.cpp ReaderTest.cpp diff --git a/bolt/dwio/common/tests/OptionsTests.cpp b/bolt/dwio/common/tests/OptionsTests.cpp index 7a672805..9d2c5403 100644 --- a/bolt/dwio/common/tests/OptionsTests.cpp +++ b/bolt/dwio/common/tests/OptionsTests.cpp @@ -29,11 +29,23 @@ */ #include + +#include "bolt/common/base/SpillConfig.h" #include "bolt/dwio/common/Options.h" +#include "bolt/type/Type.h" using namespace ::testing; +using namespace bytedance::bolt; +using namespace bytedance::bolt::common; using namespace bytedance::bolt::dwio::common; +class WriterOptionsSerDeTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { + WriterOptions::registerSerDe(); + } +}; + TEST(OptionsTests, defaultAppendRowNumberColumnTest) { // appendRowNumberColumn flag should be false by default RowReaderOptions rowReaderOptions; @@ -55,3 +67,132 @@ TEST(OptionsTests, testAppendRowNumberColumnInCopy) { RowReaderOptions rowReaderOptionsSecondCopy{rowReaderOptions}; ASSERT_EQ(true, rowReaderOptionsSecondCopy.getAppendRowNumberColumn()); } + +TEST_F(WriterOptionsSerDeTest, writerOptionsSerializeDeserializeRoundTrip) { + WriterOptions opts; + + // 1) schema (use a simple type so serialization is stable) + opts.schema = ROW({"c1", "c2"}, {INTEGER(), VARCHAR()}); + + // 2) compression kind + opts.compressionKind = common::CompressionKind_ZSTD; + + // 3) serde parameters + opts.serdeParameters["k1"] = "v1"; + opts.serdeParameters["k2"] = "v2"; + + // 4) maxStripeSize + opts.maxStripeSize = 128UL * 1024 * 1024; + + // 5) arrowBridgeTimestampUnit + opts.arrowBridgeTimestampUnit = 3; + + // 6) zlibCompressionLevel + opts.zlibCompressionLevel = 6; + + // ---- serialize ---- + folly::dynamic dyn = opts.serialize(); + + // ---- deserialize ---- + auto roundTrip = WriterOptions::deserialize(dyn); + + // ---- verify ---- + + ASSERT_TRUE(roundTrip->schema != nullptr); + ASSERT_EQ(opts.schema->toString(), roundTrip->schema->toString()); + + ASSERT_TRUE(roundTrip->compressionKind.has_value()); + ASSERT_EQ(opts.compressionKind.value(), roundTrip->compressionKind.value()); + + ASSERT_EQ(opts.serdeParameters.size(), roundTrip->serdeParameters.size()); + ASSERT_EQ(opts.serdeParameters.at("k1"), roundTrip->serdeParameters.at("k1")); + ASSERT_EQ(opts.serdeParameters.at("k2"), roundTrip->serdeParameters.at("k2")); + + ASSERT_TRUE(roundTrip->maxStripeSize.has_value()); + ASSERT_EQ(opts.maxStripeSize.value(), roundTrip->maxStripeSize.value()); + + ASSERT_TRUE(roundTrip->arrowBridgeTimestampUnit.has_value()); + ASSERT_EQ( + opts.arrowBridgeTimestampUnit.value(), + roundTrip->arrowBridgeTimestampUnit.value()); + + ASSERT_TRUE(roundTrip->zlibCompressionLevel.has_value()); + ASSERT_EQ( + opts.zlibCompressionLevel.value(), + roundTrip->zlibCompressionLevel.value()); +} + +TEST_F(WriterOptionsSerDeTest, writerOptionsDefaultsRoundTrip) { + WriterOptions opts; + + folly::dynamic dyn = opts.serialize(); + auto roundTrip = WriterOptions::deserialize(dyn); + + ASSERT_EQ(nullptr, roundTrip->schema); + ASSERT_FALSE(roundTrip->compressionKind.has_value()); + ASSERT_TRUE(roundTrip->serdeParameters.empty()); + ASSERT_FALSE(roundTrip->maxStripeSize.has_value()); + ASSERT_FALSE(roundTrip->arrowBridgeTimestampUnit.has_value()); + ASSERT_FALSE(roundTrip->zlibCompressionLevel.has_value()); + ASSERT_EQ(nullptr, roundTrip->spillConfig); + ASSERT_EQ(nullptr, roundTrip->ownedSpillConfig.get()); +} + +TEST_F(WriterOptionsSerDeTest, writerOptionsWithSpillConfigRoundTrip) { + SpillConfig cfg; + cfg.fileNamePrefix = "writer_spill"; + cfg.maxFileSize = 256UL * 1024 * 1024; + cfg.spillUringEnabled = true; + cfg.writeBufferSize = 8UL * 1024 * 1024; + cfg.minSpillableReservationPct = 5; + cfg.spillableReservationGrowthPct = 15; + cfg.startPartitionBit = 29; + cfg.joinPartitionBits = 4; + cfg.joinRepartitionBits = 4; + cfg.maxSpillLevel = -1; + cfg.maxSpillRunRows = 0; + cfg.writerFlushThresholdSize = 32UL * 1024 * 1024; + cfg.testSpillPct = 0; + cfg.compressionKind = CompressionKind_NONE; + cfg.rowBasedSpillMode = RowBasedSpillMode::RAW; + cfg.jitEnabled = false; + + WriterOptions opts; + opts.spillConfig = &cfg; + + auto rt = WriterOptions::deserialize(opts.serialize()); + + ASSERT_NE(nullptr, rt->spillConfig); + ASSERT_NE(nullptr, rt->ownedSpillConfig.get()); + ASSERT_EQ(rt->spillConfig, rt->ownedSpillConfig.get()); + ASSERT_EQ(cfg.fileNamePrefix, rt->spillConfig->fileNamePrefix); + ASSERT_EQ(cfg.maxFileSize, rt->spillConfig->maxFileSize); + ASSERT_EQ(cfg.spillUringEnabled, rt->spillConfig->spillUringEnabled); + ASSERT_EQ(cfg.rowBasedSpillMode, rt->spillConfig->rowBasedSpillMode); + ASSERT_EQ(cfg.jitEnabled, rt->spillConfig->jitEnabled); +} + +TEST_F(WriterOptionsSerDeTest, writerOptionsNoSpillConfigRoundTrip) { + WriterOptions opts; + ASSERT_EQ(nullptr, opts.spillConfig); + + auto rt = WriterOptions::deserialize(opts.serialize()); + + ASSERT_EQ(nullptr, rt->spillConfig); + ASSERT_EQ(nullptr, rt->ownedSpillConfig.get()); +} + +TEST_F(WriterOptionsSerDeTest, writerOptionsNoSchemaRoundTrip) { + WriterOptions opts; + opts.compressionKind = common::CompressionKind_ZLIB; + opts.serdeParameters["key"] = "value"; + + folly::dynamic dyn = opts.serialize(); + auto roundTrip = WriterOptions::deserialize(dyn); + + ASSERT_EQ(nullptr, roundTrip->schema); + ASSERT_TRUE(roundTrip->compressionKind.has_value()); + ASSERT_EQ(common::CompressionKind_ZLIB, roundTrip->compressionKind.value()); + ASSERT_EQ(1u, roundTrip->serdeParameters.size()); + ASSERT_EQ("value", roundTrip->serdeParameters.at("key")); +}