Draft: Fix for Spark dataframe count on entries metadata table throws an IllegalArgumentException for partitioned tables #2395
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Quick-fix for problem was reported in Spark dataframe count on entries metadata table throws an IllegalArgumentException for partitioned tables #1378
As Russell mentioned, he debugged the same thing in : Fix Avro Pruning Bugs with ManifestEntries Table #1744, that is trying a more complete fix. This pr is focused on fixing 'entries' and 'all-entries' table.
Background: When running Spark aggregation query on "entries" metadata table, empty projection is passed in.
However, data_file is required field as per Manifest schema spec, so this projection triggers java.lang.IllegalArgumentException: Missing required field: data_file in BuildAvroProjection.record
Fix NPE when counting entries #1077 fixes it only for non-partitioned tables
This is only due to the peculiar behavior in PruneColumns where empty structs are not pruned away, thus 'data-file' is kept in the final projection when data-files.partitions is empty struct (non-partitioned table). In contrast, 'data-file' is not kept in final projection as non-empty structs with no fields matching projection are pruned away (partitioned-table).
Full exception stack for reference:
Caused by: java.lang.IllegalArgumentException: Missing required field: data_file
at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:217)
at org.apache.iceberg.avro.BuildAvroProjection.record(BuildAvroProjection.java:98)
at org.apache.iceberg.avro.BuildAvroProjection.record(BuildAvroProjection.java:42)
at org.apache.iceberg.avro.AvroCustomOrderSchemaVisitor.visit(AvroCustomOrderSchemaVisitor.java:51)
at org.apache.iceberg.avro.AvroSchemaUtil.buildAvroProjection(AvroSchemaUtil.java:105)
at org.apache.iceberg.avro.ProjectionDatumReader.setSchema(ProjectionDatumReader.java:68)
at org.apache.iceberg.shaded.org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:132)
at org.apache.iceberg.shaded.org.apache.avro.file.DataFileReader.(DataFileReader.java:106)
at org.apache.iceberg.shaded.org.apache.avro.file.DataFileReader.(DataFileReader.java:98)
at org.apache.iceberg.shaded.org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:66)
at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:100)
at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
at org.apache.iceberg.io.CloseableIterable$4$1.(CloseableIterable.java:99)
at org.apache.iceberg.io.CloseableIterable$4.iterator(CloseableIterable.java:98)
at org.apache.iceberg.io.CloseableIterable$4$1.(CloseableIterable.java:99)
at org.apache.iceberg.io.CloseableIterable$4.iterator(CloseableIterable.java:98)
at org.apache.iceberg.io.CloseableIterable$4$1.(CloseableIterable.java:99)
at org.apache.iceberg.io.CloseableIterable$4.iterator(CloseableIterable.java:98)
at org.apache.iceberg.io.CloseableIterable$4$1.(CloseableIterable.java:99)
at org.apache.iceberg.io.CloseableIterable$4.iterator(CloseableIterable.java:98)
at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:95)
at org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:86)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_1$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:897)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:897)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:372)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:336)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:480)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:483)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)