Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import java.util.Properties;

// $example on:basic_parquet_example$
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
// $example on:schema_merging$
Expand Down Expand Up @@ -217,12 +215,11 @@ private static void runJsonDatasetExample(SparkSession spark) {
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
// an Dataset[String] storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD =
new JavaSparkContext(spark.sparkContext()).parallelize(jsonData);
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleRDD);
Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
anotherPeople.show();
// +---------------+----+
// | address|name|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ object SQLDataSourceExample {

private def runJsonDatasetExample(spark: SparkSession): Unit = {
// $example on:json_dataset$
// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
Expand All @@ -135,10 +139,10 @@ object SQLDataSourceExample {
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string
val otherPeopleRDD = spark.sparkContext.makeRDD(
// an Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleRDD)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// | address|name|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
Expand Down Expand Up @@ -146,13 +147,13 @@ public void dataFrameRDDOperations() {

@Test
public void applySchemaToJSON() {
JavaRDD<String> jsonRDD = jsc.parallelize(Arrays.asList(
Dataset<String> jsonDS = spark.createDataset(Arrays.asList(
"{\"string\":\"this is a simple string.\", \"integer\":10, \"long\":21474836470, " +
"\"bigInteger\":92233720368547758070, \"double\":1.7976931348623157E308, " +
"\"boolean\":true, \"null\":null}",
"{\"string\":\"this is another simple string.\", \"integer\":11, \"long\":21474836469, " +
"\"bigInteger\":92233720368547758069, \"double\":1.7976931348623157E305, " +
"\"boolean\":false, \"null\":null}"));
"\"boolean\":false, \"null\":null}"), Encoders.STRING());
List<StructField> fields = new ArrayList<>(7);
fields.add(DataTypes.createStructField("bigInteger", DataTypes.createDecimalType(20, 0),
true));
Expand Down Expand Up @@ -183,14 +184,14 @@ public void applySchemaToJSON() {
null,
"this is another simple string."));

Dataset<Row> df1 = spark.read().json(jsonRDD);
Dataset<Row> df1 = spark.read().json(jsonDS);
StructType actualSchema1 = df1.schema();
Assert.assertEquals(expectedSchema, actualSchema1);
df1.createOrReplaceTempView("jsonTable1");
List<Row> actual1 = spark.sql("select * from jsonTable1").collectAsList();
Assert.assertEquals(expectedResult, actual1);

Dataset<Row> df2 = spark.read().schema(expectedSchema).json(jsonRDD);
Dataset<Row> df2 = spark.read().schema(expectedSchema).json(jsonDS);
StructType actualSchema2 = df2.schema();
Assert.assertEquals(expectedSchema, actualSchema2);
df2.createOrReplaceTempView("jsonTable2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,4 +414,13 @@ public void testBeanWithoutGetter() {
Assert.assertEquals(df.schema().length(), 0);
Assert.assertEquals(df.collectAsList().size(), 1);
}

@Test
public void testJsonRDDToDataFrame() {
// This is a test for the deprecated API in SPARK-15615.
JavaRDD<String> rdd = jsc.parallelize(Arrays.asList("{\"a\": 2}"));
Dataset<Row> df = spark.read().json(rdd);
Assert.assertEquals(1L, df.count());
Assert.assertEquals(2L, df.collectAsList().get(0).getLong(0));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.junit.Before;
import org.junit.Test;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
Expand All @@ -40,7 +38,6 @@
public class JavaSaveLoadSuite {

private transient SparkSession spark;
private transient JavaSparkContext jsc;

File path;
Dataset<Row> df;
Expand All @@ -58,7 +55,6 @@ public void setUp() throws IOException {
.master("local[*]")
.appName("testing")
.getOrCreate();
jsc = new JavaSparkContext(spark.sparkContext());

path =
Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile();
Expand All @@ -70,8 +66,8 @@ public void setUp() throws IOException {
for (int i = 0; i < 10; i++) {
jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
}
JavaRDD<String> rdd = jsc.parallelize(jsonObjects);
df = spark.read().json(rdd);
Dataset<String> ds = spark.createDataset(jsonObjects, Encoders.STRING());
df = spark.read().json(ds);
df.createOrReplaceTempView("jsonTable");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,15 +914,13 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}

test("SPARK-7551: support backticks for DataFrame attribute resolution") {
val df = spark.read.json(sparkContext.makeRDD(
"""{"a.b": {"c": {"d..e": {"f": 1}}}}""" :: Nil))
val df = spark.read.json(Seq("""{"a.b": {"c": {"d..e": {"f": 1}}}}""").toDS())
checkAnswer(
df.select(df("`a.b`.c.`d..e`.`f`")),
Row(1)
)

val df2 = spark.read.json(sparkContext.makeRDD(
"""{"a b": {"c": {"d e": {"f": 1}}}}""" :: Nil))
val df2 = spark.read.json(Seq("""{"a b": {"c": {"d e": {"f": 1}}}}""").toDS())
checkAnswer(
df2.select(df2("`a b`.c.d e.f")),
Row(1)
Expand Down Expand Up @@ -1110,8 +1108,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}

test("SPARK-9323: DataFrame.orderBy should support nested column name") {
val df = spark.read.json(sparkContext.makeRDD(
"""{"a": {"b": 1}}""" :: Nil))
val df = spark.read.json(Seq("""{"a": {"b": 1}}""").toDS())
checkAnswer(df.orderBy("a.b"), Row(Row(1)))
}

Expand Down Expand Up @@ -1164,8 +1161,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}

test("SPARK-10316: respect non-deterministic expressions in PhysicalOperation") {
val input = spark.read.json(spark.sparkContext.makeRDD(
(1 to 10).map(i => s"""{"id": $i}""")))
val input = spark.read.json((1 to 10).map(i => s"""{"id": $i}""").toDS())

val df = input.select($"id", rand(0).as('r))
df.as("a").join(df.filter($"r" < 0.5).as("b"), $"a.id" === $"b.id").collect().foreach { row =>
Expand Down
41 changes: 19 additions & 22 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}

test("grouping on nested fields") {
spark.read.json(sparkContext.parallelize(
"""{"nested": {"attribute": 1}, "value": 2}""" :: Nil))
spark.read
.json(Seq("""{"nested": {"attribute": 1}, "value": 2}""").toDS())
.createOrReplaceTempView("rows")

checkAnswer(
Expand All @@ -229,9 +229,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}

test("SPARK-6201 IN type conversion") {
spark.read.json(
sparkContext.parallelize(
Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}")))
spark.read
.json(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}").toDS())
.createOrReplaceTempView("d")

checkAnswer(
Expand All @@ -240,9 +239,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}

test("SPARK-11226 Skip empty line in json file") {
spark.read.json(
sparkContext.parallelize(
Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", "")))
spark.read
.json(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", "").toDS())
.createOrReplaceTempView("d")

checkAnswer(
Expand Down Expand Up @@ -1214,8 +1212,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}

test("SPARK-3483 Special chars in column names") {
val data = sparkContext.parallelize(
Seq("""{"key?number1": "value1", "key.number2": "value2"}"""))
val data = Seq("""{"key?number1": "value1", "key.number2": "value2"}""").toDS()
spark.read.json(data).createOrReplaceTempView("records")
sql("SELECT `key?number1`, `key.number2` FROM records")
}
Expand Down Expand Up @@ -1257,13 +1254,13 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}

test("SPARK-4322 Grouping field with struct field as sub expression") {
spark.read.json(sparkContext.makeRDD("""{"a": {"b": [{"c": 1}]}}""" :: Nil))
spark.read.json(Seq("""{"a": {"b": [{"c": 1}]}}""").toDS())
.createOrReplaceTempView("data")
checkAnswer(sql("SELECT a.b[0].c FROM data GROUP BY a.b[0].c"), Row(1))
spark.catalog.dropTempView("data")

spark.read.json(
sparkContext.makeRDD("""{"a": {"b": 1}}""" :: Nil)).createOrReplaceTempView("data")
spark.read.json(Seq("""{"a": {"b": 1}}""").toDS())
.createOrReplaceTempView("data")
checkAnswer(sql("SELECT a.b + 1 FROM data GROUP BY a.b + 1"), Row(2))
spark.catalog.dropTempView("data")
}
Expand Down Expand Up @@ -1311,8 +1308,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}

test("SPARK-6145: ORDER BY test for nested fields") {
spark.read.json(sparkContext.makeRDD(
"""{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""" :: Nil))
spark.read
.json(Seq("""{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""").toDS())
.createOrReplaceTempView("nestedOrder")

checkAnswer(sql("SELECT 1 FROM nestedOrder ORDER BY a.b"), Row(1))
Expand All @@ -1325,16 +1322,16 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {

test("SPARK-6145: special cases") {
spark.read
.json(sparkContext.makeRDD("""{"a": {"b": [1]}, "b": [{"a": 1}], "_c0": {"a": 1}}""" :: Nil))
.json(Seq("""{"a": {"b": [1]}, "b": [{"a": 1}], "_c0": {"a": 1}}""").toDS())
.createOrReplaceTempView("t")

checkAnswer(sql("SELECT a.b[0] FROM t ORDER BY _c0.a"), Row(1))
checkAnswer(sql("SELECT b[0].a FROM t ORDER BY _c0.a"), Row(1))
}

test("SPARK-6898: complete support for special chars in column names") {
spark.read.json(sparkContext.makeRDD(
"""{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""" :: Nil))
spark.read
.json(Seq("""{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""").toDS())
.createOrReplaceTempView("t")

checkAnswer(sql("SELECT a.`c.b`, `b.$q`[0].`a@!.q`, `q.w`.`w.i&`[0] FROM t"), Row(1, 1, 1))
Expand Down Expand Up @@ -1437,8 +1434,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {

test("SPARK-7067: order by queries for complex ExtractValue chain") {
withTempView("t") {
spark.read.json(sparkContext.makeRDD(
"""{"a": {"b": [{"c": 1}]}, "b": [{"d": 1}]}""" :: Nil)).createOrReplaceTempView("t")
spark.read
.json(Seq("""{"a": {"b": [{"c": 1}]}, "b": [{"d": 1}]}""").toDS())
.createOrReplaceTempView("t")
checkAnswer(sql("SELECT a.b FROM t ORDER BY b[0].d"), Row(Seq(Row(1))))
}
}
Expand Down Expand Up @@ -2109,8 +2107,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
|"a": [{"count": 3}], "b": [{"e": "test", "count": 1}]}}}'
|
""".stripMargin
val rdd = sparkContext.parallelize(Array(json))
spark.read.json(rdd).write.mode("overwrite").parquet(dir.toString)
spark.read.json(Seq(json).toDS()).write.mode("overwrite").parquet(dir.toString)
spark.read.parquet(dir.toString).collect()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext with ParquetT
StructField("vec", new UDT.MyDenseVectorUDT, false)
))

val stringRDD = sparkContext.parallelize(data)
val jsonRDD = spark.read.schema(schema).json(stringRDD)
val jsonRDD = spark.read.schema(schema).json(data.toDS())
checkAnswer(
jsonRDD,
Row(1, new UDT.MyDenseVector(Array(1.1, 2.2, 3.3, 4.4))) ::
Expand All @@ -242,8 +241,7 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext with ParquetT
StructField("vec", new UDT.MyDenseVectorUDT, false)
))

val stringRDD = sparkContext.parallelize(data)
val jsonDataset = spark.read.schema(schema).json(stringRDD)
val jsonDataset = spark.read.schema(schema).json(data.toDS())
.as[(Int, UDT.MyDenseVector)]
checkDataset(
jsonDataset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach {
}
datum += "}"
datum = s"""{"a": {"b": {"c": $datum, "d": $datum}, "e": $datum}}"""
val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache()
val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache()
df.count() // force caching
addCases(benchmark, df, s"$width wide x $numRows rows", "a.b.c.value_1")
}
Expand All @@ -157,7 +157,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach {
datum = "{\"value\": " + datum + "}"
selector = selector + ".value"
}
val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache()
val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache()
df.count() // force caching
addCases(benchmark, df, s"$depth deep x $numRows rows", selector)
}
Expand All @@ -180,7 +180,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach {
}
// TODO(ekl) seems like the json parsing is actually the majority of the time, perhaps
// we should benchmark that too separately.
val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache()
val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache()
df.count() // force caching
addCases(benchmark, df, s"$numNodes x $depth deep x $numRows rows", selector)
}
Expand All @@ -200,7 +200,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach {
}
}
datum += "]}"
val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache()
val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache()
df.count() // force caching
addCases(benchmark, df, s"$width wide x $numRows rows", "value[0]")
}
Expand Down
Loading