From 408ffdd7952ad585db1de2a6890972f5455eab1e Mon Sep 17 00:00:00 2001 From: liliwei Date: Tue, 9 Aug 2022 14:58:15 +0800 Subject: [PATCH] Spark 3.1:Port #3505 to Spark 3.1 --- .../org/apache/iceberg/spark/Spark3Util.java | 43 ------------------- .../apache/iceberg/spark/SparkConfParser.java | 8 ++++ .../apache/iceberg/spark/SparkReadConf.java | 22 ++++++++++ .../spark/source/SparkBatchQueryScan.java | 16 +++---- .../iceberg/spark/source/SparkBatchScan.java | 9 +--- .../iceberg/spark/source/SparkFilesScan.java | 13 ++---- .../spark/source/SparkFilesScanBuilder.java | 6 +-- .../iceberg/spark/source/SparkMergeScan.java | 11 ++--- .../spark/source/SparkScanBuilder.java | 21 ++------- 9 files changed, 47 insertions(+), 102 deletions(-) diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java index a5a58922c933..2bfd0aaf8da7 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/Spark3Util.java @@ -90,7 +90,6 @@ import org.apache.spark.sql.types.IntegerType; import org.apache.spark.sql.types.LongType; import org.apache.spark.sql.types.StructType; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; import scala.Option; import scala.Predef; import scala.Some; @@ -508,48 +507,6 @@ public static String describe(org.apache.iceberg.SortOrder order) { return Joiner.on(", ").join(SortOrderVisitor.visit(order, DescribeSortOrderVisitor.INSTANCE)); } - public static Long propertyAsLong( - CaseInsensitiveStringMap options, String property, Long defaultValue) { - if (defaultValue != null) { - return options.getLong(property, defaultValue); - } - - String value = options.get(property); - if (value != null) { - return Long.parseLong(value); - } - - return null; - } - - public static Integer propertyAsInt( - CaseInsensitiveStringMap options, String property, Integer defaultValue) { - if (defaultValue != null) { - return options.getInt(property, defaultValue); - } - - String value = options.get(property); - if (value != null) { - return Integer.parseInt(value); - } - - return null; - } - - public static Boolean propertyAsBoolean( - CaseInsensitiveStringMap options, String property, Boolean defaultValue) { - if (defaultValue != null) { - return options.getBoolean(property, defaultValue); - } - - String value = options.get(property); - if (value != null) { - return Boolean.parseBoolean(value); - } - - return null; - } - public static class DescribeSchemaVisitor extends TypeUtil.SchemaVisitor { private static final Joiner COMMA = Joiner.on(','); private static final DescribeSchemaVisitor INSTANCE = new DescribeSchemaVisitor(); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java index 6a0844df6a8c..e1425042bdbb 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java @@ -95,6 +95,10 @@ public int parse() { Preconditions.checkArgument(defaultValue != null, "Default value cannot be null"); return parse(Integer::parseInt, defaultValue); } + + public Integer parseOptional() { + return parse(Integer::parseInt, null); + } } class LongConfParser extends ConfParser { @@ -137,6 +141,10 @@ public String parse() { Preconditions.checkArgument(defaultValue != null, "Default value cannot be null"); return parse(Function.identity(), defaultValue); } + + public String parseOptional() { + return parse(Function.identity(), null); + } } abstract class ConfParser { diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 937c31e45960..184c5ac168d5 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -50,16 +50,22 @@ public class SparkReadConf { private static final Set LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs"); + private final SparkSession spark; private final Table table; private final Map readOptions; private final SparkConfParser confParser; public SparkReadConf(SparkSession spark, Table table, Map readOptions) { + this.spark = spark; this.table = table; this.readOptions = readOptions; this.confParser = new SparkConfParser(spark, table, readOptions); } + public boolean caseSensitive() { + return Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive")); + } + public boolean localityEnabled() { InputFile file = table.io().newInputFile(table.location()); @@ -88,6 +94,10 @@ public Long endSnapshotId() { return confParser.longConf().option(SparkReadOptions.END_SNAPSHOT_ID).parseOptional(); } + public String fileScanTaskSetId() { + return confParser.stringConf().option(SparkReadOptions.FILE_SCAN_TASK_SET_ID).parseOptional(); + } + public boolean streamingSkipDeleteSnapshots() { return confParser .booleanConf() @@ -142,6 +152,18 @@ public int orcBatchSize() { .parse(); } + public Long splitSizeOption() { + return confParser.longConf().option(SparkReadOptions.SPLIT_SIZE).parseOptional(); + } + + public Integer splitLookbackOption() { + return confParser.intConf().option(SparkReadOptions.LOOKBACK).parseOptional(); + } + + public Long splitOpenFileCostOption() { + return confParser.longConf().option(SparkReadOptions.FILE_OPEN_COST).parseOptional(); + } + public long splitSize() { return confParser .longConf() diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index 4fcab5517d44..5fed852f678a 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -30,11 +30,9 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkReadOptions; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; class SparkBatchQueryScan extends SparkBatchScan { @@ -52,12 +50,10 @@ class SparkBatchQueryScan extends SparkBatchScan { SparkSession spark, Table table, SparkReadConf readConf, - boolean caseSensitive, Schema expectedSchema, - List filters, - CaseInsensitiveStringMap options) { + List filters) { - super(spark, table, readConf, caseSensitive, expectedSchema, filters, options); + super(spark, table, readConf, expectedSchema, filters); this.snapshotId = readConf.snapshotId(); this.asOfTimestamp = readConf.asOfTimestamp(); @@ -83,11 +79,9 @@ class SparkBatchQueryScan extends SparkBatchScan { "Cannot only specify option end-snapshot-id to do incremental scan"); } - // look for split behavior overrides in options - this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, null); - this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, null); - this.splitOpenFileCost = - Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null); + this.splitSize = readConf.splitSizeOption(); + this.splitLookback = readConf.splitLookbackOption(); + this.splitOpenFileCost = readConf.splitOpenFileCostOption(); } @Override diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java index c09ce2f76c48..63489d5056ca 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchScan.java @@ -55,7 +55,6 @@ import org.apache.spark.sql.connector.read.SupportsReportStatistics; import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.types.StructType; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.apache.spark.sql.vectorized.ColumnarBatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +70,6 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { private final Schema expectedSchema; private final List filterExpressions; private final boolean readTimestampWithoutZone; - private final CaseInsensitiveStringMap options; // lazy variables private StructType readSchema = null; @@ -80,22 +78,19 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics { SparkSession spark, Table table, SparkReadConf readConf, - boolean caseSensitive, Schema expectedSchema, - List filters, - CaseInsensitiveStringMap options) { + List filters) { SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema); this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); this.table = table; this.readConf = readConf; - this.caseSensitive = caseSensitive; + this.caseSensitive = readConf.caseSensitive(); this.expectedSchema = expectedSchema; this.filterExpressions = filters != null ? filters : Collections.emptyList(); this.localityPreferred = readConf.localityEnabled(); this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone(); - this.options = options; } protected Table table() { diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java index 4eb36b67ea40..1cf03e55c378 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScan.java @@ -29,10 +29,8 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.FileScanTaskSetManager; import org.apache.iceberg.spark.SparkReadConf; -import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; class SparkFilesScan extends SparkBatchScan { private final String taskSetID; @@ -42,15 +40,10 @@ class SparkFilesScan extends SparkBatchScan { private List tasks = null; // lazy cache of tasks - SparkFilesScan( - SparkSession spark, - Table table, - SparkReadConf readConf, - boolean caseSensitive, - CaseInsensitiveStringMap options) { - super(spark, table, readConf, caseSensitive, table.schema(), ImmutableList.of(), options); + SparkFilesScan(SparkSession spark, Table table, SparkReadConf readConf) { + super(spark, table, readConf, table.schema(), ImmutableList.of()); - this.taskSetID = options.get(SparkReadOptions.FILE_SCAN_TASK_SET_ID); + this.taskSetID = readConf.fileScanTaskSetId(); this.splitSize = readConf.splitSize(); this.splitLookback = readConf.splitLookback(); this.splitOpenFileCost = readConf.splitOpenFileCost(); diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java index 029585caf944..03ab3aa062d3 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkFilesScanBuilder.java @@ -30,19 +30,15 @@ class SparkFilesScanBuilder implements ScanBuilder { private final SparkSession spark; private final Table table; private final SparkReadConf readConf; - private final boolean caseSensitive; - private final CaseInsensitiveStringMap options; SparkFilesScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) { this.spark = spark; this.table = table; this.readConf = new SparkReadConf(spark, table, options); - this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive")); - this.options = options; } @Override public Scan build() { - return new SparkFilesScan(spark, table, readConf, caseSensitive, options); + return new SparkFilesScan(spark, table, readConf); } } diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java index 8bc3f7d049cf..e43ff44519bc 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMergeScan.java @@ -36,12 +36,10 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.SparkReadConf; -import org.apache.iceberg.spark.SparkReadOptions; import org.apache.iceberg.util.TableScanUtil; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.iceberg.read.SupportsFileFilter; import org.apache.spark.sql.connector.read.Statistics; -import org.apache.spark.sql.util.CaseInsensitiveStringMap; class SparkMergeScan extends SparkBatchScan implements SupportsFileFilter { @@ -62,13 +60,11 @@ class SparkMergeScan extends SparkBatchScan implements SupportsFileFilter { SparkSession spark, Table table, SparkReadConf readConf, - boolean caseSensitive, boolean ignoreResiduals, Schema expectedSchema, - List filters, - CaseInsensitiveStringMap options) { + List filters) { - super(spark, table, readConf, caseSensitive, expectedSchema, filters, options); + super(spark, table, readConf, expectedSchema, filters); this.table = table; this.ignoreResiduals = ignoreResiduals; @@ -77,8 +73,7 @@ class SparkMergeScan extends SparkBatchScan implements SupportsFileFilter { this.splitLookback = readConf.splitLookback(); this.splitOpenFileCost = readConf.splitOpenFileCost(); - Preconditions.checkArgument( - !options.containsKey(SparkReadOptions.SNAPSHOT_ID), "Can't set snapshot-id in options"); + Preconditions.checkArgument(readConf.snapshotId() == null, "Can't set snapshot-id in options"); Snapshot currentSnapshot = table.currentSnapshot(); this.snapshotId = currentSnapshot != null ? currentSnapshot.snapshotId() : null; diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 708d4378bc1b..f0ecdb8f13d0 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -58,7 +58,6 @@ public class SparkScanBuilder private final SparkSession spark; private final Table table; private final SparkReadConf readConf; - private final CaseInsensitiveStringMap options; private final List metaColumns = Lists.newArrayList(); private Schema schema = null; @@ -74,8 +73,7 @@ public class SparkScanBuilder this.table = table; this.schema = schema; this.readConf = new SparkReadConf(spark, table, options); - this.options = options; - this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive")); + this.caseSensitive = readConf.caseSensitive(); } SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) { @@ -171,25 +169,12 @@ private Schema schemaWithMetadataColumns() { @Override public Scan build() { return new SparkBatchQueryScan( - spark, - table, - readConf, - caseSensitive, - schemaWithMetadataColumns(), - filterExpressions, - options); + spark, table, readConf, schemaWithMetadataColumns(), filterExpressions); } public Scan buildMergeScan() { return new SparkMergeScan( - spark, - table, - readConf, - caseSensitive, - ignoreResiduals, - schemaWithMetadataColumns(), - filterExpressions, - options); + spark, table, readConf, ignoreResiduals, schemaWithMetadataColumns(), filterExpressions); } @Override