-
Notifications
You must be signed in to change notification settings - Fork 1.5k
PARQUET-1381: Add merge blocks command to parquet-tools #512
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| throw new RuntimeException("Illegal row group of 0 rows"); | ||
| } | ||
| Optional<ColumnChunkMetaData> mc = findColumnByPath(block, columnDescriptor.getPath()); | ||
| if (mc.isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe return mc.map(column -> { ChunkDescriptor chunk = new ChunkDescriptor(columnDescriptor, column, column.getStartingPos(), (int) column.getTotalSize()); return readChunk(f, chunk) }); ?
|
|
||
| private Optional<ColumnChunkMetaData> findColumnByPath(BlockMetaData block, String[] path) { | ||
| for (ColumnChunkMetaData column : block.getColumns()) { | ||
| if (Arrays.equals(column.getPath().toArray(), (path))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could remove unnecessary brackets around path
| int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0); | ||
| List<ByteBuffer> buffers = new ArrayList<>(numAllocations); | ||
|
|
||
| for (int i = 0; i < fullAllocations; i += 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i++
a33a2e5 to
82ae4a6
Compare
|
@alexeyzavyalov Thank you! Implemented all suggested changes. |
|
Could you please reference the JIRA number in the PR title? The standard format is "PARQUET-###: Description". If there is no JIRA yet, could you please create one? Thanks! |
| * @param configuration Hadoop configuration | ||
| * @param schema the schema of the data | ||
| * @param file the file to write to | ||
| * @param schema the schema of the data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file contains a lot of formatting changes, like the alignment of the parameter descriptions here or the expansion of the one-liner methods above or the identation changes below. These changes are problematic because they add clutter to the commit diff and the git blame history and more importantly can lead to conflicts in the future when cherry-picking, backporting or merging in general.
For this reason, could you please revert lines that do not contain actual code changes?
|
Instead of adding a new command, would it make sense to add a parameter like |
b08975b to
9f3a815
Compare
|
@zivanfi and @nandorKollar thank you for quick feedback and good suggestions! Agree with you, changed the code and added JIRA task with description. |
9f3a815 to
3943e19
Compare
zivanfi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add unit tests for the code? It's not necessary for the parquet-tools command (we don't have any existing unit tests there either), but it's customary for the core Parquet API. Thanks!
| @Override | ||
| public void execute(CommandLine options) throws Exception { | ||
| boolean mergeBlocks = options.hasOption('b'); | ||
| int maxBlockSize = options.hasOption('l')? Integer.parseInt(options.getOptionValue('l')) : 128; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be mistaken, but I think this will result in a desired row group size of 128 bytes, not 128 megabytes.
I would suggest using the ParquetWriter.DEFAULT_BLOCK_SIZE constant instead of a hard-coded value.
| import org.apache.parquet.column.ColumnWriter; | ||
| import org.apache.parquet.schema.PrimitiveType; | ||
|
|
||
| public class ParquetTripletUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since apart from ParquetFileWriter it seems that no other code uses this utils, I'd recommend moving the only method into a private method in ParquetFileWriter.
3943e19 to
d8a92cb
Compare
|
@zivanfi , @nandorKollar thank you! Implemented changes and added tests. |
| } | ||
| } | ||
|
|
||
| public void flushToFileWriter(ColumnDescriptor path, ParquetFileWriter writer) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm not mistaken, the visibility of this method can be restricted to package private
| } | ||
|
|
||
| private List<ParquetFileReader> getReaders(List<InputFile> inputFiles) throws IOException { | ||
| List<ParquetFileReader> readers = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the ArrayList size is know in advance, it would be nice to add as constructor parameter to avoid resize
| readers.forEach(r -> { | ||
| try { | ||
| r.close(); | ||
| } catch (IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not very nice to swallow an exception. Either throw a runtime exception, or at least log that something happened while closing the file.
d8a92cb to
1a0a0a8
Compare
1a0a0a8 to
d1b8873
Compare
|
Thank you @nandorKollar! I've made the suggested changes. |
|
Thanks, this sounds a very useful feature, I'd like to have a final look at the PR. One notice, it seems you force push each time, could you please simply commit the changes instead? If there are multiple commits for a PR, those will be merged into one when the PR is merged, and if you commit instead of force push, it is easier to see the changes you for the response for the reviews. |
| .desc("Merge adjacent blocks into one up to upper bound size limit default to 128 MB") | ||
| .build(); | ||
|
|
||
| Option limit = Option.builder("l") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for working on this.
Instead of grabbing some write properties like block size and compression and adding command line options for them I would suggest allowing to set any of the parquet properties in a future proof way.
Some ideas:
- Have a command line parameter that argument is a key-value pair; the parameter can be used multiple times
- Have a command line parameter that argument is a list of key-value pairs
- Have a command line parameter that argument is a file that contains the key-value pairs
Multiple solutions might also make sense (e.g. set the key-value pairs from command line as well as from file).
The help shall reference the some docs or the source code for the up-to-date list of available options. It shall also list some of the most important options like parquet.block.size, parquet.compression, parquet.page.size or parquet.writer.max-padding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we have several options here with no clear best choice. Since this review is already getting quite long, I think we can defer to do these in a follow-up change if the need arises. Are you okay with that Gabor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.
| * @return the ByteBuffer blocks | ||
| * @throws IOException if there is an error while reading from the stream | ||
| */ | ||
| public List<ByteBuffer> readBlocks(SeekableInputStream f, long offset, int length) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The visibility could be changed to package private.
| return buffers; | ||
| } | ||
|
|
||
| public Optional<PageReader> readColumnInBlock(int blockIndex, ColumnDescriptor columnDescriptor) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The visibility could be restricted to package protected. nit: this method doesn't throw IOException.
| } | ||
|
|
||
| public static void closeReaders(List<ParquetFileReader> readers) { | ||
| readers.forEach(r -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this logic, if there's an exception when closing a reader, then the rest remains unclosed. Though in a tool like this this is not a big deal, if one uses this method for other purposes, it could lead resource leakage.
| } | ||
| this.endBlock(); | ||
| } | ||
| this.end(new HashMap<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Collections.emptyMap() would be reasonable here.
| } | ||
| this.end(new HashMap<>()); | ||
|
|
||
| BlocksCombiner.closeReaders(readers); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Readers should be closed in a finally block to avoid unclosed readers in case of exception.
I was also wondering if it would be possible to keep only two files open at the same time: one which Parquet writes to, and one which is currently being processed. Since it seems sufficient for me to keep only these files open, and close the current one as soon as the processing of it is finished, and then open the next one, we won't keep files open longer than needed, and in addition we don't keep too many files open for no reason at the same time. I'm not sure if this makes sense, just a thought. If it is too complicated to implement, then I'm fine with the current approach too.
nandorKollar
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
I merged your commit @kgalieva, thanks for your contribution! |
Summary: Merge two commits from upstream Revert "PARQUET-1381: Add merge blocks command to parquet-tools (apache#512) apache#621 PARQUET-1533: TestSnappy() throws OOM exception with Parquet-1485 change apache#622 Reviewers: pavi, leisun Reviewed By: leisun Differential Revision: https://code.uberinternal.com/D2544359
Summary: Revert "PARQUET-1381: Add merge blocks command to parquet-tools (apache#512)" (apache#621) This reverts commit 863a081. The design of this feature has conceptional problems and also works incorrectly. See PARQUET-1381 for more details. PARQUET-1531: Page row count limit causes empty pages to be written from MessageColumnIO (apache#620) PARQUET-1544: Possible over-shading of modules (apache#628) Reviewers: pavi Reviewed By: pavi Differential Revision: https://code.uberinternal.com/D2769319
Current implementation of merge command in parquet-tools doesn't merge row groups, just places one after the other.
Add another command to be able to merge small blocks into larger ones up to specified size in bytes limit.