Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ private TableProperties() {
public static final String ORC_VECTORIZATION_ENABLED = "read.orc.vectorization.enabled";
public static final boolean ORC_VECTORIZATION_ENABLED_DEFAULT = false;

public static final String LOCALITY_ENABLED = "read.locality.enabled";
public static final String LOCALITY_ENABLED_DEFAULT = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the current default? Is that inconsistent across uses and that's why this is null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that inconsistent across uses and that's why this is null?

Yes. The current default comes from LOCALITY_WHITELIST_FS.contains(scheme), which is true on hdfs and false otherwise.


public static final String LOCALITY_TASK_INITIALIZE_THREADS = "read.locality.task.initialize.threads";
public static final int LOCALITY_TASK_INITIALIZE_THREADS_DEFAULT = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this PR is adding the ability to disable locality, I think it should remain focused on that task and should not include a separate feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it


public static final String OBJECT_STORE_ENABLED = "write.object-storage.enabled";
public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ private SparkReadOptions() {
// Overrides the table's read.parquet.vectorization.batch-size
public static final String VECTORIZATION_BATCH_SIZE = "batch-size";

// Overrides the table's read.locality.enabled
public static final String LOCALITY_ENABLED = "locality";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about locality-enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The locality was passed by spark read option "locality". Maybe someone is already using it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, this was an existing setting? If so we don't need to rename it.


// Set ID that is used to fetch file scan tasks
public static final String FILE_SCAN_TASK_SET_ID = "file-scan-task-set-id";
}
14 changes: 14 additions & 0 deletions spark/src/main/java/org/apache/iceberg/spark/SparkUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,28 @@
package org.apache.iceberg.spark;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.transforms.Transform;
import org.apache.iceberg.transforms.UnknownTransform;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.spark.util.SerializableConfiguration;

public class SparkUtil {
private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");

private SparkUtil() {
}

Expand Down Expand Up @@ -100,4 +107,11 @@ public static <C, T> Pair<C, T> catalogAndIdentifier(List<String> nameParts,
}
}
}

public static boolean isLocalityEnabledDefault(Map<String, String> tableProperties, String fsScheme) {
String tableLocalityProp = PropertyUtil.propertyAsString(tableProperties, TableProperties.LOCALITY_ENABLED,
TableProperties.LOCALITY_ENABLED_DEFAULT);
return tableLocalityProp == null ? LOCALITY_WHITELIST_FS.contains(fsScheme) :
Boolean.parseBoolean(tableLocalityProp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that locality should be enabled by default, but only relevant if the file system is white-listed. That would make the logic here localityEnabled && LOCALITY_WHITELIST_FS.contains(fsScheme). That's a bit simpler than using the table property to override and we don't want to check locality for file systems that don't support it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I am using spark-sql and didn't find a way to set locality as a spark read option. I will try to add it as a user-defined spark configuration so that it can be set per session.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add the ability to set read options in hints via Spark extensions. Being able to use hints would be great because we could use it for this and a few other things, like time travel in SQL. (@RussellSpitzer, what do you think?)

Copy link
Contributor

@kbendick kbendick May 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a big fan of hints (not that I was asked). But they're a great SQL first solution to a number of problems.

Assuming you mean like

select /* read.locality.enabled=false */ a, b, c from table iceberg_table t

We have encountered a few situations internally where updating spark.sql.partitions would make a job unreasonably slow (due to possibly a large filter prior to the write), so a COALESCE hint would be really helpful in helping with file sizes (which I assume is naturally supported from spark 3.x hints, unless Iceberg operations get in the way, e.g maybe anything to do with merging or deletes). But I have admittedly not tried it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think hints could solve a number of problems that people want to create table level properties for, for which table level properties are likely not the most appropriate as they're dependent on seasonal changes in dataset size and also cluster resources when the job is started.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm not a fan either, but the read and write options probably won't be possible through SQL otherwise. Maybe that's not what we want to do for things that would ideally have SQL clauses (like AS OF TIMESTAMP or AS OF VERSION) but hints like locality=true seem like a reasonable path to me. We may even be able to get that in upstream Spark.

Copy link
Contributor

@kbendick kbendick May 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lastly, I think hints are much easier to explain to end users and have them remember, as there are A LOT of spark configurations (by necessity - not even including iceberg spark properties which are much fewer) and even people who are very well acquainted with spark can get them mixed up.... which leads to support requests etc.

If the hint gets placed in the query (SQL file, notebook), developers are much more likely to remember what needs to be updated to change it to suit their current needs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought for time travel and such I'm a bigger fan of just adding more special table naming, just so we have less reliance on catalyst. That said I have no problem with doing some more hints for things like locality

}
}
77 changes: 63 additions & 14 deletions spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -45,11 +51,12 @@
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.api.java.JavaSparkContext;
Expand All @@ -72,12 +79,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.TableProperties.LOCALITY_TASK_INITIALIZE_THREADS;
import static org.apache.iceberg.TableProperties.LOCALITY_TASK_INITIALIZE_THREADS_DEFAULT;

class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPushDownFilters,
SupportsPushDownRequiredColumns, SupportsReportStatistics {
private static final Logger LOG = LoggerFactory.getLogger(Reader.class);

private static final Filter[] NO_FILTERS = new Filter[0];
private static final ImmutableSet<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");

private final JavaSparkContext sparkContext;
private final Table table;
Expand Down Expand Up @@ -146,8 +155,8 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
LOG.warn("Failed to get Hadoop Filesystem", ioe);
}
String scheme = fsscheme; // Makes an effectively final version of scheme
this.localityPreferred = options.get("locality").map(Boolean::parseBoolean)
.orElseGet(() -> LOCALITY_WHITELIST_FS.contains(scheme));
this.localityPreferred = options.get(SparkReadOptions.LOCALITY_ENABLED).map(Boolean::parseBoolean)
.orElseGet(() -> SparkUtil.isLocalityEnabledDefault(table.properties(), scheme));
} else {
this.localityPreferred = false;
}
Expand Down Expand Up @@ -206,11 +215,8 @@ public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));

List<InputPartition<ColumnarBatch>> readTasks = Lists.newArrayList();
for (CombinedScanTask task : tasks()) {
readTasks.add(new ReadTask<>(
task, tableBroadcast, expectedSchemaString, caseSensitive,
localityPreferred, new BatchReaderFactory(batchSize)));
}

initializeReadTasks(readTasks, tableBroadcast, expectedSchemaString, () -> new BatchReaderFactory(batchSize));
LOG.info("Batching input partitions with {} tasks.", readTasks.size());

return readTasks;
Expand All @@ -227,15 +233,58 @@ public List<InputPartition<InternalRow>> planInputPartitions() {
Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));

List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
for (CombinedScanTask task : tasks()) {
readTasks.add(new ReadTask<>(
task, tableBroadcast, expectedSchemaString, caseSensitive,
localityPreferred, InternalRowReaderFactory.INSTANCE));
}

initializeReadTasks(readTasks, tableBroadcast, expectedSchemaString, () -> InternalRowReaderFactory.INSTANCE);

return readTasks;
}

/**
* Initialize ReadTasks with multi threads as get block locations can be slow
*
* @param readTasks Result list to return
*/
private <T> void initializeReadTasks(List<InputPartition<T>> readTasks,
Broadcast<Table> tableBroadcast, String expectedSchemaString, Supplier<ReaderFactory<T>> supplier) {
int taskInitThreads = Math.max(1, PropertyUtil.propertyAsInt(table.properties(), LOCALITY_TASK_INITIALIZE_THREADS,
LOCALITY_TASK_INITIALIZE_THREADS_DEFAULT));

if (!localityPreferred || taskInitThreads == 1) {
for (CombinedScanTask task : tasks()) {
readTasks.add(new ReadTask<>(
task, tableBroadcast, expectedSchemaString, caseSensitive,
localityPreferred, supplier.get()));
}
return;
}

List<Future<ReadTask<T>>> futures = new ArrayList<>();

final ExecutorService pool = Executors.newFixedThreadPool(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove these changes and consider them in a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I will open another PR for this feature.

taskInitThreads,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Init-ReadTask-%d")
.build());
Comment on lines +263 to +268
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a look at org.apache.iceberg.util.ThreadPools, which can help handle this for you. It would also allow for the pool size to be set as a system property (as well as providing space for a default).

I'm not too sure how I feel about the default coming as a table level property though. Seems to me like how parallel and whether or not you want to wait for locality would be specific to the cluster that the job is running on. But I don't have a strong opinion on that.

Copy link
Contributor Author

@jshmchenxi jshmchenxi May 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to me like how parallel and whether or not you want to wait for locality would be specific to the cluster that the job is running on.

I aggre with that. And this property is useful only with spark. Maybe it's better we define it as a user-defined spark configuration that can be set per session?


List<CombinedScanTask> scanTasks = tasks();
for (int i = 0; i < scanTasks.size(); i++) {
final int curIndex = i;
futures.add(pool.submit(() -> new ReadTask<>(scanTasks.get(curIndex), tableBroadcast,
expectedSchemaString, caseSensitive, true, supplier.get())));
}
Comment on lines +270 to +275
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be able to use org.apache.iceberg.util.ParallelIterable here to help simplify some of this code that takes care of submitting tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I will try to use iceberg utils in another PR and this will focus on adding the ability to disable locality.


try {
for (int i = 0; i < futures.size(); i++) {
readTasks.set(i, futures.get(i).get());
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Exception caught in multi-thread initializing ReadTask", e);
} finally {
pool.shutdownNow();
}
}

@Override
public Filter[] pushFilters(Filter[] filters) {
this.tasks = null; // invalidate cached tasks, if present
Expand Down
7 changes: 4 additions & 3 deletions spark3/src/main/java/org/apache/iceberg/spark/Spark3Util.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@

public class Spark3Util {

private static final Set<String> LOCALITY_WHITELIST_FS = ImmutableSet.of("hdfs");
private static final Set<String> RESERVED_PROPERTIES = ImmutableSet.of(
TableCatalog.PROP_LOCATION, TableCatalog.PROP_PROVIDER);
private static final Joiner DOT = Joiner.on(".");
Expand Down Expand Up @@ -483,11 +482,13 @@ public static String describe(org.apache.iceberg.SortOrder order) {
return Joiner.on(", ").join(SortOrderVisitor.visit(order, DescribeSortOrderVisitor.INSTANCE));
}

public static boolean isLocalityEnabled(FileIO io, String location, CaseInsensitiveStringMap readOptions) {
public static boolean isLocalityEnabled(FileIO io, String location, Map<String, String> tableProperties,
CaseInsensitiveStringMap readOptions) {
InputFile in = io.newInputFile(location);
if (in instanceof HadoopInputFile) {
String scheme = ((HadoopInputFile) in).getFileSystem().getScheme();
return readOptions.getBoolean("locality", LOCALITY_WHITELIST_FS.contains(scheme));
return readOptions.getBoolean(
SparkReadOptions.LOCALITY_ENABLED, SparkUtil.isLocalityEnabledDefault(tableProperties, scheme));
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,15 @@
package org.apache.iceberg.spark.source;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.FileFormat;
Expand All @@ -36,6 +41,7 @@
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.Util;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.util.PropertyUtil;
Expand All @@ -58,6 +64,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.iceberg.TableProperties.LOCALITY_TASK_INITIALIZE_THREADS;
import static org.apache.iceberg.TableProperties.LOCALITY_TASK_INITIALIZE_THREADS_DEFAULT;

abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
private static final Logger LOG = LoggerFactory.getLogger(SparkBatchScan.class);

Expand All @@ -80,7 +89,7 @@ abstract class SparkBatchScan implements Scan, Batch, SupportsReportStatistics {
this.caseSensitive = caseSensitive;
this.expectedSchema = expectedSchema;
this.filterExpressions = filters != null ? filters : Collections.emptyList();
this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), options);
this.localityPreferred = Spark3Util.isLocalityEnabled(table.io(), table.location(), table.properties(), options);
this.batchSize = Spark3Util.batchSize(table.properties(), options);
this.options = options;
}
Expand Down Expand Up @@ -125,10 +134,40 @@ public InputPartition[] planInputPartitions() {

List<CombinedScanTask> scanTasks = tasks();
InputPartition[] readTasks = new InputPartition[scanTasks.size()];
for (int i = 0; i < scanTasks.size(); i++) {
readTasks[i] = new ReadTask(
scanTasks.get(i), tableBroadcast, expectedSchemaString,
caseSensitive, localityPreferred);

int taskInitThreads = Math.max(1, PropertyUtil.propertyAsInt(table.properties(), LOCALITY_TASK_INITIALIZE_THREADS,
LOCALITY_TASK_INITIALIZE_THREADS_DEFAULT));
if (localityPreferred && taskInitThreads > 1) {
List<Future<ReadTask>> futures = new ArrayList<>();

final ExecutorService pool = Executors.newFixedThreadPool(
taskInitThreads,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Init-ReadTask-%d")
.build());

for (int i = 0; i < scanTasks.size(); i++) {
final int curIndex = i;
futures.add(pool.submit(() -> new ReadTask(scanTasks.get(curIndex), tableBroadcast, expectedSchemaString,
caseSensitive, true)));
}

try {
for (int i = 0; i < futures.size(); i++) {
readTasks[i] = futures.get(i).get();
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Exception in multi-thread initializing ReadTask", e);
} finally {
pool.shutdownNow();
}
} else {
for (int i = 0; i < scanTasks.size(); i++) {
readTasks[i] = new ReadTask(
scanTasks.get(i), tableBroadcast, expectedSchemaString,
caseSensitive, localityPreferred);
}
}

return readTasks;
Expand Down