Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions velox/dwio/common/Options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "velox/dwio/common/Options.h"
#include "velox/common/compression/Compression.h"

namespace facebook::velox::dwio::common {

Expand Down Expand Up @@ -77,4 +78,85 @@ ColumnReaderOptions makeColumnReaderOptions(const ReaderOptions& options) {
return columnReaderOptions;
}

folly::dynamic WriterOptions::serialize() const {
folly::dynamic obj = folly::dynamic::object;
obj["name"] = "WriterOptions";

// 1) Schema
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ordinal numbers like this are fragile as they do not survice refactoring. Please, drop.

Also, these comments seem redundant as they just repeat the code. Consider dropping altogether.

if (schema) {
obj["schema"] = schema->serialize();
}

// 2) compressionKind
if (compressionKind) {
obj["compressionKind"] = static_cast<int>(*compressionKind);
}

// 3) serdeParameters
if (!serdeParameters.empty()) {
folly::dynamic mapObj = folly::dynamic::object;
for (auto& [k, v] : serdeParameters) {
mapObj[k] = v;
}
obj["serdeParameters"] = std::move(mapObj);
}

// 4) sessionTimezoneName
if (!sessionTimezoneName.empty()) {
obj["sessionTimezoneName"] = sessionTimezoneName;
}

// 5) adjustTimestampToTimezone
obj["adjustTimestampToTimezone"] = adjustTimestampToTimezone;

// We do *not* serialize pool, spillConfig, nonReclaimableSection,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you move this comment to the header file? I assume the caller of 'create' would need to be aware of these.

// or the factory functions—they must be re‐injected by the host.

return obj;
}

std::shared_ptr<WriterOptions> WriterOptions::deserialize(
const folly::dynamic& obj) {
auto opts = std::make_shared<WriterOptions>();

if (auto p = obj.get_ptr("schema")) {
opts->schema = ISerializable::deserialize<velox::Type>(*p);
}

if (auto p = obj.get_ptr("compressionKind")) {
opts->compressionKind =
static_cast<velox::common::CompressionKind>(p->asInt());
}

if (auto p = obj.get_ptr("serdeParameters")) {
opts->serdeParameters.clear();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is redundant.

for (auto& kv : p->items()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (const auto& [key, value] : p->items())

opts->serdeParameters.emplace(kv.first.asString(), kv.second.asString());
}
}

if (auto p = obj.get_ptr("sessionTimezoneName")) {
opts->sessionTimezoneName = p->asString();
}

if (auto p = obj.get_ptr("adjustTimestampToTimezone")) {
opts->adjustTimestampToTimezone = p->asBool();
}

// TODO: Finish spillConfig. We currently do not serialize it.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: Finish spillConfig.

What does that mean? Would you elaborate? Can you address this TODO now?

// pool, nonReclaimableSection, factories remain at default
return opts;
}

void WriterOptions::registerSerDe() {
auto& registry = DeserializationRegistryForSharedPtr();
registry.Register("WriterOptions", WriterOptions::deserialize);
}

// force registration at load‐time
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove that and require explicit registration.

static bool _writerOptionsSerdeRegistered = []() {
WriterOptions::registerSerDe();
return true;
}();

} // namespace facebook::velox::dwio::common
6 changes: 5 additions & 1 deletion velox/dwio/common/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ class ReaderOptions : public io::ReaderOptions {
bool allowEmptyFile_{false};
};

struct WriterOptions {
struct WriterOptions : public ISerializable {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that WriterOptions has a virtual method (processConfigs), which means that it has derived classes, which may have additional state. Serde needs to take this into account.

TypePtr schema{nullptr};
velox::memory::MemoryPool* memoryPool{nullptr};
const velox::common::SpillConfig* spillConfig{nullptr};
Expand Down Expand Up @@ -722,6 +722,10 @@ struct WriterOptions {
const config::ConfigBase& session) {}

virtual ~WriterOptions() = default;

folly::dynamic serialize() const override;
static std::shared_ptr<WriterOptions> deserialize(const folly::dynamic& obj);
static void registerSerDe();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add this to HiveConnector::registerSerDe?

};

// Options for creating a column reader.
Expand Down
109 changes: 109 additions & 0 deletions velox/dwio/common/tests/OptionsTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
* limitations under the License.
*/
#include <gtest/gtest.h>

// #include "velox/common/compression/Compression.h"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, remove.

#include "velox/dwio/common/Options.h"

using namespace ::testing;
using namespace facebook::velox;
using namespace facebook::velox::common;
using namespace facebook::velox::dwio::common;

TEST(OptionsTests, defaultRowNumberColumnInfoTest) {
Expand Down Expand Up @@ -51,3 +55,108 @@ TEST(OptionsTests, testRowNumberColumnInfoInCopy) {
ASSERT_EQ(rowNumberColumnInfo.insertPosition, rowNumberColumn.insertPosition);
ASSERT_EQ(rowNumberColumnInfo.name, rowNumberColumn.name);
}

TEST(OptionsTests, WriterOptionsSerdeTestRoundTripWithAllFields) {
Type::registerSerDe();
WriterOptions::registerSerDe();

auto opts = std::make_shared<WriterOptions>();

// Schema: row<a:bigint>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Drop redundant comments.

TypePtr schema = ROW({{"a", BIGINT()}});
opts->schema = schema;
opts->compressionKind = CompressionKind::CompressionKind_ZSTD;
opts->serdeParameters = {{"k1", "v1"}, {"k2", "v2"}};
opts->sessionTimezoneName = "America/Los_Angeles";
opts->adjustTimestampToTimezone = true;

// Note: We intentionally do NOT set memoryPool, nonReclaimableSection, or the
// factory callables because those are not serialized by design.

// Serialize
folly::dynamic serialized = opts->serialize();

// Basic shape checks on serialized output
ASSERT_TRUE(serialized.isObject());
// Always present:
ASSERT_TRUE(serialized.count("adjustTimestampToTimezone") == 1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ASSERT_EQ

However, let's not hard-code serde format in the test. Rather, let's create round-trip test.

// Populated in this test:
EXPECT_TRUE(serialized.count("schema") == 1);
EXPECT_TRUE(serialized.count("compressionKind") == 1);
EXPECT_TRUE(serialized.count("serdeParameters") == 1);
EXPECT_TRUE(serialized.count("sessionTimezoneName") == 1);
// Not serialized:
EXPECT_EQ(serialized.count("memoryPool"), 0);
EXPECT_EQ(serialized.count("spillConfig"), 0);
EXPECT_EQ(serialized.count("nonReclaimableSection"), 0);
EXPECT_EQ(serialized.count("memoryReclaimerFactory"), 0);
EXPECT_EQ(serialized.count("flushPolicyFactory"), 0);

// Deserialize
auto roundTripped = ISerializable::deserialize<WriterOptions>(serialized);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

roundTripped -> copy

ASSERT_NE(roundTripped, nullptr);

// Validate schema equality (compare types by string or deep equals)
ASSERT_TRUE(roundTripped->schema != nullptr);
EXPECT_EQ(roundTripped->schema->toString(), schema->toString());

// Validate compression kind
ASSERT_TRUE(roundTripped->compressionKind.has_value());
EXPECT_EQ(
*roundTripped->compressionKind, CompressionKind::CompressionKind_ZSTD);

// Validate serde parameters
EXPECT_EQ(roundTripped->serdeParameters.size(), 2);
EXPECT_EQ(roundTripped->serdeParameters.at("k1"), "v1");
EXPECT_EQ(roundTripped->serdeParameters.at("k2"), "v2");

// Validate timezone + adjust flag
EXPECT_EQ(roundTripped->sessionTimezoneName, "America/Los_Angeles");
EXPECT_TRUE(roundTripped->adjustTimestampToTimezone);

// Validate that non-serialized fields remain default/null
EXPECT_EQ(roundTripped->memoryPool, nullptr);
EXPECT_EQ(roundTripped->spillConfig, nullptr);
EXPECT_EQ(roundTripped->nonReclaimableSection, nullptr);

// Factories should be default-initialized (callables present but return
// nullptr / no-op)
ASSERT_TRUE(static_cast<bool>(roundTripped->memoryReclaimerFactory));
EXPECT_EQ(roundTripped->memoryReclaimerFactory(), nullptr);
// flushPolicyFactory is default-constructed empty unless you set it;
// implementation may leave it empty by default — check it doesn't crash when
// inspected. We only assert it is empty here (adjust if your constructor sets
// one).
EXPECT_FALSE(static_cast<bool>(roundTripped->flushPolicyFactory));
}

TEST(WriterOptionsSerdeTest, DefaultsAndUnknownFieldsAreSafe) {
Type::registerSerDe();
WriterOptions::registerSerDe();

// Minimal object: only adjustTimestampToTimezone is always emitted by
// serialize(). Here we simulate missing keys and an extra unknown key to
// ensure robustness.
folly::dynamic minimal = folly::dynamic::object;
minimal["adjustTimestampToTimezone"] = false;
minimal["unknownExtraKey"] = "ignored";

auto opts = WriterOptions::deserialize(minimal);
ASSERT_NE(opts, nullptr);

// Absent -> defaults
EXPECT_TRUE(opts->schema == nullptr);
EXPECT_FALSE(opts->compressionKind.has_value());
EXPECT_TRUE(opts->serdeParameters.empty());
EXPECT_TRUE(opts->sessionTimezoneName.empty());
EXPECT_FALSE(opts->adjustTimestampToTimezone); // false from our dynamic

// Non-serialized pointers/factories remain default/null
EXPECT_EQ(opts->memoryPool, nullptr);
EXPECT_EQ(opts->spillConfig, nullptr);
EXPECT_EQ(opts->nonReclaimableSection, nullptr);
// factory callables default as in your implementation
ASSERT_TRUE(static_cast<bool>(opts->memoryReclaimerFactory));
EXPECT_EQ(opts->memoryReclaimerFactory(), nullptr);
EXPECT_FALSE(static_cast<bool>(opts->flushPolicyFactory));
}
Loading