Skip to content

Conversation

@waterlx
Copy link
Contributor

@waterlx waterlx commented May 29, 2020

This PR is to address a NPE when counting on table's entries (found by @jerryshao), which could be triggered by

spark.read.format("iceberg").load("db.table.entries").count()

The stacktrace is as follow:

java.lang.NullPointerException
  at org.apache.iceberg.Schema.findType(Schema.java:129)
  at org.apache.iceberg.ManifestEntriesTable$EntriesTableScan.planFiles(ManifestEntriesTable.java:105)
  at org.apache.iceberg.BaseTableScan.planFiles(BaseTableScan.java:215)
  at org.apache.iceberg.BaseTableScan.splitFiles(BaseTableScan.java:284)
  at org.apache.iceberg.BaseTableScan.planTasks(BaseTableScan.java:248)
  at org.apache.iceberg.spark.source.Reader.tasks(Reader.java:300)
  at org.apache.iceberg.spark.source.Reader.planInputPartitions(Reader.java:187)
  at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions$lzycompute(DataSourceV2ScanExec.scala:76)
  at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.partitions(DataSourceV2ScanExec.scala:75)
  at org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec.outputPartitioning(DataSourceV2ScanExec.scala:65)

@waterlx
Copy link
Contributor Author

waterlx commented May 29, 2020

When calling count() on entries, Spark passes newRequestedSchema (with size = 0) to Reader#pruneColumns(). As a result, a wrong schema (empty) is generated by Reader#lazySchema().
Schema#lazyNameToId() returns an empty BiMap. get() from this empty map return null. NPE is thrown when trying to convert the null interger into int as the input to findType().

@waterlx
Copy link
Contributor Author

waterlx commented May 29, 2020

@rdblue @jerryshao Would you please help to review this PR at your convenience?
Regarding UT, I made a separate line to trigger the NPE

long actualCount = actual.count()

It could be moved into the assert clause at the end, but IMHO a separate line might make it more clear and straightforward

private Schema lazySchema() {
if (schema == null) {
if (requestedSchema != null) {
if (requestedSchema != null && requestedSchema.size() != 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct. Iceberg should support projecting 0 columns.

@rdblue
Copy link
Contributor

rdblue commented May 29, 2020

Looks like there are two problems. First, findType throws a NullPointerException if the type isn't found. I think that should check whether the name was found, like this:

   public Type findType(String name) {
     Preconditions.checkArgument(!name.isEmpty(), "Invalid column name: (empty)");
-    return findType(lazyNameToId().get(name));
+    Integer id = lazyNameToId().get(name);
+    if (id != null) {
+      return findType(id);
+    }
+    return null;
   }

Second, if the file projection is null, it is still dereferenced. So we need to fix that as well in ManifestEntriesTable:

     protected CloseableIterable<FileScanTask> planFiles(
         TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
       CloseableIterable<ManifestFile> manifests = CloseableIterable.withNoopClose(snapshot.manifests());
-      Schema fileSchema = new Schema(schema().findType("data_file").asStructType().fields());
+      Type fileProjection = schema().findType("data_file");
+      Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema();
       String schemaString = SchemaParser.toJson(schema());
       String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
       ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(rowFilter);

After that, adding this to the entries table test case works:

    Assert.assertEquals("Count should return 1",
        1, spark.read().format("iceberg").load(loadLocation(tableIdentifier, "entries")).count());

@waterlx
Copy link
Contributor Author

waterlx commented May 30, 2020

@rdblue thanks for reviewing and sharing your idea!
The UT get passed with your fix but I met the following exception when testing your fix on my laptop (master = local[*]) and did not find the root cause yet

java.lang.IllegalArgumentException: Missing required field: data_file
	at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:217)
	at org.apache.iceberg.avro.BuildAvroProjection.record(BuildAvroProjection.java:96)
	at org.apache.iceberg.avro.BuildAvroProjection.record(BuildAvroProjection.java:41)
	at org.apache.iceberg.avro.AvroCustomOrderSchemaVisitor.visit(AvroCustomOrderSchemaVisitor.java:51)
	at org.apache.iceberg.avro.AvroSchemaUtil.buildAvroProjection(AvroSchemaUtil.java:104)
	at org.apache.iceberg.avro.ProjectionDatumReader.setSchema(ProjectionDatumReader.java:60)
	at org.apache.iceberg.shaded.org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:132)
	at org.apache.iceberg.shaded.org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:106)
	at org.apache.iceberg.shaded.org.apache.avro.file.DataFileReader.<init>(DataFileReader.java:98)
	at org.apache.iceberg.shaded.org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:66)
	at org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:94)
	at org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:77)
	at org.apache.iceberg.io.CloseableIterable$3$1.<init>(CloseableIterable.java:95)
	at org.apache.iceberg.io.CloseableIterable$3.iterator(CloseableIterable.java:94)
	at org.apache.iceberg.io.CloseableIterable$3$1.<init>(CloseableIterable.java:95)
	at org.apache.iceberg.io.CloseableIterable$3.iterator(CloseableIterable.java:94)
	at org.apache.iceberg.io.CloseableIterable$3$1.<init>(CloseableIterable.java:95)
	at org.apache.iceberg.io.CloseableIterable$3.iterator(CloseableIterable.java:94)
	at org.apache.iceberg.io.CloseableIterable$3$1.<init>(CloseableIterable.java:95)
	at org.apache.iceberg.io.CloseableIterable$3.iterator(CloseableIterable.java:94)
	at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:103)
	at org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:75)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:49)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

@waterlx waterlx force-pushed the entry_table_npe branch from db9e84a to 8d66eb7 Compare May 31, 2020 14:14
@waterlx
Copy link
Contributor Author

waterlx commented May 31, 2020

@rdblue The PR is updated.
The following code you suggested

Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) new Schema();

is updated as

Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : null;

When meeting the null file projection (yielded by projecting 0 columns), it returns null instead of an empty schema, to address the issue of Missing required field: data_file mentioned here. The issue is quite weird to me as it does not break the UT while it only happends on my local test with master as local[].
Returning an empty schema seems not work as expected in BaseManifestReader#projection() when setting project as an empty schema.
But fixing it by returning null seems go back to the way of not supporting 0 columns.

@waterlx waterlx closed this Jun 1, 2020
@waterlx waterlx reopened this Jun 1, 2020
@waterlx waterlx closed this Jun 2, 2020
@waterlx waterlx reopened this Jun 2, 2020
@rdblue
Copy link
Contributor

rdblue commented Jun 5, 2020

Part of the problem is that we changed this table over to use a DataTask, which provides the data as an iterable instead of reading a data file. The purpose was to correctly set sequence numbers, part of #951. Rows returned by DataTask should be full rows, not projections. That's probably why we're hitting this now.

As for why you're getting the current error, I'm not sure why your manifest doesn't have the data_file field. It looks like it should work to me, but I can't debug it because I don't have the rest of the test case.

Anyway, I think the solution is to remove the project(fileSchema) call after ManifestFiles.read because data tasks should return an unprojected record.

@rdblue
Copy link
Contributor

rdblue commented Jun 19, 2020

@waterlx, do you want to update this PR?

@waterlx
Copy link
Contributor Author

waterlx commented Jun 20, 2020

@rdblue Yes, I will work on it during the weekend.

@waterlx
Copy link
Contributor Author

waterlx commented Jun 20, 2020

@rdblue The PR is updated to yield an empty schema when projecting 0 columns, as you suggested here,
becuase after the PR is rebased according to the most recent master branch, I could not re-produce the following error as mentioned here:
java.lang.IllegalArgumentException: Missing required field: data_file
The problem may be due to that I read a wrong manifest or a manifest generated by the un-completely-merged/rebased code. Thanks for sharing your thought on that! Really appreciate it!

Would you please help to review the curent changes at your most convenience? Thanks!

@rdblue
Copy link
Contributor

rdblue commented Jun 22, 2020

After looking at this again, I don't think this should do any projection. From my last comment:

I think the solution is to remove the project(fileSchema) call after ManifestFiles.read because data tasks should return an unprojected record.

Data tasks should not project data, so this should ignore attempted projection.

@rdblue rdblue added this to the Java 0.9.0 Release milestone Jun 30, 2020
@rdblue rdblue closed this in ca468d5 Jul 9, 2020
@rdblue
Copy link
Contributor

rdblue commented Jul 9, 2020

@waterlx, I tried the approach I suggested and it doesn't seem to match what we do for the file tables. Since this PR is a simple fix, I merged it. Thanks for fix this!

Also, I had to rebase and squash by hand since this was a bit old. That's why it's marked closed instead of merged.

cmathiesen pushed a commit to ExpediaGroup/iceberg that referenced this pull request Aug 19, 2020
szehon-ho pushed a commit to szehon-ho/iceberg that referenced this pull request Mar 30, 2021
…rows an IllegalArgumentException for partitioned tables)

      - When running Spark aggregation query on "entries" metadata table, empty projection is passed in.
      - However, data_file is required field, so violatesjava.lang.IllegalArgumentException: Missing required field: data_file in BuildAvroProjection.record
      - apache#1077 fixes it only for non-partitioned tables, but only due to the (expected?) behavior in PruneColumns where empty structs are not pruned.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants