Skip to content
Closed
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions dev/deps/spark-deps-hadoop-2.7
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ arpack_combined_all-0.1.jar
arrow-format-0.12.0.jar
arrow-memory-0.12.0.jar
arrow-vector-0.12.0.jar
audience-annotations-0.7.0.jar
Copy link
Member

@dongjoon-hyun dongjoon-hyun Feb 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it required at Spark? Otherwise, can we exclude this in Parquet 1.11.0 RC4?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun it is a new dependency for Parquet to annotate which API is public and what is private. I don't think that we can exclude it from Parquet, since it is used there. The annotations are Yetus audience annotations with runtime retention policy (I think class retention policy would be more adequate), therefore I'm afraid the had to be present at runtime.

automaton-1.11-8.jar
avro-1.8.2.jar
avro-ipc-1.8.2.jar
Expand Down Expand Up @@ -162,13 +163,13 @@ orc-shims-1.5.5.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
parquet-column-1.10.1.jar
parquet-common-1.10.1.jar
parquet-encoding-1.10.1.jar
parquet-format-2.4.0.jar
parquet-hadoop-1.10.1.jar
parquet-column-1.11.0.jar
parquet-common-1.11.0.jar
parquet-encoding-1.11.0.jar
parquet-format-structures-1.11.0.jar
parquet-hadoop-1.11.0.jar
parquet-hadoop-bundle-1.6.0.jar
parquet-jackson-1.10.1.jar
parquet-jackson-1.11.0.jar
protobuf-java-2.5.0.jar
py4j-0.10.8.1.jar
pyrolite-4.13.jar
Expand Down
12 changes: 6 additions & 6 deletions dev/deps/spark-deps-hadoop-3.2
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,13 @@ orc-shims-1.5.5.jar
oro-2.0.8.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.8.jar
parquet-column-1.10.1.jar
parquet-common-1.10.1.jar
parquet-encoding-1.10.1.jar
parquet-format-2.4.0.jar
parquet-hadoop-1.10.1.jar
parquet-column-1.11.0.jar
parquet-common-1.11.0.jar
parquet-encoding-1.11.0.jar
parquet-format-structures-1.11.0.jar
parquet-hadoop-1.11.0.jar
parquet-hadoop-bundle-1.6.0.jar
parquet-jackson-1.10.1.jar
parquet-jackson-1.11.0.jar
protobuf-java-2.5.0.jar
py4j-0.10.8.1.jar
pyrolite-4.13.jar
Expand Down
3 changes: 2 additions & 1 deletion dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,8 @@ def get_hadoop_profiles(hadoop_version):
def build_spark_maven(hadoop_version):
# Enable all of the profiles for the build:
build_profiles = get_hadoop_profiles(hadoop_version) + modules.root.build_profile_flags
mvn_goals = ["clean", "package", "-DskipTests"]
mvn_goals = ["dependency:purge-local-repository", "-Dinclude=org.apache.parquet",
"clean", "package", "-DskipTests"]
profiles_and_goals = build_profiles + mvn_goals

print("[info] Building Spark (w/Hive 1.2.1) using Maven with these arguments: ",
Expand Down
16 changes: 15 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
<kafka.version>2.2.0</kafka.version>
<derby.version>10.12.1.1</derby.version>
<parquet.version>1.10.1</parquet.version>
<parquet.version>1.11.0</parquet.version>
<orc.version>1.5.5</orc.version>
<orc.classifier>nohive</orc.classifier>
<hive.parquet.group>com.twitter</hive.parquet.group>
Expand Down Expand Up @@ -245,6 +245,20 @@
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>parquet-1.11.0</id>
<!-- This is a temporary repository to use parquet-mr 1.11.0 release candidate -->
<name>Parquet 1.11.0 RC4</name>
<url>https://repository.apache.org/content/groups/staging/</url>
<releases>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</releases>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</snapshots>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import org.apache.parquet.column.page.*;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;

import org.apache.spark.sql.catalyst.util.DateTimeUtils;
Expand All @@ -40,6 +41,8 @@
import org.apache.spark.sql.types.DecimalType;

import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS;
import static org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.ValuesReaderIntIterator;
import static org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.createRLEIterator;

Expand Down Expand Up @@ -96,20 +99,23 @@ public class VectorizedColumnReader {

private final PageReader pageReader;
private final ColumnDescriptor descriptor;
private final OriginalType originalType;
private final LogicalTypeAnnotation logicalTypeAnnotation;
// The timezone conversion to apply to int96 timestamps. Null if no conversion.
private final TimeZone convertTz;
private final TimeZone sessionLocalTz;
private static final TimeZone UTC = DateTimeUtils.TimeZoneUTC();

public VectorizedColumnReader(
ColumnDescriptor descriptor,
OriginalType originalType,
LogicalTypeAnnotation logicalTypeAnnotation,
PageReader pageReader,
TimeZone convertTz) throws IOException {
TimeZone convertTz,
TimeZone sessionLocalTz) throws IOException {
this.descriptor = descriptor;
this.pageReader = pageReader;
this.convertTz = convertTz;
this.originalType = originalType;
this.sessionLocalTz = sessionLocalTz;
this.logicalTypeAnnotation = logicalTypeAnnotation;
this.maxDefLevel = descriptor.getMaxDefinitionLevel();

DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
Expand Down Expand Up @@ -179,7 +185,7 @@ void readBatch(int total, WritableColumnVector column) throws IOException {
if (column.hasDictionary() || (rowId == 0 &&
(typeName == PrimitiveType.PrimitiveTypeName.INT32 ||
(typeName == PrimitiveType.PrimitiveTypeName.INT64 &&
originalType != OriginalType.TIMESTAMP_MILLIS) ||
!(logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation)) ||
typeName == PrimitiveType.PrimitiveTypeName.FLOAT ||
typeName == PrimitiveType.PrimitiveTypeName.DOUBLE ||
typeName == PrimitiveType.PrimitiveTypeName.BINARY))) {
Expand Down Expand Up @@ -287,17 +293,24 @@ private void decodeDictionaryIds(
case INT64:
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType()) ||
originalType == OriginalType.TIMESTAMP_MICROS) {
isTimestampWithUnit(logicalTypeAnnotation, MICROS)) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
long time = dictionary.decodeToLong(dictionaryIds.getDictId(i));
if (needsTimezoneAdjustment()) {
time = DateTimeUtils.convertTz(time, sessionLocalTz, UTC);
}
column.putLong(i, time);
}
}
} else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
} else if (isTimestampWithUnit(logicalTypeAnnotation, MILLIS)) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
column.putLong(i,
DateTimeUtils.fromMillis(dictionary.decodeToLong(dictionaryIds.getDictId(i))));
long time = DateTimeUtils.fromMillis(dictionary.decodeToLong(dictionaryIds.getDictId(i)));
if (needsTimezoneAdjustment()) {
time = DateTimeUtils.convertTz(time, sessionLocalTz, UTC);
}
column.putLong(i, time);
}
}
} else {
Expand Down Expand Up @@ -425,13 +438,29 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro
// This is where we implement support for the valid type conversions.
if (column.dataType() == DataTypes.LongType ||
DecimalType.is64BitDecimalType(column.dataType()) ||
originalType == OriginalType.TIMESTAMP_MICROS) {
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
isTimestampWithUnit(logicalTypeAnnotation, MICROS)) {
if (needsTimezoneAdjustment()) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
long timestamp = dataColumn.readLong();
timestamp = DateTimeUtils.convertTz(timestamp, sessionLocalTz, UTC);
column.putLong(rowId + i, timestamp);
} else {
column.putNull(rowId + i);
}
}
} else {
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
}
} else if (isTimestampWithUnit(logicalTypeAnnotation, MILLIS)) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong()));
long timestamp = DateTimeUtils.fromMillis(dataColumn.readLong());
if (needsTimezoneAdjustment()) {
timestamp = DateTimeUtils.convertTz(timestamp, sessionLocalTz, UTC);
}
column.putLong(rowId + i, timestamp);
} else {
column.putNull(rowId + i);
}
Expand All @@ -441,6 +470,18 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro
}
}

private boolean needsTimezoneAdjustment() {
return logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation &&
!((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).isAdjustedToUTC();
}

private boolean isTimestampWithUnit(
LogicalTypeAnnotation logicalTypeAnnotation,
LogicalTypeAnnotation.TimeUnit timeUnit) {
return (logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation) &&
((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).getUnit() == timeUnit;
}

private void readFloatBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: support implicit cast to double?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
*/
private TimeZone convertTz = null;

private TimeZone sessionLocalTz = null;

/**
* columnBatch object that is used for batch decoding. This is created on first use and triggers
* batched decoding. It is not valid to interleave calls to the batched interface with the row
Expand Down Expand Up @@ -116,8 +118,9 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
*/
private final MemoryMode MEMORY_MODE;

public VectorizedParquetRecordReader(TimeZone convertTz, boolean useOffHeap, int capacity) {
public VectorizedParquetRecordReader(TimeZone convertTz, TimeZone sessionLocalTz, boolean useOffHeap, int capacity) {
this.convertTz = convertTz;
this.sessionLocalTz = sessionLocalTz;
MEMORY_MODE = useOffHeap ? MemoryMode.OFF_HEAP : MemoryMode.ON_HEAP;
this.capacity = capacity;
}
Expand Down Expand Up @@ -308,8 +311,8 @@ private void checkEndOfRowGroup() throws IOException {
columnReaders = new VectorizedColumnReader[columns.size()];
for (int i = 0; i < columns.size(); ++i) {
if (missingColumns[i]) continue;
columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getOriginalType(),
pages.getPageReader(columns.get(i)), convertTz);
columnReaders[i] = new VectorizedColumnReader(columns.get(i), types.get(i).getLogicalTypeAnnotation(),
pages.getPageReader(columns.get(i)), convertTz, sessionLocalTz);
}
totalCountLoadedSoFar += pages.getRowCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,14 +363,16 @@ class ParquetFileFormat
null)

val sharedConf = broadcastedHadoopConf.value.value
val sessionLocalTz = DateTimeUtils.getTimeZone(
sharedConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key))

lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(pushDownDate, pushDownTimestamp, pushDownDecimal,
pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive, sessionLocalTz)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
Expand Down Expand Up @@ -408,7 +410,10 @@ class ParquetFileFormat
val taskContext = Option(TaskContext.get())
if (enableVectorizedReader) {
val vectorizedReader = new VectorizedParquetRecordReader(
convertTz.orNull, enableOffHeapColumnVector && taskContext.isDefined, capacity)
convertTz.orNull,
sessionLocalTz,
enableOffHeapColumnVector && taskContext.isDefined,
capacity)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion lister before `initialization`.
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
Expand All @@ -426,9 +431,10 @@ class ParquetFileFormat
// ParquetRecordReader returns UnsafeRow
val reader = if (pushed.isDefined && enableRecordFilter) {
val parquetFilter = FilterCompat.get(pushed.get, null)
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz), parquetFilter)
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz, sessionLocalTz),
parquetFilter)
} else {
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz, sessionLocalTz))
}
val iter = new RecordReaderIterator(reader)
// SPARK-23457 Register a task completion lister before `initialization`.
Expand Down
Loading