Skip to content

Conversation

@fschmalzel
Copy link
Contributor

@fschmalzel fschmalzel commented Feb 18, 2021

Adds a method readRowGroup(BlockMetaData) to allow random access to
PageReadStores via BlockMetaData, which can be obtained using the
getRowGroups() method.

This is similar to the existing method
getDictionaryReader(BlockMetaData)
that already exists.

With random access the reader can be reused if for example someone
needs to go back a row group. This would improve performance
because we don't need to open the file again and read the metadata.

Make sure you have checked all steps below.

Jira

Tests

  • My PR adds the following unit tests:
  • TestParquetReaderRandomAccess

Commits

  • My commits all reference Jira issues in their subject lines. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation

  • In case of new functionality, my PR adds documentation that describes how to use it.
    • All the public functions and the classes in the PR contain Javadoc that explain what it does

Copy link
Contributor

@gszadovszky gszadovszky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think more exhaustive tests are required. What do you think about the following idea?

Generate a larger file (~10,000 rows) and a couple of columns (5-10) with different types. (See TestStatistics for example.) You may play with the page sizes and row group sizes to ensure you'll have several pages in a row group and several row groups as well.
Then, shuffle the row groups and create a couple of readers (e.g. 5) as you have done in your tests. You may validate the the read data with the original one by adding all the rows to the same list and sorting by an id column.

}
}

Collections.shuffle(indexes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This way the order of the indexes is not deterministic. Usually we don't like this in unit tests because in case of failure we cannot reproduce the test. I would suggest invoking shuffle with a pre-created Random instance with a fix seed.

}
}

Collections.shuffle(indexes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Adds a method readRowGroup(BlockMetaData) to allow random access to
PageReadStores via BlockMetaData, which can be obtained using the
getRowGroups() method.

This is similar to the existing method
getDictionaryReader(BlockMetaData)
that already exists.

With random access the reader can be reused if for example someone
needs to go back a row group. This would improve performance
because we don't need to open the file again and read the metadata.
Copy link
Contributor

@gszadovszky gszadovszky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a couple of comments.

Also, please do not force push to PRs. It makes really hard to track the code reviews.

Comment on lines +78 to +79
for (boolean enableDictionary : new boolean[]{false, true}) {
for (WriterVersion writerVersion : new WriterVersion[]{WriterVersion.PARQUET_1_0, WriterVersion.PARQUET_2_0}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course it is not a problem but I think with/without dictionary and V1/V2 pages are irrelevant for this test

* @return the PageReadStore which can provide PageReaders for each column or null if there are no rows in this block
* @throws IOException if an error occurs while reading
*/
public PageReadStore readFilteredRowGroup(int blockIndex) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason behind you are using a BlockMetaData argument for readRowGroup while you use the index of the row group here? I think, both should work similarly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using BlockMetaData is better as it is similar to the existing getDictionaryReader(BlockMetaData). For the filtered row group, we need the index to access the RowRanges and ColumnIndexStore.

453a6cc#diff-8da24c84aef62e6e836d073938f7843d289785baaeddf446f3afeae6d4ef4b10R983

453a6cc#diff-8da24c84aef62e6e836d073938f7843d289785baaeddf446f3afeae6d4ef4b10R994

What do you think is the better approach?
Accept BlockMetaData and find the index in the list.
-or-
Change the other method signatures to also use indexes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on how do you plan to use it. If you don't plan to cache the related metadata outside of the reader I think it is more clean to use the index because it does not suggest that you may use any arbitrary metadata but the ones are in the file. Meanwhile, you may want to select specific row groups to be read based on the metadata so you would already have the related object. In this case it is easier to simply pass it instead of checking for the index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that using the index is cleaner. I added methods using int instead of BlockMetaData for existing API.

import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
import static org.junit.Assert.*;

public class TestParquetReaderRandomAccess {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I could understand you only check the first page of the same column for every row groups. I am not sure if it is enough or not but in this case the other columns are completely useless. I think, at least all the pages of the same column should be checked.

Please, also test readFilteredRowGroup. As you do not decode the pages you do not need to test anything differently but we need to cover all new paths.

@shangxinli
Copy link
Contributor

Do you have some benchmarking test numbers to show how much gain we could have?

@fschmalzel
Copy link
Contributor Author

First of sorry for the late answer.

We use parquet-data to display graphs. We needed the optimization to go back pages if a user scrolls to the left or zooms out. I currently don't have any concrete numbers but it was a lot more performant. Going from practically unusably slow to reasonably fast, comparable to our legacy file format.

I plan to go and get some more precise numbers for our use case soon.

@fschmalzel
Copy link
Contributor Author

Tested random access with a 351 MB parquet file with zstd and dictionary enabled. The file has 321 row groups.
Using the sequential method to read the randomized indexes took around 87319 milliseconds.
Using the sequential method without this commit took around 86311 milliseconds.
Using the random access method took around 58941 milliseconds.

Note: This only improves randomized access. For sequential access both methods should take about the same time. Nevertheless the sequential method is not removed, so that should not be a problem.

This should all be taken with a grain of salt, as this was tested without something like jmh. https://github.com/openjdk/jmh#java-microbenchmark-harness-jmh

Unfortunately i cannot upload the tested file somewhere. If you want me to test a different file i will gladly do so.

Quick and dirty test:

package org.apache.parquet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.RecordReader;
import org.apache.parquet.schema.MessageType;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;

public class ReadTest {

  private static final long RANDOM_SEED = 7174252115631550700L;
  private static final String FILE_PATH = "C:\\Users\\fes\\Desktop\\test.parquet";
  private static final int ROUNDS = 4;

  @Test
  public void testSequential() throws IOException {
    Random random = new Random(RANDOM_SEED);

    Path file = new Path(FILE_PATH);
    Configuration configuration = new Configuration();
    ParquetReadOptions options = ParquetReadOptions.builder().build();

    ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(file, configuration), options);
    MessageType schema = reader.getFileMetaData().getSchema();
    MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
    GroupRecordConverter converter = new GroupRecordConverter(schema);

    int blockAmount = reader.getRowGroups().size();

    List<Integer> indexes = new ArrayList<>();
    for (int j = 0; j < ROUNDS; j++) {
      for (int i = 0; i < blockAmount; i++) {
        indexes.add(i);
      }
    }

    Collections.shuffle(indexes, random);

    long start = System.currentTimeMillis();

    int currentRowGroup = -1;

    for (int index : indexes) {
      if (index <= currentRowGroup) {
        try {
          reader.close();
        } catch (Exception ignored) {
        }

        reader = new ParquetFileReader(HadoopInputFile.fromPath(file, configuration), options);
        currentRowGroup = -1;
      }
      for (int i = 1; i < index - currentRowGroup; i++) {
        reader.skipNextRowGroup();
      }
      currentRowGroup = index;
      PageReadStore pages = reader.readNextRowGroup();
      long rowCount = pages.getRowCount();
      RecordReader<Group> recordReader = columnIO.getRecordReader(pages, converter);
      for (long i = 0; i < rowCount; i++) {
        recordReader.read();
      }
    }

    long stop = System.currentTimeMillis();

    try {
      reader.close();
    } catch (Exception ignored) {
    }

    long timeTaken = stop - start;

    System.out.printf("Sequential access took %d milliseconds%n", timeTaken);
  }

  @Test
  public void testRandom() throws IOException {
    Random random = new Random(RANDOM_SEED);

    Path file = new Path(FILE_PATH);
    Configuration configuration = new Configuration();
    ParquetReadOptions options = ParquetReadOptions.builder().build();

    ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(file, configuration), options);
    MessageType schema = reader.getFileMetaData().getSchema();
    MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
    GroupRecordConverter converter = new GroupRecordConverter(schema);

    int blockAmount = reader.getRowGroups().size();

    List<Integer> indexes = new ArrayList<>();
    for (int j = 0; j < ROUNDS; j++) {
      for (int i = 0; i < blockAmount; i++) {
        indexes.add(i);
      }
    }

    Collections.shuffle(indexes, random);

    long start = System.currentTimeMillis();
    for (int index : indexes) {
      PageReadStore pages = reader.readRowGroup(index);
      long rowCount = pages.getRowCount();
      RecordReader<Group> recordReader = columnIO.getRecordReader(pages, converter);
      for (long i = 0; i < rowCount; i++) {
        recordReader.read();
      }
    }

    long stop = System.currentTimeMillis();

    try {
      reader.close();
    } catch (Exception ignored) {
    }

    long timeTaken = stop - start;

    System.out.printf("Random access took %d milliseconds%n", timeTaken);
  }
  
}

@gszadovszky
Copy link
Contributor

Thanks a lot for the performance test @fschmalzel!
@shangxinli, any other comments?

@shangxinli
Copy link
Contributor

@gszadovszky No I don't have other comments. I didn't get much time to review this code. If you don't have other comments, feel free to approve. If you want me to have a look in more detail, I can do probably do it on the weekend.

@gszadovszky
Copy link
Contributor

Thanks @shangxinli. I don't feel this change is a big deal as it does not change the current behavior but only adds new options to read row groups. I've already approved it so I'll merge it soon.

@gszadovszky gszadovszky merged commit 3f54ba0 into apache:master Apr 19, 2021
elikkatz added a commit to TheWeatherCompany/parquet-mr that referenced this pull request Jun 2, 2021
* 'master' of https://github.com/apache/parquet-mr: (222 commits)
  PARQUET-2052: Integer overflow when writing huge binary using dictionary encoding (apache#910)
  PARQUET-2041: Add zstd to `parquet.compression` description of ParquetOutputFormat Javadoc (apache#899)
  PARQUET-2050: Expose repetition & definition level from ColumnIO (apache#908)
  PARQUET-1761: Lower Logging Level in ParquetOutputFormat (apache#745)
  PARQUET-2046: Upgrade Apache POM to 23 (apache#904)
  PARQUET-2048: Deprecate BaseRecordReader (apache#906)
  PARQUET-1922: Deprecate IOExceptionUtils (apache#825)
  PARQUET-2037: Write INT96 with parquet-avro (apache#901)
  PARQUET-2044: Enable ZSTD buffer pool by default (apache#903)
  PARQUET-2038: Upgrade Jackson version used in parquet encryption. (apache#898)
  Revert "[WIP] Refactor GroupReadSupport to unuse deprecated api (apache#894)"
  PARQUET-2027: Fix calculating directory offset for merge (apache#896)
  [WIP] Refactor GroupReadSupport to unuse deprecated api (apache#894)
  PARQUET-2030: Expose page size row check configurations to ParquetWriter.Builder (apache#895)
  PARQUET-2031: Upgrade to parquet-format 2.9.0 (apache#897)
  PARQUET-1448: Review of ParquetFileReader (apache#892)
  PARQUET-2020: Remove deprecated modules (apache#888)
  PARQUET-2025: Update Snappy version to 1.1.8.3 (apache#893)
  PARQUET-2022: ZstdDecompressorStream should close `zstdInputStream` (apache#889)
  PARQUET-1982: Random access to row groups in ParquetFileReader (apache#871)
  ...

# Conflicts:
#	parquet-column/src/main/java/org/apache/parquet/example/data/simple/SimpleGroup.java
#	parquet-hadoop/pom.xml
#	parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
#	parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants