Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jvm-packages] xgboost4j-spark Prediction Optimization #4307

Closed
mingyang opened this issue Mar 28, 2019 · 5 comments · Fixed by #4388
Closed

[jvm-packages] xgboost4j-spark Prediction Optimization #4307

mingyang opened this issue Mar 28, 2019 · 5 comments · Fixed by #4388

Comments

@mingyang
Copy link

mingyang commented Mar 28, 2019

Motivation

Recently, I've been testing whether xgboost4j-spark could be a viable solution in production. One problem surfaced when multiple models were applied to the same dataset in sequence.

In an industrial setting, it's very likely that a practitioner builds multiple models using the same dataset, with same features but different labels for different tasks. For example, one might want to predict the demographic status of her users (gender, age, marital status, etc.) with the same set of base features.

Ideal Situation

Once multiple models have been trained on the labeled dataset, one wants to make predictions on a (potentially) much bigger unlabeled dataset. In pseudo code, it looks like

for model in trained_models; do
  data = model.transform(data).withColumnRenamed('prediction', 'prediction_from_model_i')
data.writeoutput

Ideally, this only loops through the dataset ONCE while applying all models.

Current Observations

In a toy setup

I've tried to apply 4 models, using xgboost4j-spark and spark logistic regression (LR), to a small dataset and monitored the IO. What's far from ideal is when applying multiple xgboost4j-spark models, the number of input records grows exponentially with the number of models applied:

model #models #input prediction_time
xgboost4j-spark 0 11952 (size of the data)
xgboost4j-spark 1 23904 6
xgboost4j-spark 2 47808 9
xgboost4j-spark 3 95616 22
xgboost4j-spark 4 192002 23
spark-LR 0 10912 (size of the data)
spark-LR 1 10912 12
spark-LR 2 10912 15
spark-LR 3 10912 15
spark-LR 4 10912 16

In other words, if there are N models, then spark will read the dataset 2^N times. On the contrary, native spark LR simply reads the dataset once no matter how many models applied.

Even when just applying one model, the current implementation requires 2X of the dataset.

In a more realistic setup

This is where I first noticed the problem:

  • Predicting using one model reads in ~1TB data (again 2X of the dataset size), while applying 8 models together read in 127.4 TB (~2^8 X of the dataset size).
  • Prediction time went up from 6-7mins using one model to 7-9 hours using all eight, as compared to 7 X 8 ~ 1 hour if we do predictions separately in sequence.

Suspected Reasons

In Implementation, xgboost4j-spark uses the RDD interface with method zipPartition, while spark models uses spark sql udfs to do prediction.

This could make a huge difference in the execution plans.

  • xgboost4j-spark
    image
  • spark
    image

Workaround

  • One workaround for now is to cache the whole dataset that one wants to get predictions, which may not be feasible for some industrial settings where billions of records need to be scored.
  • A second workaround is just make predictions using single models separately, but this is also not very optimal depending on downstream stages.

Suggested Solutions

  1. Move to the Dataset framework in Spark, similar to how native Spark models make predictions (spark.sql.functions.udf), OR
  2. In RDD framework, append predictions to the spark.sql.Rows to avoid zipping
@CodingCat
Copy link
Member

@mingyang thanks for reporting the issue and the analysis

the reason we didn't use any per-row approach (e.g. udf, appending to Row) is that per-instance prediction with XGBoost is very slow due to the overhead in creating DMatrix, etc.

I don't have a workaround better than the second one you already mentioned for now

I am thinking about provide some built-in tools which overlaps with Spark ML in functionality (e.g. cross-validation, ) but provide better performance with some special cares to the characteristics of XGBoost. What you mentioned here can be a very good use case for the tools

@mingyang
Copy link
Author

the reason we didn't use any per-row approach (e.g. udf, appending to Row) is that per-instance prediction with XGBoost is very slow due to the overhead in creating DMatrix, etc.

OK, this makes sense to me now. @CodingCat

How about this workaround, then? In pseudo-scala code:

val outputRDD = dataset.rdd.mapPartitions{ rowIterator => {
  // keep a copy of the input rows in memory. potentially a drawback
  val inputRows = rowIterator.toList 
  // create the DMatrix for the whole input partition to avoid overhead
  val features = rowIterator.map(row=>row.getAs[Vector]($(featuresCol))).toList
  val dm = new DMatrix(features, ...) 
  // get the predictions
  val Array(rawPredictionItr, probabilityItr, predLeafItr, predContribItr) =
            producePredictionItrs(bBooster, dm)
  // zip within the mapPartition instead of zipping two RDDs together
  Iterator(zip(inputRows, rawPredictionItr, probabilityItr, predLeafItr, predContribuItr))
}

Please forgive my syntax errors since I don't normally write scala code. But I hope my idea is clear.

If the memory footprint of keeping the partition in the memory is too high, which I don't think is a real issue since the spark default partition size when reading from hdfs is 128MB only, then we can do multiple mini-batches (e.g. every 100 or 1000 rows) within each partition. This way, we don't need to have another copy of the whole partition in memory, but only one mini-batch.

@CodingCat
Copy link
Member

we do experience some issues with loading everything to memory (you can check #4033),

and actually in your code, the iterator rowIterator is not traversable after you execute val inputRows = rowIterator.toList as toList has move iterator to the end..... as a result, doing miniBatch in an iterator becomes very complex, I am not sure I would choose to pay such cost here or I would have other ways to do it

@mingyang
Copy link
Author

mingyang commented Apr 1, 2019

Pardon my scala snippet. It was only to illustrate the main idea.

If we weren't using zipPartition before #4033, then it would avoid the problem that I describe above. On the down side, if I understand correctly, this would require more memory per executor.

So, is it possible to leave this tradeoff to the users by providing either 1) two prediction functions (e.g. model.transform and model.transform_inplace) or, 2) a parameter called something similar to prediction_inplace?

@CodingCat
Copy link
Member

we intentionally avoid using too much memory in xgb itself to improve the scalability, and since you have to use more memory with in something like rowIterator.toList, it is nearly the same with caching/persisting/checkpointing the input dataframe before feeding to xgboost,

I think persisting before training will not bring additional overhead here,

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
2 participants