diff --git a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp index d5bc8529cd8..6c9a930972e 100644 --- a/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp +++ b/velox/dwio/parquet/tests/writer/ParquetWriterTest.cpp @@ -150,6 +150,36 @@ DEBUG_ONLY_TEST_F(ParquetWriterTest, unitFromWriterOptions) { std::dynamic_pointer_cast<::arrow::TimestampType>( arrowSchema->field(0)->type()); ASSERT_EQ(tsType->unit(), ::arrow::TimeUnit::MICRO); + ASSERT_EQ(tsType->timezone(), "America/Los_Angeles"); + }))); + + const auto data = makeRowVector({makeFlatVector( + 10'000, [](auto row) { return Timestamp(row, row); })}); + parquet::WriterOptions writerOptions; + writerOptions.memoryPool = leafPool_.get(); + writerOptions.parquetWriteTimestampUnit = TimestampUnit::kMicro; + writerOptions.parquetWriteTimestampTimeZone = "America/Los_Angeles"; + + // Create an in-memory writer. + auto sink = std::make_unique( + 200 * 1024 * 1024, + dwio::common::FileSink::Options{.pool = leafPool_.get()}); + auto writer = std::make_unique( + std::move(sink), writerOptions, rootPool_, ROW({"c0"}, {TIMESTAMP()})); + writer->write(data); + writer->close(); +}; + +TEST_F(ParquetWriterTest, parquetWriteTimestampTimeZoneWithDefault) { + SCOPED_TESTVALUE_SET( + "facebook::velox::parquet::Writer::write", + std::function( + ([&](const ::arrow::Schema* arrowSchema) { + const auto tsType = + std::dynamic_pointer_cast<::arrow::TimestampType>( + arrowSchema->field(0)->type()); + ASSERT_EQ(tsType->unit(), ::arrow::TimeUnit::MICRO); + ASSERT_EQ(tsType->timezone(), ""); }))); const auto data = makeRowVector({makeFlatVector( diff --git a/velox/dwio/parquet/writer/Writer.cpp b/velox/dwio/parquet/writer/Writer.cpp index d508b53356f..baffa40cec9 100644 --- a/velox/dwio/parquet/writer/Writer.cpp +++ b/velox/dwio/parquet/writer/Writer.cpp @@ -19,6 +19,7 @@ #include #include #include "velox/common/testutil/TestValue.h" +#include "velox/core/QueryConfig.h" #include "velox/dwio/parquet/writer/arrow/Properties.h" #include "velox/dwio/parquet/writer/arrow/Writer.h" #include "velox/exec/MemoryReclaimer.h" @@ -237,6 +238,7 @@ Writer::Writer( } options_.timestampUnit = options.parquetWriteTimestampUnit.value_or(TimestampUnit::kNano); + options_.timestampTimeZone = options.parquetWriteTimestampTimeZone; arrowContext_->properties = getArrowParquetWriterOptions(options, flushPolicy_); setMemoryReclaimers(); @@ -421,6 +423,15 @@ std::optional getTimestampUnit( return std::nullopt; } +std::optional getTimestampTimeZone( + const Config& config, + const char* configKey) { + if (const auto timezone = config.get(configKey)) { + return timezone.value(); + } + return std::nullopt; +} + } // namespace void WriterOptions::processSessionConfigs(const Config& config) { @@ -428,6 +439,11 @@ void WriterOptions::processSessionConfigs(const Config& config) { parquetWriteTimestampUnit = getTimestampUnit(config, kParquetSessionWriteTimestampUnit); } + + if (!parquetWriteTimestampTimeZone) { + parquetWriteTimestampTimeZone = + getTimestampTimeZone(config, core::QueryConfig::kSessionTimezone); + } } void WriterOptions::processHiveConnectorConfigs(const Config& config) { @@ -435,6 +451,11 @@ void WriterOptions::processHiveConnectorConfigs(const Config& config) { parquetWriteTimestampUnit = getTimestampUnit(config, kParquetHiveConnectorWriteTimestampUnit); } + + if (!parquetWriteTimestampTimeZone) { + parquetWriteTimestampTimeZone = + getTimestampTimeZone(config, core::QueryConfig::kSessionTimezone); + } } std::unique_ptr ParquetWriterFactory::createWriter( diff --git a/velox/dwio/parquet/writer/Writer.h b/velox/dwio/parquet/writer/Writer.h index 6901c2dc3be..261fd9016f7 100644 --- a/velox/dwio/parquet/writer/Writer.h +++ b/velox/dwio/parquet/writer/Writer.h @@ -109,6 +109,8 @@ struct WriterOptions : public dwio::common::WriterOptions { /// Timestamp unit for Parquet write through Arrow bridge. /// Default if not specified: TimestampUnit::kNano (9). std::optional parquetWriteTimestampUnit; + /// Timestamp time zone for Parquet write through Arrow bridge. + std::optional parquetWriteTimestampTimeZone; bool writeInt96AsTimestamp = false; // Parsing session and hive configs. diff --git a/velox/vector/arrow/Bridge.cpp b/velox/vector/arrow/Bridge.cpp index 8d9fa46ca9c..07fe130b689 100644 --- a/velox/vector/arrow/Bridge.cpp +++ b/velox/vector/arrow/Bridge.cpp @@ -124,7 +124,8 @@ struct VeloxToArrowSchemaBridgeHolder { std::unique_ptr dictionary; - // Buffer required to generate a decimal format. + // Buffer required to generate a decimal format or timestamp with timezone + // format. std::string formatBuffer; void setChildAtIndex( @@ -212,6 +213,33 @@ static void releaseArrowSchema(ArrowSchema* arrowSchema) { arrowSchema->private_data = nullptr; } +const char* exportArrowFormatTimestampStr( + const ArrowOptions& options, + std::string& formatBuffer) { + switch (options.timestampUnit) { + case TimestampUnit::kSecond: + formatBuffer = "tss:"; + break; + case TimestampUnit::kMilli: + formatBuffer = "tsm:"; + break; + case TimestampUnit::kMicro: + formatBuffer = "tsu:"; + break; + case TimestampUnit::kNano: + formatBuffer = "tsn:"; + break; + default: + VELOX_UNREACHABLE(); + } + + if (options.timestampTimeZone.has_value()) { + formatBuffer += options.timestampTimeZone.value(); + } + + return formatBuffer.c_str(); +} + // Returns the Arrow C data interface format type for a given Velox type. const char* exportArrowFormatStr( const TypePtr& type, @@ -255,18 +283,7 @@ const char* exportArrowFormatStr( case TypeKind::UNKNOWN: return "n"; // NullType case TypeKind::TIMESTAMP: - switch (options.timestampUnit) { - case TimestampUnit::kSecond: - return "tss:"; - case TimestampUnit::kMilli: - return "tsm:"; - case TimestampUnit::kMicro: - return "tsu:"; - case TimestampUnit::kNano: - return "tsn:"; - default: - VELOX_UNREACHABLE(); - } + return exportArrowFormatTimestampStr(options, formatBuffer); // Complex/nested types. case TypeKind::ARRAY: static_assert(sizeof(vector_size_t) == 4); diff --git a/velox/vector/arrow/Bridge.h b/velox/vector/arrow/Bridge.h index 7d93a809881..f29c099ba88 100644 --- a/velox/vector/arrow/Bridge.h +++ b/velox/vector/arrow/Bridge.h @@ -36,6 +36,7 @@ struct ArrowOptions { bool flattenDictionary{false}; bool flattenConstant{false}; TimestampUnit timestampUnit = TimestampUnit::kNano; + std::optional timestampTimeZone{std::nullopt}; }; namespace facebook::velox { diff --git a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp index 0837c96a09a..7734c411763 100644 --- a/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp +++ b/velox/vector/arrow/tests/ArrowBridgeSchemaTest.cpp @@ -192,6 +192,7 @@ TEST_F(ArrowBridgeSchemaExportTest, scalar) { testScalarType(VARCHAR(), "u"); testScalarType(VARBINARY(), "z"); + // Test default timezone options_.timestampUnit = TimestampUnit::kSecond; testScalarType(TIMESTAMP(), "tss:"); options_.timestampUnit = TimestampUnit::kMilli; @@ -201,6 +202,17 @@ TEST_F(ArrowBridgeSchemaExportTest, scalar) { options_.timestampUnit = TimestampUnit::kNano; testScalarType(TIMESTAMP(), "tsn:"); + // Test specific timezone + options_.timestampTimeZone = "+01:0"; + options_.timestampUnit = TimestampUnit::kSecond; + testScalarType(TIMESTAMP(), "tss:+01:0"); + options_.timestampUnit = TimestampUnit::kMilli; + testScalarType(TIMESTAMP(), "tsm:+01:0"); + options_.timestampUnit = TimestampUnit::kMicro; + testScalarType(TIMESTAMP(), "tsu:+01:0"); + options_.timestampUnit = TimestampUnit::kNano; + testScalarType(TIMESTAMP(), "tsn:+01:0"); + testScalarType(DATE(), "tdD"); testScalarType(INTERVAL_YEAR_MONTH(), "tiM");