diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index da37fa83254b..801b6dd85a2b 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -22,7 +22,6 @@ import java.net.URI import java.util.{Arrays, Locale, Properties, ServiceLoader, UUID} import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference} -import javax.ws.rs.core.UriBuilder import scala.collection.Map import scala.collection.concurrent.{Map => ScalaConcurrentMap} @@ -1829,12 +1828,12 @@ class SparkContext(config: SparkConf) extends Logging { addedArchives .getOrElseUpdate(jobArtifactUUID, new ConcurrentHashMap[String, Long]().asScala) .putIfAbsent( - UriBuilder.fromUri(new URI(key)).fragment(uri.getFragment).build().toString, + Utils.getUriBuilder(new URI(key)).fragment(uri.getFragment).build().toString, timestamp).isEmpty) { logInfo(s"Added archive $path at $key with timestamp $timestamp") // If the scheme is file, use URI to simply copy instead of downloading. val uriToUse = if (!isLocal && scheme == "file") uri else new URI(key) - val uriToDownload = UriBuilder.fromUri(uriToUse).fragment(null).build() + val uriToDownload = Utils.getUriBuilder(uriToUse).fragment(null).build() val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf, hadoopConfiguration, timestamp, useCache = false, shouldUntar = false) val dest = new File( diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 5e7dc799ab07..10c1dbe2054a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -24,7 +24,6 @@ import java.nio.file.Files import java.security.PrivilegedExceptionAction import java.util.ServiceLoader import java.util.jar.JarInputStream -import javax.ws.rs.core.UriBuilder import scala.annotation.tailrec import scala.collection.mutable.ArrayBuffer @@ -409,7 +408,7 @@ private[spark] class SparkSubmit extends Logging { val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI) val localResources = downloadFileList( resolvedUris.map( - UriBuilder.fromUri(_).fragment(null).build().toString).mkString(","), + Utils.getUriBuilder(_).fragment(null).build().toString).mkString(","), targetDir, sparkConf, hadoopConf) Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(resolvedUris).map { case (localResources, resolvedUri) => @@ -426,7 +425,7 @@ private[spark] class SparkSubmit extends Logging { Files.copy(source.toPath, dest.toPath) } // Keep the URIs of local files with the given fragments. - UriBuilder.fromUri( + Utils.getUriBuilder( localResources).fragment(resolvedUri.getFragment).build().toString }.mkString(",") } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index dae00a72285d..206b293e08c2 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -27,7 +27,6 @@ import java.util.concurrent._ import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantLock import javax.annotation.concurrent.GuardedBy -import javax.ws.rs.core.UriBuilder import scala.collection.immutable import scala.collection.mutable.{ArrayBuffer, HashMap} @@ -1157,7 +1156,7 @@ private[spark] class Executor( state.currentArchives.getOrElse(name, -1L) < timestamp) { logInfo(s"Fetching $name with timestamp $timestamp") val sourceURI = new URI(name) - val uriToDownload = UriBuilder.fromUri(sourceURI).fragment(null).build() + val uriToDownload = Utils.getUriBuilder(sourceURI).fragment(null).build() val source = Utils.fetchFile(uriToDownload.toString, Utils.createTempDir(), conf, hadoopConf, timestamp, useCache = !isLocal, shouldUntar = false) val dest = new File( diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b49f97aed05e..25b03a2c5d6d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -32,6 +32,7 @@ import java.util.{Locale, Properties, Random, UUID} import java.util.concurrent._ import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.zip.{GZIPInputStream, ZipInputStream} +import javax.ws.rs.core.UriBuilder import scala.annotation.tailrec import scala.collection.Map @@ -2885,6 +2886,20 @@ private[spark] object Utils uri.startsWith(s"$LOCAL_SCHEME:") } + /** Create a UriBuilder from URI object. */ + def getUriBuilder(uri: URI): UriBuilder = { + // scalastyle:off uribuilder + UriBuilder.fromUri(uri) + // scalastyle:on uribuilder + } + + /** Create a UriBuilder from URI string. */ + def getUriBuilder(uri: String): UriBuilder = { + // scalastyle:off uribuilder + UriBuilder.fromUri(uri) + // scalastyle:on uribuilder + } + /** Check whether the file of the path is splittable. */ def isFileSplittable(path: Path, codecFactory: CompressionCodecFactory): Boolean = { val codec = codecFactory.getCodec(path) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index 31dd029c27fb..51ee9ffbe405 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.deploy.k8s.features -import javax.ws.rs.core.UriBuilder - import scala.collection.mutable import scala.jdk.CollectionConverters._ @@ -171,7 +169,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) conf.get(key).partition(uri => KubernetesUtils.isLocalAndResolvable(uri)) val value = { if (key == ARCHIVES) { - localUris.map(UriBuilder.fromUri(_).fragment(null).build()).map(_.toString) + localUris.map(Utils.getUriBuilder(_).fragment(null).build()).map(_.toString) } else { localUris } @@ -180,7 +178,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf) if (resolved.nonEmpty) { val resolvedValue = if (key == ARCHIVES) { localUris.zip(resolved).map { case (uri, r) => - UriBuilder.fromUri(r).fragment(new java.net.URI(uri).getFragment).build().toString + Utils.getUriBuilder(r).fragment(new java.net.URI(uri).getFragment).build().toString } } else { resolved diff --git a/scalastyle-config.xml b/scalastyle-config.xml index b38c6d1c1fa6..fb1d17dca0f5 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -283,6 +283,11 @@ This file is divided into 3 sections: of Commons Lang 2 (package org.apache.commons.lang.*) + + UriBuilder\.fromUri + Use Utils.getUriBuilder instead. + + scala\.concurrent\.ExecutionContext\.Implicits\.global User queries can use global thread pool, causing starvation and eventual OOM. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala index 47f5dbdac488..9e4b6ef31636 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala @@ -21,7 +21,6 @@ import java.io.File import java.net.{URI, URL, URLClassLoader} import java.nio.file.{Files, Path, Paths, StandardCopyOption} import java.util.concurrent.CopyOnWriteArrayList -import javax.ws.rs.core.UriBuilder import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag @@ -174,7 +173,7 @@ class ArtifactManager(session: SparkSession) extends Logging { } } else if (remoteRelativePath.startsWith(s"archives${File.separator}")) { val canonicalUri = - fragment.map(UriBuilder.fromUri(new URI(uri)).fragment).getOrElse(new URI(uri)) + fragment.map(Utils.getUriBuilder(new URI(uri)).fragment).getOrElse(new URI(uri)) session.sparkContext.addArchive(canonicalUri.toString) } else if (remoteRelativePath.startsWith(s"files${File.separator}")) { session.sparkContext.addFile(uri) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 2ef68887e87c..79ddfc7d31da 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -19,7 +19,6 @@ package org.apache.spark.sql.execution import java.time.ZoneOffset import java.util.{Locale, TimeZone} -import javax.ws.rs.core.UriBuilder import scala.jdk.CollectionConverters._ @@ -42,6 +41,7 @@ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.types.StringType +import org.apache.spark.util.Utils.getUriBuilder /** * Concrete parser for Spark SQL statements. @@ -862,7 +862,7 @@ class SparkSqlAstBuilder extends AstBuilder { throw QueryParsingErrors.unsupportedLocalFileSchemeError(ctx, pathScheme) case _ => // force scheme to be file rather than fs.default.name - val loc = Some(UriBuilder.fromUri(CatalogUtils.stringToURI(path)).scheme("file").build()) + val loc = Some(getUriBuilder(CatalogUtils.stringToURI(path)).scheme("file").build()) storage = storage.copy(locationUri = loc) } }