Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptCont
this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
this.reader = new ParquetFileReader(
configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
for (BlockMetaData block : blocks) {
// use the blocks from the reader in case some do not match filters and will not be read
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you be more specific by mentioning the corresponding Parquet JIRA issue or versions (1.10.0)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean here. This is a problem entirely in Spark because Spark is reaching into Parquet internals for its vectorized support. There's no Parquet issue to reference.

Copy link
Member

@dongjoon-hyun dongjoon-hyun May 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is this patch is logically okay, but only valid for master branch, Spark 2.4 with Parquet 1.10.0. For example, the test case will pass in branch-2.3 without this patch because it uses Parquet 1.8.X. As you mentioned, it would be great if we had included this patch in #21070.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it is fine and more correct for this to be ported to older versions. I doubt it will because it is unnecessary though.

Copy link
Member

@dongjoon-hyun dongjoon-hyun May 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks correct to me, too. However, this comment isn't clear.

  • If the comment is correct only in Parquet 1.10.0, please fix the comment.
  • If the comment is correct in general, the failure should occur in Apache Spark 2.3.X (with old Parquet). Why don't we fix that in 2.3.1? This was my original suggestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we will need to backport this to the 2.3.x line. No rush to make it for 2.3.1 though, since dictionary filtering is off by default and this isn't a correctness problem.

for (BlockMetaData block : reader.getRowGroups()) {
this.totalRowCount += block.getRowCount();
}

Expand Down Expand Up @@ -225,7 +226,8 @@ protected void initialize(String path, List<String> columns) throws IOException
this.sparkSchema = new ParquetToSparkSchemaConverter(config).convert(requestedSchema);
this.reader = new ParquetFileReader(
config, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
for (BlockMetaData block : blocks) {
// use the blocks from the reader in case some do not match filters and will not be read
for (BlockMetaData block : reader.getRowGroups()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is an existing issue, does your test case fail on Spark 2.3 too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dictionary filtering is off by default in 1.8.x. It was enabled after we built confidence in its correctness in 1.9.x.

We should backport this fix to 2.3.x also, but the only downside to not having it is that dictionary filtering will throw an exception when it is enabled. So the feature just isn't available.

this.totalRowCount += block.getRowCount();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,6 +879,18 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
}
}
}

test("SPARK-24230: filter row group using dictionary") {
withSQLConf(("parquet.filter.dictionary.enabled", "true")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a valid way to control this configuration? It seems to pass with false, too.

Copy link
Contributor Author

@rdblue rdblue May 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the default, so it is possible that it isn't getting passed to Parquet correctly. I can debug it at some point to find out why it passes with false. I did make sure that the test case fails without the fix, so we know it should be correctly using dictionary filtering. Well, that or there were other cases that had the same problem and this hits one of those.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is someone who leads this correctly, it's you, @rdblue . :)

I knew that this is the default of parquet. With the patch of SpecificParquetRecordReaderBase.java or not, we should not add no-op invalid conf line like withSQLConf here. It's misleading for the whole Spark community for the future. Please debug and add the correct test case here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is misleading. The dictionary filter needs to be on and there's no guarantee from Parquet that the default will continue to be true.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my point, too. This configuration is needed, but this code doesn't do anything for that. To use this configuration correctly, we need to fix it first. We should not have no-op code like this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are unable to pass the parquet-specific parameter through withSQLConf. Below shows the way to pass the parquet option.

    withTable("t1") {
      spark.createDataFrame((0 until 100).map(i => ((i * 2) % 20, s"data-$i"))).write
        .option("parquet.filter.dictionary.enabled", false).saveAsTable("t1")
      checkAnswer(sql("SELECT _2 FROM t1 WHERE t1._1 = 5"), Seq.empty)
    }

Could you help investigate why we still hit the error [without the fix] when we set parquet.filter.dictionary.enabled to false?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using SQLConf is OK, Spark will put all SQL configs to Hadoop conf, which will be accessed by parquet writer at the executor side.

I'm also curious about why turning parquet.filter.dictionary.enabled off can't avoid this bug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like the problem is a bug in Parquet. It is using the stats property instead of the dictionary property. This is minor because there is almost no reason to turn either one off now that we've built more confidence in the filters.

// create a table with values from 0, 2, ..., 18 that will be dictionary-encoded
withParquetTable((0 until 100).map(i => ((i * 2) % 20, s"data-$i")), "t") {
// search for a key that is not present so the dictionary filter eliminates all row groups
// Fails without SPARK-24230:
// java.io.IOException: expecting more rows but reached last block. Read 0 out of 50
checkAnswer(sql("SELECT _2 FROM t WHERE t._1 = 5"), Seq.empty)
}
}
}
}

object TestingUDT {
Expand Down