Skip to content

Conversation

@shardulm94
Copy link
Contributor

@shardulm94 shardulm94 commented Jul 10, 2020

  • Most of the new code is added in VectorizedSparkOrcReaders
  • Replaced NullValuesColumnVector with ConstantColumnVector which can support any constant (including nulls)
  • Moved the Iceberg to Spark constant conversion logic from RowDataReader into BaseDataReader so that it can be reused in BatchDataReader
  • Modified many of the Spark read test cases to test for both vectorized and non vectorized codepaths
  • Modified TestFilteredScan in Spark3 to use IcebergGenerics instead of Avro just like in Spark2. This enables use to test ORC reads/writes which do not have Avro GenericRecord writer.

Benchmark results:
Tests reads of 10 files with 5M records each

Benchmark                                                                          Mode  Cnt    Score    Error  Units
IcebergSourceFlatORCDataReadBenchmark.readFileSourceNonVectorized                    ss    5   42.789 ±  3.294   s/op
IcebergSourceFlatORCDataReadBenchmark.readFileSourceVectorized                       ss    5   18.566 ±  1.450   s/op
IcebergSourceFlatORCDataReadBenchmark.readIcebergNonVectorized                       ss    5   30.186 ±  1.007   s/op
IcebergSourceFlatORCDataReadBenchmark.readIcebergVectorized                          ss    5   18.835 ±  0.818   s/op
IcebergSourceFlatORCDataReadBenchmark.readWithProjectionFileSourceNonVectorized      ss    5    8.935 ±  0.801   s/op
IcebergSourceFlatORCDataReadBenchmark.readWithProjectionFileSourceVectorized         ss    5    2.387 ±  0.195   s/op
IcebergSourceFlatORCDataReadBenchmark.readWithProjectionIcebergNonVectorized         ss    5   10.691 ±  0.603   s/op
IcebergSourceFlatORCDataReadBenchmark.readWithProjectionIcebergVectorized            ss    5    2.653 ±  0.511   s/op
IcebergSourceNestedORCDataReadBenchmark.readFileSourceNonVectorized                  ss    5  118.318 ±  1.583   s/op
IcebergSourceNestedORCDataReadBenchmark.readIcebergNonVectorized                     ss    5   18.943 ±  1.305   s/op
IcebergSourceNestedORCDataReadBenchmark.readIcebergVectorized                        ss    5    9.330 ±  0.938   s/op
IcebergSourceNestedORCDataReadBenchmark.readWithProjectionFileSourceNonVectorized    ss    5   86.136 ±  1.139   s/op
IcebergSourceNestedORCDataReadBenchmark.readWithProjectionIcebergNonVectorized       ss    5   16.671 ±  0.855   s/op
IcebergSourceNestedORCDataReadBenchmark.readWithProjectionIcebergVectorized          ss    5    6.710 ±  0.679   s/op

@rdblue
Copy link
Contributor

rdblue commented Jul 10, 2020

Thanks, @shardulm94! It's great to see a PR for this. I'll try to get some time to review it.

@rdblue rdblue requested a review from omalley July 10, 2020 00:48
@rdblue
Copy link
Contributor

rdblue commented Jul 10, 2020

Do we know why the nested data benchmarks show such a big improvement? Are we running correctness tests for the same cases to make sure we aren't dropping data by accident? It just seems a bit too good.

@shardulm94
Copy link
Contributor Author

Which cases are you comparing for nested data? readIcebergNonVectorized v/s readIcebergVectorized for nested data shows 2-3x improvement which is similar to the improvements for flat data. For nested data the readFileSourceVectorized and readWithProjectionFileSourceVectorized are not really relevant since the file source defaults to row by row reading for nested data, so I guess we should just remove them. I modified the Spark unit tests to also test the vectorized codepaths, so I am assuming those tests check correctness, but I can do some sanity checks manually.

@rdblue
Copy link
Contributor

rdblue commented Jul 10, 2020

For nested data the readFileSourceVectorized and readWithProjectionFileSourceVectorized are not really relevant since the file source defaults to row by row reading for nested data

That's the answer I was looking for. It was really surprising that Spark vectorized was so slow. Thanks!

@Override
public int getInt(int rowId) {
Integer value = (Integer) primitiveValueReader.read(vector, rowId);
return value != null ? value : 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this extra check just to add a default value? If it is okay to return a default value, then getInt should never be called for a rowId where the value is null. And if this is never called when the value is null, then I'd rather directly cast. That way, a NullPointerException is thrown if the method contract is violated instead of silently returning the wrong value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! That makes sense. If the reader is not going to check for null, then returning a default value would be erroneous. One issue I see is that Spark's own ColumnVector code seems to violate this at https://github.com/apache/spark/blob/d5b903e38556ee3e8e1eb8f71a08e232afa4e36a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnVector.java#L92 and similar methods within the same class.
In Spark 2.4 these methods are not actually used, the methods are used at https://github.com/apache/spark/blob/d5b903e38556ee3e8e1eb8f71a08e232afa4e36a/sql/core/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java#L53 but those are not used anywhere.

In Spark 3.0, they are used at https://github.com/apache/spark/blob/3b0aee3f9557ae1666b63665b08d899fe7682852/sql/catalyst/src/main/java/org/apache/spark/sql/vectorized/ColumnarArray.java#L55, but this seems incorrect for the reasons you mentioned above. I am unsure when the copy method would actually be triggered though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like other implementations in Spark just return from the underlying vector, without checking. So these methods would always return something. Arrow is an exception: if null checking is enabled (the default) then it will check and throw IllegalStateException.

I think it still makes sense to throw the NullPointerException, even if copy in Spark 3 would use it. I don't see any uses of ArrayData.copy in Spark 3, and any problem would only affect arrays, so the impact is limited. Plus, other sources (Arrow) break in this case and it appears to be overlooked. I think it makes sense to throw an exception to prevent a copy that corrupts the data by dropping null values.

@rdblue
Copy link
Contributor

rdblue commented Jul 11, 2020

The only blocker I see is that the batch iterator doesn't pass the close call down to the underlying ORC reader. Otherwise, everything is minor.

Awesome work, @shardulm94!

@shardulm94 shardulm94 force-pushed the orc-vectorized-new branch from 6a5e8c5 to 7d0cf1c Compare July 11, 2020 02:18
@rdblue rdblue merged commit 6fab8f5 into apache:master Jul 13, 2020
@rdblue
Copy link
Contributor

rdblue commented Jul 13, 2020

Nice work, @shardulm94! I'm merging this because the blockers have been resolved and this is large. We can follow up with a change for the way nulls are handled in getters. I don't think that is a blocker because there are examples in Spark of both ignoring the issue and throwing exceptions.

shardulm94 added a commit to shardulm94/iceberg that referenced this pull request Jul 14, 2020
@shardulm94 shardulm94 deleted the orc-vectorized-new branch July 15, 2020 03:50
aokolnychyi pushed a commit to aokolnychyi/iceberg that referenced this pull request Aug 18, 2020
cmathiesen pushed a commit to ExpediaGroup/iceberg that referenced this pull request Aug 19, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants