diff --git a/api/src/main/java/org/apache/iceberg/Schema.java b/api/src/main/java/org/apache/iceberg/Schema.java index 86a89c0e77d4..0ecbfd432fa4 100644 --- a/api/src/main/java/org/apache/iceberg/Schema.java +++ b/api/src/main/java/org/apache/iceberg/Schema.java @@ -124,9 +124,21 @@ public List columns() { return struct.fields(); } + /** + * Returns the {@link Type} of a sub-field identified by the field name. + * + * @param name a field name + * @return a Type for the sub-field or null if it is not found + */ public Type findType(String name) { Preconditions.checkArgument(!name.isEmpty(), "Invalid column name: (empty)"); - return findType(lazyNameToId().get(name)); + Integer id = lazyNameToId().get(name); + if (id != null) { // name is found + return findType(id); + } + + // name could not be found + return null; } /** diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java index bb19c410e997..1ab918d3c685 100644 --- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java @@ -28,6 +28,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.ParallelIterable; import org.apache.iceberg.util.ThreadPools; @@ -100,7 +101,8 @@ protected CloseableIterable planFiles( TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean ignoreResiduals, boolean caseSensitive, boolean colStats) { CloseableIterable manifests = allManifestFiles(ops.current().snapshots()); - Schema fileSchema = new Schema(schema().findType("data_file").asStructType().fields()); + Type fileProjection = schema().findType("data_file"); + Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema(); String schemaString = SchemaParser.toJson(schema()); String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned()); Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter; diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java index e4288c077471..3c43fc40bda1 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java @@ -26,6 +26,7 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; /** @@ -97,7 +98,8 @@ protected CloseableIterable planFiles( boolean ignoreResiduals, boolean caseSensitive, boolean colStats) { // return entries from both data and delete manifests CloseableIterable manifests = CloseableIterable.withNoopClose(snapshot.allManifests()); - Schema fileSchema = new Schema(schema().findType("data_file").asStructType().fields()); + Type fileProjection = schema().findType("data_file"); + Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema(); String schemaString = SchemaParser.toJson(schema()); String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned()); Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter; diff --git a/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java index e17836c12443..6528b786f388 100644 --- a/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java +++ b/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java @@ -193,6 +193,30 @@ public void testAllEntriesTable() throws Exception { } } + @Test + public void testCountEntriesTable() { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "count_entries_test"); + createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned()); + + // init load + List records = Lists.newArrayList(new SimpleRecord(1, "1")); + Dataset inputDf = spark.createDataFrame(records, SimpleRecord.class); + inputDf.select("id", "data").write() + .format("iceberg") + .mode("append") + .save(loadLocation(tableIdentifier)); + + final int expectedEntryCount = 1; + + // count entries + Assert.assertEquals("Count should return " + expectedEntryCount, + expectedEntryCount, spark.read().format("iceberg").load(loadLocation(tableIdentifier, "entries")).count()); + + // count all_entries + Assert.assertEquals("Count should return " + expectedEntryCount, + expectedEntryCount, spark.read().format("iceberg").load(loadLocation(tableIdentifier, "all_entries")).count()); + } + @Test public void testFilesTable() throws Exception { TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");