Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DISCUSSION] Integration with PySpark #1698

Closed
CodingCat opened this issue Oct 25, 2016 · 53 comments
Closed

[DISCUSSION] Integration with PySpark #1698

CodingCat opened this issue Oct 25, 2016 · 53 comments

Comments

@CodingCat
Copy link
Member

I just noticed that there are some requests for integration with PySpark http://dmlc.ml/2016/03/14/xgboost4j-portable-distributed-xgboost-in-spark-flink-and-dataflow.html

I also received some emails from the users discussing the same topic

I would like to initialize a discussion here on whether/when we shall start this work

@tqchen @terrytangyuan

@CodingCat CodingCat changed the title [DISCUSS] Integration with PySpark [DISCUSSION] Integration with PySpark Oct 25, 2016
@terrytangyuan
Copy link
Member

@CodingCat Do you know how big is PySpark community? Most people just use Scala API. It seems like a lot of things need to be re-implemented in Python - please correct me if I am wrong.

@CodingCat
Copy link
Member Author

I think PySpark is pretty prevalent in the community of Data Scientists, that means in the scenario of quick prototyping, etc. I heard about many cases that data scientists use pySpark to analyze the large volumes of data.

On the other side, most of production-level scenarios are based on Scala API (I only know a single case that people are using PySpark in large scale production)

@terrytangyuan
Copy link
Member

Yeah I just feel like the current python API should be able to handle most prototyping needs. I personally care more about Spark when we want more production-ready stuff. Perhaps we should leave the discussion here so people can discuss their needs. In the meantime, it would be great if you can provide some details on approaches/estimates/steps for the integration.

@CodingCat
Copy link
Member Author

just noticed some discussions in the community

http://apache-spark-developers-list.1001551.n3.nabble.com/Blocked-PySpark-changes-td19712.html

it seems that the development of PySpark is lagging behind...as the downstream library, I vote to hold on to dedicate to PySpark integration.....

@terrytangyuan
Copy link
Member

Yeah it's also hard to debug into any issues you encountered (at least when I was trying it last year)...

@ckljohn
Copy link

ckljohn commented Feb 6, 2017

In the roadmap (#873), it said Distributed python has been implemented. Does it mean that xgboost can run on a hadoop cluster with python? (I'm not meaning pyspark)

@tqchen
Copy link
Member

tqchen commented Feb 6, 2017

yes, see the example posted in the link

@yiming-chen
Copy link

what is the difference of running xgboost on hadoop cluster with python vs. running xgboost on hadoop cluster with scala api? Are there major performance differences?
I think there are still a lot of people using pyspark for production model as well.

@CodingCat
Copy link
Member Author

@yiming-chen the goal of xgboost4j-spark is to unify ETL and model training in the same pipeline

the question comes down to what language users use when doing ETL? based on my observation and experience, 95% users are building their ETL system with scala

@berch
Copy link

berch commented Mar 28, 2017

@CodingCat I don't know where you got your 95% stat from, but PySpark is definitely widely used in my experience. For example, we are trying to integrate Airflow to schedule the job for our pipeline and Python would be suitable in that situation.

@CodingCat
Copy link
Member Author

@berch PySpark is widely used by you and you are going to integrate with airflow...is it relevant with what I said?

@zherebetskyy
Copy link

zherebetskyy commented May 3, 2017

@CodingCat @tqchen Data Science community will definitely benefit from XGboost been implemented in PySpark, because:

@CodingCat
Copy link
Member Author

feel free to send a PR , you will find the cost

@CodingCat
Copy link
Member Author

to avoid coming back to thread time and time again, I will close the discussion with the conclusion that

  • I personally will not vote to continue this effort to integrate PySpark (for now)

  • anyone else is more than welcomed to contribute on this, however, we need to at least consider the following things:

    • do not introduce another python package

    • backward compatibility to the current python API when implement integration

    • handle the lagging behind features of pyspark's ML

@haiy
Copy link

haiy commented Jan 11, 2018

So we can't use pyspark to load XGBoost-spark model? @CodingCat

@wpopielarski
Copy link

so, actually scala XGBoost can be more less painfully wrapped in PySpark JavaEstimator API. I played a little and have such prototype:

from pyspark.ml.wrapper import JavaEstimator, JavaModel
from pyspark.ml.param.shared import *
from pyspark.ml.util import *
from pyspark.context import SparkContext

class XGBoost(JavaEstimator, JavaMLWritable, JavaMLReadable, HasRegParam, HasElasticNetParam):

    def __init__(self, paramMap = {}):
        super(XGBoost, self).__init__()
        scalaMap = SparkContext._active_spark_context._jvm.PythonUtils.toScalaMap(paramMap)
        self._java_obj = self._new_java_obj(
            "ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator", self.uid, scalaMap)
        self._defaultParamMap = paramMap
        self._paramMap = paramMap

    def setParams(self, paramMap = {}):
        return self._set(paramMap)

    def _create_model(self, javaTrainingData):
        return JavaModel(javaTrainingData)

I think it still needs some work but I was able to run Xgboost in PySpark with it.

@sratakon
Copy link

Wieslaw thanks for sharing the code snippet on XGBoost PySpark wrapper. Can you share the code on invoking the XGBoost class with the appropriate parameters?

Thanks

@AakashBasuRZT
Copy link

@wpopielarski it is an awesome job you did. Can you please share the code on invoking the XGBoost with the parameters needed? That would be a great help!

@wpopielarski
Copy link

this is something like:

        from app.xgboost import XGBoost
        xgboost_params = {
            "eta"  : 0.023,
            "max_depth" : 10,
            "min_child_weight" : 0.3,
            "subsample" : 0.7,
            "colsample_bytree" : 0.82,
            "colsample_bylevel" : 0.9,
            "base_score" : base_score,
            "eval_metric" : "auc",
            "seed" : 49,
            "silent" : 1,
            "objective" : "binary:logistic",
            "round" : 10,
            "nWorkers" : 2,
            "useExternalMemory" : True
        }
        xgboost_estimator = XGBoost.XGBoost(xgboost_params)
...
        model = xgboost_estimator.fit(data)

@thesuperzapper
Copy link
Contributor

I am getting close to doing a PR with proper PySpark support.

@AakashBasuRZT
Copy link

@thesuperzapper , that's great!

How long do you think it would take to wrap it up? Do share insights while progressing.

Thanks!

@haiy
Copy link

haiy commented Jun 11, 2018

hi, I write a simple version with ParamGridBuilder in case anyone interested, it's really easy to customize it.

  • 1 create a package dir mkdir -p ml/dmlc/xgboost4j/scala in any valid PYTHONPATH dir.
  • 2 copy code below to ml/dmlc/xgboost4j/scala/spark.py
from pyspark.ml.classification import JavaClassificationModel, JavaMLWritable, JavaMLReadable, TypeConverters, Param, \
    Params, HasFeaturesCol, HasLabelCol, HasPredictionCol, HasRawPredictionCol, SparkContext
from pyspark.ml.wrapper import JavaModel, JavaWrapper, JavaEstimator


class XGBParams(Params):
    '''

    '''
    eta = Param(Params._dummy(), "eta",
                "step size shrinkage used in update to prevents overfitting. After each boosting step, we can directly get the weights of new features. and eta actually shrinks the feature weights to make the boosting process more conservative",
                typeConverter=TypeConverters.toFloat)
    max_depth = Param(Params._dummy(), "max_depth",
                      "maximum depth of a tree, increase this value will make the model more complex / likely to be overfitting. 0 indicates no limit, limit is required for depth-wise grow policy.range: [0,∞]",
                      typeConverter=TypeConverters.toInt)
    min_child_weight = Param(Params._dummy(), "min_child_weight",
                             "minimum sum of instance weight (hessian) needed in a child. If the tree partition step results in a leaf node with the sum of instance weight less than min_child_weight, then the building process will give up further partitioning. In linear regression mode, this simply corresponds to minimum number of instances needed to be in each node. The larger, the more conservative the algorithm will berange: [0,∞]",
                             typeConverter=TypeConverters.toFloat)
    max_delta_step = Param(Params._dummy(), "max_delta_step",
                           "Maximum delta step we allow each tree’s weight estimation to be. If the value is set to 0, it means there is no constraint. If it is set to a positive value, it can help making the update step more conservative. Usually this parameter is not needed, but it might help in logistic regression when class is extremely imbalanced. Set it to value of 1-10 might help control the update.",
                           typeConverter=TypeConverters.toInt)
    subsample = Param(Params._dummy(), "subsample",
                      "subsample ratio of the training instance. Setting it to 0.5 means that XGBoost randomly collected half of the data instances to grow trees and this will prevent overfitting.",
                      typeConverter=TypeConverters.toFloat)
    colsample_bytree = Param(Params._dummy(), "colsample_bytree",
                             "subsample ratio of columns when constructing each tree",
                             typeConverter=TypeConverters.toFloat)
    colsample_bylevel = Param(Params._dummy(), "colsample_bylevel",
                              "subsample ratio of columns for each split, in each level.",
                              typeConverter=TypeConverters.toFloat)
    max_leaves = Param(Params._dummy(), "max_leaves",
                       "Maximum number of nodes to be added. Only relevant for the ‘lossguide’ grow policy.",
                       typeConverter=TypeConverters.toInt)

    def __init__(self):
        super(XGBParams, self).__init__()

class XGBoostClassifier(JavaEstimator, JavaMLWritable, JavaMLReadable, XGBParams,
                        HasFeaturesCol, HasLabelCol, HasPredictionCol, HasRawPredictionCol):
    def __init__(self, paramMap={}):
        super(XGBoostClassifier, self).__init__()
        scalaMap = SparkContext._active_spark_context._jvm.PythonUtils.toScalaMap(paramMap)
        self._java_obj = self._new_java_obj("ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator", self.uid, scalaMap)
        self._defaultParamMap = paramMap
        self._paramMap = paramMap

    def setParams(self, paramMap={}):
        return self._set(paramMap)

    def _create_model(self, java_model):
        return XGBoostClassificationModel(java_model)


class XGBoostClassificationModel(JavaModel, JavaClassificationModel, JavaMLWritable, JavaMLReadable):

    def getBooster(self):
        return self._call_java("booster")

    def saveBooster(self, save_path):
        jxgb = JavaWrapper(self.getBooster())
        jxgb._call_java("saveModel", save_path)
  • 3 play it as a normal pyspark model!

@thesuperzapper
Copy link
Contributor

thesuperzapper commented Jun 11, 2018

@AakashBasuRZT @haiy, we are now working on this properly in Issue #3370, with PR #3376 providing initial support.

@sagnik-rzt
Copy link

sagnik-rzt commented Jun 18, 2018

@haiy could you show me a snippet of code that fits the classifier on some arbitary dataset? I have followed points 1 and 2 which you have outlined, but I am not able to understand your 3rd point.

@haiy
Copy link

haiy commented Jun 19, 2018

@sagnik-rzt check this sample

@sagnik-rzt
Copy link

sagnik-rzt commented Jun 19, 2018

@haiy I'm trying to run this:

import pyspark
import pandas as pd
from dmlc.xgboost4j.scala.spark import XGBoostClassifier
from sklearn.utils import shuffle

sc = pyspark.SparkContext('local[2]')
spark = pyspark.sql.SparkSession(sc)
df = pd.DataFrame({'x1': range(10), 'x2': [10] * 10, 'y': shuffle([0 for i in range(5)] + [1 for i in range(5)])})
sdf = spark.createDataFrame(df)
X = sdf.select(['x1', 'x2'])
Y = sdf.select(['y'])
print(X.show(5))

params = {'objective' :'binary:logistic', 'n_estimators' : 10, 'max_depth' : 3, 'learning_rate' : 0.033}
xgb_model = XGBoostClassifier(params)

and it is giving this exception :

Traceback (most recent call last):
  File "/home/sagnikb/PycharmProjects/auto_ML/pyspark_xgboost.py", line 20, in <module>
    xgb_model = XGBoostClassifier(params)
  File "/usr/lib/ml/dmlc/xgboost4j/scala/spark.py", line 47, in __init__
    self._java_obj = self._new_java_obj("ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator", self.uid, scalaMap)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/ml/wrapper.py", line 63, in _new_java_obj
    return java_obj(*java_args)
TypeError: 'JavaPackage' object is not callable

Error in sys.excepthook:
Traceback (most recent call last):
  File "/home/sagnikb/PycharmProjects/auto_ML/pyspark_xgboost.py", line 20, in <module>
    xgb_model = XGBoostClassifier(params)
  File "/usr/lib/ml/dmlc/xgboost4j/scala/spark.py", line 47, in __init__
    self._java_obj = self._new_java_obj("ml.dmlc.xgboost4j.scala.spark.XGBoostEstimator", self.uid, scalaMap)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/ml/wrapper.py", line 63, in _new_java_obj
    return java_obj(*java_args)
TypeError: 'JavaPackage' object is not callable
Exception ignored in: <bound method JavaParams.__del__ of XGBoostClassifier_4f9eb5d1388e9e1424a4>
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/pyspark/ml/wrapper.py", line 105, in __del__
    SparkContext._active_spark_context._gateway.detach(self._java_obj)
  File "/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py", line 1897, in detach
    java_object._detach()
AttributeError: 'NoneType' object has no attribute '_detach'

Environment:
Python 3.6
Spark 2.3
Scala 2.11

@wpopielarski
Copy link

@sagnik-rzt
not sure but do you add xgboost-spark.jar with deps on spark classpath?

@sagnik-rzt
Copy link

@wpopielarski Hey no I haven't done that. Any idea where I can find that jar file?

@haiy
Copy link

haiy commented Jun 19, 2018

@sagnik-rzt hi, please download the jar here, it's the official spark4j fat jar. . Sorrry, I just found my jar was built based on mac, just try to build it as @wpopielarski suggest. And put it in the spark deps dir, like $SPARK_HOME/jars .

@wpopielarski
Copy link

need to build it your own :), with maven and profile assembly which builds fat jar in jvm-packages/xgboost-spark/target or so.

@wpopielarski
Copy link

@sagnik-rzt not sure what you are going to do but to build fat jar for your OS just clone dmlc xgboost github project, cd to jvm-packages and run mvn with assemby profile. I don't have faintest idea how to write gradle build file.

@sagnik-rzt
Copy link

Okay so I have built a fat jar with dependencies and then copy-pasted it to $SPARK_HOME/jars.
However, the same exception still pertains:

Traceback (most recent call last):
  File "/home/sagnikb/PycharmProjects/xgboost/test_import.py", line 21, in <module>
    clf = xgb(params)
  File "/usr/lib/ml/dmlc/xgboost4j/scala/spark.py", line 48, in __init__
    self._java_obj = self._new_java_obj("dmlc.xgboost4j.scala.spark.XGBoostEstimator", self.uid, scalaMap)
  File "/usr/local/lib/python3.6/dist-packages/pyspark/ml/wrapper.py", line 63, in _new_java_obj
    return java_obj(*java_args)
TypeError: 'JavaPackage' object is not callable

@wpopielarski
Copy link

wpopielarski commented Jun 30, 2018 via email

@thesuperzapper
Copy link
Contributor

thesuperzapper commented Jul 4, 2018

I am currently working on rebasing #3376 to the new spark branch, In the mean time, a few people have asked for how to use the current code on XGBoost-0.72.

Here is a zip file with the pyspark code for XGBoost-0.72.
Download: sparkxgb.zip

All you need to do is:

  1. Add the normal Scala XGBoost jars and dependencies to your job. (e.g. using --jars or the spark.jars config).
  2. Once the job has started, run this in python: (Or any local location which every executor can see)
sc.addPyFile("hdfs:///XXXX/XXXX/XXXX/sparkxgb.zip")
  1. Test with the following code. (Assuming you have moved sample_binary_classification_data.txt to a reachable location, it's normally in $SPARK_HOME/data/mllib/sample_binary_classification_data.txt)
from sparkxgb import XGBoostEstimator

# Load Data
dataPath = "sample_binary_classification_data.txt"
dataDF = spark.read.format("libsvm").load(dataPath)

# Split into Train/Test
trainDF, testDF = dataDF.randomSplit([0.8, 0.2], seed=1000)

# Define and train model
xgboost = XGBoostEstimator(
    # General Params
    nworkers=1, nthread=1, checkpointInterval=-1, checkpoint_path="",
    use_external_memory=False, silent=0, missing=float("nan"),
    
    # Column Params
    featuresCol="features", labelCol="label", predictionCol="prediction", 
    weightCol="weight", baseMarginCol="baseMargin", 
    
    # Booster Params
    booster="gbtree", base_score=0.5, objective="binary:logistic", eval_metric="error", 
    num_class=2, num_round=2, seed=None,
    
    # Tree Booster Params
    eta=0.3, gamma=0.0, max_depth=6, min_child_weight=1.0, max_delta_step=0.0, subsample=1.0,
    colsample_bytree=1.0, colsample_bylevel=1.0, reg_lambda=0.0, alpha=0.0, tree_method="auto",
    sketch_eps=0.03, scale_pos_weight=1.0, grow_policy='depthwise', max_bin=256,
    
    # Dart Booster Params
    sample_type="uniform", normalize_type="tree", rate_drop=0.0, skip_drop=0.0,
    
    # Linear Booster Params
    lambda_bias=0.0
)
xgboost_model = xgboost.fit(trainDF)

# Transform test set
xgboost_model.transform(testDF).show()

# Write model/classifier
xgboost.write().overwrite().save("xgboost_class_test")
xgboost_model.write().overwrite().save("xgboost_class_test.model")

Note:

  • This will only work for Spark 2.2+
  • Pipelines and ParamGridBuilder are kind of supported, use the modified pipeline object with from sparkxgb.pipeline import XGBoostPipeline,XGBoostPipelineModel as you would the normal objects.
  • You have to use float("+inf") for missing values rather than float("nan") for null values to be treated correctly due to an error in XGboost-0.72.
  • You cannot load back untrained model objects, (See [jvm-packages] ML Pipelines Load/Save and train error #3035)
  • This API will change with a full release of pyspark support.

@BogdanCojocar
Copy link

BogdanCojocar commented Jul 8, 2018

@thesuperzapper I'm trying to test this with pyspark on the jupyter notebook.

My system:
python 3.6.1
xgboost 0.72
spark 2.2.0
java 1.8
scala 2.12

When I'm trying to load the XGBoostEstimator I get:

Exception in thread "Thread-19" java.lang.NoClassDefFoundError: ml/dmlc/xgboost4j/scala/EvalTrait
	at java.lang.Class.getDeclaredMethods0(Native Method)
	at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
	at java.lang.Class.privateGetPublicMethods(Class.java:2902)
	at java.lang.Class.getMethods(Class.java:1615)
	at py4j.reflection.ReflectionEngine.getMethodsByNameAndLength(ReflectionEngine.java:345)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:305)
	at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
	at py4j.Gateway.invoke(Gateway.java:272)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: ml.dmlc.xgboost4j.scala.EvalTrait
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	... 12 more

Is this a bug or am I missing some requirements?

@thesuperzapper
Copy link
Contributor

@BogdanCojocar It seems like your missing the xgboost library.

You need both these jars for xgboost to work properly:

You can download the required jars from those maven links.

@BogdanCojocar
Copy link

Thanks @thesuperzapper. Works fine. Great job with this integration to pyspark!

@ericwang915
Copy link

Any suggestion on how to save the trained model to booster for loading in python module?

@thesuperzapper
Copy link
Contributor

@ericwang915 Typically to get a model which inter-operates with other XGBoost libraries, you would use the .booster.saveModel("XXX/XXX") method on the model object, where XXX is a local (non HDFS) path on the Spark driver. If you use other saving method you get an error (See: #2480)

However, I forgot to add a method to call the save function in that version of the wrapper, I will do this tomorrow if I get time. (I live in NZ... so timezones)

@ericwang915
Copy link

Thank you. By the way, during the training process, there is no log showing the evaluation metrics and boosting round even though the silent is set as 1.

@ccdtzccdtz
Copy link

@thesuperzapper thanks for the instruction. I was able to follow your instruction to train/save xgboost model in pyspark. Any idea on how to access other xgboost model function like (scala)getFeatureScore()?

@thesuperzapper
Copy link
Contributor

@ccdtzccdtz currently I am rewiring the pyspark wrapper since 0.8 had massive changes to the Spark API, when finished, I aim to have feature parity with the Spark Scala API.

I did not expose the native booster method in my initial pyspark wrapper, but if you use the Spark Scala API, you can call xgboost_model_object.nativeBooster.getFeatureScore( and use it like normal.

@nitinkak001
Copy link

nitinkak001 commented Aug 29, 2018

I have seen XGBoost on pyspark failing consistently if it is run 2 or more times. I am running it on the same dataset with the same code. First time it succeeds but the second time and subsequently it fails. I am using XGBoost 0.72 on Spark 2.3. I have to restart the pyspark shell to run the job successfully again.

I use xgboost.trainWithDataFrame for training purposes.

Has anyone seen this issue?

@sagnik-rzt
Copy link

sagnik-rzt commented Sep 4, 2018

Hi @thesuperzapper
What you prescribed works for me on a single worker node.
However, when I try to run pyspark xgboost on a using more than one worker (3 in this case), the executors become idle and shut down after a while.
This is the code I'm trying to run on the Titanic dataset (which is a small dataset):

from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

spark = SparkSession\
        .builder\
        .appName("PySpark XGBOOST Titanic")\
        .getOrCreate()

#spark.sparkContext.addPyFile("../sparkxgb.zip")

from automl.sparkxgb import XGBoostEstimator

schema = StructType(
  [StructField("PassengerId", DoubleType()),
    StructField("Survival", DoubleType()),
    StructField("Pclass", DoubleType()),
    StructField("Name", StringType()),
    StructField("Sex", StringType()),
    StructField("Age", DoubleType()),
    StructField("SibSp", DoubleType()),
    StructField("Parch", DoubleType()),
    StructField("Ticket", StringType()),
    StructField("Fare", DoubleType()),
    StructField("Cabin", StringType()),
    StructField("Embarked", StringType())
  ])

df_raw = spark\
  .read\
  .option("header", "true")\
  .schema(schema)\
  .csv("titanic.csv")


df = df_raw.na.fill(0)

sexIndexer = StringIndexer() \
    .setInputCol("Sex") \
    .setOutputCol("SexIndex") \
    .setHandleInvalid("keep")

cabinIndexer = StringIndexer() \
    .setInputCol("Cabin") \
    .setOutputCol("CabinIndex") \
    .setHandleInvalid("keep")

embarkedIndexer = StringIndexer() \
    .setInputCol("Embarked") \
    .setOutputCol("EmbarkedIndex") \
    .setHandleInvalid("keep")

vectorAssembler  = VectorAssembler()\
  .setInputCols(["Pclass", "SexIndex", "Age", "SibSp", "Parch", "Fare", "CabinIndex", "EmbarkedIndex"])\
  .setOutputCol("features")

xgboost = XGBoostEstimator(nworkers=2,
    featuresCol="features",
    labelCol="Survival",
    predictionCol="prediction"
)

pipeline = Pipeline().setStages([sexIndexer, cabinIndexer, embarkedIndexer, vectorAssembler, xgboost])
trainDF, testDF = df.randomSplit([0.8, 0.2], seed=24)

model  =pipeline.fit(trainDF)
print(trainDF.schema)

This is the stack trace:
Tracker started, with env={DMLC_NUM_SERVER=0, DMLC_TRACKER_URI=172.16.1.5, DMLC_TRACKER_PORT=9093, DMLC_NUM_WORKER=3}2018-09-04 08:52:55 ERROR TaskSchedulerImpl:70 - Lost executor 0 on 192.168.49.43: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.2018-09-04 08:52:55 ERROR AsyncEventQueue:91 - Interrupted while posting to TaskFailedListener. Removing that listener.java.lang.InterruptedException: ExecutorLost during XGBoost Training: ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages. at org.apache.spark.TaskFailedListener.onTaskEnd(SparkParallelismTracker.scala:116) at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)

The executor is stuck at:
org.apache.spark.RDD.foreachPartition(RDD.scala:927) ml.dmlc.xgboost4j.scala.spark.XGBoost$$anonfun$trainDistributed$4$$anon$1.run(XGBoost.scala:348)

Environment: Python 3.5.4, Spark Version 2.3.1, Xgboost 0.72

@nitinkak001
Copy link

nitinkak001 commented Sep 4, 2018 via email

@thesuperzapper
Copy link
Contributor

@sagnik-rzt I am surprised that works at all, as that pyspark wrapper only support XGboost 0.72, we are still working on the 0.8 one.

@vectosaurus
Copy link

@thesuperzapper, based on the version you provided, I redid some parts to support xgboost 0.80.
However, I am ending up with an py4j.protocol.Py4JError: ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier does not exist in the JVM error. I have provided a full description here. Could you take a look?

All the codes are placed here.

@thesuperzapper
Copy link
Contributor

There is quite a lot more changes needed than you have made to make it work with 0.8.

The main reason I haven't just pushed out a 0.8 version, is because I really don't want to make an xgboost specific pipeline object like I have in the 0.72, I am working on a way to hopefully have the pyspark xgboost object work with the default pipeline persistence.

@vectosaurus
Copy link

@thesuperzapper, while using the codes for 0.72, I used the XGBoostEstimator object directly that is without using the XGBoostPipeline object. And while doing so, I noticed that the training/fitting doesn't get distributed across the workers on the cluster. Is it necessary to use the XGBoostPipeline for distribution across the workers?

If that's not the case, do you know why the training doesn't get distributed across the workers?

Update
I tried the training by setting XGBoostEstimator as a stage in XGBoostPipeline but the issue persists. Training doesn't get distributed across workers when I run it on the cluster while it does for other pyspark supported models.

Have you observed this behavior? How do I tackle it?

@thesuperzapper
Copy link
Contributor

I have mostly re-coded the wrapper for XGBoost 0.8, but as my work cluster is still on 2.2, I cant test it easily in distributed mode, as my Dockerized Spark 2.3 cluster cant even train Scala XGBoost distributed models without getting shuffle location missing issues.

I think the issues @sagnik-rzt and others are experiencing, are related to your cluster config or some deeper issue with Spark-Scala XGBoost.

Are you able to train a model in Spark-Scala XGBoost?

@vectosaurus
Copy link

vectosaurus commented Sep 10, 2018

Thanks @thesuperzapper, I thought shuffle locations were handled internally, that is, it would be taken care of independent of the cluster config. But I found this stackoverflow post so will be implementing those suggestions.

Also, could you share your 0.8 version, if it is ready? I can test for distribution on my cluster. It has spark 2.3.1 and python 3.5.

@anaveenan
Copy link

anaveenan commented Sep 12, 2018

after saving the model and loading getting the following error

IllegalArgumentException: u'requirement failed: Error loading metadata: Expected class name org.apache.spark.ml.Pipeline but found class name org.apache.spark.ml.PipelineModel'

can you please help with this . thanks

import pyspark
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

spark.sparkContext.addPyFile("sparkxgb.zip")
from sparkxgb import XGBoostEstimator
schema = StructType(
  [StructField("PassengerId", DoubleType()),
    StructField("Survival", DoubleType()),
    StructField("Pclass", DoubleType()),
    StructField("Name", StringType()),
    StructField("Sex", StringType()),
    StructField("Age", DoubleType()),
    StructField("SibSp", DoubleType()),
    StructField("Parch", DoubleType()),
    StructField("Ticket", StringType()),
    StructField("Fare", DoubleType()),
    StructField("Cabin", StringType()),
    StructField("Embarked", StringType())
  ])

df_raw = spark\
  .read\
  .option("header", "true")\
  .schema(schema)\
  .csv("train.csv")

 df = df_raw.na.fill(0)

 sexIndexer = StringIndexer()\
  .setInputCol("Sex")\
  .setOutputCol("SexIndex")\
  .setHandleInvalid("keep")
    
cabinIndexer = StringIndexer()\
  .setInputCol("Cabin")\
  .setOutputCol("CabinIndex")\
  .setHandleInvalid("keep")
    
embarkedIndexer = StringIndexer()\
  .setInputCol("Embarked")\
  .setOutputCol("EmbarkedIndex")\
  .setHandleInvalid("keep")

vectorAssembler = VectorAssembler()\
  .setInputCols(["Pclass", "SexIndex", "Age", "SibSp", "Parch", "Fare", "CabinIndex", "EmbarkedIndex"])\
  .setOutputCol("features")
xgboost = XGBoostEstimator(
    featuresCol="features", 
    labelCol="Survival", 
    predictionCol="prediction"
)

pipeline = Pipeline().setStages([sexIndexer, cabinIndexer, embarkedIndexer, vectorAssembler, xgboost])
model = pipeline.fit(df)
model.transform(df).select(col("PassengerId"), col("prediction")).show()

model.save("model_xgboost")
loadedModel = Pipeline.load("model_xgboost")


IllegalArgumentException: u'requirement failed: Error loading metadata: Expected class name org.apache.spark.ml.Pipeline but found class name org.apache.spark.ml.PipelineModel'


#predict2 = loadedModel.transform(df)

Tried the following option

from pyspark.ml import PipelineModel
#model.save("model_xgboost")
loadedModel = PipelineModel.load("model_xgboost")

Getting the following errror

No module named ml.dmlc.xgboost4j.scala.spark

@thesuperzapper
Copy link
Contributor

DEV DOWNLOAD LINK: sparkxgb.zip

This version will work with XGBoost-0.8, but please dont use it for anything other than testing, or contributing to this thread, as stuff will change.
(Also note: I have removed all the backports for Spark 2.2, so this only supports Spark 2.3)

The main issue I am aware of with that version, is that classification models wont load back after being saved, giving the error: TypeError: 'JavaPackage' object is not callable. However, strangely XGBoostPipelineModel works just fine with an XGBoost classification stage. This leads me to think is an issue on my end, can someone verify if reading classification models works for them?

Regardless, I am attempting to properly implement DefaultParamsWritable, which would remove the need for the dedicated XGBoostPipeline, which will be way easier to maintain long term, so the read/write issue should become irrelevant anyway. (This also might allow persistence on CrossValidator to work)

@lock lock bot locked as resolved and limited conversation to collaborators Dec 11, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests