Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ building for particular Hive and Hive Thriftserver distributions.
Please refer to the [Configuration Guide](http://spark.apache.org/docs/latest/configuration.html)
in the online documentation for an overview on how to configure Spark.

## Contributing
## Contributing

Please review the [Contribution to Spark guide](http://spark.apache.org/contributing.html)
for information on how to get started contributing to the project.
Original file line number Diff line number Diff line change
Expand Up @@ -384,9 +384,16 @@ private[serializer] object KryoSerializer {
classOf[HighlyCompressedMapStatus],
classOf[CompactBuffer[_]],
classOf[BlockManagerId],
classOf[Array[Boolean]],
classOf[Array[Byte]],
classOf[Array[Short]],
classOf[Array[Int]],
classOf[Array[Long]],
classOf[Array[Float]],
classOf[Array[Double]],
classOf[Array[Char]],
classOf[Array[String]],
classOf[Array[Array[String]]],
classOf[BoundedPriorityQueue[_]],
classOf[SparkConf]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
}

test("basic types") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")

val ser = new KryoSerializer(conf).newInstance()
def check[T: ClassTag](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
Expand Down Expand Up @@ -106,6 +109,9 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
}

test("pairs") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")

val ser = new KryoSerializer(conf).newInstance()
def check[T: ClassTag](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
Expand All @@ -130,12 +136,16 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
}

test("Scala data structures") {
val conf = new SparkConf(false)
conf.set("spark.kryo.registrationRequired", "true")

val ser = new KryoSerializer(conf).newInstance()
def check[T: ClassTag](t: T) {
assert(ser.deserialize[T](ser.serialize(t)) === t)
}
check(List[Int]())
check(List[Int](1, 2, 3))
check(Seq[Int](1, 2, 3))
check(List[String]())
check(List[String]("x", "y", "z"))
check(None)
Expand Down
2 changes: 1 addition & 1 deletion docs/building-spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ Developers who compile Spark frequently may want to speed up compilation; e.g.,
developers who build with SBT). For more information about how to do this, refer to the
[Useful Developer Tools page](http://spark.apache.org/developer-tools.html#reducing-build-times).

## Encrypted Filesystems
## Encrypted Filesystems

When building on an encrypted filesystem (if your home directory is encrypted, for example), then the Spark build might fail with a "Filename too long" error. As a workaround, add the following in the configuration args of the `scala-maven-plugin` in the project `pom.xml`:

Expand Down
66 changes: 66 additions & 0 deletions docs/ml-features.md
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,72 @@ for more details on the API.

</div>


## Imputer

The `Imputer` transformer completes missing values in a dataset, either using the mean or the
median of the columns in which the missing values are located. The input columns should be of
`DoubleType` or `FloatType`. Currently `Imputer` does not support categorical features and possibly
creates incorrect values for columns containing categorical features.

**Note** all `null` values in the input columns are treated as missing, and so are also imputed.

**Examples**

Suppose that we have a DataFrame with the columns `a` and `b`:

~~~
a | b
------------|-----------
1.0 | Double.NaN
2.0 | Double.NaN
Double.NaN | 3.0
4.0 | 4.0
5.0 | 5.0
~~~

In this example, Imputer will replace all occurrences of `Double.NaN` (the default for the missing value)
with the mean (the default imputation strategy) computed from the other values in the corresponding columns.
In this example, the surrogate values for columns `a` and `b` are 3.0 and 4.0 respectively. After
transformation, the missing values in the output columns will be replaced by the surrogate value for
the relevant column.

~~~
a | b | out_a | out_b
------------|------------|-------|-------
1.0 | Double.NaN | 1.0 | 4.0
2.0 | Double.NaN | 2.0 | 4.0
Double.NaN | 3.0 | 3.0 | 3.0
4.0 | 4.0 | 4.0 | 4.0
5.0 | 5.0 | 5.0 | 5.0
~~~

<div class="codetabs">
<div data-lang="scala" markdown="1">

Refer to the [Imputer Scala docs](api/scala/index.html#org.apache.spark.ml.feature.Imputer)
for more details on the API.

{% include_example scala/org/apache/spark/examples/ml/ImputerExample.scala %}
</div>

<div data-lang="java" markdown="1">

Refer to the [Imputer Java docs](api/java/org/apache/spark/ml/feature/Imputer.html)
for more details on the API.

{% include_example java/org/apache/spark/examples/ml/JavaImputerExample.java %}
</div>

<div data-lang="python" markdown="1">

Refer to the [Imputer Python docs](api/python/pyspark.ml.html#pyspark.ml.feature.Imputer)
for more details on the API.

{% include_example python/ml/imputer_example.py %}
</div>
</div>

# Feature Selectors

## VectorSlicer
Expand Down
2 changes: 1 addition & 1 deletion docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ In the API, an application is referenced by its application ID, `[app-id]`.
When running on YARN, each application may have multiple attempts, but there are attempt IDs
only for applications in cluster mode, not applications in client mode. Applications in YARN cluster mode
can be identified by their `[attempt-id]`. In the API listed below, when running in YARN cluster mode,
`[app-id]` will actually be `[base-app-id]/[attempt-id]`, where `[base-app-id]` is the YARN application ID.
`[app-id]` will actually be `[base-app-id]/[attempt-id]`, where `[base-app-id]` is the YARN application ID.

<table class="table">
<tr><th>Endpoint</th><th>Meaning</th></tr>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.ml;

// $example on$
import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.feature.Imputer;
import org.apache.spark.ml.feature.ImputerModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
// $example off$

import static org.apache.spark.sql.types.DataTypes.*;

/**
* An example demonstrating Imputer.
* Run with:
* bin/run-example ml.JavaImputerExample
*/
public class JavaImputerExample {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("JavaImputerExample")
.getOrCreate();

// $example on$
List<Row> data = Arrays.asList(
RowFactory.create(1.0, Double.NaN),
RowFactory.create(2.0, Double.NaN),
RowFactory.create(Double.NaN, 3.0),
RowFactory.create(4.0, 4.0),
RowFactory.create(5.0, 5.0)
);
StructType schema = new StructType(new StructField[]{
createStructField("a", DoubleType, false),
createStructField("b", DoubleType, false)
});
Dataset<Row> df = spark.createDataFrame(data, schema);

Imputer imputer = new Imputer()
.setInputCols(new String[]{"a", "b"})
.setOutputCols(new String[]{"out_a", "out_b"});

ImputerModel model = imputer.fit(df);
model.transform(df).show();
// $example off$

spark.stop();
}
}
50 changes: 50 additions & 0 deletions examples/src/main/python/ml/imputer_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# $example on$
from pyspark.ml.feature import Imputer
# $example off$
from pyspark.sql import SparkSession

"""
An example demonstrating Imputer.
Run with:
bin/spark-submit examples/src/main/python/ml/imputer_example.py
"""

if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("ImputerExample")\
.getOrCreate()

# $example on$
df = spark.createDataFrame([
(1.0, float("nan")),
(2.0, float("nan")),
(float("nan"), 3.0),
(4.0, 4.0),
(5.0, 5.0)
], ["a", "b"])

imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"])
model = imputer.fit(df)

model.transform(df).show()
# $example off$

spark.stop()
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.ml

// $example on$
import org.apache.spark.ml.feature.Imputer
// $example off$
import org.apache.spark.sql.SparkSession

/**
* An example demonstrating Imputer.
* Run with:
* bin/run-example ml.ImputerExample
*/
object ImputerExample {

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.appName("ImputerExample")
.getOrCreate()

// $example on$
val df = spark.createDataFrame(Seq(
(1.0, Double.NaN),
(2.0, Double.NaN),
(Double.NaN, 3.0),
(4.0, 4.0),
(5.0, 5.0)
)).toDF("a", "b")

val imputer = new Imputer()
.setInputCols(Array("a", "b"))
.setOutputCols(Array("out_a", "out_b"))

val model = imputer.fit(df)
model.transform(df).show()
// $example off$

spark.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ class MultilayerPerceptronClassificationModel private[ml] (

@Since("1.5.0")
override def copy(extra: ParamMap): MultilayerPerceptronClassificationModel = {
copyValues(new MultilayerPerceptronClassificationModel(uid, layers, weights), extra)
val copied = new MultilayerPerceptronClassificationModel(uid, layers, weights).setParent(parent)
copyValues(copied, extra)
}

@Since("2.0.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,10 @@ class BucketedRandomProjectionLSHModel private[ml](
}

@Since("2.1.0")
override def copy(extra: ParamMap): this.type = defaultCopy(extra)
override def copy(extra: ParamMap): BucketedRandomProjectionLSHModel = {
val copied = new BucketedRandomProjectionLSHModel(uid, randUnitVectors).setParent(parent)
copyValues(copied, extra)
}

@Since("2.1.0")
override def write: MLWriter = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.types._
private[feature] trait ImputerParams extends Params with HasInputCols {

/**
* The imputation strategy.
* The imputation strategy. Currently only "mean" and "median" are supported.
* If "mean", then replace missing values using the mean value of the feature.
* If "median", then replace missing values using the approximate median value of the feature.
* Default: mean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ class MinHashLSHModel private[ml](
}

@Since("2.1.0")
override def copy(extra: ParamMap): this.type = defaultCopy(extra)
override def copy(extra: ParamMap): MinHashLSHModel = {
val copied = new MinHashLSHModel(uid, randCoefficients).setParent(parent)
copyValues(copied, extra)
}

@Since("2.1.0")
override def write: MLWriter = new MinHashLSHModel.MinHashLSHModelWriter(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,10 @@ class RFormulaModel private[feature](
}

@Since("1.5.0")
override def copy(extra: ParamMap): RFormulaModel = copyValues(
new RFormulaModel(uid, resolvedFormula, pipelineModel))
override def copy(extra: ParamMap): RFormulaModel = {
val copied = new RFormulaModel(uid, resolvedFormula, pipelineModel).setParent(parent)
copyValues(copied, extra)
}

@Since("2.0.0")
override def toString: String = s"RFormulaModel($resolvedFormula) (uid=$uid)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class MultilayerPerceptronClassifierSuite
.setMaxIter(100)
.setSolver("l-bfgs")
val model = trainer.fit(dataset)
MLTestingUtils.checkCopy(model)
val result = model.transform(dataset)
val predictionAndLabels = result.select("prediction", "label").collect()
predictionAndLabels.foreach { case Row(p: Double, l: Double) =>
Expand Down
Loading