Skip to content

Commit

Permalink
[Spark] Test special characters for temp folders used by DV tests (#2726
Browse files Browse the repository at this point in the history
)

#### Which Delta project/connector is this regarding?

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

This PR follows #2696 and
#2719 to finally enable testing
special characters in all DV tests.

One test is currently failing due to a potential bug in the `OPTIMIZE`
code path, which is pending investigation.

## How was this patch tested?

Test-only.

## Does this PR introduce _any_ user-facing changes?

No.
  • Loading branch information
xupefei authored Mar 11, 2024
1 parent 86e3b2c commit 9b02ec7
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ case class DeletionVectorDescriptor(
* The deletion vector is stored inline in the log.
* - `storageType="p"` format: `<absolute path>`
* The DV is stored in a file with an absolute path given by this
* url.
* url. Special characters in this path must be escaped.
*/
pathOrInlineDv: String,
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,17 @@ trait DeltaCommand extends DeltaLogging {
* Find the AddFile record corresponding to the file that was read as part of a
* delete/update/merge operation.
*
* @param filePath The path to a file. Can be either absolute or relative
* @param nameToAddFileMap Map generated through `generateCandidateFileMap()`
* @param basePath The path of the table. Must not be escaped.
* @param escapedFilePath The path to a file that can be either absolute or relative. All special
* chars in this path must be already escaped by URI standards.
* @param nameToAddFileMap Map generated through `generateCandidateFileMap()`.
*/
def getTouchedFile(
basePath: Path,
filePath: String,
escapedFilePath: String,
nameToAddFileMap: Map[String, AddFile]): AddFile = {
val absolutePath = DeltaFileOperations.absolutePath(basePath.toString, filePath).toString
val absolutePath =
DeltaFileOperations.absolutePath(basePath.toString, escapedFilePath).toString
nameToAddFileMap.getOrElse(absolutePath, {
throw DeltaErrors.notFoundFileToBeRewritten(absolutePath, nameToAddFileMap.keys)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.{Column, DataFrame, Dataset, Encoder, SparkSession}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
Expand Down Expand Up @@ -116,15 +117,17 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
val relativizeIgnoreError =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_VACUUM_RELATIVIZE_IGNORE_ERROR)

val canonicalizedBasePath = SparkPath.fromPathString(basePath).urlEncoded
snapshot.stateDS.mapPartitions { actions =>
val reservoirBase = new Path(basePath)
val fs = reservoirBase.getFileSystem(hadoopConf.value.value)
actions.flatMap {
_.unwrap match {
case fa: FileAction if checkAbsolutePathOnly && !fa.path.contains(basePath) =>
Nil
case tombstone: RemoveFile if tombstone.delTimestamp < deleteBeforeTimestamp =>
Nil
// Existing tables may not store canonicalized paths, so we check both the canonicalized
// and non-canonicalized paths to ensure we don't accidentally delete wrong files.
case fa: FileAction if checkAbsolutePathOnly &&
!fa.path.contains(basePath) && !fa.path.contains(canonicalizedBasePath) => Nil
case tombstone: RemoveFile if tombstone.delTimestamp < deleteBeforeTimestamp => Nil
case fa: FileAction =>
getValidRelativePathsAndSubdirs(
fa,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,10 +458,8 @@ trait CloneTableSuiteBase extends QueryTest
val df1 = Seq(1, 2, 3, 4, 5).toDF("id")
df1.write.format("delta").mode("append").save(source)

val baseS3 = new URI("s3", null, source, null, null).toString

runAndValidateClone(
baseS3,
s"s3:$source",
s"file:$clone"
)()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.delta.actions.{Action, AddFile, DeletionVectorDescri
import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray, RoaringBitmapArrayFormat}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore
import org.apache.spark.sql.delta.test.DeltaSQLTestUtils
import org.apache.spark.sql.delta.util.PathWithFileSystem
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path
Expand All @@ -33,7 +34,7 @@ import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.test.SharedSparkSession

/** Collection of test utilities related with persistent Deletion Vectors. */
trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession {
trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession with DeltaSQLTestUtils {

def enableDeletionVectors(
spark: SparkSession,
Expand Down Expand Up @@ -99,6 +100,16 @@ trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession {
}
}

/** Create a temp path which contains special characters. */
override def withTempPath(f: File => Unit): Unit = {
super.withTempPath(prefix = "s p a r k %2a")(f)
}

/** Create a temp path which contains special characters. */
override protected def withTempDir(f: File => Unit): Unit = {
super.withTempDir(prefix = "s p a r k %2a")(f)
}

/** Helper that verifies whether a defined number of DVs exist */
def verifyDVsExist(targetLog: DeltaLog, filesWithDVsSize: Int): Unit = {
val filesWithDVs = getFilesWithDeletionVectors(targetLog)
Expand Down Expand Up @@ -128,8 +139,7 @@ trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession {

// Check that DV exists.
val dvPath = dv.absolutePath(tablePath)
val dvPathStr = DeletionVectorStore.pathToEscapedString(dvPath)
assert(new File(dvPathStr).exists(), s"DV not found $dvPath")
assert(new File(dvPath.toString).exists(), s"DV not found $dvPath")

// Check that cardinality is correct.
val bitmap = newDVStore.read(dvPath, dv.offset.get, dv.sizeInBytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1957,7 +1957,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase
}

test("fail on data loss - starting from missing files") {
withTempDirs { case (srcData, targetData, chkLocation) =>
withTempDirs { (srcData, targetData, chkLocation) =>
def addData(): Unit = {
spark.range(10).write.format("delta").mode("append").save(srcData.getCanonicalPath)
}
Expand Down Expand Up @@ -1992,7 +1992,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase
}

test("fail on data loss - gaps of files") {
withTempDirs { case (srcData, targetData, chkLocation) =>
withTempDirs { (srcData, targetData, chkLocation) =>
def addData(): Unit = {
spark.range(10).write.format("delta").mode("append").save(srcData.getCanonicalPath)
}
Expand Down Expand Up @@ -2027,7 +2027,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase
}

test("fail on data loss - starting from missing files with option off") {
withTempDirs { case (srcData, targetData, chkLocation) =>
withTempDirs { (srcData, targetData, chkLocation) =>
def addData(): Unit = {
spark.range(10).write.format("delta").mode("append").save(srcData.getCanonicalPath)
}
Expand Down Expand Up @@ -2063,7 +2063,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase
}

test("fail on data loss - gaps of files with option off") {
withTempDirs { case (srcData, targetData, chkLocation) =>
withTempDirs { (srcData, targetData, chkLocation) =>
def addData(): Unit = {
spark.range(10).write.format("delta").mode("append").save(srcData.getCanonicalPath)
}
Expand Down Expand Up @@ -2352,7 +2352,7 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase

test("handling nullability schema changes") {
withTable("srcTable") {
withTempDirs { case (srcTblDir, checkpointDir, checkpointDir2) =>
withTempDirs { (srcTblDir, checkpointDir, checkpointDir2) =>
def readStream(startingVersion: Option[Long] = None): DataFrame = {
var dsr = spark.readStream
startingVersion.foreach { v =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import java.io.File

import org.apache.spark.sql.delta.actions.Format
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.test.DeltaSQLTestUtils

import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.types.StructType

trait DeltaSourceSuiteBase extends StreamTest {
trait DeltaSourceSuiteBase extends StreamTest
with DeltaSQLTestUtils {

/**
* Creates 3 temporary directories for use within a function.
Expand All @@ -41,6 +43,20 @@ trait DeltaSourceSuiteBase extends StreamTest {
}
}

/**
* Creates 3 temporary directories for use within a function using a given prefix.
* @param f function to be run with created temp directories
*/
protected def withTempDirs(prefix: String)(f: (File, File, File) => Unit): Unit = {
withTempDir(prefix) { file1 =>
withTempDir(prefix) { file2 =>
withTempDir(prefix) { file3 =>
f(file1, file2, file3)
}
}
}
}

/**
* Copy metadata for fields in newSchema from currentSchema
* @param newSchema new schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,19 @@ trait DeltaVacuumSuiteBase extends QueryTest
with DeletionVectorsTestUtils
with DeltaTestUtilsForTempViews {

protected def withEnvironment(f: (File, ManualClock) => Unit): Unit = {
withTempDir { file =>
val clock = new ManualClock()
withSQLConf("spark.databricks.delta.retentionDurationCheck.enabled" -> "false") {
f(file, clock)
}
private def executeWithEnvironment(file: File)(f: (File, ManualClock) => Unit): Unit = {
val clock = new ManualClock()
withSQLConf(DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED.key -> "false") {
f(file, clock)
}
}

protected def withEnvironment(f: (File, ManualClock) => Unit): Unit =
withTempDir(file => executeWithEnvironment(file)(f))

protected def withEnvironment(prefix: String)(f: (File, ManualClock) => Unit): Unit =
withTempDir(prefix)(file => executeWithEnvironment(file)(f))

protected def defaultTombstoneInterval: Long = {
DeltaConfigs.getMilliSeconds(
IntervalUtils.safeStringToInterval(
Expand Down Expand Up @@ -798,8 +802,10 @@ class DeltaVacuumSuite
}
}

// TODO: There is somewhere in the code calling CanonicalPathFunction with an unescaped path
// string, which needs investigation. Do not test special characters until that is fixed.
testQuietly("correctness test") {
withEnvironment { (tempDir, clock) =>
withEnvironment(prefix = "spark") { (tempDir, clock) =>

val reservoirDir = new File(tempDir.getAbsolutePath, "reservoir")
assert(reservoirDir.mkdirs())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,9 @@ class DeletionVectorsSuite extends QueryTest
}

test("absolute DV path with encoded special characters") {
withTempDir { dir =>
// This test uses hand-crafted path with special characters.
// Do not test with a prefix that needs URL standard escaping.
withTempDir(prefix = "spark") { dir =>
writeTableHavingSpecialCharInDVPath(dir, pathIsEncoded = true)
checkAnswer(
spark.read.format("delta").load(dir.getCanonicalPath),
Expand All @@ -693,7 +695,9 @@ class DeletionVectorsSuite extends QueryTest
}

test("absolute DV path with not-encoded special characters") {
withTempDir { dir =>
// This test uses hand-crafted path with special characters.
// Do not test with a prefix that needs URL standard escaping.
withTempDir(prefix = "spark") { dir =>
writeTableHavingSpecialCharInDVPath(dir, pathIsEncoded = false)
val e = intercept[SparkException] {
spark.read.format("delta").load(dir.getCanonicalPath).collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,22 @@ import org.apache.spark.util.Utils

trait DeltaSQLTestUtils extends SQLTestUtils {
/**
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
* returns.
* Generate a temporary directory path without creating the actual directory, which is then
* passed to `f` and will be deleted after `f` returns.
*
* This method is copied over from [[SQLTestUtils]] of Apache Spark.
*/
def withTempDir(prefix: String)(f: File => Unit): Unit = {
val path = Utils.createTempDir(namePrefix = prefix)
// delete the auto-created directory, otherwise some Delta tests will fail
// with PATH_ALREADY_EXISTS error.
path.delete()
try f(path) finally Utils.deleteRecursively(path)
}

/**
* Creates a temporary directory, which is then passed to `f` and will be deleted after `f`
* returns.
* Generate a temporary directory path without creating the actual directory, which is then
* passed to `f` and will be deleted after `f` returns.
*
* This method is copied over from [[SQLTestUtils]] of Apache Spark.
*/
Expand Down

0 comments on commit 9b02ec7

Please sign in to comment.