From d2971b9cecad63a16d3ff861dc50550a6f56e3c7 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Fri, 8 Oct 2021 22:29:05 -0700 Subject: [PATCH 01/10] Add File for Avro files throws PreconditionException https://github.com/apache/iceberg/issues/3263 --- .../iceberg/data/TableMigrationUtil.java | 1 + .../extensions/TestAddFilesProcedure.java | 60 +++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java index 27508470254d..856eefa0c596 100644 --- a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java +++ b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java @@ -103,6 +103,7 @@ private static List listAvroPartition(Map partitionPat .withFileSizeInBytes(stat.getLen()) .withMetrics(metrics) .withPartitionPath(partitionKey) + .withRecordCount(0) .build(); }).collect(Collectors.toList()); diff --git a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java index 50ab8b9bfc1c..ccd385de5eb6 100644 --- a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java +++ b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java @@ -25,7 +25,18 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; @@ -106,6 +117,55 @@ public void addDataUnpartitionedOrc() { sql("SELECT * FROM %s ORDER BY id", tableName)); } + @Test + public void addDataUnpartitionedAvroFile() throws Exception { + final Schema SCHEMA = new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())); + + GenericRecord baseRecord = GenericRecord.create(SCHEMA); + + ImmutableList.Builder builder = ImmutableList.builder(); + builder.add(baseRecord.copy(ImmutableMap.of("id", 1L, "data", "a"))); + builder.add(baseRecord.copy(ImmutableMap.of("id", 2L, "data", "b"))); + List records = builder.build(); + + OutputFile file = Files.localOutput(temp.newFile()); + + DataWriter dataWriter = Avro.writeData(file) + .schema(SCHEMA) + .createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create) + .overwrite() + .withSpec(PartitionSpec.unpartitioned()) + .build(); + + try { + for (Record record : records) { + dataWriter.add(record); + } + } finally { + dataWriter.close(); + } + + String path = dataWriter.toDataFile().path().toString(); + + String createIceberg = + "CREATE TABLE %s (id Long, data String) USING iceberg"; + sql(createIceberg, tableName); + + Object result = scalarSql("CALL %s.system.add_files('%s', '`avro`.`%s`')", + catalogName, tableName, path); + Assert.assertEquals(1L, result); + + List expected = Lists.newArrayList( + new Object[]{1L, "a"}, + new Object[]{2L, "b"} + ); + assertEquals("Iceberg table contains correct data", + expected, + sql("SELECT * FROM %s ORDER BY id", tableName)); + } + // TODO Adding spark-avro doesn't work in tests @Ignore public void addDataUnpartitionedAvro() { From b660e8f6051273c072e862a4d4749df7322d2b82 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 11 Oct 2021 12:24:59 -0700 Subject: [PATCH 02/10] Change DataFiles.Builder Preconditions to accept -1. Use Long instead of long to differentiate unset vs set recordCount. --- core/src/main/java/org/apache/iceberg/DataFiles.java | 4 ++-- .../java/org/apache/iceberg/data/TableMigrationUtil.java | 2 -- .../iceberg/spark/extensions/TestAddFilesProcedure.java | 6 +++--- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java b/core/src/main/java/org/apache/iceberg/DataFiles.java index a765dc7fb86a..bbb6ded95158 100644 --- a/core/src/main/java/org/apache/iceberg/DataFiles.java +++ b/core/src/main/java/org/apache/iceberg/DataFiles.java @@ -119,7 +119,7 @@ public static class Builder { private PartitionData partitionData; private String filePath = null; private FileFormat format = null; - private long recordCount = -1L; + private Long recordCount = null; private long fileSizeInBytes = -1L; private Integer sortOrderId = SortOrder.unsorted().orderId(); @@ -285,7 +285,7 @@ public DataFile build() { } Preconditions.checkArgument(format != null, "File format is required"); Preconditions.checkArgument(fileSizeInBytes >= 0, "File size is required"); - Preconditions.checkArgument(recordCount >= 0, "Record count is required"); + Preconditions.checkArgument(recordCount != null, "Record count is required"); return new GenericDataFile( specId, filePath, format, isPartitioned ? partitionData.copy() : null, diff --git a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java index 856eefa0c596..702cbd47da27 100644 --- a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java +++ b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java @@ -103,9 +103,7 @@ private static List listAvroPartition(Map partitionPat .withFileSizeInBytes(stat.getLen()) .withMetrics(metrics) .withPartitionPath(partitionKey) - .withRecordCount(0) .build(); - }).collect(Collectors.toList()); } catch (IOException e) { throw new RuntimeException("Unable to list files in partition: " + partitionUri, e); diff --git a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java index ccd385de5eb6..c260f194096d 100644 --- a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java +++ b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java @@ -119,11 +119,11 @@ public void addDataUnpartitionedOrc() { @Test public void addDataUnpartitionedAvroFile() throws Exception { - final Schema SCHEMA = new Schema( + Schema schema = new Schema( Types.NestedField.required(1, "id", Types.LongType.get()), Types.NestedField.optional(2, "data", Types.StringType.get())); - GenericRecord baseRecord = GenericRecord.create(SCHEMA); + GenericRecord baseRecord = GenericRecord.create(schema); ImmutableList.Builder builder = ImmutableList.builder(); builder.add(baseRecord.copy(ImmutableMap.of("id", 1L, "data", "a"))); @@ -133,7 +133,7 @@ public void addDataUnpartitionedAvroFile() throws Exception { OutputFile file = Files.localOutput(temp.newFile()); DataWriter dataWriter = Avro.writeData(file) - .schema(SCHEMA) + .schema(schema) .createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create) .overwrite() .withSpec(PartitionSpec.unpartitioned()) From f6ba2292fea45af06eff792011fe3a2d2972b7a7 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 11 Oct 2021 13:08:25 -0700 Subject: [PATCH 03/10] Add another precondition to validate -1 case --- core/src/main/java/org/apache/iceberg/DataFiles.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java b/core/src/main/java/org/apache/iceberg/DataFiles.java index bbb6ded95158..991df4ad474c 100644 --- a/core/src/main/java/org/apache/iceberg/DataFiles.java +++ b/core/src/main/java/org/apache/iceberg/DataFiles.java @@ -286,6 +286,12 @@ public DataFile build() { Preconditions.checkArgument(format != null, "File format is required"); Preconditions.checkArgument(fileSizeInBytes >= 0, "File size is required"); Preconditions.checkArgument(recordCount != null, "Record count is required"); + // MetricsEvaluator skips using other metrics, if record count is -1 + Preconditions.checkArgument(recordCount >= 0 || + (recordCount == -1 && ( + (valueCounts == null) || (columnSizes == null) || (nanValueCounts == null) || + (lowerBounds == null) || (upperBounds == null))), + "Metrics cannot be set if record count is -1."); return new GenericDataFile( specId, filePath, format, isPartitioned ? partitionData.copy() : null, From 9fce5286e2067ce616b25215296278a99fe1af2b Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 11 Oct 2021 13:15:12 -0700 Subject: [PATCH 04/10] Fix condition --- core/src/main/java/org/apache/iceberg/DataFiles.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java b/core/src/main/java/org/apache/iceberg/DataFiles.java index 991df4ad474c..aa256fc34e9c 100644 --- a/core/src/main/java/org/apache/iceberg/DataFiles.java +++ b/core/src/main/java/org/apache/iceberg/DataFiles.java @@ -288,9 +288,8 @@ public DataFile build() { Preconditions.checkArgument(recordCount != null, "Record count is required"); // MetricsEvaluator skips using other metrics, if record count is -1 Preconditions.checkArgument(recordCount >= 0 || - (recordCount == -1 && ( - (valueCounts == null) || (columnSizes == null) || (nanValueCounts == null) || - (lowerBounds == null) || (upperBounds == null))), + (recordCount == -1 && valueCounts == null && columnSizes == null && nanValueCounts == null && + lowerBounds == null && upperBounds == null), "Metrics cannot be set if record count is -1."); return new GenericDataFile( From ee54d1a708e7ae7e60005ae67fc9aef405f6b47e Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Mon, 11 Oct 2021 23:07:44 -0700 Subject: [PATCH 05/10] Restore unnecessary newline removal --- .../main/java/org/apache/iceberg/data/TableMigrationUtil.java | 1 + 1 file changed, 1 insertion(+) diff --git a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java index 702cbd47da27..27508470254d 100644 --- a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java +++ b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java @@ -104,6 +104,7 @@ private static List listAvroPartition(Map partitionPat .withMetrics(metrics) .withPartitionPath(partitionKey) .build(); + }).collect(Collectors.toList()); } catch (IOException e) { throw new RuntimeException("Unable to list files in partition: " + partitionUri, e); From 3385130574d7b90df211badd958a8343cd808250 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Wed, 13 Oct 2021 21:43:40 -0700 Subject: [PATCH 06/10] Minor test changes for review feedback --- .../iceberg/spark/extensions/TestAddFilesProcedure.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java index c260f194096d..025a7e9faede 100644 --- a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java +++ b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java @@ -118,7 +118,7 @@ public void addDataUnpartitionedOrc() { } @Test - public void addDataUnpartitionedAvroFile() throws Exception { + public void addAvroFile() throws Exception { Schema schema = new Schema( Types.NestedField.required(1, "id", Types.LongType.get()), Types.NestedField.optional(2, "data", Types.StringType.get())); @@ -164,6 +164,12 @@ public void addDataUnpartitionedAvroFile() throws Exception { assertEquals("Iceberg table contains correct data", expected, sql("SELECT * FROM %s ORDER BY id", tableName)); + + List expectedCount = Lists.newArrayList(); + expectedCount.add(new Object[]{2L}); + assertEquals("Iceberg table has correct count", + expectedCount, + sql("SELECT COUNT(*) FROM %s", tableName)); } // TODO Adding spark-avro doesn't work in tests From 438e4396f1e377767c4a6c15f235c7cdd436c68f Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 19 Oct 2021 10:46:38 -0700 Subject: [PATCH 07/10] Implement rowCount for added Avro files, restore -1 rowCount check --- .../java/org/apache/iceberg/DataFiles.java | 9 ++------- .../java/org/apache/iceberg/avro/AvroIO.java | 4 ++-- .../iceberg/data/TableMigrationUtil.java | 9 ++++++++- .../extensions/TestAddFilesProcedure.java | 19 ++++++++++++++----- 4 files changed, 26 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/DataFiles.java b/core/src/main/java/org/apache/iceberg/DataFiles.java index aa256fc34e9c..a765dc7fb86a 100644 --- a/core/src/main/java/org/apache/iceberg/DataFiles.java +++ b/core/src/main/java/org/apache/iceberg/DataFiles.java @@ -119,7 +119,7 @@ public static class Builder { private PartitionData partitionData; private String filePath = null; private FileFormat format = null; - private Long recordCount = null; + private long recordCount = -1L; private long fileSizeInBytes = -1L; private Integer sortOrderId = SortOrder.unsorted().orderId(); @@ -285,12 +285,7 @@ public DataFile build() { } Preconditions.checkArgument(format != null, "File format is required"); Preconditions.checkArgument(fileSizeInBytes >= 0, "File size is required"); - Preconditions.checkArgument(recordCount != null, "Record count is required"); - // MetricsEvaluator skips using other metrics, if record count is -1 - Preconditions.checkArgument(recordCount >= 0 || - (recordCount == -1 && valueCounts == null && columnSizes == null && nanValueCounts == null && - lowerBounds == null && upperBounds == null), - "Metrics cannot be set if record count is -1."); + Preconditions.checkArgument(recordCount >= 0, "Record count is required"); return new GenericDataFile( specId, filePath, format, isPartitioned ? partitionData.copy() : null, diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroIO.java b/core/src/main/java/org/apache/iceberg/avro/AvroIO.java index c569d93b1fa5..c344a0e57e03 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroIO.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroIO.java @@ -35,7 +35,7 @@ import org.apache.iceberg.io.DelegatingInputStream; import org.apache.iceberg.io.SeekableInputStream; -class AvroIO { +public class AvroIO { private static final byte[] AVRO_MAGIC = new byte[] { 'O', 'b', 'j', 1 }; private static final ValueReader MAGIC_READER = ValueReaders.fixed(AVRO_MAGIC.length); private static final ValueReader> META_READER = ValueReaders.map( @@ -146,7 +146,7 @@ public boolean markSupported() { } } - static long findStartingRowPos(Supplier open, long start) { + public static long findStartingRowPos(Supplier open, long start) { long totalRows = 0; try (SeekableInputStream in = open.get()) { // use a direct decoder that will not buffer so the position of the input stream is accurate diff --git a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java index 27508470254d..6a82bae78395 100644 --- a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java +++ b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java @@ -36,7 +36,9 @@ import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.avro.AvroIO; import org.apache.iceberg.hadoop.HadoopInputFile; +import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMapping; import org.apache.iceberg.orc.OrcMetrics; import org.apache.iceberg.parquet.ParquetUtil; @@ -91,7 +93,12 @@ private static List listAvroPartition(Map partitionPat return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER)) .filter(FileStatus::isFile) .map(stat -> { - Metrics metrics = new Metrics(-1L, null, null, null); + InputFile inFile = HadoopInputFile.fromLocation(stat.getPath().toString(), conf); + long length = inFile.getLength(); + + // Seeking to the end will count all the rows. + long rowCount = AvroIO.findStartingRowPos(inFile::newStream, length); + Metrics metrics = new Metrics(rowCount, null, null, null); String partitionKey = spec.fields().stream() .map(PartitionField::name) .map(name -> String.format("%s=%s", name, partitionPath.get(name))) diff --git a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java index 025a7e9faede..e99d4046b61e 100644 --- a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java +++ b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.iceberg.AssertHelpers; +import org.apache.iceberg.DataFile; import org.apache.iceberg.Files; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; @@ -46,6 +47,7 @@ import org.apache.spark.sql.types.StructType; import org.junit.After; import org.junit.Assert; +import org.junit.Assume; import org.junit.Before; import org.junit.Ignore; import org.junit.Rule; @@ -119,6 +121,10 @@ public void addDataUnpartitionedOrc() { @Test public void addAvroFile() throws Exception { + // Spark Session Catalog cannot load metadata tables + // with "The namespace in session catalog must have exactly one name part" + Assume.assumeFalse(catalogName.equals("spark_catalog")); + Schema schema = new Schema( Types.NestedField.required(1, "id", Types.LongType.get()), Types.NestedField.optional(2, "data", Types.StringType.get())); @@ -161,15 +167,18 @@ public void addAvroFile() throws Exception { new Object[]{1L, "a"}, new Object[]{2L, "b"} ); + assertEquals("Iceberg table contains correct data", expected, sql("SELECT * FROM %s ORDER BY id", tableName)); - List expectedCount = Lists.newArrayList(); - expectedCount.add(new Object[]{2L}); - assertEquals("Iceberg table has correct count", - expectedCount, - sql("SELECT COUNT(*) FROM %s", tableName)); + List actualRecordCount = sql("select %s from %s.files", + DataFile.RECORD_COUNT.name(), + tableName); + List expectedRecordCount = Lists.newArrayList(); + expectedRecordCount.add(new Object[]{2L}); + assertEquals("Iceberg file metadata should have correct metadata count", + expectedRecordCount, actualRecordCount); } // TODO Adding spark-avro doesn't work in tests From bd71a89c9d9b847d438d9fe223376666fb32ccf6 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 19 Oct 2021 11:51:30 -0700 Subject: [PATCH 08/10] Fix rebase error --- spark/v3.0/build.gradle | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/v3.0/build.gradle b/spark/v3.0/build.gradle index ef9a99be7e04..ec0b7c4be816 100644 --- a/spark/v3.0/build.gradle +++ b/spark/v3.0/build.gradle @@ -121,6 +121,8 @@ project(":iceberg-spark:iceberg-spark3-extensions") { testImplementation project(path: ':iceberg-spark', configuration: 'testArtifacts') testImplementation project(path: ':iceberg-spark:iceberg-spark3', configuration: 'testArtifacts') + testImplementation "org.apache.avro:avro" + // Required because we remove antlr plugin dependencies from the compile configuration, see note above // We shade this in Spark3 Runtime to avoid issues with Spark's Antlr Runtime runtimeOnly "org.antlr:antlr4-runtime:4.7.1" From 0aba1aa6ce1d343df6d3b4770b2290f82ac60ee4 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 19 Oct 2021 19:11:28 -0700 Subject: [PATCH 09/10] Address review comments, use native Avro writer in test --- .../java/org/apache/iceberg/avro/Avro.java | 8 +++ .../iceberg/data/TableMigrationUtil.java | 9 +-- .../extensions/TestAddFilesProcedure.java | 68 ++++++++----------- 3 files changed, 39 insertions(+), 46 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index c6f9f0a1c2b6..b467bea63096 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -644,4 +644,12 @@ public AvroIterable build() { } } + /** + * Returns number of rows in specified Avro file + * @param file Avro file + * @return number of rows in file + */ + public static long rowCount(InputFile file) { + return AvroIO.findStartingRowPos(file::newStream, Long.MAX_VALUE); + } } diff --git a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java index 6a82bae78395..a432c7639386 100644 --- a/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java +++ b/data/src/main/java/org/apache/iceberg/data/TableMigrationUtil.java @@ -36,7 +36,7 @@ import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.avro.AvroIO; +import org.apache.iceberg.avro.Avro; import org.apache.iceberg.hadoop.HadoopInputFile; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.mapping.NameMapping; @@ -93,11 +93,8 @@ private static List listAvroPartition(Map partitionPat return Arrays.stream(fs.listStatus(partition, HIDDEN_PATH_FILTER)) .filter(FileStatus::isFile) .map(stat -> { - InputFile inFile = HadoopInputFile.fromLocation(stat.getPath().toString(), conf); - long length = inFile.getLength(); - - // Seeking to the end will count all the rows. - long rowCount = AvroIO.findStartingRowPos(inFile::newStream, length); + InputFile file = HadoopInputFile.fromLocation(stat.getPath().toString(), conf); + long rowCount = Avro.rowCount(file); Metrics metrics = new Metrics(rowCount, null, null, null); String partitionKey = spec.fields().stream() .map(PartitionField::name) diff --git a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java index e99d4046b61e..7f5c7df43101 100644 --- a/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java +++ b/spark/v3.0/spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java @@ -24,20 +24,17 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; import org.apache.iceberg.AssertHelpers; import org.apache.iceberg.DataFile; -import org.apache.iceberg.Files; -import org.apache.iceberg.PartitionSpec; -import org.apache.iceberg.Schema; -import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.GenericRecord; -import org.apache.iceberg.data.Record; -import org.apache.iceberg.io.DataWriter; -import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; @@ -125,42 +122,33 @@ public void addAvroFile() throws Exception { // with "The namespace in session catalog must have exactly one name part" Assume.assumeFalse(catalogName.equals("spark_catalog")); - Schema schema = new Schema( - Types.NestedField.required(1, "id", Types.LongType.get()), - Types.NestedField.optional(2, "data", Types.StringType.get())); - - GenericRecord baseRecord = GenericRecord.create(schema); - - ImmutableList.Builder builder = ImmutableList.builder(); - builder.add(baseRecord.copy(ImmutableMap.of("id", 1L, "data", "a"))); - builder.add(baseRecord.copy(ImmutableMap.of("id", 2L, "data", "b"))); - List records = builder.build(); - - OutputFile file = Files.localOutput(temp.newFile()); - - DataWriter dataWriter = Avro.writeData(file) - .schema(schema) - .createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create) - .overwrite() - .withSpec(PartitionSpec.unpartitioned()) - .build(); - - try { - for (Record record : records) { - dataWriter.add(record); - } - } finally { - dataWriter.close(); - } - - String path = dataWriter.toDataFile().path().toString(); + // Create an Avro file + + Schema schema = SchemaBuilder.record("record").fields() + .requiredInt("id") + .requiredString("data") + .endRecord(); + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", 1L); + record1.put("data", "a"); + GenericRecord record2 = new GenericData.Record(schema); + record2.put("id", 2L); + record2.put("data", "b"); + File outputFile = temp.newFile("test.avro"); + + DatumWriter datumWriter = new GenericDatumWriter(schema); + DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); + dataFileWriter.create(schema, outputFile); + dataFileWriter.append(record1); + dataFileWriter.append(record2); + dataFileWriter.close(); String createIceberg = "CREATE TABLE %s (id Long, data String) USING iceberg"; sql(createIceberg, tableName); Object result = scalarSql("CALL %s.system.add_files('%s', '`avro`.`%s`')", - catalogName, tableName, path); + catalogName, tableName, outputFile.getPath()); Assert.assertEquals(1L, result); List expected = Lists.newArrayList( From fbdb3e4baee72b4776f9328fd4e1d4252f95b077 Mon Sep 17 00:00:00 2001 From: Szehon Ho Date: Tue, 19 Oct 2021 19:19:00 -0700 Subject: [PATCH 10/10] Revert making AvroIO public --- core/src/main/java/org/apache/iceberg/avro/AvroIO.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroIO.java b/core/src/main/java/org/apache/iceberg/avro/AvroIO.java index c344a0e57e03..c569d93b1fa5 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroIO.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroIO.java @@ -35,7 +35,7 @@ import org.apache.iceberg.io.DelegatingInputStream; import org.apache.iceberg.io.SeekableInputStream; -public class AvroIO { +class AvroIO { private static final byte[] AVRO_MAGIC = new byte[] { 'O', 'b', 'j', 1 }; private static final ValueReader MAGIC_READER = ValueReaders.fixed(AVRO_MAGIC.length); private static final ValueReader> META_READER = ValueReaders.map( @@ -146,7 +146,7 @@ public boolean markSupported() { } } - public static long findStartingRowPos(Supplier open, long start) { + static long findStartingRowPos(Supplier open, long start) { long totalRows = 0; try (SeekableInputStream in = open.get()) { // use a direct decoder that will not buffer so the position of the input stream is accurate