-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdata_source_txt.scala
85 lines (66 loc) · 2.09 KB
/
data_source_txt.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/**
* spark.read 读写csv, json, text
* DataFrameWriter
* DataFrameReader
*/
package spark
import java.io.File
import org.apache.spark.sql.streaming.DataStreamReader
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrameReader, DataFrameWriter, Dataset, SparkSession}
object data_source_txt {
val spark: SparkSession = SparkSession.builder.master("local[4]").getOrCreate()
import spark.implicits._
// 这里可以想一下嵌套的schema该怎么写
val schema = new StructType()
.add("name", "string", false)
.add("age", "int", false)
.add("education", "string", nullable = true)
.add("skill", "string")
def readTxtExample() = {
val reader: DataFrameReader = spark.read
reader.json("./src/main/scala/resource/people.json").show()
reader.csv("./src/main/scala/resource/people.csv").toDF()
reader.textFile("Readme.md").toDF()
reader
.format("csv")
.option("header", true) // 一些很有用的option选项,读取时的部分configure
.option("delimiter", "|")
.schema(schema) // 按照Schema 读取
.load("./src/main/scala/resource/people.csv")
.show()
}
def writeTxtExample() = {
val ints: Dataset[Int] = (0 to 5).toDS.repartition(1)
val writer: DataFrameWriter[Int] = ints.write
val fileExists: File => Boolean = file => file.exists()
val path = "./src/main/scala/resource/ints.json"
val outputFile = new File(path)
if (fileExists(outputFile))
dirDel(outputFile)
writer.json(path)
dirDel(outputFile)
writer.format("json").save(path)
}
def streamExample() = {
val stream: DataStreamReader = spark.readStream
val paper = spark.readStream.text("Readme.md").as[String]
val streamWriter = paper.writeStream
}
def dirDel(path: File) {
if (!path.exists())
return
else {
path.isFile match {
case true => path.delete()
case false => path.listFiles().foreach(dirDel)
}
}
path.delete()
}
def main(args: Array[String]): Unit = {
readTxtExample()
writeTxtExample()
streamExample()
}
}