Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2119][SQL] Improved Parquet performance when reading off S3 #1370

Closed
wants to merge 4 commits into from

Conversation

liancheng
Copy link
Contributor

JIRA issue: SPARK-2119

Essentially this PR fixed three issues to gain much better performance when reading large Parquet file off S3.

  1. When reading the schema, fetching Parquet metadata from a part-file rather than the _metadata file

    The _metadata file contains metadata of all row groups, and can be very large if there are many row groups. Since schema information and row group metadata are coupled within a single Thrift object, we have to read the whole _metadata to fetch the schema. On the other hand, schema is replicated among footers of all part-files, which are fairly small.

  2. Only add the root directory of the Parquet file rather than all the part-files to input paths

    HDFS API can automatically filter out all hidden files and underscore files (_SUCCESS & _metadata), there's no need to filter out all part-files and add them individually to input paths. What make it much worse is that, FileInputFormat.listStatus() calls FileSystem.globStatus() on each individual input path sequentially, each results a blocking remote S3 HTTP request.

  3. Worked around PARQUET-16

    Essentially PARQUET-16 is similar to the above issue, and results lots of sequential FileSystem.getFileStatus() calls, which are further translated into a bunch of remote S3 HTTP requests.

    FilteringParquetRowInputFormat should be cleaned up once PARQUET-16 is fixed.

Below is the micro benchmark result. The dataset used is a S3 Parquet file consists of 3,793 partitions, about 110MB per partition in average. The benchmark is done with a 9-node AWS cluster.

  • Creating a Parquet SchemaRDD (Parquet schema is fetched)

    val tweets = parquetFile(uri)
    • Before: 17.80s
    • After: 8.61s
  • Fetching partition information

    tweets.getPartitions
    • Before: 700.87s
    • After: 21.47s
  • Counting the whole file (both steps above are executed altogether)

    parquetFile(uri).count()
    • Before: ??? (haven't test yet)
    • After: 53.26s

@SparkQA
Copy link

SparkQA commented Jul 11, 2014

QA tests have started for PR 1370. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16555/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 11, 2014

QA results for PR 1370:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16555/consoleFull

}

// NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row
// groups. Since Parquet schema is replicated among all row groups, we only need to touch a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we making a new assumption here that all of the data has the same schema. I know we don't promise support for that now, but it would be nice to do in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we are making this assumption, will add a comment here. (And checking schema consistency can be potentially inefficient for large Parquet file with lots of row groups.)

@SparkQA
Copy link

SparkQA commented Jul 16, 2014

QA tests have started for PR 1370. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16704/consoleFull

@SparkQA
Copy link

SparkQA commented Jul 16, 2014

QA results for PR 1370:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds no public classes

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16704/consoleFull

@marmbrus
Copy link
Contributor

Thanks! I've merged this into master.

@asfgit asfgit closed this in efc452a Jul 16, 2014
asfgit pushed a commit that referenced this pull request Aug 27, 2014
…gle file as parameter

```if (!fs.getFileStatus(path).isDir) throw Exception``` make no sense after this commit #1370

be careful if someone is working on SPARK-2551, make sure the new change passes test case ```test("Read a parquet file instead of a directory")```

Author: chutium <[email protected]>

Closes #2044 from chutium/parquet-singlefile and squashes the following commits:

4ae477f [chutium] [SPARK-3138][SQL] sqlContext.parquetFile should be able to take a single file as parameter

(cherry picked from commit 48f4278)
Signed-off-by: Michael Armbrust <[email protected]>
asfgit pushed a commit that referenced this pull request Aug 27, 2014
…gle file as parameter

```if (!fs.getFileStatus(path).isDir) throw Exception``` make no sense after this commit #1370

be careful if someone is working on SPARK-2551, make sure the new change passes test case ```test("Read a parquet file instead of a directory")```

Author: chutium <[email protected]>

Closes #2044 from chutium/parquet-singlefile and squashes the following commits:

4ae477f [chutium] [SPARK-3138][SQL] sqlContext.parquetFile should be able to take a single file as parameter
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
JIRA issue: [SPARK-2119](https://issues.apache.org/jira/browse/SPARK-2119)

Essentially this PR fixed three issues to gain much better performance when reading large Parquet file off S3.

1. When reading the schema, fetching Parquet metadata from a part-file rather than the `_metadata` file

   The `_metadata` file contains metadata of all row groups, and can be very large if there are many row groups. Since schema information and row group metadata are coupled within a single Thrift object, we have to read the whole `_metadata` to fetch the schema. On the other hand, schema is replicated among footers of all part-files, which are fairly small.

1. Only add the root directory of the Parquet file rather than all the part-files to input paths

   HDFS API can automatically filter out all hidden files and underscore files (`_SUCCESS` & `_metadata`), there's no need to filter out all part-files and add them individually to input paths. What make it much worse is that, `FileInputFormat.listStatus()` calls `FileSystem.globStatus()` on each individual input path sequentially, each results a blocking remote S3 HTTP request.

1. Worked around [PARQUET-16](https://issues.apache.org/jira/browse/PARQUET-16)

   Essentially PARQUET-16 is similar to the above issue, and results lots of sequential `FileSystem.getFileStatus()` calls, which are further translated into a bunch of remote S3 HTTP requests.

   `FilteringParquetRowInputFormat` should be cleaned up once PARQUET-16 is fixed.

Below is the micro benchmark result. The dataset used is a S3 Parquet file consists of 3,793 partitions, about 110MB per partition in average. The benchmark is done with a 9-node AWS cluster.

- Creating a Parquet `SchemaRDD` (Parquet schema is fetched)

  ```scala
  val tweets = parquetFile(uri)
  ```

  - Before: 17.80s
  - After: 8.61s

- Fetching partition information

  ```scala
  tweets.getPartitions
  ```

  - Before: 700.87s
  - After: 21.47s

- Counting the whole file (both steps above are executed altogether)

  ```scala
  parquetFile(uri).count()
  ```

  - Before: ??? (haven't test yet)
  - After: 53.26s

Author: Cheng Lian <[email protected]>

Closes apache#1370 from liancheng/faster-parquet and squashes the following commits:

94a2821 [Cheng Lian] Added comments about schema consistency
d2c4417 [Cheng Lian] Worked around PARQUET-16 to improve Parquet performance
1c0d1b9 [Cheng Lian] Accelerated Parquet schema retrieving
5bd3d29 [Cheng Lian] Fixed Parquet log level
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
…gle file as parameter

```if (!fs.getFileStatus(path).isDir) throw Exception``` make no sense after this commit apache#1370

be careful if someone is working on SPARK-2551, make sure the new change passes test case ```test("Read a parquet file instead of a directory")```

Author: chutium <[email protected]>

Closes apache#2044 from chutium/parquet-singlefile and squashes the following commits:

4ae477f [chutium] [SPARK-3138][SQL] sqlContext.parquetFile should be able to take a single file as parameter
@liancheng liancheng deleted the faster-parquet branch September 24, 2014 00:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants