Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
import org.apache.spark.sql.types.IntegerType;
import org.apache.spark.sql.types.LongType;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import scala.Option;
import scala.Predef;
import scala.Some;
Expand Down Expand Up @@ -508,48 +507,6 @@ public static String describe(org.apache.iceberg.SortOrder order) {
return Joiner.on(", ").join(SortOrderVisitor.visit(order, DescribeSortOrderVisitor.INSTANCE));
}

public static Long propertyAsLong(
CaseInsensitiveStringMap options, String property, Long defaultValue) {
if (defaultValue != null) {
return options.getLong(property, defaultValue);
}

String value = options.get(property);
if (value != null) {
return Long.parseLong(value);
}

return null;
}

public static Integer propertyAsInt(
CaseInsensitiveStringMap options, String property, Integer defaultValue) {
if (defaultValue != null) {
return options.getInt(property, defaultValue);
}

String value = options.get(property);
if (value != null) {
return Integer.parseInt(value);
}

return null;
}

public static Boolean propertyAsBoolean(
CaseInsensitiveStringMap options, String property, Boolean defaultValue) {
if (defaultValue != null) {
return options.getBoolean(property, defaultValue);
}

String value = options.get(property);
if (value != null) {
return Boolean.parseBoolean(value);
}

return null;
}

public static class DescribeSchemaVisitor extends TypeUtil.SchemaVisitor<String> {
private static final Joiner COMMA = Joiner.on(',');
private static final DescribeSchemaVisitor INSTANCE = new DescribeSchemaVisitor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ public int parse() {
Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
return parse(Integer::parseInt, defaultValue);
}

public Integer parseOptional() {
return parse(Integer::parseInt, null);
}
}

class LongConfParser extends ConfParser<LongConfParser, Long> {
Expand Down Expand Up @@ -137,6 +141,10 @@ public String parse() {
Preconditions.checkArgument(defaultValue != null, "Default value cannot be null");
return parse(Function.identity(), defaultValue);
}

public String parseOptional() {
return parse(Function.identity(), null);
}
}

abstract class ConfParser<ThisT, T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,22 @@ public class SparkReadConf {

private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");

private final SparkSession spark;
private final Table table;
private final Map<String, String> readOptions;
private final SparkConfParser confParser;

public SparkReadConf(SparkSession spark, Table table, Map<String, String> readOptions) {
this.spark = spark;
this.table = table;
this.readOptions = readOptions;
this.confParser = new SparkConfParser(spark, table, readOptions);
}

public boolean caseSensitive() {
return Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
}

public boolean localityEnabled() {
InputFile file = table.io().newInputFile(table.location());

Expand Down Expand Up @@ -88,6 +94,10 @@ public Long endSnapshotId() {
return confParser.longConf().option(SparkReadOptions.END_SNAPSHOT_ID).parseOptional();
}

public String fileScanTaskSetId() {
return confParser.stringConf().option(SparkReadOptions.FILE_SCAN_TASK_SET_ID).parseOptional();
}

public boolean streamingSkipDeleteSnapshots() {
return confParser
.booleanConf()
Expand Down Expand Up @@ -142,6 +152,18 @@ public int orcBatchSize() {
.parse();
}

public Long splitSizeOption() {
return confParser.longConf().option(SparkReadOptions.SPLIT_SIZE).parseOptional();
}

public Integer splitLookbackOption() {
return confParser.intConf().option(SparkReadOptions.LOOKBACK).parseOptional();
}

public Long splitOpenFileCostOption() {
return confParser.longConf().option(SparkReadOptions.FILE_OPEN_COST).parseOptional();
}

public long splitSize() {
return confParser
.longConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

class SparkBatchQueryScan extends SparkBatchScan {

Expand All @@ -52,12 +50,10 @@ class SparkBatchQueryScan extends SparkBatchScan {
SparkSession spark,
Table table,
SparkReadConf readConf,
boolean caseSensitive,
Schema expectedSchema,
List<Expression> filters,
CaseInsensitiveStringMap options) {
List<Expression> filters) {

super(spark, table, readConf, caseSensitive, expectedSchema, filters, options);
super(spark, table, readConf, expectedSchema, filters);

this.snapshotId = readConf.snapshotId();
this.asOfTimestamp = readConf.asOfTimestamp();
Expand All @@ -83,11 +79,9 @@ class SparkBatchQueryScan extends SparkBatchScan {
"Cannot only specify option end-snapshot-id to do incremental scan");
}

// look for split behavior overrides in options
this.splitSize = Spark3Util.propertyAsLong(options, SparkReadOptions.SPLIT_SIZE, null);
this.splitLookback = Spark3Util.propertyAsInt(options, SparkReadOptions.LOOKBACK, null);
this.splitOpenFileCost =
Spark3Util.propertyAsLong(options, SparkReadOptions.FILE_OPEN_COST, null);
this.splitSize = readConf.splitSizeOption();
this.splitLookback = readConf.splitLookbackOption();
this.splitOpenFileCost = readConf.splitOpenFileCostOption();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -71,7 +70,6 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
private final Schema expectedSchema;
private final List<Expression> filterExpressions;
private final boolean readTimestampWithoutZone;
private final CaseInsensitiveStringMap options;

// lazy variables
private StructType readSchema = null;
Expand All @@ -80,22 +78,19 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
SparkSession spark,
Table table,
SparkReadConf readConf,
boolean caseSensitive,
Schema expectedSchema,
List<Expression> filters,
CaseInsensitiveStringMap options) {
List<Expression> filters) {

SparkSchemaUtil.validateMetadataColumnReferences(table.schema(), expectedSchema);

this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
this.readConf = readConf;
this.caseSensitive = caseSensitive;
this.caseSensitive = readConf.caseSensitive();
this.expectedSchema = expectedSchema;
this.filterExpressions = filters != null ? filters : Collections.emptyList();
this.localityPreferred = readConf.localityEnabled();
this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone();
this.options = options;
}

protected Table table() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.FileScanTaskSetManager;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

class SparkFilesScan extends SparkBatchScan {
private final String taskSetID;
Expand All @@ -42,15 +40,10 @@ class SparkFilesScan extends SparkBatchScan {

private List<CombinedScanTask> tasks = null; // lazy cache of tasks

SparkFilesScan(
SparkSession spark,
Table table,
SparkReadConf readConf,
boolean caseSensitive,
CaseInsensitiveStringMap options) {
super(spark, table, readConf, caseSensitive, table.schema(), ImmutableList.of(), options);
SparkFilesScan(SparkSession spark, Table table, SparkReadConf readConf) {
super(spark, table, readConf, table.schema(), ImmutableList.of());

this.taskSetID = options.get(SparkReadOptions.FILE_SCAN_TASK_SET_ID);
this.taskSetID = readConf.fileScanTaskSetId();
this.splitSize = readConf.splitSize();
this.splitLookback = readConf.splitLookback();
this.splitOpenFileCost = readConf.splitOpenFileCost();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,15 @@ class SparkFilesScanBuilder implements ScanBuilder {
private final SparkSession spark;
private final Table table;
private final SparkReadConf readConf;
private final boolean caseSensitive;
private final CaseInsensitiveStringMap options;

SparkFilesScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) {
this.spark = spark;
this.table = table;
this.readConf = new SparkReadConf(spark, table, options);
this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
this.options = options;
}

@Override
public Scan build() {
return new SparkFilesScan(spark, table, readConf, caseSensitive, options);
return new SparkFilesScan(spark, table, readConf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.iceberg.read.SupportsFileFilter;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

class SparkMergeScan extends SparkBatchScan implements SupportsFileFilter {

Expand All @@ -62,13 +60,11 @@ class SparkMergeScan extends SparkBatchScan implements SupportsFileFilter {
SparkSession spark,
Table table,
SparkReadConf readConf,
boolean caseSensitive,
boolean ignoreResiduals,
Schema expectedSchema,
List<Expression> filters,
CaseInsensitiveStringMap options) {
List<Expression> filters) {

super(spark, table, readConf, caseSensitive, expectedSchema, filters, options);
super(spark, table, readConf, expectedSchema, filters);

this.table = table;
this.ignoreResiduals = ignoreResiduals;
Expand All @@ -77,8 +73,7 @@ class SparkMergeScan extends SparkBatchScan implements SupportsFileFilter {
this.splitLookback = readConf.splitLookback();
this.splitOpenFileCost = readConf.splitOpenFileCost();

Preconditions.checkArgument(
!options.containsKey(SparkReadOptions.SNAPSHOT_ID), "Can't set snapshot-id in options");
Preconditions.checkArgument(readConf.snapshotId() == null, "Can't set snapshot-id in options");
Snapshot currentSnapshot = table.currentSnapshot();
this.snapshotId = currentSnapshot != null ? currentSnapshot.snapshotId() : null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ public class SparkScanBuilder
private final SparkSession spark;
private final Table table;
private final SparkReadConf readConf;
private final CaseInsensitiveStringMap options;
private final List<String> metaColumns = Lists.newArrayList();

private Schema schema = null;
Expand All @@ -74,8 +73,7 @@ public class SparkScanBuilder
this.table = table;
this.schema = schema;
this.readConf = new SparkReadConf(spark, table, options);
this.options = options;
this.caseSensitive = Boolean.parseBoolean(spark.conf().get("spark.sql.caseSensitive"));
this.caseSensitive = readConf.caseSensitive();
}

SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) {
Expand Down Expand Up @@ -171,25 +169,12 @@ private Schema schemaWithMetadataColumns() {
@Override
public Scan build() {
return new SparkBatchQueryScan(
spark,
table,
readConf,
caseSensitive,
schemaWithMetadataColumns(),
filterExpressions,
options);
spark, table, readConf, schemaWithMetadataColumns(), filterExpressions);
}

public Scan buildMergeScan() {
return new SparkMergeScan(
spark,
table,
readConf,
caseSensitive,
ignoreResiduals,
schemaWithMetadataColumns(),
filterExpressions,
options);
spark, table, readConf, ignoreResiduals, schemaWithMetadataColumns(), filterExpressions);
}

@Override
Expand Down