Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ object DateTimeUtils {
*/
def fromJulianDay(days: Int, nanos: Long): Long = {
// use Long to avoid rounding errors
val micros = (days - JULIAN_DAY_OF_EPOCH).toLong * MICROS_PER_DAY + nanos / NANOS_PER_MICROS
rebaseJulianToGregorianMicros(micros)
(days - JULIAN_DAY_OF_EPOCH).toLong * MICROS_PER_DAY + nanos / NANOS_PER_MICROS
}

/**
Expand All @@ -179,7 +178,7 @@ object DateTimeUtils {
* Note: support timestamp since 4717 BC (without negative nanoseconds, compatible with Hive).
*/
def toJulianDay(micros: Long): (Int, Long) = {
val julianUs = rebaseGregorianToJulianMicros(micros) + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY
val julianUs = micros + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY
val days = julianUs / MICROS_PER_DAY
val us = julianUs % MICROS_PER_DAY
(days.toInt, MICROSECONDS.toNanos(us))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2640,6 +2640,20 @@ object SQLConf {
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
.createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)

val LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE =
buildConf("spark.sql.legacy.parquet.int96RebaseModeInWrite")
.internal()
.doc("When LEGACY, which is the default, Spark will rebase INT96 timestamps from " +
"Proleptic Gregorian calendar to the legacy hybrid (Julian + Gregorian) calendar when " +
"writing Parquet files. When CORRECTED, Spark will not do rebase and write the timestamps" +
" as it is. When EXCEPTION, Spark will fail the writing if it sees ancient timestamps " +
"that are ambiguous between the two calendars.")
.version("3.1.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
.createWithDefault(LegacyBehaviorPolicy.LEGACY.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the default be made LegacyBehaviorPolicy.EXCEPTION instead? Could also do this in a follow-up PR if this is controversial.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that we can do that in the minor release 3.1. This can break existing apps. @cloud-fan @HyukjinKwon WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a breaking change, but probably is acceptable as it doesn't lead to silent results changing. We need an item in the migration guide though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the PR #30121 . Will see how many tests this will affect.


val LEGACY_PARQUET_REBASE_MODE_IN_READ =
buildConf("spark.sql.legacy.parquet.datetimeRebaseModeInRead")
.internal()
Expand All @@ -2655,6 +2669,21 @@ object SQLConf {
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
.createWithDefault(LegacyBehaviorPolicy.EXCEPTION.toString)

val LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ =
buildConf("spark.sql.legacy.parquet.int96RebaseModeInRead")
.internal()
.doc("When LEGACY, which is the default, Spark will rebase INT96 timestamps from " +
"the legacy hybrid (Julian + Gregorian) calendar to Proleptic Gregorian calendar when " +
"reading Parquet files. When CORRECTED, Spark will not do rebase and read the timestamps " +
"as it is. When EXCEPTION, Spark will fail the reading if it sees ancient timestamps " +
"that are ambiguous between the two calendars. This config is only effective if the " +
"writer info (like Spark, Hive) of the Parquet files is unknown.")
.version("3.1.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(LegacyBehaviorPolicy.values.map(_.toString))
.createWithDefault(LegacyBehaviorPolicy.LEGACY.toString)

val LEGACY_AVRO_REBASE_MODE_IN_WRITE =
buildConf("spark.sql.legacy.avro.datetimeRebaseModeInWrite")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.catalyst.util.RebaseDateTime.rebaseJulianToGregorianMicros
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
Expand Down Expand Up @@ -70,17 +71,17 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
}

test("us and julian day") {
val (d, ns) = toJulianDay(0)
val (d, ns) = toJulianDay(RebaseDateTime.rebaseGregorianToJulianMicros(0))
assert(d === JULIAN_DAY_OF_EPOCH)
assert(ns === 0)
assert(fromJulianDay(d, ns) == 0L)
assert(rebaseJulianToGregorianMicros(fromJulianDay(d, ns)) == 0L)

Seq(Timestamp.valueOf("2015-06-11 10:10:10.100"),
Timestamp.valueOf("2015-06-11 20:10:10.100"),
Timestamp.valueOf("1900-06-11 20:10:10.100")).foreach { t =>
val (d, ns) = toJulianDay(fromJavaTimestamp(t))
val (d, ns) = toJulianDay(RebaseDateTime.rebaseGregorianToJulianMicros(fromJavaTimestamp(t)))
assert(ns > 0)
val t1 = toJavaTimestamp(fromJulianDay(d, ns))
val t1 = toJavaTimestamp(rebaseJulianToGregorianMicros(fromJulianDay(d, ns)))
assert(t.equals(t1))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,15 @@ public class VectorizedColumnReader {
private final ZoneId convertTz;
private static final ZoneId UTC = ZoneOffset.UTC;
private final String datetimeRebaseMode;
private final String int96RebaseMode;

public VectorizedColumnReader(
ColumnDescriptor descriptor,
OriginalType originalType,
PageReader pageReader,
ZoneId convertTz,
String datetimeRebaseMode) throws IOException {
String datetimeRebaseMode,
String int96RebaseMode) throws IOException {
this.descriptor = descriptor;
this.pageReader = pageReader;
this.convertTz = convertTz;
Expand All @@ -136,6 +138,9 @@ public VectorizedColumnReader(
assert "LEGACY".equals(datetimeRebaseMode) || "EXCEPTION".equals(datetimeRebaseMode) ||
"CORRECTED".equals(datetimeRebaseMode);
this.datetimeRebaseMode = datetimeRebaseMode;
assert "LEGACY".equals(int96RebaseMode) || "EXCEPTION".equals(int96RebaseMode) ||
"CORRECTED".equals(int96RebaseMode);
this.int96RebaseMode = int96RebaseMode;
}

/**
Expand Down Expand Up @@ -189,10 +194,13 @@ static int rebaseDays(int julianDays, final boolean failIfRebase) {
}
}

static long rebaseMicros(long julianMicros, final boolean failIfRebase) {
private static long rebaseTimestamp(
long julianMicros,
final boolean failIfRebase,
final String format) {
if (failIfRebase) {
if (julianMicros < RebaseDateTime.lastSwitchJulianTs()) {
throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
throw DataSourceUtils.newRebaseExceptionInRead(format);
} else {
return julianMicros;
}
Expand All @@ -201,6 +209,14 @@ static long rebaseMicros(long julianMicros, final boolean failIfRebase) {
}
}

static long rebaseMicros(long julianMicros, final boolean failIfRebase) {
return rebaseTimestamp(julianMicros, failIfRebase, "Parquet");
}

static long rebaseInt96(long julianMicros, final boolean failIfRebase) {
return rebaseTimestamp(julianMicros, failIfRebase, "Parquet INT96");
}

/**
* Reads `total` values from this columnReader into column.
*/
Expand Down Expand Up @@ -399,20 +415,44 @@ private void decodeDictionaryIds(
break;
case INT96:
if (column.dataType() == DataTypes.TimestampType) {
final boolean failIfRebase = "EXCEPTION".equals(int96RebaseMode);
if (!shouldConvertTimestamps()) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
if ("CORRECTED".equals(int96RebaseMode)) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
}
}
} else {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(v);
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase);
column.putLong(i, gregorianMicros);
}
}
}
} else {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
long rawTime = ParquetRowConverter.binaryToSQLTimestamp(v);
long adjTime = DateTimeUtils.convertTz(rawTime, convertTz, UTC);
column.putLong(i, adjTime);
if ("CORRECTED".equals(int96RebaseMode)) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
long gregorianMicros = ParquetRowConverter.binaryToSQLTimestamp(v);
long adjTime = DateTimeUtils.convertTz(gregorianMicros, convertTz, UTC);
column.putLong(i, adjTime);
}
}
} else {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(v);
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase);
long adjTime = DateTimeUtils.convertTz(gregorianMicros, convertTz, UTC);
column.putLong(i, adjTime);
}
}
}
}
Expand Down Expand Up @@ -577,25 +617,53 @@ private void readBinaryBatch(int rowId, int num, WritableColumnVector column) th
|| DecimalType.isByteArrayDecimalType(column.dataType())) {
defColumn.readBinarys(num, column, rowId, maxDefLevel, data);
} else if (column.dataType() == DataTypes.TimestampType) {
final boolean failIfRebase = "EXCEPTION".equals(int96RebaseMode);
if (!shouldConvertTimestamps()) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
// Read 12 bytes for INT96
long rawTime = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12));
column.putLong(rowId + i, rawTime);
} else {
column.putNull(rowId + i);
if ("CORRECTED".equals(int96RebaseMode)) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
// Read 12 bytes for INT96
long gregorianMicros = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12));
column.putLong(rowId + i, gregorianMicros);
} else {
column.putNull(rowId + i);
}
}
} else {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
// Read 12 bytes for INT96
long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12));
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase);
column.putLong(rowId + i, gregorianMicros);
} else {
column.putNull(rowId + i);
}
}
}
} else {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
// Read 12 bytes for INT96
long rawTime = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12));
long adjTime = DateTimeUtils.convertTz(rawTime, convertTz, UTC);
column.putLong(rowId + i, adjTime);
} else {
column.putNull(rowId + i);
if ("CORRECTED".equals(int96RebaseMode)) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
// Read 12 bytes for INT96
long gregorianMicros = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12));
long adjTime = DateTimeUtils.convertTz(gregorianMicros, convertTz, UTC);
column.putLong(rowId + i, adjTime);
} else {
column.putNull(rowId + i);
}
}
} else {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
// Read 12 bytes for INT96
long julianMicros = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12));
long gregorianMicros = rebaseInt96(julianMicros, failIfRebase);
long adjTime = DateTimeUtils.convertTz(gregorianMicros, convertTz, UTC);
column.putLong(rowId + i, adjTime);
} else {
column.putNull(rowId + i);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
*/
private final String datetimeRebaseMode;

/**
* The mode of rebasing INT96 timestamp from Julian to Proleptic Gregorian calendar.
*/
private final String int96RebaseMode;

/**
* columnBatch object that is used for batch decoding. This is created on first use and triggers
* batched decoding. It is not valid to interleave calls to the batched interface with the row
Expand Down Expand Up @@ -122,16 +127,21 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
private final MemoryMode MEMORY_MODE;

public VectorizedParquetRecordReader(
ZoneId convertTz, String datetimeRebaseMode, boolean useOffHeap, int capacity) {
ZoneId convertTz,
String datetimeRebaseMode,
String int96RebaseMode,
boolean useOffHeap,
int capacity) {
this.convertTz = convertTz;
this.datetimeRebaseMode = datetimeRebaseMode;
this.int96RebaseMode = int96RebaseMode;
MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
this.capacity = capacity;
}

// For test only.
public VectorizedParquetRecordReader(boolean useOffHeap, int capacity) {
this(null, "CORRECTED", useOffHeap, capacity);
this(null, "CORRECTED", "LEGACY", useOffHeap, capacity);
}

/**
Expand Down Expand Up @@ -320,8 +330,13 @@ private void checkEndOfRowGroup() throws IOException {
columnReaders = new VectorizedColumnReader[columns.size()];
for (int i = 0; i < columns.size(); ++i) {
if (missingColumns[i]) continue;
columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(),
pages.getPageReader(columns.get(i)), convertTz, datetimeRebaseMode);
columnReaders[i] = new VectorizedColumnReader(
columns.get(i),
types.get(i).getOriginalType(),
pages.getPageReader(columns.get(i)),
convertTz,
datetimeRebaseMode,
int96RebaseMode);
}
totalCountLoadedSoFar += pages.getRowCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.SparkUpgradeException
import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.{SPARK_INT96_NO_REBASE, SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils}
import org.apache.spark.sql.catalyst.util.RebaseDateTime
Expand Down Expand Up @@ -111,13 +111,26 @@ object DataSourceUtils {
}.getOrElse(LegacyBehaviorPolicy.withName(modeByConfig))
}

def newRebaseExceptionInRead(format: String): SparkUpgradeException = {
val config = if (format == "Parquet") {
SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key
} else if (format == "Avro") {
SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key
def int96RebaseMode(
lookupFileMeta: String => String,
modeByConfig: String): LegacyBehaviorPolicy.Value = {
if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
LegacyBehaviorPolicy.CORRECTED
} else if (lookupFileMeta(SPARK_INT96_NO_REBASE) != null) {
LegacyBehaviorPolicy.CORRECTED
} else if (lookupFileMeta(SPARK_VERSION_METADATA_KEY) != null) {
LegacyBehaviorPolicy.LEGACY
} else {
throw new IllegalStateException("unrecognized format " + format)
LegacyBehaviorPolicy.withName(modeByConfig)
}
}

def newRebaseExceptionInRead(format: String): SparkUpgradeException = {
val config = format match {
case "Parquet INT96" => SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_READ.key
case "Parquet" => SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key
case "Avro" => SQLConf.LEGACY_AVRO_REBASE_MODE_IN_READ.key
case _ => throw new IllegalStateException("unrecognized format " + format)
}
new SparkUpgradeException("3.0", "reading dates before 1582-10-15 or timestamps before " +
s"1900-01-01T00:00:00Z from $format files can be ambiguous, as the files may be written by " +
Expand All @@ -129,12 +142,11 @@ object DataSourceUtils {
}

def newRebaseExceptionInWrite(format: String): SparkUpgradeException = {
val config = if (format == "Parquet") {
SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key
} else if (format == "Avro") {
SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key
} else {
throw new IllegalStateException("unrecognized format " + format)
val config = format match {
case "Parquet INT96" => SQLConf.LEGACY_PARQUET_INT96_REBASE_MODE_IN_WRITE.key
case "Parquet" => SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key
case "Avro" => SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key
case _ => throw new IllegalStateException("unrecognized format " + format)
}
new SparkUpgradeException("3.0", "writing dates before 1582-10-15 or timestamps before " +
s"1900-01-01T00:00:00Z into $format files can be dangerous, as the files may be read by " +
Expand Down
Loading