Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,36 @@ DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) {
std::dynamic_pointer_cast<::arrow::TimestampType>(
arrowSchema->field(0)->type());
ASSERT_EQ(tsType->unit(), ::arrow::TimeUnit::MICRO);
ASSERT_EQ(tsType->timezone(), "America/Los_Angeles");
})));

const auto data = makeRowVector({makeFlatVector<Timestamp>(
10'000, [](auto row) { return Timestamp(row, row); })});
parquet::WriterOptions writerOptions;
writerOptions.memoryPool = leafPool_.get();
writerOptions.parquetWriteTimestampUnit = TimestampUnit::kMicro;
writerOptions.parquetWriteTimestampTimeZone = "America/Los_Angeles";

// Create an in-memory writer.
auto sink = std::make_unique<MemorySink>(
200 * 1024 * 1024,
dwio::common::FileSink::Options{.pool = leafPool_.get()});
auto writer = std::make_unique<parquet::Writer>(
std::move(sink), writerOptions, rootPool_, ROW({"c0"}, {TIMESTAMP()}));
writer->write(data);
writer->close();
};

TEST_F(ParquetWriterTest, parquetWriteTimestampTimeZoneWithDefault) {
SCOPED_TESTVALUE_SET(
"facebook::velox::parquet::Writer::write",
std::function<void(const ::arrow::Schema*)>(
([&](const ::arrow::Schema* arrowSchema) {
const auto tsType =
std::dynamic_pointer_cast<::arrow::TimestampType>(
arrowSchema->field(0)->type());
ASSERT_EQ(tsType->unit(), ::arrow::TimeUnit::MICRO);
ASSERT_EQ(tsType->timezone(), "");
})));

const auto data = makeRowVector({makeFlatVector<Timestamp>(
Expand Down
21 changes: 21 additions & 0 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <arrow/io/interfaces.h>
#include <arrow/table.h>
#include "velox/common/testutil/TestValue.h"
#include "velox/core/QueryConfig.h"
#include "velox/dwio/parquet/writer/arrow/Properties.h"
#include "velox/dwio/parquet/writer/arrow/Writer.h"
#include "velox/exec/MemoryReclaimer.h"
Expand Down Expand Up @@ -237,6 +238,7 @@ Writer::Writer(
}
options_.timestampUnit =
options.parquetWriteTimestampUnit.value_or(TimestampUnit::kNano);
options_.timestampTimeZone = options.parquetWriteTimestampTimeZone;
arrowContext_->properties =
getArrowParquetWriterOptions(options, flushPolicy_);
setMemoryReclaimers();
Expand Down Expand Up @@ -421,20 +423,39 @@ std::optional<TimestampUnit> getTimestampUnit(
return std::nullopt;
}

std::optional<std::string> getTimestampTimeZone(
const Config& config,
const char* configKey) {
if (const auto timezone = config.get<std::string>(configKey)) {
return timezone.value();
}
return std::nullopt;
}

} // namespace

void WriterOptions::processSessionConfigs(const Config& config) {
if (!parquetWriteTimestampUnit) {
parquetWriteTimestampUnit =
getTimestampUnit(config, kParquetSessionWriteTimestampUnit);
}

if (!parquetWriteTimestampTimeZone) {
parquetWriteTimestampTimeZone =
getTimestampTimeZone(config, core::QueryConfig::kSessionTimezone);
}
}

void WriterOptions::processHiveConnectorConfigs(const Config& config) {
if (!parquetWriteTimestampUnit) {
parquetWriteTimestampUnit =
getTimestampUnit(config, kParquetHiveConnectorWriteTimestampUnit);
}

if (!parquetWriteTimestampTimeZone) {
parquetWriteTimestampTimeZone =
getTimestampTimeZone(config, core::QueryConfig::kSessionTimezone);
}
}

std::unique_ptr<dwio::common::Writer> ParquetWriterFactory::createWriter(
Expand Down
2 changes: 2 additions & 0 deletions velox/dwio/parquet/writer/Writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ struct WriterOptions : public dwio::common::WriterOptions {
/// Timestamp unit for Parquet write through Arrow bridge.
/// Default if not specified: TimestampUnit::kNano (9).
std::optional<TimestampUnit> parquetWriteTimestampUnit;
/// Timestamp time zone for Parquet write through Arrow bridge.
std::optional<std::string> parquetWriteTimestampTimeZone;
bool writeInt96AsTimestamp = false;

// Parsing session and hive configs.
Expand Down
43 changes: 30 additions & 13 deletions velox/vector/arrow/Bridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ struct VeloxToArrowSchemaBridgeHolder {

std::unique_ptr<ArrowSchema> dictionary;

// Buffer required to generate a decimal format.
// Buffer required to generate a decimal format or timestamp with timezone
// format.
std::string formatBuffer;

void setChildAtIndex(
Expand Down Expand Up @@ -212,6 +213,33 @@ static void releaseArrowSchema(ArrowSchema* arrowSchema) {
arrowSchema->private_data = nullptr;
}

const char* exportArrowFormatTimestampStr(
const ArrowOptions& options,
std::string& formatBuffer) {
switch (options.timestampUnit) {
case TimestampUnit::kSecond:
formatBuffer = "tss:";
break;
case TimestampUnit::kMilli:
formatBuffer = "tsm:";
break;
case TimestampUnit::kMicro:
formatBuffer = "tsu:";
break;
case TimestampUnit::kNano:
formatBuffer = "tsn:";
break;
default:
VELOX_UNREACHABLE();
}

if (options.timestampTimeZone.has_value()) {
formatBuffer += options.timestampTimeZone.value();
}

return formatBuffer.c_str();
}

// Returns the Arrow C data interface format type for a given Velox type.
const char* exportArrowFormatStr(
const TypePtr& type,
Expand Down Expand Up @@ -255,18 +283,7 @@ const char* exportArrowFormatStr(
case TypeKind::UNKNOWN:
return "n"; // NullType
case TypeKind::TIMESTAMP:
switch (options.timestampUnit) {
case TimestampUnit::kSecond:
return "tss:";
case TimestampUnit::kMilli:
return "tsm:";
case TimestampUnit::kMicro:
return "tsu:";
case TimestampUnit::kNano:
return "tsn:";
default:
VELOX_UNREACHABLE();
}
return exportArrowFormatTimestampStr(options, formatBuffer);
// Complex/nested types.
case TypeKind::ARRAY:
static_assert(sizeof(vector_size_t) == 4);
Expand Down
1 change: 1 addition & 0 deletions velox/vector/arrow/Bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ struct ArrowOptions {
bool flattenDictionary{false};
bool flattenConstant{false};
TimestampUnit timestampUnit = TimestampUnit::kNano;
std::optional<std::string> timestampTimeZone{std::nullopt};
};

namespace facebook::velox {
Expand Down
12 changes: 12 additions & 0 deletions velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ TEST_F(ArrowBridgeSchemaExportTest, scalar) {
testScalarType(VARCHAR(), "u");
testScalarType(VARBINARY(), "z");

// Test default timezone
options_.timestampUnit = TimestampUnit::kSecond;
testScalarType(TIMESTAMP(), "tss:");
options_.timestampUnit = TimestampUnit::kMilli;
Expand All @@ -201,6 +202,17 @@ TEST_F(ArrowBridgeSchemaExportTest, scalar) {
options_.timestampUnit = TimestampUnit::kNano;
testScalarType(TIMESTAMP(), "tsn:");

// Test specific timezone
options_.timestampTimeZone = "+01:0";
options_.timestampUnit = TimestampUnit::kSecond;
testScalarType(TIMESTAMP(), "tss:+01:0");
options_.timestampUnit = TimestampUnit::kMilli;
testScalarType(TIMESTAMP(), "tsm:+01:0");
options_.timestampUnit = TimestampUnit::kMicro;
testScalarType(TIMESTAMP(), "tsu:+01:0");
options_.timestampUnit = TimestampUnit::kNano;
testScalarType(TIMESTAMP(), "tsn:+01:0");

testScalarType(DATE(), "tdD");
testScalarType(INTERVAL_YEAR_MONTH(), "tiM");

Expand Down