import mleap.pyspark from mleap.pyspark.spark_support import SimpleSparkSerializer import pyspark import pyspark.ml from pyspark.sql import SparkSession from xgboost.spark import SparkXGBRegressor, SparkXGBClassifier spark = SparkSession.builder.enableHiveSupport().getOrCreate() features = ['feat1','feat2','feat3','feat4','feat5','feat5','feat6','feat7','feat8', 'feat9', 'feat10'] df = spark.createDataFrame([ (7,20,3,6,1,10,3,53948,245351,1, 2, 1), (7,20,3,6,1,10,3,53948,245351,1, 2, 1), (7,20,1,6,1,10,3,53948,245351,1, 2, 0), (7,20,1,6,1,10,3,53948,245351,1, 2, 0), (5,20,1,6,1,10,3,53948,245351,1, 2, 0), (5,20,3,6,1,10,3,53948,245351,1, 2, 1) ], features + ['label']) xgboost = SparkXGBClassifier( features_col="features", label_col="label", max_depth=8, num_boost_round=100, scale_pos_weight=10.0, min_child_weight=120.0, alpha=0.4, colsample_bytree=0.5, subsample=0.6, tree_method="approx", eval_metric="aucpr", missing=0.0, prediction_col="prediction" ) pipeline = pyspark.ml.Pipeline(stages=[pyspark.ml.feature.VectorAssembler(inputCols=features, outputCol="features"), xgboost]) model = pipeline.fit(df) predictions = model.transform(df) predictions.show() local_path = "jar:file:/tmp/pyspark.example.feat7" model.serializeToBundle(local_path, predictions) deserialized_model = pyspark.ml.PipelineModel.deserializeFromBundle(local_path) deserialized_model.stages[-1].set(deserialized_model.stages[-1].missing, 0.0) deserialized_model.transform(df).show()