Skip to content

Conversation

@jshmchenxi
Copy link
Contributor

@jshmchenxi jshmchenxi commented May 10, 2021

…e locality for spark-sql

When reading an Iceberg table by spark-sql, I couldn't find a way to disable locality as it is defined as a Spark read option. In our busy cluster, a table of about 8k files took about 15s to get block locations from NN. Both QueryExecution$.prepareForExecution() and QueryExecution.sparkPlan() will get block locations and that's 30 seconds... However, the spark job of my sql only runs about 10 seconds...

So I try to add locality as a table property that it can be disable for spark-sql.

Maybe there is a way to disable locality in spark-sql session? Please inform me, thanks!
I also found it not so elegant to set these runtime read-options as table properties. However, I'm using spark-sql and what could be a better way to apply to this problem?

I've dumped the wallclock time profiling. We can see the problem lies in SparkBatchScan.planInputPartitions()
wall-clock

@kbendick
Copy link
Contributor

kbendick commented May 13, 2021

I think that this can be configured via spark.locality.wait. I think if you set it to zero, it will just automatically give up looking for a data local node. At least that's what I've done when reading from S3 with yarn (which is by definition not local).

Number of milliseconds to wait to launch a data-local task before giving up and launching it on a less-local node.
The same wait will be used to step through multiple locality levels (process-local, node-local, rack-local and then any).
It is also possible to customize the waiting time for each level by setting spark.locality.wait.node, etc.
You should increase this setting if your tasks are long and see poor locality, but the default usually works well.

@kbendick
Copy link
Contributor

Also, if we introduced this, would we be removing the possibility of all of the different locality levels that are provided by Yarn?

https://spark.apache.org/docs/latest/tuning.html#data-locality

From the scheduling configs, you can see that the options provided by spark are much more complex:
https://spark.apache.org/docs/latest/configuration.html#scheduling

spark.locality.wait.node
spark.locality.wait.process
spark.locality.wait.rack

I think that the 30s upper limit you are seeing is potentially being derived from spark.scheduler.maxRegisteredResourcesWaitingTime, which is by default 30s. The docs for that state:

Maximum amount of time to wait for resources to register before scheduling begins.

So I'm guessing that is where the 30s thing came into play (before spark started scheduling tasks).

Can you try setting spark.locallity.wait.rack or .node and see if that helps? I've only tried this on S3, so I'm not 100% sure if this will help you.

@jshmchenxi
Copy link
Contributor Author

@kbendick Hi! Thank you for your kind suggestion!
I looked through the problem and found that the delay happens during spark-sql planning phase (eg. before the spark job was started).
The spark.locality.wait.* configurations should take effect during the spark job lifetime, so I'm afraid it cannot help with the problem.
I've added multi-threaded mechanism for ReadTask initialize to solve this. Would you have another look? Thanks!

@jshmchenxi
Copy link
Contributor Author

jshmchenxi commented May 17, 2021

@openinx @rdblue Hi, this could be a performance bottleneck when reading iceberg table with spark-sql. Would you have a look, please?

@kbendick
Copy link
Contributor

@kbendick Hi! Thank you for your kind suggestion!
I looked through the problem and found that the delay happens during spark-sql planning phase (eg. before the spark job was started).
The spark.locality.wait.* configurations should take effect during the spark job lifetime, so I'm afraid it cannot help with the problem.
I've added multi-threaded mechanism for ReadTask initialize to solve this. Would you have another look? Thanks!

Ah ok. That makes sense re: the current spark.locality.wait.* entries. I will try to take a look at this later today, but I might not be the best person to be reviewing this portion of the code. Will give it a look though.

Comment on lines +270 to +275
List<CombinedScanTask> scanTasks = tasks();
for (int i = 0; i < scanTasks.size(); i++) {
final int curIndex = i;
futures.add(pool.submit(() -> new ReadTask<>(scanTasks.get(curIndex), tableBroadcast,
expectedSchemaString, caseSensitive, true, supplier.get())));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might be able to use org.apache.iceberg.util.ParallelIterable here to help simplify some of this code that takes care of submitting tasks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I will try to use iceberg utils in another PR and this will focus on adding the ability to disable locality.

Comment on lines +263 to +268
final ExecutorService pool = Executors.newFixedThreadPool(
taskInitThreads,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("Init-ReadTask-%d")
.build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have a look at org.apache.iceberg.util.ThreadPools, which can help handle this for you. It would also allow for the pool size to be set as a system property (as well as providing space for a default).

I'm not too sure how I feel about the default coming as a table level property though. Seems to me like how parallel and whether or not you want to wait for locality would be specific to the cluster that the job is running on. But I don't have a strong opinion on that.

Copy link
Contributor Author

@jshmchenxi jshmchenxi May 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to me like how parallel and whether or not you want to wait for locality would be specific to the cluster that the job is running on.

I aggre with that. And this property is useful only with spark. Maybe it's better we define it as a user-defined spark configuration that can be set per session?

public static final boolean ORC_VECTORIZATION_ENABLED_DEFAULT = false;

public static final String LOCALITY_ENABLED = "read.locality.enabled";
public static final String LOCALITY_ENABLED_DEFAULT = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the current default? Is that inconsistent across uses and that's why this is null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that inconsistent across uses and that's why this is null?

Yes. The current default comes from LOCALITY_WHITELIST_FS.contains(scheme), which is true on hdfs and false otherwise.

public static final String VECTORIZATION_BATCH_SIZE = "batch-size";

// Overrides the table's read.locality.enabled
public static final String LOCALITY_ENABLED = "locality";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about locality-enabled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The locality was passed by spark read option "locality". Maybe someone is already using it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, this was an existing setting? If so we don't need to rename it.

String tableLocalityProp = PropertyUtil.propertyAsString(tableProperties, TableProperties.LOCALITY_ENABLED,
TableProperties.LOCALITY_ENABLED_DEFAULT);
return tableLocalityProp == null ? LOCALITY_WHITELIST_FS.contains(fsScheme) :
Boolean.parseBoolean(tableLocalityProp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that locality should be enabled by default, but only relevant if the file system is white-listed. That would make the logic here localityEnabled && LOCALITY_WHITELIST_FS.contains(fsScheme). That's a bit simpler than using the table property to override and we don't want to check locality for file systems that don't support it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I am using spark-sql and didn't find a way to set locality as a spark read option. I will try to add it as a user-defined spark configuration so that it can be set per session.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add the ability to set read options in hints via Spark extensions. Being able to use hints would be great because we could use it for this and a few other things, like time travel in SQL. (@RussellSpitzer, what do you think?)

Copy link
Contributor

@kbendick kbendick May 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a big fan of hints (not that I was asked). But they're a great SQL first solution to a number of problems.

Assuming you mean like

select /* read.locality.enabled=false */ a, b, c from table iceberg_table t

We have encountered a few situations internally where updating spark.sql.partitions would make a job unreasonably slow (due to possibly a large filter prior to the write), so a COALESCE hint would be really helpful in helping with file sizes (which I assume is naturally supported from spark 3.x hints, unless Iceberg operations get in the way, e.g maybe anything to do with merging or deletes). But I have admittedly not tried it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think hints could solve a number of problems that people want to create table level properties for, for which table level properties are likely not the most appropriate as they're dependent on seasonal changes in dataset size and also cluster resources when the job is started.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm not a fan either, but the read and write options probably won't be possible through SQL otherwise. Maybe that's not what we want to do for things that would ideally have SQL clauses (like AS OF TIMESTAMP or AS OF VERSION) but hints like locality=true seem like a reasonable path to me. We may even be able to get that in upstream Spark.

Copy link
Contributor

@kbendick kbendick May 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lastly, I think hints are much easier to explain to end users and have them remember, as there are A LOT of spark configurations (by necessity - not even including iceberg spark properties which are much fewer) and even people who are very well acquainted with spark can get them mixed up.... which leads to support requests etc.

If the hint gets placed in the query (SQL file, notebook), developers are much more likely to remember what needs to be updated to change it to suit their current needs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought for time travel and such I'm a bigger fan of just adding more special table naming, just so we have less reliance on catalyst. That said I have no problem with doing some more hints for things like locality


List<Future<ReadTask<T>>> futures = new ArrayList<>();

final ExecutorService pool = Executors.newFixedThreadPool(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove these changes and consider them in a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I will open another PR for this feature.

public static final String LOCALITY_ENABLED_DEFAULT = null;

public static final String LOCALITY_TASK_INITIALIZE_THREADS = "read.locality.task.initialize.threads";
public static final int LOCALITY_TASK_INITIALIZE_THREADS_DEFAULT = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this PR is adding the ability to disable locality, I think it should remain focused on that task and should not include a separate feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it

@rdblue
Copy link
Contributor

rdblue commented Apr 10, 2022

Since this hasn't been updated recently, I'm going to go ahead and close it. Feel free to reopen if you have the time to update it!

@rdblue rdblue closed this Apr 10, 2022
@jshmchenxi
Copy link
Contributor Author

Sure, thanks Ryan!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants