diff --git a/velox/dwio/common/Options.cpp b/velox/dwio/common/Options.cpp index 33946f71159..9f9feb108fa 100644 --- a/velox/dwio/common/Options.cpp +++ b/velox/dwio/common/Options.cpp @@ -15,6 +15,7 @@ */ #include "velox/dwio/common/Options.h" +#include "velox/common/compression/Compression.h" namespace facebook::velox::dwio::common { @@ -77,4 +78,85 @@ ColumnReaderOptions makeColumnReaderOptions(const ReaderOptions& options) { return columnReaderOptions; } +folly::dynamic WriterOptions::serialize() const { + folly::dynamic obj = folly::dynamic::object; + obj["name"] = "WriterOptions"; + + // 1) Schema + if (schema) { + obj["schema"] = schema->serialize(); + } + + // 2) compressionKind + if (compressionKind) { + obj["compressionKind"] = static_cast(*compressionKind); + } + + // 3) serdeParameters + if (!serdeParameters.empty()) { + folly::dynamic mapObj = folly::dynamic::object; + for (auto& [k, v] : serdeParameters) { + mapObj[k] = v; + } + obj["serdeParameters"] = std::move(mapObj); + } + + // 4) sessionTimezoneName + if (!sessionTimezoneName.empty()) { + obj["sessionTimezoneName"] = sessionTimezoneName; + } + + // 5) adjustTimestampToTimezone + obj["adjustTimestampToTimezone"] = adjustTimestampToTimezone; + + // We do *not* serialize pool, spillConfig, nonReclaimableSection, + // or the factory functions—they must be re‐injected by the host. + + return obj; +} + +std::shared_ptr WriterOptions::deserialize( + const folly::dynamic& obj) { + auto opts = std::make_shared(); + + if (auto p = obj.get_ptr("schema")) { + opts->schema = ISerializable::deserialize(*p); + } + + if (auto p = obj.get_ptr("compressionKind")) { + opts->compressionKind = + static_cast(p->asInt()); + } + + if (auto p = obj.get_ptr("serdeParameters")) { + opts->serdeParameters.clear(); + for (auto& kv : p->items()) { + opts->serdeParameters.emplace(kv.first.asString(), kv.second.asString()); + } + } + + if (auto p = obj.get_ptr("sessionTimezoneName")) { + opts->sessionTimezoneName = p->asString(); + } + + if (auto p = obj.get_ptr("adjustTimestampToTimezone")) { + opts->adjustTimestampToTimezone = p->asBool(); + } + + // TODO: Finish spillConfig. We currently do not serialize it. + // pool, nonReclaimableSection, factories remain at default + return opts; +} + +void WriterOptions::registerSerDe() { + auto& registry = DeserializationRegistryForSharedPtr(); + registry.Register("WriterOptions", WriterOptions::deserialize); +} + +// force registration at load‐time +static bool _writerOptionsSerdeRegistered = []() { + WriterOptions::registerSerDe(); + return true; +}(); + } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/Options.h b/velox/dwio/common/Options.h index 851f97bee9c..5f173f440c8 100644 --- a/velox/dwio/common/Options.h +++ b/velox/dwio/common/Options.h @@ -693,7 +693,7 @@ class ReaderOptions : public io::ReaderOptions { bool allowEmptyFile_{false}; }; -struct WriterOptions { +struct WriterOptions : public ISerializable { TypePtr schema{nullptr}; velox::memory::MemoryPool* memoryPool{nullptr}; const velox::common::SpillConfig* spillConfig{nullptr}; @@ -722,6 +722,10 @@ struct WriterOptions { const config::ConfigBase& session) {} virtual ~WriterOptions() = default; + + folly::dynamic serialize() const override; + static std::shared_ptr deserialize(const folly::dynamic& obj); + static void registerSerDe(); }; // Options for creating a column reader. diff --git a/velox/dwio/common/tests/OptionsTests.cpp b/velox/dwio/common/tests/OptionsTests.cpp index 82335821c52..dd957d9e3d2 100644 --- a/velox/dwio/common/tests/OptionsTests.cpp +++ b/velox/dwio/common/tests/OptionsTests.cpp @@ -14,9 +14,13 @@ * limitations under the License. */ #include + +// #include "velox/common/compression/Compression.h" #include "velox/dwio/common/Options.h" using namespace ::testing; +using namespace facebook::velox; +using namespace facebook::velox::common; using namespace facebook::velox::dwio::common; TEST(OptionsTests, defaultRowNumberColumnInfoTest) { @@ -51,3 +55,108 @@ TEST(OptionsTests, testRowNumberColumnInfoInCopy) { ASSERT_EQ(rowNumberColumnInfo.insertPosition, rowNumberColumn.insertPosition); ASSERT_EQ(rowNumberColumnInfo.name, rowNumberColumn.name); } + +TEST(OptionsTests, WriterOptionsSerdeTestRoundTripWithAllFields) { + Type::registerSerDe(); + WriterOptions::registerSerDe(); + + auto opts = std::make_shared(); + + // Schema: row + TypePtr schema = ROW({{"a", BIGINT()}}); + opts->schema = schema; + opts->compressionKind = CompressionKind::CompressionKind_ZSTD; + opts->serdeParameters = {{"k1", "v1"}, {"k2", "v2"}}; + opts->sessionTimezoneName = "America/Los_Angeles"; + opts->adjustTimestampToTimezone = true; + + // Note: We intentionally do NOT set memoryPool, nonReclaimableSection, or the + // factory callables because those are not serialized by design. + + // Serialize + folly::dynamic serialized = opts->serialize(); + + // Basic shape checks on serialized output + ASSERT_TRUE(serialized.isObject()); + // Always present: + ASSERT_TRUE(serialized.count("adjustTimestampToTimezone") == 1); + // Populated in this test: + EXPECT_TRUE(serialized.count("schema") == 1); + EXPECT_TRUE(serialized.count("compressionKind") == 1); + EXPECT_TRUE(serialized.count("serdeParameters") == 1); + EXPECT_TRUE(serialized.count("sessionTimezoneName") == 1); + // Not serialized: + EXPECT_EQ(serialized.count("memoryPool"), 0); + EXPECT_EQ(serialized.count("spillConfig"), 0); + EXPECT_EQ(serialized.count("nonReclaimableSection"), 0); + EXPECT_EQ(serialized.count("memoryReclaimerFactory"), 0); + EXPECT_EQ(serialized.count("flushPolicyFactory"), 0); + + // Deserialize + auto roundTripped = ISerializable::deserialize(serialized); + ASSERT_NE(roundTripped, nullptr); + + // Validate schema equality (compare types by string or deep equals) + ASSERT_TRUE(roundTripped->schema != nullptr); + EXPECT_EQ(roundTripped->schema->toString(), schema->toString()); + + // Validate compression kind + ASSERT_TRUE(roundTripped->compressionKind.has_value()); + EXPECT_EQ( + *roundTripped->compressionKind, CompressionKind::CompressionKind_ZSTD); + + // Validate serde parameters + EXPECT_EQ(roundTripped->serdeParameters.size(), 2); + EXPECT_EQ(roundTripped->serdeParameters.at("k1"), "v1"); + EXPECT_EQ(roundTripped->serdeParameters.at("k2"), "v2"); + + // Validate timezone + adjust flag + EXPECT_EQ(roundTripped->sessionTimezoneName, "America/Los_Angeles"); + EXPECT_TRUE(roundTripped->adjustTimestampToTimezone); + + // Validate that non-serialized fields remain default/null + EXPECT_EQ(roundTripped->memoryPool, nullptr); + EXPECT_EQ(roundTripped->spillConfig, nullptr); + EXPECT_EQ(roundTripped->nonReclaimableSection, nullptr); + + // Factories should be default-initialized (callables present but return + // nullptr / no-op) + ASSERT_TRUE(static_cast(roundTripped->memoryReclaimerFactory)); + EXPECT_EQ(roundTripped->memoryReclaimerFactory(), nullptr); + // flushPolicyFactory is default-constructed empty unless you set it; + // implementation may leave it empty by default — check it doesn't crash when + // inspected. We only assert it is empty here (adjust if your constructor sets + // one). + EXPECT_FALSE(static_cast(roundTripped->flushPolicyFactory)); +} + +TEST(WriterOptionsSerdeTest, DefaultsAndUnknownFieldsAreSafe) { + Type::registerSerDe(); + WriterOptions::registerSerDe(); + + // Minimal object: only adjustTimestampToTimezone is always emitted by + // serialize(). Here we simulate missing keys and an extra unknown key to + // ensure robustness. + folly::dynamic minimal = folly::dynamic::object; + minimal["adjustTimestampToTimezone"] = false; + minimal["unknownExtraKey"] = "ignored"; + + auto opts = WriterOptions::deserialize(minimal); + ASSERT_NE(opts, nullptr); + + // Absent -> defaults + EXPECT_TRUE(opts->schema == nullptr); + EXPECT_FALSE(opts->compressionKind.has_value()); + EXPECT_TRUE(opts->serdeParameters.empty()); + EXPECT_TRUE(opts->sessionTimezoneName.empty()); + EXPECT_FALSE(opts->adjustTimestampToTimezone); // false from our dynamic + + // Non-serialized pointers/factories remain default/null + EXPECT_EQ(opts->memoryPool, nullptr); + EXPECT_EQ(opts->spillConfig, nullptr); + EXPECT_EQ(opts->nonReclaimableSection, nullptr); + // factory callables default as in your implementation + ASSERT_TRUE(static_cast(opts->memoryReclaimerFactory)); + EXPECT_EQ(opts->memoryReclaimerFactory(), nullptr); + EXPECT_FALSE(static_cast(opts->flushPolicyFactory)); +}