Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 44 additions & 53 deletions python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,25 @@
from pyspark.ml.wrapper import JavaParams
from pyspark.mllib.common import _java2py
from pyspark.mllib.linalg import Vectors, DenseVector, SparseVector
from pyspark.sql import DataFrame, SQLContext, Row
from pyspark.sql import DataFrame, Row, SparkSession
from pyspark.sql.functions import rand
from pyspark.sql.utils import IllegalArgumentException
from pyspark.storagelevel import *
from pyspark.tests import ReusedPySparkTestCase as PySparkTestCase


class SparkSessionTestCase(PySparkTestCase):
@classmethod
def setUpClass(cls):
PySparkTestCase.setUpClass()
cls.spark = SparkSession(cls.sc)

@classmethod
def tearDownClass(cls):
PySparkTestCase.tearDownClass()
cls.spark.stop()


class MockDataset(DataFrame):

def __init__(self):
Expand Down Expand Up @@ -350,7 +362,7 @@ def test_word2vec_param(self):
self.assertEqual(model.getWindowSize(), 6)


class FeatureTests(PySparkTestCase):
class FeatureTests(SparkSessionTestCase):

def test_binarizer(self):
b0 = Binarizer()
Expand All @@ -376,8 +388,7 @@ def test_binarizer(self):
self.assertEqual(b1.getOutputCol(), "output")

def test_idf(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
dataset = self.spark.createDataFrame([
(DenseVector([1.0, 2.0]),),
(DenseVector([0.0, 1.0]),),
(DenseVector([3.0, 0.2]),)], ["tf"])
Expand All @@ -390,8 +401,7 @@ def test_idf(self):
self.assertIsNotNone(output.head().idf)

def test_ngram(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
dataset = self.spark.createDataFrame([
Row(input=["a", "b", "c", "d", "e"])])
ngram0 = NGram(n=4, inputCol="input", outputCol="output")
self.assertEqual(ngram0.getN(), 4)
Expand All @@ -401,8 +411,7 @@ def test_ngram(self):
self.assertEqual(transformedDF.head().output, ["a b c d", "b c d e"])

def test_stopwordsremover(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([Row(input=["a", "panda"])])
dataset = self.spark.createDataFrame([Row(input=["a", "panda"])])
stopWordRemover = StopWordsRemover(inputCol="input", outputCol="output")
# Default
self.assertEqual(stopWordRemover.getInputCol(), "input")
Expand All @@ -419,15 +428,14 @@ def test_stopwordsremover(self):
self.assertEqual(transformedDF.head().output, ["a"])
# with language selection
stopwords = StopWordsRemover.loadDefaultStopWords("turkish")
dataset = sqlContext.createDataFrame([Row(input=["acaba", "ama", "biri"])])
dataset = self.spark.createDataFrame([Row(input=["acaba", "ama", "biri"])])
stopWordRemover.setStopWords(stopwords)
self.assertEqual(stopWordRemover.getStopWords(), stopwords)
transformedDF = stopWordRemover.transform(dataset)
self.assertEqual(transformedDF.head().output, [])

def test_count_vectorizer_with_binary(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
dataset = self.spark.createDataFrame([
(0, "a a a b b c".split(' '), SparseVector(3, {0: 1.0, 1: 1.0, 2: 1.0}),),
(1, "a a".split(' '), SparseVector(3, {0: 1.0}),),
(2, "a b".split(' '), SparseVector(3, {0: 1.0, 1: 1.0}),),
Expand Down Expand Up @@ -475,11 +483,10 @@ def _fit(self, dataset):
return model


class CrossValidatorTests(PySparkTestCase):
class CrossValidatorTests(SparkSessionTestCase):

def test_copy(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
dataset = self.spark.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
Expand All @@ -503,8 +510,7 @@ def test_copy(self):
< 0.0001)

def test_fit_minimize_metric(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
dataset = self.spark.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
Expand All @@ -527,8 +533,7 @@ def test_fit_minimize_metric(self):
self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")

def test_fit_maximize_metric(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
dataset = self.spark.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
Expand All @@ -554,8 +559,7 @@ def test_save_load(self):
# This tests saving and loading the trained model only.
# Save/load for CrossValidator will be added later: SPARK-13786
temp_path = tempfile.mkdtemp()
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame(
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
Expand All @@ -576,11 +580,10 @@ def test_save_load(self):
self.assertEqual(loadedLrModel.intercept, lrModel.intercept)


class TrainValidationSplitTests(PySparkTestCase):
class TrainValidationSplitTests(SparkSessionTestCase):

def test_fit_minimize_metric(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
dataset = self.spark.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
Expand All @@ -603,8 +606,7 @@ def test_fit_minimize_metric(self):
self.assertEqual(0.0, bestModelMetric, "Best model has RMSE of 0")

def test_fit_maximize_metric(self):
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame([
dataset = self.spark.createDataFrame([
(10, 10.0),
(50, 50.0),
(100, 100.0),
Expand All @@ -630,8 +632,7 @@ def test_save_load(self):
# This tests saving and loading the trained model only.
# Save/load for TrainValidationSplit will be added later: SPARK-13786
temp_path = tempfile.mkdtemp()
sqlContext = SQLContext(self.sc)
dataset = sqlContext.createDataFrame(
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
Expand All @@ -652,7 +653,7 @@ def test_save_load(self):
self.assertEqual(loadedLrModel.intercept, lrModel.intercept)


class PersistenceTest(PySparkTestCase):
class PersistenceTest(SparkSessionTestCase):

def test_linear_regression(self):
lr = LinearRegression(maxIter=1)
Expand Down Expand Up @@ -724,11 +725,10 @@ def test_pipeline_persistence(self):
"""
Pipeline[HashingTF, PCA]
"""
sqlContext = SQLContext(self.sc)
temp_path = tempfile.mkdtemp()

try:
df = sqlContext.createDataFrame([(["a", "b", "c"],), (["c", "d", "e"],)], ["words"])
df = self.spark.createDataFrame([(["a", "b", "c"],), (["c", "d", "e"],)], ["words"])
tf = HashingTF(numFeatures=10, inputCol="words", outputCol="features")
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
pl = Pipeline(stages=[tf, pca])
Expand All @@ -753,11 +753,10 @@ def test_nested_pipeline_persistence(self):
"""
Pipeline[HashingTF, Pipeline[PCA]]
"""
sqlContext = SQLContext(self.sc)
temp_path = tempfile.mkdtemp()

try:
df = sqlContext.createDataFrame([(["a", "b", "c"],), (["c", "d", "e"],)], ["words"])
df = self.spark.createDataFrame([(["a", "b", "c"],), (["c", "d", "e"],)], ["words"])
tf = HashingTF(numFeatures=10, inputCol="words", outputCol="features")
pca = PCA(k=2, inputCol="features", outputCol="pca_features")
p0 = Pipeline(stages=[pca])
Expand Down Expand Up @@ -816,7 +815,7 @@ def test_decisiontree_regressor(self):
pass


class LDATest(PySparkTestCase):
class LDATest(SparkSessionTestCase):

def _compare(self, m1, m2):
"""
Expand All @@ -836,8 +835,7 @@ def _compare(self, m1, m2):

def test_persistence(self):
# Test save/load for LDA, LocalLDAModel, DistributedLDAModel.
sqlContext = SQLContext(self.sc)
df = sqlContext.createDataFrame([
df = self.spark.createDataFrame([
[1, Vectors.dense([0.0, 1.0])],
[2, Vectors.sparse(2, {0: 1.0})],
], ["id", "features"])
Expand Down Expand Up @@ -871,12 +869,11 @@ def test_persistence(self):
pass


class TrainingSummaryTest(PySparkTestCase):
class TrainingSummaryTest(SparkSessionTestCase):

def test_linear_regression_summary(self):
from pyspark.mllib.linalg import Vectors
sqlContext = SQLContext(self.sc)
df = sqlContext.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),
df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),
(0.0, 2.0, Vectors.sparse(1, [], []))],
["label", "weight", "features"])
lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal", weightCol="weight",
Expand Down Expand Up @@ -914,8 +911,7 @@ def test_linear_regression_summary(self):

def test_logistic_regression_summary(self):
from pyspark.mllib.linalg import Vectors
sqlContext = SQLContext(self.sc)
df = sqlContext.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),
df = self.spark.createDataFrame([(1.0, 2.0, Vectors.dense(1.0)),
(0.0, 2.0, Vectors.sparse(1, [], []))],
["label", "weight", "features"])
lr = LogisticRegression(maxIter=5, regParam=0.01, weightCol="weight", fitIntercept=False)
Expand All @@ -942,11 +938,10 @@ def test_logistic_regression_summary(self):
self.assertAlmostEqual(sameSummary.areaUnderROC, s.areaUnderROC)


class OneVsRestTests(PySparkTestCase):
class OneVsRestTests(SparkSessionTestCase):

def test_copy(self):
sqlContext = SQLContext(self.sc)
df = sqlContext.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
(1.0, Vectors.sparse(2, [], [])),
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
Expand All @@ -960,8 +955,7 @@ def test_copy(self):
self.assertEqual(model1.getPredictionCol(), "indexed")

def test_output_columns(self):
sqlContext = SQLContext(self.sc)
df = sqlContext.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
(1.0, Vectors.sparse(2, [], [])),
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
Expand All @@ -973,8 +967,7 @@ def test_output_columns(self):

def test_save_load(self):
temp_path = tempfile.mkdtemp()
sqlContext = SQLContext(self.sc)
df = sqlContext.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
(1.0, Vectors.sparse(2, [], [])),
(2.0, Vectors.dense(0.5, 0.5))],
["label", "features"])
Expand All @@ -994,12 +987,11 @@ def test_save_load(self):
self.assertEqual(m.uid, n.uid)


class HashingTFTest(PySparkTestCase):
class HashingTFTest(SparkSessionTestCase):

def test_apply_binary_term_freqs(self):
sqlContext = SQLContext(self.sc)

df = sqlContext.createDataFrame([(0, ["a", "a", "b", "c", "c", "c"])], ["id", "words"])
df = self.spark.createDataFrame([(0, ["a", "a", "b", "c", "c", "c"])], ["id", "words"])
n = 10
hashingTF = HashingTF()
hashingTF.setInputCol("words").setOutputCol("features").setNumFeatures(n).setBinary(True)
Expand All @@ -1011,11 +1003,10 @@ def test_apply_binary_term_freqs(self):
": expected " + str(expected[i]) + ", got " + str(features[i]))


class ALSTest(PySparkTestCase):
class ALSTest(SparkSessionTestCase):

def test_storage_levels(self):
sqlContext = SQLContext(self.sc)
df = sqlContext.createDataFrame(
df = self.spark.createDataFrame(
[(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
["user", "item", "rating"])
als = ALS().setMaxIter(1).setRank(1)
Expand Down
19 changes: 10 additions & 9 deletions python/pyspark/mllib/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@
from pyspark.mllib.util import MLUtils
from pyspark.serializers import PickleSerializer
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.utils import IllegalArgumentException
from pyspark.streaming import StreamingContext

_have_scipy = False
Expand All @@ -83,9 +84,10 @@
class MLlibTestCase(unittest.TestCase):
def setUp(self):
self.sc = SparkContext('local[4]', "MLlib tests")
self.spark = SparkSession(self.sc)

def tearDown(self):
self.sc.stop()
self.spark.stop()


class MLLibStreamingTestCase(unittest.TestCase):
Expand Down Expand Up @@ -698,7 +700,6 @@ def test_serialization(self):
self.assertEqual(v, self.udt.deserialize(self.udt.serialize(v)))

def test_infer_schema(self):
sqlCtx = SQLContext(self.sc)
rdd = self.sc.parallelize([LabeledPoint(1.0, self.dv1), LabeledPoint(0.0, self.sv1)])
df = rdd.toDF()
schema = df.schema
Expand Down Expand Up @@ -731,7 +732,6 @@ def test_serialization(self):
self.assertEqual(m, self.udt.deserialize(self.udt.serialize(m)))

def test_infer_schema(self):
sqlCtx = SQLContext(self.sc)
rdd = self.sc.parallelize([("dense", self.dm1), ("sparse", self.sm1)])
df = rdd.toDF()
schema = df.schema
Expand Down Expand Up @@ -919,7 +919,7 @@ def test_goodness_of_fit(self):

# Negative counts in observed
neg_obs = Vectors.dense([1.0, 2.0, 3.0, -4.0])
self.assertRaises(Py4JJavaError, Statistics.chiSqTest, neg_obs, expected1)
self.assertRaises(IllegalArgumentException, Statistics.chiSqTest, neg_obs, expected1)

# Count = 0.0 in expected but not observed
zero_expected = Vectors.dense([1.0, 0.0, 3.0])
Expand All @@ -930,7 +930,8 @@ def test_goodness_of_fit(self):

# 0.0 in expected and observed simultaneously
zero_observed = Vectors.dense([2.0, 0.0, 1.0])
self.assertRaises(Py4JJavaError, Statistics.chiSqTest, zero_observed, zero_expected)
self.assertRaises(
IllegalArgumentException, Statistics.chiSqTest, zero_observed, zero_expected)

def test_matrix_independence(self):
data = [40.0, 24.0, 29.0, 56.0, 32.0, 42.0, 31.0, 10.0, 0.0, 30.0, 15.0, 12.0]
Expand All @@ -944,15 +945,15 @@ def test_matrix_independence(self):

# Negative counts
neg_counts = Matrices.dense(2, 2, [4.0, 5.0, 3.0, -3.0])
self.assertRaises(Py4JJavaError, Statistics.chiSqTest, neg_counts)
self.assertRaises(IllegalArgumentException, Statistics.chiSqTest, neg_counts)

# Row sum = 0.0
row_zero = Matrices.dense(2, 2, [0.0, 1.0, 0.0, 2.0])
self.assertRaises(Py4JJavaError, Statistics.chiSqTest, row_zero)
self.assertRaises(IllegalArgumentException, Statistics.chiSqTest, row_zero)

# Column sum = 0.0
col_zero = Matrices.dense(2, 2, [0.0, 0.0, 2.0, 2.0])
self.assertRaises(Py4JJavaError, Statistics.chiSqTest, col_zero)
self.assertRaises(IllegalArgumentException, Statistics.chiSqTest, col_zero)

def test_chi_sq_pearson(self):
data = [
Expand Down
Loading