diff --git a/spark-sql-application/README.md b/spark-sql-application/README.md index b0505282aba..6422f294cd3 100644 --- a/spark-sql-application/README.md +++ b/spark-sql-application/README.md @@ -3,6 +3,7 @@ This application execute sql query and store the result in OpenSearch index in following format ``` "stepId":"", +"applicationId":"" "schema": "json blob", "result": "json blob" ``` @@ -61,7 +62,8 @@ OpenSearch index document will look like "{'column_name':'Letter','data_type':'string'}", "{'column_name':'Number','data_type':'integer'}" ], - "stepId" : "s-JZSB1139WIVU" + "stepId" : "s-JZSB1139WIVU", + "applicationId" : "application_1687726870985_0003" } } ``` diff --git a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala index f2dd0c869c2..04fa92b25bb 100644 --- a/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala +++ b/spark-sql-application/src/main/scala/org/opensearch/sql/SQLJob.scala @@ -84,13 +84,15 @@ object SQLJob { val schema = StructType(Seq( StructField("result", ArrayType(StringType, containsNull = true), nullable = true), StructField("schema", ArrayType(StringType, containsNull = true), nullable = true), - StructField("stepId", StringType, nullable = true))) + StructField("stepId", StringType, nullable = true), + StructField("applicationId", StringType, nullable = true))) // Create the data rows val rows = Seq(( result.toJSON.collect.toList.map(_.replaceAll("\"", "'")), resultSchema.toJSON.collect.toList.map(_.replaceAll("\"", "'")), - sys.env.getOrElse("EMR_STEP_ID", ""))) + sys.env.getOrElse("EMR_STEP_ID", "unknown"), + spark.sparkContext.applicationId)) // Create the DataFrame for data spark.createDataFrame(rows).toDF(schema.fields.map(_.name): _*) diff --git a/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala b/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala index 2cdb06d6caf..7ec4e454506 100644 --- a/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala +++ b/spark-sql-application/src/test/scala/org/opensearch/sql/SQLJobTest.scala @@ -31,13 +31,15 @@ class SQLJobTest extends AnyFunSuite{ val expectedSchema = StructType(Seq( StructField("result", ArrayType(StringType, containsNull = true), nullable = true), StructField("schema", ArrayType(StringType, containsNull = true), nullable = true), - StructField("stepId", StringType, nullable = true) + StructField("stepId", StringType, nullable = true), + StructField("applicationId", StringType, nullable = true) )) val expectedRows = Seq( Row( Array("{'Letter':'A','Number':1}","{'Letter':'B','Number':2}", "{'Letter':'C','Number':3}"), Array("{'column_name':'Letter','data_type':'string'}", "{'column_name':'Number','data_type':'integer'}"), - "" + "unknown", + spark.sparkContext.applicationId ) ) val expected: DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(expectedRows), expectedSchema)