Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BLOCKING][jvm-packages] fix non-deterministic order within a partition (in the case of an upstream shuffle) on prediction #4388

Merged
merged 6 commits into from
Apr 26, 2019

Conversation

sperlingxx
Copy link
Contributor

@sperlingxx sperlingxx commented Apr 20, 2019

fix issue #4387 by replacing zip RDDs with caching original data in closure.

Corresponding unit tests have been added.

closes #4387

closes #4307

@CodingCat
Copy link
Member

thanks, yeah, this is a bug which is not easy to fix

this PR actually falls back to the previous problem about memory footprint, you can check f368d0d#diff-a435450e9c28607f848ccf3246944a44

let me think about what is the right way to fix, if we go to sort ,we need to do significant perf benchmarking before merging

@CodingCat CodingCat changed the title [jvm-packages] fix #4387 [jvm-packages] fix non-deterministic order within a partition (in the case of an upstream shuffle) on prediction Apr 20, 2019
@sperlingxx
Copy link
Contributor Author

@CodingCat
Thank you for replying me that quickly.
Actually, this solution is quite like v0.81's implementation.
I have several ideas to improve this(maybe not good ideas = =)

  1. do prediction in MiniBatch style
    Maybe we can add a param like batch size?
  2. add params idColumn and appendColumns
    We filter other columns to reduce memory overhead of caching original data.

@CodingCat CodingCat changed the title [jvm-packages] fix non-deterministic order within a partition (in the case of an upstream shuffle) on prediction [BLOCKING][jvm-packages] fix non-deterministic order within a partition (in the case of an upstream shuffle) on prediction Apr 22, 2019
@hcho3 hcho3 mentioned this pull request Apr 22, 2019
18 tasks
@CodingCat
Copy link
Member

@sperlingxx can you elaborate more on the second approach?

@CodingCat
Copy link
Member

at the same time I am benchmarking what if we sortWithinPartition beforehand

@CodingCat
Copy link
Member

CodingCat commented Apr 22, 2019

I tested with a prototype to sort each partition before hand,

without sorting, we need 6+mins to finish the prediction over 120G input

with sort, it grows to 12+mins to finish the same task

@CodingCat
Copy link
Member

I am experimenting with several potential solutions and finding more problems in our implementation, will update soon

@sperlingxx
Copy link
Contributor Author

sperlingxx commented Apr 23, 2019

@CodingCat
The second approach, we keep as fewer columns as we can, before batch data fetching(transformInternal). So, we can cache fewer data of original DataFrame(inputRDD).

something like:

dataset.toDF().select($(appendColumns) : _*)

And I think, maybe split prediction task into miniBatch is everything we need?

@CodingCat
Copy link
Member

CodingCat commented Apr 23, 2019

ok, so I essentially tried three approaches to resolve the issue and finding more problems in XGBoost.

  1. approaches

I tried sorting the DataFrame before feeding to transformInternal(), duplicate dataset like the implementation here and miniBatch

  1. benchmark of different approaches

I trained a model based on an internal dataset having 1.5b rows and around 20 features and load the model to predict the training dataset in a separate Spark application.

To scale the test, I manually duplicate the dataset in Spark application and the benchmark results only counts the time spent on prediction stage

used code: https://github.com/CodingCat/xgboost4j-spark-scalability/blob/master/src/main/scala/me/codingcat/xgboost4j/PureXGBoostPredictor.scala

  1. benchmark results

image

resources I used --num-executors 100 --executor-memory 14g --executor-cores 8

  1. problems here,

booster's prediction method is and has to be an synchronized method

private synchronized float[][] predict(DMatrix data,

I think in C++ layer, there is some sharing among different boosters in the same process (I didn't get enough time to debug it). If we make it as non-synchronized, we will meet a lot of double-freed error in native layer on prediction code path (I have tried to save booster and load back to create a new booster, even just use the broadcasted booster with non-synchronized version the method)

because of this synchronized, we are creating more context switches when using miniBatch approach, that's the reason we see the results like above (miniBatch is a bit slower if the scalability is not the issue)

@CodingCat
Copy link
Member

the actions here

  1. I will submit a PR to @sperlingxx 's branch soon

  2. fix the sharing issue in booster after this is merged (not blocking tho)

@hcho3
Copy link
Collaborator

hcho3 commented Apr 23, 2019

@CodingCat BTW, you have permission to directly modify all PRs as a maintainer

@CodingCat
Copy link
Member

@hcho3 how to modify with a bunch of changes?

@hcho3
Copy link
Collaborator

hcho3 commented Apr 23, 2019

@CodingCat You can get a local clone of this PR by running

git clone --recursive https://github.com/sperlingxx/xgboost -b hot_fix_spark_estimator

Then create a commit with a bunch of changes. You should have permission to run git push origin hot_fix_spark_estimator

@CodingCat
Copy link
Member

I see.......thx

@sperlingxx
Copy link
Contributor Author

@CodingCat
I'm not sure whether the implementation here will work in a minibatch way with the whole pipeline.
So, I rewrote the classification part in a pure lazy(iterator) style. I hope it will be helpful :)

@CodingCat
Copy link
Member

@sperlingxx can you explain how your implementation is a pure lazy(iterator) style and why my suggested implementation is not?

private var batchCnt = 0

private val batchIterImpl = rowIterator.grouped(
XGBoostClassificationModel.PREDICTION_BATCH_SIZE).flatMap { batchRow =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

batchRow has been a Seq[Row] instead of Iterator[] here, so it's not lazy evaluated and stays in memory until this batch is finished

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we should think about the memory footprint in this place as the "grouped iterator" has been put in memory for twice

Rabit.init(rabitEnv.asJava)
}

val features = batchRow.map(row => row.getAs[Vector]($(featuresCol)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding the memory footprint, you have put two Seq, one for Seq[Row], one for Seq[Vector] in memory,

you can compare with my implementation, it only keeps a Seq[Row] due to iterator.duplicate()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's an unnecessary footprint. Maybe it can be replaced by

val features = batchRow.iterator.map(row => row.getAs[Vector]($(featuresCol)))

@CodingCat
Copy link
Member

this is a significant change regarding performance and we need to be very careful about the correctness as well

can you also use internal dataset for evaluation?

@CodingCat
Copy link
Member

CodingCat commented Apr 24, 2019

ok, here is my latest benchmark results, looks like with the current implementation is slower than the diff I pushed yesterday

image

My theory is still on the synchronized method in Booster

@sperlingxx
Copy link
Contributor Author

sperlingxx commented Apr 25, 2019

ok, here is my latest benchmark results, looks like with the current implementation is slower than the diff I pushed yesterday

image

My theory is still on the synchronized method in Booster

Thanks for benchmarking!

I'm a little confused about the context switches cost caused by synchronized prediction . Is it because there are multiple spark tasks running on each executor concurrently, and they share the same Booster Instance? What's more, they share the same booster handle, so the method has to be decorated with synchronized?

@CodingCat
Copy link
Member

Yes, because we are using a broadcast booster which is singleton per executor, and regarding why we use broadcasted booster, you can check my previous comments

@CodingCat
Copy link
Member

I left more comments there, @sperlingxx would you please move forward with the PR

my suggestion is, use your way for the next version, and look at how to resolve shared properties among boosters after that

val trainingDM = new DMatrix(Classification.train.iterator)
val testDM = new DMatrix(Classification.test.iterator)
val trainingDF = buildDataFrame(Classification.train)
val testDF = buildDataFrame(Classification.test)
val randSortedTestDF = buildDataFrameWithRandSort(Classification.test)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's separate them into two test to highlight the randsorting version and normal version of the test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay!

@@ -25,11 +25,12 @@ import org.scalatest.FunSuite

class XGBoostRegressorSuite extends FunSuite with PerTest {

test("XGBoost-Spark XGBoostRegressor ouput should match XGBoost4j: regression") {
test("XGBoost-Spark XGBoostRegressor output should match XGBoost4j: regression") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make the test name consistent with classifier part

@@ -47,7 +47,7 @@
* the prediction of these DMatrices will become faster than not-cached data.
* @throws XGBoostError native error
*/
Booster(Map<String, Object> params, DMatrix[] cacheMats) throws XGBoostError {
public Booster(Map<String, Object> params, DMatrix[] cacheMats) throws XGBoostError {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might no be necessary, maybe my bad

<<<<<<< HEAD
=======

>>>>>>> regressor impl
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad

@CodingCat
Copy link
Member

LGTM, thanks, will merge after CI is happy

@CodingCat CodingCat merged commit 2d875ec into dmlc:master Apr 26, 2019
@sperlingxx sperlingxx deleted the hot_fix_spark_estimator branch April 27, 2019 14:53
raydouglass pushed a commit to rapidsai/xgboost that referenced this pull request Jun 26, 2019
dmlc#4388

Hot Fix info

Author:     Xu Xiao <[email protected]>
AuthorDate: Sat Apr 27 02:09:20 2019 +0800
Commit:     Nan Zhu <[email protected]>
CommitDate: Fri Apr 26 11:09:20 2019 -0700

    [BLOCKING][jvm-packages] fix non-deterministic order within a partition  (in the case of an upstream shuffle) on prediction  (dmlc#4388)

    * [jvm-packages][hot-fix] fix column mismatch caused by zip actions at XGBooostModel.transformInternal

    * apply minibatch in prediction

    * an iterator-compatible minibatch prediction

    * regressor impl

    * continuous working on mini-batch prediction of xgboost4j-spark

    * Update Booster.java
@lock lock bot locked as resolved and limited conversation to collaborators Jul 26, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[jvm-packages] bug of XGBoostModel.transformInternal [jvm-packages] xgboost4j-spark Prediction Optimization
3 participants