Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
fb5df20
Timezone adjustment
nkollar Sep 19, 2018
f90d63e
Generate file with epoch (called rawValue) value as id
nkollar Oct 2, 2018
82a06bf
Fix predicate pushdown - equals
nkollar Oct 9, 2018
1b67c87
refactor
nkollar Oct 10, 2018
dc8c3bb
Parquet pushdown - without lt and lteq predicates
nkollar Oct 11, 2018
73e431b
Fix lt and lteq predicates with timestamps
nkollar Oct 12, 2018
b6fe694
Fix style issues, remove whitespaces
nkollar Oct 12, 2018
5fa5769
Fix failing tests
nkollar Feb 1, 2019
cb6f06c
Upgrade to Parquet 1.11.0 release candidate
nkollar Feb 1, 2019
4514386
Fix name
nkollar Feb 1, 2019
d93ecb8
Address code review comments
nkollar Feb 4, 2019
21af765
Add staging repository to use parquet-mr 1.11.0 RC3
nkollar Feb 8, 2019
8d4b06c
Update manifest
nkollar Feb 8, 2019
fbe3039
Fix failing tests, address code review comments
nkollar Feb 12, 2019
fb708c3
Fix failing ParquetPartitionDiscoverySuite test
nkollar Feb 18, 2019
4deff14
Use direct snapshot url
nkollar Feb 20, 2019
afc7564
Change updatePolicy to always
nkollar Feb 28, 2019
2998df2
remove duplicated repository
nkollar Mar 4, 2019
440a9b3
Use 1.12.0-SNAPSHOT
nkollar Mar 5, 2019
6e95803
update deps file
nkollar Mar 7, 2019
9a39876
fix manifest file
nkollar Mar 25, 2019
ca9bdc3
purge Parquet artifacts from local repository
nkollar Mar 25, 2019
f8ecac1
line too long
nkollar Mar 25, 2019
c9a50e4
fix python style error
nkollar Mar 25, 2019
9dbf152
Remove snapshot repo
nandorKollar Jan 3, 2020
9fe073e
Merge remote-tracking branch 'upstream/master' into parquet_logical
h-vetinari Feb 28, 2021
06aaa64
resolve merge conflicts, enough to compile at least
h-vetinari Feb 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ def get_hive_profiles(hive_version):
def build_spark_maven(extra_profiles):
# Enable all of the profiles for the build:
build_profiles = extra_profiles + modules.root.build_profile_flags
mvn_goals = ["clean", "package", "-DskipTests"]
mvn_goals = ["dependency:purge-local-repository", "-Dinclude=org.apache.parquet",
"clean", "package", "-DskipTests"]
profiles_and_goals = build_profiles + mvn_goals

print("[info] Building Spark using Maven with these arguments: ", " ".join(profiles_and_goals))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ object DateTimeUtils {
final val JULIAN_DAY_OF_EPOCH = 2440588

final val TimeZoneUTC = TimeZone.getTimeZone("UTC")
// for why ".normalized", see https://stackoverflow.com/a/39507023/2965879
final val ZoneIdUTC = ZoneId.of("UTC").normalized()

val TIMEZONE_OPTION = "timeZone"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;

Expand All @@ -46,6 +47,10 @@
import org.apache.spark.sql.types.DecimalType;

import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
import static org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.ValuesReaderIntIterator;
Expand Down Expand Up @@ -104,7 +109,7 @@ public class VectorizedColumnReader {

private final PageReader pageReader;
private final ColumnDescriptor descriptor;
private final OriginalType originalType;
private final LogicalTypeAnnotation logicalTypeAnnotation;
// The timezone conversion to apply to int96 timestamps. Null if no conversion.
private final ZoneId convertTz;
private static final ZoneId UTC = ZoneOffset.UTC;
Expand Down Expand Up @@ -136,15 +141,15 @@ private boolean canReadAsBinaryDecimal(DataType dt) {

public VectorizedColumnReader(
ColumnDescriptor descriptor,
OriginalType originalType,
LogicalTypeAnnotation logicalTypeAnnotation,
PageReader pageReader,
ZoneId convertTz,
String datetimeRebaseMode,
String int96RebaseMode) throws IOException {
this.descriptor = descriptor;
this.pageReader = pageReader;
this.convertTz = convertTz;
this.originalType = originalType;
this.logicalTypeAnnotation = logicalTypeAnnotation;
this.maxDefLevel = descriptor.getMaxDefinitionLevel();

DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
Expand Down Expand Up @@ -192,13 +197,14 @@ private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName
boolean isSupported = false;
switch (typeName) {
case INT32:
isSupported = originalType != OriginalType.DATE || "CORRECTED".equals(datetimeRebaseMode);
isSupported = (!(logicalTypeAnnotation instanceof DateLogicalTypeAnnotation)
|| "CORRECTED".equals(datetimeRebaseMode));
break;
case INT64:
if (originalType == OriginalType.TIMESTAMP_MICROS) {
if (isTimestampWithUnit(logicalTypeAnnotation, MICROS)) {
isSupported = "CORRECTED".equals(datetimeRebaseMode);
} else {
isSupported = originalType != OriginalType.TIMESTAMP_MILLIS;
isSupported = !(logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation);
}
break;
case FLOAT:
Expand Down Expand Up @@ -278,6 +284,7 @@ void readBatch(int total, WritableColumnVector column) throws IOException {
// Column vector supports lazy decoding of dictionary values so just set the dictionary.
// We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
// non-dictionary encoded values have already been added).
// TODO: replace OriginalType with something from LogicalTypeAnnotation
PrimitiveType primitiveType = descriptor.getPrimitiveType();
if (primitiveType.getOriginalType() == OriginalType.DECIMAL &&
primitiveType.getDecimalMetadata().getPrecision() <= Decimal.MAX_INT_DIGITS() &&
Expand Down Expand Up @@ -398,14 +405,22 @@ private void decodeDictionaryIds(
case INT64:
if (column.dataType() == DataTypes.LongType ||
canReadAsLongDecimal(column.dataType()) ||
(originalType == OriginalType.TIMESTAMP_MICROS &&
(isTimestampWithUnit(logicalTypeAnnotation, MICROS) &&
"CORRECTED".equals(datetimeRebaseMode))) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
}
}
} else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
} else if (isTimestampWithUnit(logicalTypeAnnotation, MICROS)) {
final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
long julianMicros = dictionary.decodeToLong(dictionaryIds.getDictId(i));
column.putLong(i, rebaseMicros(julianMicros, failIfRebase));
}
}
} else if (isTimestampWithUnit(logicalTypeAnnotation, MILLIS)) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
Expand All @@ -423,14 +438,6 @@ private void decodeDictionaryIds(
}
}
}
} else if (originalType == OriginalType.TIMESTAMP_MICROS) {
final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
for (int i = rowId; i < rowId + num; ++i) {
if (!column.isNullAt(i)) {
long julianMicros = dictionary.decodeToLong(dictionaryIds.getDictId(i));
column.putLong(i, rebaseMicros(julianMicros, failIfRebase));
}
}
} else {
throw constructConvertNotSupportedException(descriptor, column);
}
Expand Down Expand Up @@ -592,7 +599,7 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn,
DecimalType.is32BitDecimalType(column.dataType()));
} else if (originalType == OriginalType.TIMESTAMP_MICROS) {
} else if (isTimestampWithUnit(logicalTypeAnnotation, MICROS)) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, false);
Expand All @@ -601,7 +608,7 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro
defColumn.readLongsWithRebase(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, failIfRebase);
}
} else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
} else if (isTimestampWithUnit(logicalTypeAnnotation, MILLIS)) {
if ("CORRECTED".equals(datetimeRebaseMode)) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
Expand All @@ -626,6 +633,13 @@ private void readLongBatch(int rowId, int num, WritableColumnVector column) thro
}
}

private boolean isTimestampWithUnit(
LogicalTypeAnnotation logicalTypeAnnotation,
LogicalTypeAnnotation.TimeUnit timeUnit) {
return (logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation) &&
((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).getUnit() == timeUnit;
}

private void readFloatBatch(int rowId, int num, WritableColumnVector column) throws IOException {
// This is where we implement support for the valid type conversions.
// TODO: support implicit cast to double?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ private void checkEndOfRowGroup() throws IOException {
if (missingColumns[i]) continue;
columnReaders[i] = new VectorizedColumnReader(
columns.get(i),
types.get(i).getOriginalType(),
types.get(i).getLogicalTypeAnnotation(),
pages.getPageReader(columns.get(i)),
convertTz,
datetimeRebaseMode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,20 +272,6 @@ class ParquetFileFormat

lazy val footerFileMetaData =
ParquetFileReader.readFooter(sharedConf, filePath, SKIP_ROW_GROUPS).getFileMetaData
// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(parquetFilters.createFilter(_))
.reduceOption(FilterApi.and)
} else {
None
}

// PARQUET_INT96_TIMESTAMP_CONVERSION says to apply timezone conversions to int96 timestamps'
// *only* if the file was created by something other than "parquet-mr", so check the actual
Expand All @@ -302,6 +288,22 @@ class ParquetFileFormat
None
}

// Try to push down filters when filter push-down is enabled.
val pushed = if (enableParquetFilterPushDown) {
val parquetSchema = footerFileMetaData.getSchema
val parquetFilters = new ParquetFilters(parquetSchema, pushDownDate, pushDownTimestamp,
pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive,
convertTz.orNull)
filters
// Collects all converted Parquet filter predicates. Notice that not all predicates can be
// converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap`
// is used here.
.flatMap(parquetFilters.createFilter(_))
.reduceOption(FilterApi.and)
} else {
None
}

val datetimeRebaseMode = DataSourceUtils.datetimeRebaseMode(
footerFileMetaData.getKeyValueMetaData.get,
datetimeRebaseModeInRead)
Expand Down
Loading