Skip to content

Commit 86f76a1

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-7613] Add persistWithTempTable to Dataset to replace cache (apache#55)
1 parent 6fc9de8 commit 86f76a1

File tree

2 files changed

+35
-2
lines changed

2 files changed

+35
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.spark.sql
1919

20-
import java.io.{ByteArrayOutputStream, CharArrayWriter, DataOutputStream}
20+
import java.io.{ByteArrayOutputStream, CharArrayWriter, DataOutputStream, File}
21+
import java.util.UUID
2122

2223
import scala.annotation.varargs
2324
import scala.collection.JavaConverters._
@@ -39,7 +40,7 @@ import org.apache.spark.rdd.RDD
3940
import org.apache.spark.scheduler.RepeatableIterator
4041
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, QueryPlanningTracker, ScalaReflection, TableIdentifier}
4142
import org.apache.spark.sql.catalyst.analysis._
42-
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
43+
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, HiveTableRelation}
4344
import org.apache.spark.sql.catalyst.encoders._
4445
import org.apache.spark.sql.catalyst.expressions._
4546
import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JSONOptions}
@@ -3798,6 +3799,30 @@ class Dataset[T] private[sql](
37983799
this
37993800
}
38003801

3802+
def persistWithTempTable(): DataFrame = {
3803+
persistWithTempTable(UUID.randomUUID().toString.replace('-', '_'))
3804+
}
3805+
3806+
def persistWithTempTable(tableName: String): DataFrame = sparkSession.withActive {
3807+
val sessionPath = sparkSession.sessionState.catalog.sessionPath
3808+
val table =
3809+
CatalogTable(
3810+
TableIdentifier(tableName),
3811+
CatalogTableType.TEMPORARY,
3812+
CatalogStorageFormat.empty.copy(
3813+
locationUri = Some(sessionPath.suffix(File.separator + tableName).toUri)),
3814+
StructType(Nil),
3815+
provider = Some(sparkSession.sessionState.conf.defaultDataSourceName))
3816+
3817+
CreateDataSourceTableAsSelectCommand(
3818+
table,
3819+
SaveMode.ErrorIfExists,
3820+
logicalPlan,
3821+
logicalPlan.output.map(_.name)).run(sparkSession)
3822+
3823+
sparkSession.table(tableName)
3824+
}
3825+
38013826
/**
38023827
* Persist this Dataset with the default storage level (`MEMORY_AND_DISK`).
38033828
*

sql/core/src/test/scala/org/apache/spark/sql/TemporaryTableSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
2323
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
2424
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
2525
import org.apache.spark.sql.catalyst.parser.ParseException
26+
import org.apache.spark.sql.execution.FileSourceScanExec
2627
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
2728
import org.apache.spark.sql.test.SharedSparkSession
2829

@@ -347,4 +348,11 @@ class TemporaryTableSuite extends QueryTest
347348
testUnsupportedCommand("ALTER TABLE t1 CHANGE COLUMN id TYPE bigint")
348349
testUnsupportedCommand("ALTER TABLE t1 SET SERDE 'whatever'")
349350
}
351+
352+
test("HADP-51648: persistToTempTable API") {
353+
val df = spark.range(10).persistWithTempTable()
354+
val location = df.queryExecution.sparkPlan.asInstanceOf[FileSourceScanExec].relation.location
355+
assert(location.rootPaths.head.toString.contains(sparkScratch))
356+
checkAnswer(df, spark.range(10).toDF())
357+
}
350358
}

0 commit comments

Comments
 (0)