diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index c6f9f0a1c2b6..b467bea63096 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -644,4 +644,12 @@ public AvroIterable build() { } } + /** + * Returns number of rows in specified Avro file + * @param file Avro file + * @return number of rows in file + */ + public static long rowCount(InputFile file) { + return AvroIO.findStartingRowPos(file::newStream, Long.MAX_VALUE); + } } diff --git a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java index 27508470254d..a432c7639386 100644 --- a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java +++ b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java @@ -36,7 +36,9 @@ import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.avro.Avro; import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.orc.OrcMetrics; import org.apache.iceberg.parquet.ParquetUtil; @@ -91,7 +93,9 @@ private static List listAvroPartition(Map partitionPat return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER)) .filter(FileStatus::isFile) .map(stat -> { - Metrics metrics = new Metrics(-1L, null, null, null); + InputFile file = HadoopInputFile.fromLocation(stat.getPath().toString(), conf); + long rowCount = Avro.rowCount(file); + Metrics metrics = new Metrics(rowCount, null, null, null); String partitionKey = spec.fields().stream() .map(PartitionField::name) .map(name -> String.format("%s=%s", name, partitionPath.get(name))) diff --git a/spark/v3.0/build.gradle b/spark/v3.0/build.gradle index ef9a99be7e04..ec0b7c4be816 100644 --- a/spark/v3.0/build.gradle +++ b/spark/v3.0/build.gradle @@ -121,6 +121,8 @@ project(":iceberg-spark:iceberg-spark3-extensions") { testImplementation project(path: ':iceberg-spark', configuration: 'testArtifacts') testImplementation project(path: ':iceberg-spark:iceberg-spark3', configuration: 'testArtifacts') + testImplementation "org.apache.avro:avro" + // Required because we remove antlr plugin dependencies from the compile configuration, see note above // We shade this in Spark3 Runtime to avoid issues with Spark's Antlr Runtime runtimeOnly "org.antlr:antlr4-runtime:4.7.1" diff --git a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java index 50ab8b9bfc1c..7f5c7df43101 100644 --- a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java +++ b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java @@ -24,8 +24,17 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.DataFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; @@ -35,6 +44,7 @@ import org.apache.spark.sql.types.StructType; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Ignore; import org.junit.Rule; @@ -106,6 +116,59 @@ public void addDataUnpartitionedOrc() { sql("SELECT * FROM %s ORDER BY id", tableName)); } + @Test + public void addAvroFile() throws Exception { + // Spark Session Catalog cannot load metadata tables + // with "The namespace in session catalog must have exactly one name part" + Assume.assumeFalse(catalogName.equals("spark_catalog")); + + // Create an Avro file + + Schema schema = SchemaBuilder.record("record").fields() + .requiredInt("id") + .requiredString("data") + .endRecord(); + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", 1L); + record1.put("data", "a"); + GenericRecord record2 = new GenericData.Record(schema); + record2.put("id", 2L); + record2.put("data", "b"); + File outputFile = temp.newFile("test.avro"); + + DatumWriter datumWriter = new GenericDatumWriter(schema); + DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); + dataFileWriter.create(schema, outputFile); + dataFileWriter.append(record1); + dataFileWriter.append(record2); + dataFileWriter.close(); + + String createIceberg = + "CREATE TABLE %s (id Long, data String) USING iceberg"; + sql(createIceberg, tableName); + + Object result = scalarSql("CALL %s.system.add_files('%s', '`avro`.`%s`')", + catalogName, tableName, outputFile.getPath()); + Assert.assertEquals(1L, result); + + List expected = Lists.newArrayList( + new Object[]{1L, "a"}, + new Object[]{2L, "b"} + ); + + assertEquals("Iceberg table contains correct data", + expected, + sql("SELECT * FROM %s ORDER BY id", tableName)); + + List actualRecordCount = sql("select %s from %s.files", + DataFile.RECORD_COUNT.name(), + tableName); + List expectedRecordCount = Lists.newArrayList(); + expectedRecordCount.add(new Object[]{2L}); + assertEquals("Iceberg file metadata should have correct metadata count", + expectedRecordCount, actualRecordCount); + } + // TODO Adding spark-avro doesn't work in tests @Ignore public void addDataUnpartitionedAvro() {