Skip to content
4 changes: 3 additions & 1 deletion spark-sql-application/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
This application execute sql query and store the result in OpenSearch index in following format
```
"stepId":"<emr-step-id>",
"applicationId":"<spark-application-id>"
"schema": "json blob",
"result": "json blob"
```
Expand Down Expand Up @@ -61,7 +62,8 @@ OpenSearch index document will look like
"{'column_name':'Letter','data_type':'string'}",
"{'column_name':'Number','data_type':'integer'}"
],
"stepId" : "s-JZSB1139WIVU"
"stepId" : "s-JZSB1139WIVU",
"applicationId" : "application_1687726870985_0003"
}
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,15 @@ object SQLJob {
val schema = StructType(Seq(
StructField("result", ArrayType(StringType, containsNull = true), nullable = true),
StructField("schema", ArrayType(StringType, containsNull = true), nullable = true),
StructField("stepId", StringType, nullable = true)))
StructField("stepId", StringType, nullable = true),
StructField("applicationId", StringType, nullable = true)))

// Create the data rows
val rows = Seq((
result.toJSON.collect.toList.map(_.replaceAll("\"", "'")),
resultSchema.toJSON.collect.toList.map(_.replaceAll("\"", "'")),
sys.env.getOrElse("EMR_STEP_ID", "")))
sys.env.getOrElse("EMR_STEP_ID", "unknown"),
spark.sparkContext.applicationId))

// Create the DataFrame for data
spark.createDataFrame(rows).toDF(schema.fields.map(_.name): _*)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ class SQLJobTest extends AnyFunSuite{
val expectedSchema = StructType(Seq(
StructField("result", ArrayType(StringType, containsNull = true), nullable = true),
StructField("schema", ArrayType(StringType, containsNull = true), nullable = true),
StructField("stepId", StringType, nullable = true)
StructField("stepId", StringType, nullable = true),
StructField("applicationId", StringType, nullable = true)
))
val expectedRows = Seq(
Row(
Array("{'Letter':'A','Number':1}","{'Letter':'B','Number':2}", "{'Letter':'C','Number':3}"),
Array("{'column_name':'Letter','data_type':'string'}", "{'column_name':'Number','data_type':'integer'}"),
""
"unknown",
spark.sparkContext.applicationId
)
)
val expected: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(expectedRows), expectedSchema)
Expand Down