Skip to content
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -644,4 +644,12 @@ public <D> AvroIterable<D> build() {
}
}

/**
* Returns number of rows in specified Avro file
* @param file Avro file
* @return number of rows in file
*/
public static long rowCount(InputFile file) {
return AvroIO.findStartingRowPos(file::newStream, Long.MAX_VALUE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.orc.OrcMetrics;
import org.apache.iceberg.parquet.ParquetUtil;
Expand Down Expand Up @@ -91,7 +93,9 @@ private static List<DataFile> listAvroPartition(Map<String, String> partitionPat
return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER))
.filter(FileStatus::isFile)
.map(stat -> {
Metrics metrics = new Metrics(-1L, null, null, null);
InputFile file = HadoopInputFile.fromLocation(stat.getPath().toString(), conf);
long rowCount = Avro.rowCount(file);
Metrics metrics = new Metrics(rowCount, null, null, null);
String partitionKey = spec.fields().stream()
.map(PartitionField::name)
.map(name -> String.format("%s=%s", name, partitionPath.get(name)))
Expand Down
2 changes: 2 additions & 0 deletions spark/v3.0/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ project(":iceberg-spark:iceberg-spark3-extensions") {
testImplementation project(path: ':iceberg-spark', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-spark:iceberg-spark3', configuration: 'testArtifacts')

testImplementation "org.apache.avro:avro"

// Required because we remove antlr plugin dependencies from the compile configuration, see note above
// We shade this in Spark3 Runtime to avoid issues with Spark's Antlr Runtime
runtimeOnly "org.antlr:antlr4-runtime:4.7.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,17 @@
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
Expand All @@ -35,6 +44,7 @@
import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
Expand Down Expand Up @@ -106,6 +116,59 @@ public void addDataUnpartitionedOrc() {
sql("SELECT * FROM %s ORDER BY id", tableName));
}

@Test
public void addAvroFile() throws Exception {
// Spark Session Catalog cannot load metadata tables
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This runs on the two other test parameters (hive catalog, hadoop catalog)

// with "The namespace in session catalog must have exactly one name part"
Assume.assumeFalse(catalogName.equals("spark_catalog"));

// Create an Avro file

Schema schema = SchemaBuilder.record("record").fields()
.requiredInt("id")
.requiredString("data")
.endRecord();
GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", 1L);
record1.put("data", "a");
GenericRecord record2 = new GenericData.Record(schema);
record2.put("id", 2L);
record2.put("data", "b");
File outputFile = temp.newFile("test.avro");

DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter(datumWriter);
dataFileWriter.create(schema, outputFile);
dataFileWriter.append(record1);
dataFileWriter.append(record2);
dataFileWriter.close();

String createIceberg =
"CREATE TABLE %s (id Long, data String) USING iceberg";
sql(createIceberg, tableName);

Object result = scalarSql("CALL %s.system.add_files('%s', '`avro`.`%s`')",
catalogName, tableName, outputFile.getPath());
Assert.assertEquals(1L, result);

List<Object[]> expected = Lists.newArrayList(
new Object[]{1L, "a"},
new Object[]{2L, "b"}
);

assertEquals("Iceberg table contains correct data",
expected,
sql("SELECT * FROM %s ORDER BY id", tableName));

List<Object[]> actualRecordCount = sql("select %s from %s.files",
DataFile.RECORD_COUNT.name(),
tableName);
List<Object[]> expectedRecordCount = Lists.newArrayList();
expectedRecordCount.add(new Object[]{2L});
assertEquals("Iceberg file metadata should have correct metadata count",
expectedRecordCount, actualRecordCount);
}

// TODO Adding spark-avro doesn't work in tests
@Ignore
public void addDataUnpartitionedAvro() {
Expand Down