Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ private[sql] class ParquetRelation(
val assumeBinaryIsString = sqlContext.conf.isParquetBinaryAsString
val assumeInt96IsTimestamp = sqlContext.conf.isParquetINT96AsTimestamp

// When merging schemas is enabled and the column of the given filter does not exist,
// Parquet emits an exception which is an issue of Parquet (PARQUET-389).
val safeParquetFilterPushDown = !shouldMergeSchemas && parquetFilterPushDown

// Parquet row group size. We will use this value as the value for
// mapreduce.input.fileinputformat.split.minsize and mapred.min.split.size if the value
// of these flags are smaller than the parquet row group size.
Expand All @@ -305,7 +309,7 @@ private[sql] class ParquetRelation(
dataSchema,
parquetBlockSize,
useMetadataCache,
parquetFilterPushDown,
safeParquetFilterPushDown,
assumeBinaryIsString,
assumeInt96IsTimestamp) _

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,4 +314,24 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}
}
}

test("SPARK-11103: Filter applied on merged Parquet schema with new column fails") {
import testImplicits._

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") {
withTempPath { dir =>
var pathOne = s"${dir.getCanonicalPath}/table1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(pathOne)
var pathTwo = s"${dir.getCanonicalPath}/table2"
(1 to 3).map(i => (i, i.toString)).toDF("c", "b").write.parquet(pathTwo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several nits here, but I'm going to merge this one first since 1.5.2rc2 is being cut soon.

  • Please use val instead of var here.

  • To construct the test DF, the following way is more preferable for better readability:

    sqlContext.range(3).select('id as 'c, 'id cast StringType as 'b)

    or

    sqlContext.range(3).selectExpr("id AS c", "CAST(id AS STRING) AS b")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val was used by mistake... Thanks for the comments!


// If the "c = 1" filter gets pushed down, this query will throw an exception which
// Parquet emits. This is a Parquet issue (PARQUET-389).
checkAnswer(
sqlContext.read.parquet(pathOne, pathTwo).filter("c = 1"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will great if we can specify the columns for this kind of cases because the ordering of the columns can be changed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I just wonder if the inconsistent order is another issue. I think users might think it is weird if they run the same script with SELECT * (using merging schemas) but the column order of the results are different sometimes.

Could I open an issue for this if you think it is a separate issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon It will be weird if the column ordering of sqlContext.read.parquet(pathOne, pathTwo) is not deterministic. Can you try it out and see if it is the case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try to check this. Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yhuai
I investigated that. It does not guarantee the order.

This is because of FileStatusCache in HadoopFsRelation (which ParquetRelation extends as you know). When FileStatusCache.listLeafFiles() is called, this returns Set[FileStatus] which messes up the order of Array[FileStatus].

So, after retrieving the list of leaf files including _metadata and _common_metadata, this starts to merge (separately and if necessary) the Sets of _metadata, _common_metadata and part-files in ParquetRelation.mergeSchemasInParallel(), which ends up in the different column order having the leading columns (of the first file) which the other files do not have.

I think this can be resolved by using LinkedHashSet.

I will open an issue for this. I would like to work on this if this is really an issue.

Filed here https://issues.apache.org/jira/browse/SPARK-11500

(1 to 1).map(i => Row(i, i.toString, null)))
}
}
}
}