Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion api/src/main/java/org/apache/iceberg/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,21 @@ public List<NestedField> columns() {
return struct.fields();
}

/**
* Returns the {@link Type} of a sub-field identified by the field name.
*
* @param name a field name
* @return a Type for the sub-field or null if it is not found
*/
public Type findType(String name) {
Preconditions.checkArgument(!name.isEmpty(), "Invalid column name: (empty)");
return findType(lazyNameToId().get(name));
Integer id = lazyNameToId().get(name);
if (id != null) { // name is found
return findType(id);
}

// name could not be found
return null;
}

/**
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/AllEntriesTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.ParallelIterable;
import org.apache.iceberg.util.ThreadPools;
Expand Down Expand Up @@ -100,7 +101,8 @@ protected CloseableIterable<FileScanTask> planFiles(
TableOperations ops, Snapshot snapshot, Expression rowFilter,
boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
CloseableIterable<ManifestFile> manifests = allManifestFiles(ops.current().snapshots());
Schema fileSchema = new Schema(schema().findType("data_file").asStructType().fields());
Type fileProjection = schema().findType("data_file");
Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema();
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;

/**
Expand Down Expand Up @@ -97,7 +98,8 @@ protected CloseableIterable<FileScanTask> planFiles(
boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
// return entries from both data and delete manifests
CloseableIterable<ManifestFile> manifests = CloseableIterable.withNoopClose(snapshot.allManifests());
Schema fileSchema = new Schema(schema().findType("data_file").asStructType().fields());
Type fileProjection = schema().findType("data_file");
Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema();
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,30 @@ public void testAllEntriesTable() throws Exception {
}
}

@Test
public void testCountEntriesTable() {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "count_entries_test");
createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());

// init load
List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
inputDf.select("id", "data").write()
.format("iceberg")
.mode("append")
.save(loadLocation(tableIdentifier));

final int expectedEntryCount = 1;

// count entries
Assert.assertEquals("Count should return " + expectedEntryCount,
expectedEntryCount, spark.read().format("iceberg").load(loadLocation(tableIdentifier, "entries")).count());

// count all_entries
Assert.assertEquals("Count should return " + expectedEntryCount,
expectedEntryCount, spark.read().format("iceberg").load(loadLocation(tableIdentifier, "all_entries")).count());
}

@Test
public void testFilesTable() throws Exception {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_test");
Expand Down