Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ fi
if [[ "$1" =~ org.apache.spark.tools.* ]]; then
if test -z "$SPARK_TOOLS_JAR"; then
echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/" 1>&2
echo "You need to build Spark before running $1." 1>&2
echo "You need to run \"build/sbt tools/package\" before running $1." 1>&2
exit 1
fi
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;

/**
* A simple example demonstrating model selection using CrossValidator.
Expand All @@ -55,7 +55,7 @@ public class JavaCrossValidatorExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaCrossValidatorExample");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaSQLContext jsql = new JavaSQLContext(jsc);
SQLContext jsql = new SQLContext(jsc);

// Prepare training documents, which are labeled.
List<LabeledDocument> localTraining = Lists.newArrayList(
Expand All @@ -71,8 +71,7 @@ public static void main(String[] args) {
new LabeledDocument(9L, "a e c l", 0.0),
new LabeledDocument(10L, "spark compile", 1.0),
new LabeledDocument(11L, "hadoop software", 0.0));
JavaSchemaRDD training =
jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
Expand Down Expand Up @@ -113,11 +112,11 @@ public static void main(String[] args) {
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop"));
JavaSchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);

// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test).registerAsTable("prediction");
JavaSchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
SchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
for (Row r: predictions.collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2)
+ ", prediction=" + r.get(3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;

/**
* A simple example demonstrating ways to specify parameters for Estimators and Transformers.
Expand All @@ -44,7 +44,7 @@ public class JavaSimpleParamsExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaSimpleParamsExample");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaSQLContext jsql = new JavaSQLContext(jsc);
SQLContext jsql = new SQLContext(jsc);

// Prepare training data.
// We use LabeledPoint, which is a JavaBean. Spark SQL can convert RDDs of JavaBeans
Expand All @@ -54,7 +54,7 @@ public static void main(String[] args) {
new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)));
JavaSchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);

// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
Expand Down Expand Up @@ -94,14 +94,14 @@ public static void main(String[] args) {
new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)));
JavaSchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);

// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'probability' column instead of the usual 'score'
// column since we renamed the lr.scoreCol parameter previously.
model2.transform(test).registerAsTable("results");
JavaSchemaRDD results =
SchemaRDD results =
jsql.sql("SELECT features, label, probability, prediction FROM results");
for (Row r: results.collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@

import com.google.common.collect.Lists;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;

/**
* A simple text classification pipeline that recognizes "spark" from input text. It uses the Java
Expand All @@ -46,16 +46,15 @@ public class JavaSimpleTextClassificationPipeline {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaSimpleTextClassificationPipeline");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaSQLContext jsql = new JavaSQLContext(jsc);
SQLContext jsql = new SQLContext(jsc);

// Prepare training documents, which are labeled.
List<LabeledDocument> localTraining = Lists.newArrayList(
new LabeledDocument(0L, "a b c d e spark", 1.0),
new LabeledDocument(1L, "b d", 0.0),
new LabeledDocument(2L, "spark f g h", 1.0),
new LabeledDocument(3L, "hadoop mapreduce", 0.0));
JavaSchemaRDD training =
jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
Expand All @@ -80,11 +79,11 @@ public static void main(String[] args) {
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop"));
JavaSchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);

// Make predictions on test documents.
model.transform(test).registerAsTable("prediction");
JavaSchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
SchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
for (Row r: predictions.collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2)
+ ", prediction=" + r.get(3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.Row;

public class JavaSparkSQL {
public static class Person implements Serializable {
Expand All @@ -55,7 +55,7 @@ public void setAge(int age) {
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaSQLContext sqlCtx = new JavaSQLContext(ctx);
SQLContext sqlCtx = new SQLContext(ctx);

System.out.println("=== Data source: RDD ===");
// Load a text file and convert each line to a Java Bean.
Expand All @@ -74,15 +74,15 @@ public Person call(String line) {
});

// Apply a schema to an RDD of Java Beans and register it as a table.
JavaSchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
SchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
schemaPeople.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
SchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
List<String> teenagerNames = teenagers.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0);
Expand All @@ -99,13 +99,13 @@ public String call(Row row) {
// Read in the parquet file created above.
// Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a JavaSchemaRDD.
JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");
SchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
JavaSchemaRDD teenagers2 =
SchemaRDD teenagers2 =
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
teenagerNames = teenagers2.map(new Function<Row, String>() {
teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0);
Expand All @@ -120,7 +120,7 @@ public String call(Row row) {
// The path can be either a single text file or a directory storing text files.
String path = "examples/src/main/resources/people.json";
// Create a JavaSchemaRDD from the file(s) pointed by path
JavaSchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path);
SchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path);

// Because the schema of a JSON dataset is automatically inferred, to write queries,
// it is better to take a look at what is the schema.
Expand All @@ -134,11 +134,11 @@ public String call(Row row) {
peopleFromJsonFile.registerTempTable("people");

// SQL statements can be run by using the sql methods provided by sqlCtx.
JavaSchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
SchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
teenagerNames = teenagers3.map(new Function<Row, String>() {
teenagerNames = teenagers3.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) { return "Name: " + row.getString(0); }
}).collect();
Expand All @@ -151,7 +151,7 @@ public String call(Row row) {
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
JavaSchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD);
SchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD.rdd());

// Take a look at the schema of this new JavaSchemaRDD.
peopleFromJsonRDD.printSchema();
Expand All @@ -164,8 +164,8 @@ public String call(Row row) {

peopleFromJsonRDD.registerTempTable("people2");

JavaSchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
List<String> nameAndCity = peopleWithCity.map(new Function<Row, String>() {
SchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0) + ", City: " + row.getString(1);
Expand Down
38 changes: 0 additions & 38 deletions mllib/src/main/scala/org/apache/spark/ml/Estimator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
package org.apache.spark.ml

import scala.annotation.varargs
import scala.collection.JavaConverters._

import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.param.{ParamMap, ParamPair, Params}
import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.api.java.JavaSchemaRDD

/**
* :: AlphaComponent ::
Expand Down Expand Up @@ -66,40 +64,4 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage with Params {
def fit(dataset: SchemaRDD, paramMaps: Array[ParamMap]): Seq[M] = {
paramMaps.map(fit(dataset, _))
}

// Java-friendly versions of fit.

/**
* Fits a single model to the input data with optional parameters.
*
* @param dataset input dataset
* @param paramPairs optional list of param pairs (overwrite embedded params)
* @return fitted model
*/
@varargs
def fit(dataset: JavaSchemaRDD, paramPairs: ParamPair[_]*): M = {
fit(dataset.schemaRDD, paramPairs: _*)
}

/**
* Fits a single model to the input data with provided parameter map.
*
* @param dataset input dataset
* @param paramMap parameter map
* @return fitted model
*/
def fit(dataset: JavaSchemaRDD, paramMap: ParamMap): M = {
fit(dataset.schemaRDD, paramMap)
}

/**
* Fits multiple models to the input data with multiple sets of parameters.
*
* @param dataset input dataset
* @param paramMaps an array of parameter maps
* @return fitted models, matching the input parameter maps
*/
def fit(dataset: JavaSchemaRDD, paramMaps: Array[ParamMap]): java.util.List[M] = {
fit(dataset.schemaRDD, paramMaps).asJava
}
}
24 changes: 0 additions & 24 deletions mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.param._
import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.api.java.JavaSchemaRDD
import org.apache.spark.sql.catalyst.analysis.Star
import org.apache.spark.sql.catalyst.expressions.ScalaUdf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -55,29 +54,6 @@ abstract class Transformer extends PipelineStage with Params {
* @return transformed dataset
*/
def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD

// Java-friendly versions of transform.

/**
* Transforms the dataset with optional parameters.
* @param dataset input datset
* @param paramPairs optional list of param pairs, overwrite embedded params
* @return transformed dataset
*/
@varargs
def transform(dataset: JavaSchemaRDD, paramPairs: ParamPair[_]*): JavaSchemaRDD = {
transform(dataset.schemaRDD, paramPairs: _*).toJavaSchemaRDD
}

/**
* Transforms the dataset with provided parameter map as additional parameters.
* @param dataset input dataset
* @param paramMap additional parameters, overwrite embedded params
* @return transformed dataset
*/
def transform(dataset: JavaSchemaRDD, paramMap: ParamMap): JavaSchemaRDD = {
transform(dataset.schemaRDD, paramMap).toJavaSchemaRDD
}
}

/**
Expand Down
17 changes: 8 additions & 9 deletions mllib/src/test/java/org/apache/spark/ml/JavaPipelineSuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,23 @@
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import static org.apache.spark.mllib.classification.LogisticRegressionSuite
.generateLogisticInputAsList;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.SQLContext;
import static org.apache.spark.mllib.classification.LogisticRegressionSuite.generateLogisticInputAsList;

/**
* Test Pipeline construction and fitting in Java.
*/
public class JavaPipelineSuite {

private transient JavaSparkContext jsc;
private transient JavaSQLContext jsql;
private transient JavaSchemaRDD dataset;
private transient SQLContext jsql;
private transient SchemaRDD dataset;

@Before
public void setUp() {
jsc = new JavaSparkContext("local", "JavaPipelineSuite");
jsql = new JavaSQLContext(jsc);
jsql = new SQLContext(jsc);
JavaRDD<LabeledPoint> points =
jsc.parallelize(generateLogisticInputAsList(1.0, 1.0, 100, 42), 2);
dataset = jsql.applySchema(points, LabeledPoint.class);
Expand All @@ -66,7 +65,7 @@ public void pipeline() {
.setStages(new PipelineStage[] {scaler, lr});
PipelineModel model = pipeline.fit(dataset);
model.transform(dataset).registerTempTable("prediction");
JavaSchemaRDD predictions = jsql.sql("SELECT label, score, prediction FROM prediction");
predictions.collect();
SchemaRDD predictions = jsql.sql("SELECT label, score, prediction FROM prediction");
predictions.collectAsList();
}
}
Loading