Skip to content

Commit 04e61c5

Browse files
committed
Merge branch 'master' into SPARK-34581-keep-grouping-expressions
# Conflicts: # sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
2 parents 3de19ca + 93a5d34 commit 04e61c5

File tree

102 files changed

+2320
-1568
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

102 files changed

+2320
-1568
lines changed

core/src/main/java/org/apache/spark/memory/MemoryConsumer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize, Mem
4040
this.mode = mode;
4141
}
4242

43-
protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
44-
this(taskMemoryManager, taskMemoryManager.pageSizeBytes(), MemoryMode.ON_HEAP);
43+
protected MemoryConsumer(TaskMemoryManager taskMemoryManager, MemoryMode mode) {
44+
this(taskMemoryManager, taskMemoryManager.pageSizeBytes(), mode);
4545
}
4646

4747
/**

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -172,18 +172,18 @@ private[spark] class ContextCleaner(
172172
registerForCleanup(rdd, CleanCheckpoint(parentId))
173173
}
174174

175-
/** Register an object for cleanup. */
176-
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
177-
referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
178-
}
179-
180175
/** Register a SparkListener to be cleaned up when its owner is garbage collected. */
181176
def registerSparkListenerForCleanup(
182177
listenerOwner: AnyRef,
183178
listener: SparkListener): Unit = {
184179
registerForCleanup(listenerOwner, CleanSparkListener(listener))
185180
}
186181

182+
/** Register an object for cleanup. */
183+
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
184+
referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
185+
}
186+
187187
/** Keep cleaning RDD, shuffle, and broadcast state. */
188188
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
189189
while (!stopped) {

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1584,7 +1584,11 @@ class SparkContext(config: SparkConf) extends Logging {
15841584
path: String, recursive: Boolean, addedOnSubmit: Boolean, isArchive: Boolean = false
15851585
): Unit = {
15861586
val uri = if (!isArchive) {
1587-
new Path(path).toUri
1587+
if (Utils.isAbsoluteURI(path) && path.contains("%")) {
1588+
new URI(path)
1589+
} else {
1590+
new Path(path).toUri
1591+
}
15881592
} else {
15891593
Utils.resolveURI(path)
15901594
}
@@ -1619,10 +1623,8 @@ class SparkContext(config: SparkConf) extends Logging {
16191623
env.rpcEnv.fileServer.addFile(new File(uri.getPath))
16201624
} else if (uri.getScheme == null) {
16211625
schemeCorrectedURI.toString
1622-
} else if (isArchive) {
1623-
uri.toString
16241626
} else {
1625-
path
1627+
uri.toString
16261628
}
16271629

16281630
val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
@@ -1977,7 +1979,11 @@ class SparkContext(config: SparkConf) extends Logging {
19771979
// For local paths with backslashes on Windows, URI throws an exception
19781980
(addLocalJarFile(new File(path)), "local")
19791981
} else {
1980-
val uri = new Path(path).toUri
1982+
val uri = if (Utils.isAbsoluteURI(path) && path.contains("%")) {
1983+
new URI(path)
1984+
} else {
1985+
new Path(path).toUri
1986+
}
19811987
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
19821988
Utils.validateURL(uri)
19831989
val uriScheme = uri.getScheme

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ import org.apache.ivy.core.report.ResolveReport
4444
import org.apache.ivy.core.resolve.ResolveOptions
4545
import org.apache.ivy.core.retrieve.RetrieveOptions
4646
import org.apache.ivy.core.settings.IvySettings
47-
import org.apache.ivy.plugins.matcher.{GlobPatternMatcher, PatternMatcher}
47+
import org.apache.ivy.plugins.matcher.GlobPatternMatcher
4848
import org.apache.ivy.plugins.repository.file.FileRepository
4949
import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver}
5050

@@ -366,7 +366,6 @@ private[spark] class SparkSubmit extends Logging {
366366
args.pyFiles = Option(args.pyFiles).map(resolveGlobPaths(_, hadoopConf)).orNull
367367
args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull
368368

369-
lazy val secMgr = new SecurityManager(sparkConf)
370369

371370
// In client mode, download remote files.
372371
var localPrimaryResource: String = null
@@ -1153,8 +1152,6 @@ private[spark] object SparkSubmitUtils extends Logging {
11531152
// We need a chain resolver if we want to check multiple repositories
11541153
val cr = new ChainResolver
11551154
cr.setName("spark-list")
1156-
cr.setChangingMatcher(PatternMatcher.REGEXP)
1157-
cr.setChangingPattern(".*-SNAPSHOT")
11581155

11591156
val localM2 = new IBiblioResolver
11601157
localM2.setM2compatible(true)
@@ -1314,8 +1311,6 @@ private[spark] object SparkSubmitUtils extends Logging {
13141311
remoteRepos.filterNot(_.trim.isEmpty).map(_.split(",")).foreach { repositoryList =>
13151312
val cr = new ChainResolver
13161313
cr.setName("user-list")
1317-
cr.setChangingMatcher(PatternMatcher.REGEXP)
1318-
cr.setChangingPattern(".*-SNAPSHOT")
13191314

13201315
// add current default resolver, if any
13211316
Option(ivySettings.getDefaultResolver).foreach(cr.add)

core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ object DriverWrapper extends Logging {
7474

7575
private def setupDependencies(loader: MutableURLClassLoader, userJar: String): Unit = {
7676
val sparkConf = new SparkConf()
77-
val secMgr = new SecurityManager(sparkConf)
7877
val hadoopConf = SparkHadoopUtil.newConfiguration(sparkConf)
7978

8079
val ivyProperties = DependencyUtils.getIvyProperties()

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,7 +1037,7 @@ package object config {
10371037
.doc("When true, HadoopRDD/NewHadoopRDD will not create partitions for empty input splits.")
10381038
.version("2.3.0")
10391039
.booleanConf
1040-
.createWithDefault(false)
1040+
.createWithDefault(true)
10411041

10421042
private[spark] val SECRET_REDACTION_PATTERN =
10431043
ConfigBuilder("spark.redaction.regex")
@@ -1047,7 +1047,7 @@ package object config {
10471047
"like YARN and event logs.")
10481048
.version("2.1.2")
10491049
.regexConf
1050-
.createWithDefault("(?i)secret|password|token".r)
1050+
.createWithDefault("(?i)secret|password|token|access[.]key".r)
10511051

10521052
private[spark] val STRING_REDACTION_PATTERN =
10531053
ConfigBuilder("spark.redaction.string.regex")

core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,17 @@ private[spark] class BlockStoreShuffleReader[K, C](
5151
true
5252
}
5353
val useOldFetchProtocol = conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)
54+
// SPARK-34790: Fetching continuous blocks in batch is incompatible with io encryption.
55+
val ioEncryption = conf.get(config.IO_ENCRYPTION_ENABLED)
5456

5557
val doBatchFetch = shouldBatchFetch && serializerRelocatable &&
56-
(!compressed || codecConcatenation) && !useOldFetchProtocol
58+
(!compressed || codecConcatenation) && !useOldFetchProtocol && !ioEncryption
5759
if (shouldBatchFetch && !doBatchFetch) {
5860
logDebug("The feature tag of continuous shuffle block fetching is set to true, but " +
5961
"we can not enable the feature because other conditions are not satisfied. " +
6062
s"Shuffle compress: $compressed, serializer relocatable: $serializerRelocatable, " +
6163
s"codec concatenation: $codecConcatenation, use old shuffle fetch protocol: " +
62-
s"$useOldFetchProtocol.")
64+
s"$useOldFetchProtocol, io encryption: $ioEncryption.")
6365
}
6466
doBatchFetch
6567
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2063,6 +2063,17 @@ private[spark] object Utils extends Logging {
20632063
}
20642064
}
20652065

2066+
/** Check whether a path is an absolute URI. */
2067+
def isAbsoluteURI(path: String): Boolean = {
2068+
try {
2069+
val uri = new URI(path: String)
2070+
uri.isAbsolute
2071+
} catch {
2072+
case _: URISyntaxException =>
2073+
false
2074+
}
2075+
}
2076+
20662077
/** Return all non-local paths from a comma-separated list of paths. */
20672078
def nonLocalPaths(paths: String, testWindows: Boolean = false): Array[String] = {
20682079
val windows = isWindows || testWindows

core/src/main/scala/org/apache/spark/util/collection/Spillable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.memory.{MemoryConsumer, MemoryMode, TaskMemoryManager}
2727
* has been exceeded.
2828
*/
2929
private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager)
30-
extends MemoryConsumer(taskMemoryManager) with Logging {
30+
extends MemoryConsumer(taskMemoryManager, MemoryMode.ON_HEAP) with Logging {
3131
/**
3232
* Spills the current in-memory collection to disk, and releases the memory.
3333
*

core/src/test/scala/org/apache/spark/SparkContextSuite.scala

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1197,6 +1197,46 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
11971197
assert(sc.hadoopConfiguration.get(bufferKey).toInt === 65536,
11981198
"spark configs have higher priority than spark.hadoop configs")
11991199
}
1200+
1201+
test("SPARK-34225: addFile/addJar shouldn't further encode URI if a URI form string is passed") {
1202+
withTempDir { dir =>
1203+
val jar1 = File.createTempFile("testprefix", "test jar.jar", dir)
1204+
val jarUrl1 = jar1.toURI.toString
1205+
val file1 = File.createTempFile("testprefix", "test file.txt", dir)
1206+
val fileUrl1 = file1.toURI.toString
1207+
val jar2 = File.createTempFile("testprefix", "test %20jar.jar", dir)
1208+
val file2 = File.createTempFile("testprefix", "test %20file.txt", dir)
1209+
1210+
try {
1211+
sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local"))
1212+
sc.addJar(jarUrl1)
1213+
sc.addFile(fileUrl1)
1214+
sc.addJar(jar2.toString)
1215+
sc.addFile(file2.toString)
1216+
sc.parallelize(Array(1), 1).map { x =>
1217+
val gottenJar1 = new File(SparkFiles.get(jar1.getName))
1218+
if (!gottenJar1.exists()) {
1219+
throw new SparkException("file doesn't exist : " + jar1)
1220+
}
1221+
val gottenFile1 = new File(SparkFiles.get(file1.getName))
1222+
if (!gottenFile1.exists()) {
1223+
throw new SparkException("file doesn't exist : " + file1)
1224+
}
1225+
val gottenJar2 = new File(SparkFiles.get(jar2.getName))
1226+
if (!gottenJar2.exists()) {
1227+
throw new SparkException("file doesn't exist : " + jar2)
1228+
}
1229+
val gottenFile2 = new File(SparkFiles.get(file2.getName))
1230+
if (!gottenFile2.exists()) {
1231+
throw new SparkException("file doesn't exist : " + file2)
1232+
}
1233+
x
1234+
}.collect()
1235+
} finally {
1236+
sc.stop()
1237+
}
1238+
}
1239+
}
12001240
}
12011241

12021242
object SparkContextSuite {

0 commit comments

Comments
 (0)