From 5ee55601f38a9ed085557e5d5b6a9cb21be997b4 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Thu, 26 Aug 2021 17:57:46 -0700 Subject: [PATCH 1/2] Better SparkBatchScan statistics estimation. When estimating statistics, we should take the read schema into account. --- .../org/apache/iceberg/spark/source/SparkBatchScan.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index acd02037aced..b5c31b527635 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -215,10 +215,14 @@ public Statistics estimateStatistics() { long sizeInBytes = 0L; long numRows = 0L; + double compressionFactor = SparkSession.active().sessionState().conf().fileCompressionFactor(); + StructType tableSchema = SparkSchemaUtil.convert(table.schema()); + double adjustmentFactor = compressionFactor * readSchema().defaultSize() / tableSchema.defaultSize(); for (CombinedScanTask task : tasks()) { for (FileScanTask file : task.files()) { - sizeInBytes += file.length(); + long estimate = (long) (adjustmentFactor * file.length()); + sizeInBytes += estimate; numRows += file.file().recordCount(); } } From 5deeb5635cfef5eec5c8efc04fb7f985ad5e77f4 Mon Sep 17 00:00:00 2001 From: Wing Yew Poon Date: Thu, 2 Sep 2021 16:35:23 -0700 Subject: [PATCH 2/2] Use row size * number of rows instead of adjusted file size. Fix SparkSchemaUtil.estimateSize to use LongMath.checkedMultiply. --- .../apache/iceberg/spark/SparkSchemaUtil.java | 20 +++++++++---------- .../iceberg/spark/source/SparkBatchScan.java | 10 ++-------- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java b/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java index b503ba634d85..321050dceb74 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkSchemaUtil.java @@ -30,6 +30,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.math.LongMath; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; @@ -37,7 +38,6 @@ import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalog.Column; import org.apache.spark.sql.types.DataType; -import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; /** @@ -280,23 +280,23 @@ private static PartitionSpec identitySpec(Schema schema, List partitionN } /** - * estimate approximate table size based on spark schema and total records. + * Estimate approximate table size based on Spark schema and total records. * - * @param tableSchema spark schema + * @param tableSchema Spark schema * @param totalRecords total records in the table - * @return approxiate size based on table schema + * @return approximate size based on table schema */ public static long estimateSize(StructType tableSchema, long totalRecords) { if (totalRecords == Long.MAX_VALUE) { return totalRecords; } - long approximateSize = 0; - for (StructField sparkField : tableSchema.fields()) { - approximateSize += sparkField.dataType().defaultSize(); + long result; + try { + result = LongMath.checkedMultiply(tableSchema.defaultSize(), totalRecords); + } catch (ArithmeticException e) { + result = Long.MAX_VALUE; } - - long result = approximateSize * totalRecords; - return result > 0 ? result : Long.MAX_VALUE; + return result; } } diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index b5c31b527635..e9eda0b29394 100644 --- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -207,26 +207,20 @@ public Statistics estimateStatistics() { LOG.debug("using table metadata to estimate table statistics"); long totalRecords = PropertyUtil.propertyAsLong(table.currentSnapshot().summary(), SnapshotSummary.TOTAL_RECORDS_PROP, Long.MAX_VALUE); - Schema projectedSchema = expectedSchema != null ? expectedSchema : table.schema(); return new Stats( - SparkSchemaUtil.estimateSize(SparkSchemaUtil.convert(projectedSchema), totalRecords), + SparkSchemaUtil.estimateSize(readSchema(), totalRecords), totalRecords); } - long sizeInBytes = 0L; long numRows = 0L; - double compressionFactor = SparkSession.active().sessionState().conf().fileCompressionFactor(); - StructType tableSchema = SparkSchemaUtil.convert(table.schema()); - double adjustmentFactor = compressionFactor * readSchema().defaultSize() / tableSchema.defaultSize(); for (CombinedScanTask task : tasks()) { for (FileScanTask file : task.files()) { - long estimate = (long) (adjustmentFactor * file.length()); - sizeInBytes += estimate; numRows += file.file().recordCount(); } } + long sizeInBytes = SparkSchemaUtil.estimateSize(readSchema(), numRows); return new Stats(sizeInBytes, numRows); }