-
Notifications
You must be signed in to change notification settings - Fork 2.9k
ORC support integration for Spark 2.4.0 #139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks for working on this, @edgarRd! I'll take a look soon. |
|
@edgarRd, can you rebase this on master? We've renamed packages. |
|
@rdblue, I've rebased with the new package names. Thanks! |
|
@rdblue following up on this. Any comment? Thanks! |
|
@edgarRd, thanks for the reminder. I'll have a look. |
spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcReader.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/Writer.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
Outdated
Show resolved
Hide resolved
|
|
||
| import static org.apache.iceberg.Files.localOutput; | ||
|
|
||
| public class TestOrcScan extends AvroDataTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with these tests is that they require supporting ORC in the Spark reader and writer. I don't think we should update the Spark reader and writer until many of the problems with ORC support are fixed. For example, ORC doesn't actually use the InputFile and OutputFile abstractions, it creates paths from the locations and relies on Hadoop. It also doesn't support the full suite of features required for Iceberg formats, including column reordering.
What I'd prefer is to write tests like TestSparkParquetReader and TestSparkParquetWriter that don't go through the Spark reader and writer, but test the Spark data model directly. That way, we can remove the changes that actually expose the format until it has full support. Otherwise, we'll just have to remove it and disable the tests for the release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should have been on TestOrcWrite, not TestOrcScan. As long as we don't release write support, I'm happy adding read support.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read support only for now sounds good to me. I'm working on this approach. Thanks for the guidance!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this still needs to be done as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I've addressed this concern by creating a test that does not go through the Spark reader and writer, namely in TestSparkOrcReader.
|
@edgarRd, thanks for working on this! Overall, I think it looks like a great start toward ORC support, but I'd like to add ORC support back a little more carefully this time. Last time, we added ORC support assuming that the remaining problems with it would be fixed rather quickly. Because we haven't seen those fixes, I'd prefer not to expose the write support through Spark (read support is fine). I suggested a way to test this code that doesn't add it to the write path. We can add it to the write path when it can make the same guarantees as the other formats. |
|
@rdblue I've pushed changes that I think should address the comments previously made. However, some concerns like the Hadoop dependency on |
|
|
||
| /** | ||
| * Write data value of a schema. | ||
| * @author Edgar Rodriguez-Diaz |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: please remove @author tags and any empty tags like @since
|
|
||
| /** | ||
| * Reads | ||
| * @param reuse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like Javadoc is incomplete.
| @@ -0,0 +1,101 @@ | |||
| package org.apache.iceberg.orc; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These files need the Apache license header.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
| } | ||
| } | ||
|
|
||
| private class OrcIterator implements Iterator<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually, Iterator classes should be static to ensure that the iterator shares no state with the Iterable other than what was passed in.
|
|
||
| private static Reader newFileReader(InputFile file, Configuration config) { | ||
| try { | ||
| return OrcFile.createReader(new Path(file.location()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to use file to open instead of passing a Path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately no, I checked the OrcFile signatures and all (createReader and createWriter) use Path only.
| try { | ||
| return new VectorizedRowBatchIterator(file.location(), orcSchema, orcFileReader.rows(options)); | ||
| } | ||
| catch (IOException ioe) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this should go on the same line as }.
| @SuppressWarnings("unchecked") | ||
| @Override | ||
| public Iterator<T> iterator() { | ||
| return new OrcIterator(orcIter, (OrcValueReader<T>) readerFunction.apply(schema)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not use the same VectorizedRowBatchIterator for all iterators. Each iterator should be independent, so this should call newOrcIterator.
| final Writer writer; | ||
|
|
||
| try { | ||
| writer = OrcFile.createWriter(locPath, options); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to pass an output stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately no, I checked the OrcFile signatures and all (createReader and createWriter) use Path only. Having a similar method to this receiving an output stream would be ideal.
| OrcFile.WriterOptions options = | ||
| OrcFile.writerOptions(conf); | ||
| return new OrcFileAppender(schema, file, options, metadata); | ||
| public <D> FileAppender<D> build() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also addresses the issue #127
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But this does not actually add support for generics, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you are right @rdblue
| */ | ||
| public class OrcFileAppender implements FileAppender<VectorizedRowBatch> { | ||
| private final Writer writer; | ||
| public class OrcFileAppender<D> implements FileAppender<D> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this should be package private, similar to other appenders
| @@ -0,0 +1,782 @@ | |||
| /* | |||
| * Copyright 2018 Hortonworks | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong license headers? Here and other places
| writer.addUserMetadata(COLUMN_NUMBERS_ATTRIBUTE, columnIds.serialize()); | ||
| metadata.forEach( | ||
| (key,value) -> writer.addUserMetadata(key, ByteBuffer.wrap(value))); | ||
| batch = orcSchema.createRowBatch(BATCH_SIZE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the batch size be user configurable, maybe large amount of data here can cause memory problems?. Using a default BATCH_SIZE of 1024 is already provided by the API orcSchema.createRowBatch()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a configuration setting for this value.
| OrcFile.WriterOptions options = | ||
| OrcFile.writerOptions(conf); | ||
| return new OrcFileAppender(schema, file, options, metadata); | ||
| public <D> FileAppender<D> build() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue I think Avro and Parquet store Iceberg schema as a json string in their metadata. I'm not sure why that is required, but do u think that it makes sense to do that here as well for ORC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably a good idea, but not required. The requirement is that we can get the file's Iceberg schema from its metadata, and I prefer to build that from the file schema itself, plus the column IDs. That way, we don't have problems from a faulty conversion in an old version.
| public class OrcFileAppender implements FileAppender<VectorizedRowBatch> { | ||
| private final Writer writer; | ||
| class OrcFileAppender<D> implements FileAppender<D> { | ||
| private final static int BATCH_SIZE = 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default batch size is already a public property
as part of org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch#DEFAULT_SIZE . Also see above comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I'm using this now.
| } | ||
| } | ||
|
|
||
| public TypeDescription getSchema() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a requirement for this API?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've removed this call since it was not used.
| /** | ||
| * Reads a value in row. | ||
| */ | ||
| T read(Object reuse, int row); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Object instead of VectorizedRowBatch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to make this interface generic to potentially re-use it for reading other values in a similar approach as the other file formats are implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this does quite the same thing that the other interfaces do.
For Avro, a similar interface allows reusing container objects. So a value reader that returns a Record can also accept a Record instance that it will fill with data. The reuse object here is always a VectorizedRowBatch and this returns an InternalRow. So the equivalent would be this:
InternalRow read(VectorizedRowBatch batch, int rowNum, Object rowToReuse);The rows could be swapped out for some other in-memory container, like Iceberg's GenericRecord.
Unless this is doing something similar to what Avro does, I don't think this is a good change to include in this PR. Maybe we should keep it simple and go with the original code that didn't use a generic interface here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, in order for this interface to serve in a similar function as in the other formats it'd need more work. I'll set it to VectorizedRowBatch which is the only usage right now.
Unit tests for reads are passing.
Remove hack for converting decimal to long value.
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.UUID; | ||
| import java.util.function.Function; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did these imports move?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most likely IDE moved them. Do we have a standard style for these and a way to enforce it?
| OrcFile.WriterOptions options = OrcFile.writerOptions(conf); | ||
| return new OrcFileAppender<>(TypeConversion.toOrc(schema, new ColumnIdMap()), | ||
| this.file, createWriterFunc, options, metadata, | ||
| conf.getInt(VECTOR_ROW_BATCH_SIZE, DEFAULT_BATCH_SIZE)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use the ORC property instead of the copy?
|
This looks about ready to me. I just want to get a few minor things fixed and fix the issue that @rdsr pointed out for the reader API. |
Rename orcSchema to readSchema since that's effectively the function it accomplishes.
|
@omalley, I think this ORC PR is ready to go in. It updates the license headers for ORC files from Hortonworks to the standard ASF header. Could you reply with +1 or -1 for that change? |
|
+1 thanks @edgarRd for fixing up the ORC bindings |
|
Merged! Thanks for working on the ORC support, @edgarRd! |
|
Great, thanks all for the help! |
…pache#139) Co-authored-by: Hongyue/Steve Zhang <[email protected]>
I noticed that a large portion of the support for ORC in Netflix/iceberg@c59138e#diff-545c12970ccace1ba019c99192569301 was missing.
I've adapted that code to be supported with Spark 2.4.0 since the API for
UnsafeWriterchanged.Test cases are passing for reads and writes.