diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index d3d78f249c495..1b78182f3ba96 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -58,6 +58,10 @@
slf4j-api
provided
+
+ com.github.ben-manes.caffeine
+ caffeine
+
com.google.guava
guava
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
index 73d4e6ceb1951..650f33e5e6473 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
@@ -20,10 +20,7 @@
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.*;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
+import java.util.concurrent.*;
import java.util.stream.Collectors;
import org.apache.commons.lang3.builder.ToStringBuilder;
@@ -32,11 +29,10 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Weigher;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.Weigher;
import com.google.common.collect.Maps;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
@@ -109,23 +105,13 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF
Executor directoryCleaner) throws IOException {
this.conf = conf;
this.rddFetchEnabled =
- Boolean.valueOf(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "false"));
+ Boolean.parseBoolean(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "false"));
this.registeredExecutorFile = registeredExecutorFile;
String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m");
- CacheLoader indexCacheLoader =
- new CacheLoader() {
- public ShuffleIndexInformation load(File file) throws IOException {
- return new ShuffleIndexInformation(file);
- }
- };
- shuffleIndexCache = CacheBuilder.newBuilder()
+ shuffleIndexCache = Caffeine.newBuilder()
.maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize))
- .weigher(new Weigher() {
- public int weigh(File file, ShuffleIndexInformation indexInfo) {
- return indexInfo.getSize();
- }
- })
- .build(indexCacheLoader);
+ .weigher((Weigher)(file, indexInfo) -> indexInfo.getSize())
+ .build(ShuffleIndexInformation::new);
db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper);
if (db != null) {
executors = reloadRegisteredExecutors(db);
@@ -317,7 +303,7 @@ private ManagedBuffer getSortBasedShuffleBlockData(
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
shuffleIndexRecord.getOffset(),
shuffleIndexRecord.getLength());
- } catch (ExecutionException e) {
+ } catch (CompletionException e) {
throw new RuntimeException("Failed to open file: " + indexFile, e);
}
}
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index cc7d4dbe00573..a21a3b81312a9 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -31,9 +31,13 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.*;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Weigher;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
@@ -43,10 +47,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.Weigher;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.roaringbitmap.RoaringBitmap;
@@ -115,16 +115,10 @@ public RemoteBlockPushResolver(TransportConf conf) {
NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner"));
this.minChunkSize = conf.minChunkSizeInMergedShuffleFile();
this.ioExceptionsThresholdDuringMerge = conf.ioExceptionsThresholdDuringMerge();
- CacheLoader indexCacheLoader =
- new CacheLoader() {
- public ShuffleIndexInformation load(File file) throws IOException {
- return new ShuffleIndexInformation(file);
- }
- };
- indexCache = CacheBuilder.newBuilder()
+ indexCache = Caffeine.newBuilder()
.maximumWeight(conf.mergedIndexCacheSize())
- .weigher((Weigher) (file, indexInfo) -> indexInfo.getSize())
- .build(indexCacheLoader);
+ .weigher((Weigher)(file, indexInfo) -> indexInfo.getSize())
+ .build(ShuffleIndexInformation::new);
this.errorHandler = new ErrorHandler.BlockPushErrorHandler();
}
@@ -299,7 +293,7 @@ public ManagedBuffer getMergedBlockData(
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(chunkId);
return new FileSegmentManagedBuffer(
conf, dataFile, shuffleIndexRecord.getOffset(), shuffleIndexRecord.getLength());
- } catch (ExecutionException e) {
+ } catch (CompletionException e) {
throw new RuntimeException(String.format(
"Failed to open merged shuffle index file %s", indexFile.getPath()), e);
}
diff --git a/core/benchmarks/LocalCacheBenchmark-jdk11-results.txt b/core/benchmarks/LocalCacheBenchmark-jdk11-results.txt
new file mode 100644
index 0000000000000..ceca3867a6c4d
--- /dev/null
+++ b/core/benchmarks/LocalCacheBenchmark-jdk11-results.txt
@@ -0,0 +1,12 @@
+================================================================================================
+Loading Cache
+================================================================================================
+
+OpenJDK 64-Bit Server VM 11.0.8+10-LTS on Mac OS X 10.15.7
+Intel(R) Core(TM) i5-7360U CPU @ 2.30GHz
+Loading Cache: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+--------------------------------------------------------------------------------------------------------------------------
+Guava Cache 5 6 1 15.9 62.8 1.0X
+Caffeine 2 2 0 46.1 21.7 2.9X
+
+
diff --git a/core/benchmarks/LocalCacheBenchmark-results.txt b/core/benchmarks/LocalCacheBenchmark-results.txt
new file mode 100644
index 0000000000000..563d4701fd25b
--- /dev/null
+++ b/core/benchmarks/LocalCacheBenchmark-results.txt
@@ -0,0 +1,12 @@
+================================================================================================
+Loading Cache
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_232-b18 on Mac OS X 10.15.7
+Intel(R) Core(TM) i5-7360U CPU @ 2.30GHz
+Loading Cache: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
+--------------------------------------------------------------------------------------------------------------------------
+Guava Cache 5 5 0 16.7 60.0 1.0X
+Caffeine 2 2 1 44.3 22.6 2.7X
+
+
diff --git a/core/pom.xml b/core/pom.xml
index be449644fcd4f..f3f9e4899e75b 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -47,6 +47,10 @@
com.google.guava
guava
+
+ com.github.ben-manes.caffeine
+ caffeine
+
com.twitter
chill_${scala.binary.version}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index ee50a8f836277..dae55689e20a7 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -26,7 +26,7 @@ import scala.collection.concurrent
import scala.collection.mutable
import scala.util.Properties
-import com.google.common.cache.CacheBuilder
+import com.github.benmanes.caffeine.cache.Caffeine
import org.apache.hadoop.conf.Configuration
import org.apache.spark.annotation.DeveloperApi
@@ -77,7 +77,7 @@ class SparkEnv (
// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata =
- CacheBuilder.newBuilder().softValues().build[String, AnyRef]().asMap()
+ Caffeine.newBuilder().softValues().build[String, AnyRef]().asMap()
private[spark] var driverTmpDir: Option[String] = None
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
index 89b30a35ebebc..2be417a18349d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala
@@ -18,15 +18,14 @@
package org.apache.spark.deploy.history
import java.util.NoSuchElementException
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.CompletionException
import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import scala.collection.JavaConverters._
import com.codahale.metrics.{Counter, MetricRegistry, Timer}
-import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, RemovalListener, RemovalNotification}
-import com.google.common.util.concurrent.UncheckedExecutionException
+import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache, RemovalCause, RemovalListener}
import org.eclipse.jetty.servlet.FilterHolder
import org.apache.spark.internal.Logging
@@ -62,21 +61,27 @@ private[history] class ApplicationCache(
/**
* Removal event notifies the provider to detach the UI.
- * @param rm removal notification
+ * @param key removal key
+ * @param value removal value
+ * @param cause the reason why a `CacheEntry` was removed, it should
+ * always be `SIZE` because `appCache` configured with
+ * `maximumSize` eviction strategy
*/
- override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): Unit = {
+ override def onRemoval(key: CacheKey, value: CacheEntry, cause: RemovalCause): Unit = {
metrics.evictionCount.inc()
- val key = rm.getKey
- logDebug(s"Evicting entry ${key}")
- operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().loadedUI.ui)
+ logDebug(s"Evicting entry $key")
+ operations.detachSparkUI(key.appId, key.attemptId, value.loadedUI.ui)
}
}
private val appCache: LoadingCache[CacheKey, CacheEntry] = {
- CacheBuilder.newBuilder()
- .maximumSize(retainedApplications)
- .removalListener(removalListener)
- .build(appLoader)
+ val builder = Caffeine.newBuilder()
+ .maximumSize(retainedApplications)
+ .removalListener(removalListener)
+ // SPARK-34309: Use custom Executor to compatible with
+ // the data eviction behavior of Guava cache
+ .executor((command: Runnable) => command.run())
+ builder.build[CacheKey, CacheEntry](appLoader)
}
/**
@@ -86,9 +91,9 @@ private[history] class ApplicationCache(
def get(appId: String, attemptId: Option[String] = None): CacheEntry = {
try {
- appCache.get(new CacheKey(appId, attemptId))
+ appCache.get(CacheKey(appId, attemptId))
} catch {
- case e @ (_: ExecutionException | _: UncheckedExecutionException) =>
+ case e @ (_: CompletionException | _: RuntimeException) =>
throw Option(e.getCause()).getOrElse(e)
}
}
@@ -127,7 +132,7 @@ private[history] class ApplicationCache(
}
/** @return Number of cached UIs. */
- def size(): Long = appCache.size()
+ def size(): Long = appCache.estimatedSize()
private def time[T](t: Timer)(f: => T): T = {
val timeCtx = t.time()
@@ -197,7 +202,7 @@ private[history] class ApplicationCache(
val sb = new StringBuilder(s"ApplicationCache(" +
s" retainedApplications= $retainedApplications)")
sb.append(s"; time= ${clock.getTimeMillis()}")
- sb.append(s"; entry count= ${appCache.size()}\n")
+ sb.append(s"; entry count= ${appCache.estimatedSize()}\n")
sb.append("----\n")
appCache.asMap().asScala.foreach {
case(key, entry) => sb.append(s" $key -> $entry\n")
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index 7339eb64b1c1f..f47f823ab24ca 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
import scala.reflect.ClassTag
import scala.util.control.NonFatal
-import com.google.common.cache.{CacheBuilder, CacheLoader}
+import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
import org.apache.hadoop.fs.Path
import org.apache.spark._
@@ -85,16 +85,18 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag](
}
// Cache of preferred locations of checkpointed files.
- @transient private[spark] lazy val cachedPreferredLocations = CacheBuilder.newBuilder()
- .expireAfterWrite(
- SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME).get,
- TimeUnit.MINUTES)
- .build(
- new CacheLoader[Partition, Seq[String]]() {
- override def load(split: Partition): Seq[String] = {
- getPartitionBlockLocations(split)
- }
- })
+ @transient private[spark] lazy val cachedPreferredLocations = {
+ val builder = Caffeine.newBuilder()
+ .expireAfterWrite(
+ SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME).get,
+ TimeUnit.MINUTES)
+ val loader = new CacheLoader[Partition, Seq[String]]() {
+ override def load(split: Partition): Seq[String] = {
+ getPartitionBlockLocations(split)
+ }
+ }
+ builder.build[Partition, Seq[String]](loader)
+ }
// Returns the block locations of given partition on file system.
private def getPartitionBlockLocations(split: Partition): Seq[String] = {
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 4c646b27c270f..20d1e03205ccf 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -34,7 +34,7 @@ import scala.util.{Failure, Random, Success, Try}
import scala.util.control.NonFatal
import com.codahale.metrics.{MetricRegistry, MetricSet}
-import com.google.common.cache.CacheBuilder
+import com.github.benmanes.caffeine.cache.Caffeine
import org.apache.commons.io.IOUtils
import org.apache.spark._
@@ -123,7 +123,7 @@ private[spark] class HostLocalDirManager(
blockStoreClient: BlockStoreClient) extends Logging {
private val executorIdToLocalDirsCache =
- CacheBuilder
+ Caffeine
.newBuilder()
.maximumSize(cacheSize)
.build[String, Array[String]]()
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index c6a4457d8f910..316ad698eb0f6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -19,7 +19,7 @@ package org.apache.spark.storage
import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
-import com.google.common.cache.{CacheBuilder, CacheLoader}
+import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
@@ -136,11 +136,14 @@ private[spark] object BlockManagerId {
* The max cache size is hardcoded to 10000, since the size of a BlockManagerId
* object is about 48B, the total memory cost should be below 1MB which is feasible.
*/
- val blockManagerIdCache = CacheBuilder.newBuilder()
- .maximumSize(10000)
- .build(new CacheLoader[BlockManagerId, BlockManagerId]() {
- override def load(id: BlockManagerId) = id
- })
+ val blockManagerIdCache = {
+ Caffeine.newBuilder()
+ .maximumSize(10000)
+ .build[BlockManagerId, BlockManagerId](
+ new CacheLoader[BlockManagerId, BlockManagerId]() {
+ override def load(id: BlockManagerId): BlockManagerId = id
+ })
+ }
def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
blockManagerIdCache.get(id)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 29c605d622b80..ef82d52e2b400 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -27,7 +27,7 @@ import scala.concurrent.{ExecutionContext, Future, TimeoutException}
import scala.util.Random
import scala.util.control.NonFatal
-import com.google.common.cache.CacheBuilder
+import com.github.benmanes.caffeine.cache.Caffeine
import org.apache.spark.{MapOutputTrackerMaster, SparkConf}
import org.apache.spark.annotation.DeveloperApi
@@ -56,7 +56,7 @@ class BlockManagerMasterEndpoint(
// Mapping from executor id to the block manager's local disk directories.
private val executorIdToLocalDirs =
- CacheBuilder
+ Caffeine
.newBuilder()
.maximumSize(conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE))
.build[String, Array[String]]()
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 7ea96fea25a80..f3268cb8fb5bd 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -44,7 +44,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
import scala.util.matching.Regex
import _root_.io.netty.channel.unix.Errors.NativeIoException
-import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache}
import com.google.common.collect.Interners
import com.google.common.io.{ByteStreams, Files => GFiles}
import com.google.common.net.InetAddresses
@@ -1616,13 +1616,16 @@ private[spark] object Utils extends Logging {
if (compressedLogFileLengthCache == null) {
val compressedLogFileLengthCacheSize = sparkConf.get(
UNCOMPRESSED_LOG_FILE_LENGTH_CACHE_SIZE_CONF)
- compressedLogFileLengthCache = CacheBuilder.newBuilder()
- .maximumSize(compressedLogFileLengthCacheSize)
- .build[String, java.lang.Long](new CacheLoader[String, java.lang.Long]() {
- override def load(path: String): java.lang.Long = {
- Utils.getCompressedFileLength(new File(path))
- }
- })
+ compressedLogFileLengthCache = {
+ val builder = Caffeine.newBuilder()
+ .maximumSize(compressedLogFileLengthCacheSize)
+ builder.build[String, java.lang.Long](
+ new CacheLoader[String, java.lang.Long]() {
+ override def load(path: String): java.lang.Long = {
+ Utils.getCompressedFileLength(new File(path))
+ }
+ })
+ }
}
compressedLogFileLengthCache
}
diff --git a/core/src/test/scala/org/apache/spark/LocalCacheBenchmark.scala b/core/src/test/scala/org/apache/spark/LocalCacheBenchmark.scala
new file mode 100644
index 0000000000000..5eadfdfed5cac
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/LocalCacheBenchmark.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.Callable
+
+import scala.concurrent.duration.Duration
+import scala.util.Random
+
+import com.github.benmanes.caffeine.cache.{CacheLoader => CaffeineCacheLoader, Caffeine}
+import com.google.common.cache.{CacheBuilder, CacheLoader}
+
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Benchmark for Guava Cache vs Caffeine.
+ * To run this benchmark:
+ * {{{
+ * 1. without sbt:
+ * bin/spark-submit --class --jars
+ * 2. build/sbt "core/test:runMain "
+ * 3. generate result:
+ * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain "
+ * Results will be written to "benchmarks/LocalCacheBenchmark-results.txt".
+ * }}}
+ */
+object LocalCacheBenchmark extends BenchmarkBase {
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ runBenchmark("Loading Cache") {
+ val size = 10000
+ val parallelism = 8
+ val guavaCacheConcurrencyLevel = 8
+ val dataset = (1 to parallelism)
+ .map(_ => Random.shuffle(List.range(0, size)))
+ .map(list => list.map(i => TestData(i)))
+ val executor = ThreadUtils.newDaemonFixedThreadPool(parallelism, "Loading Cache Test Pool")
+ val guavaCacheLoader = new CacheLoader[TestData, TestData]() {
+ override def load(id: TestData): TestData = {
+ id
+ }
+ }
+ val caffeineCacheLoader = new CaffeineCacheLoader[TestData, TestData]() {
+ override def load(id: TestData): TestData = {
+ id
+ }
+ }
+
+ val benchmark = new Benchmark("Loading Cache", size * parallelism, 3, output = output)
+ benchmark.addCase("Guava Cache") { _ =>
+ val cache = CacheBuilder.newBuilder()
+ .concurrencyLevel(guavaCacheConcurrencyLevel).build[TestData, TestData](guavaCacheLoader)
+ dataset.map(dataList => executor.submit(new Callable[Unit] {
+ override def call(): Unit = {
+ dataList.foreach(key => cache.get(key))
+ }
+ })).foreach(future => ThreadUtils.awaitResult(future, Duration.Inf))
+ cache.cleanUp()
+ }
+
+ benchmark.addCase("Caffeine") { _ =>
+ val cache = Caffeine.newBuilder().build[TestData, TestData](caffeineCacheLoader)
+ dataset.map(dataList => executor.submit(new Callable[Unit] {
+ override def call(): Unit = {
+ dataList.foreach(key => cache.get(key))
+ }
+ })).foreach(future => ThreadUtils.awaitResult(future, Duration.Inf))
+ cache.cleanUp()
+ }
+
+ benchmark.run()
+ }
+ }
+
+ case class TestData(content: Int)
+}
+
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index a237447b0fa2d..8ec279aadae9d 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -29,7 +29,7 @@ import scala.collection.immutable
import scala.collection.mutable.{ArrayBuffer, Map}
import scala.concurrent.duration._
-import com.google.common.cache.{CacheBuilder, CacheLoader}
+import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
import org.mockito.ArgumentCaptor
import org.mockito.ArgumentMatchers.{any, eq => meq}
import org.mockito.Mockito.{inOrder, verify, when}
@@ -467,9 +467,9 @@ class ExecutorSuite extends SparkFunSuite
}
}
- def errorInGuavaCache(e: => Throwable): Throwable = {
- val cache = CacheBuilder.newBuilder()
- .build(new CacheLoader[String, String] {
+ def errorInCaffeine(e: => Throwable): Throwable = {
+ val cache = Caffeine.newBuilder().build[String, String](
+ new CacheLoader[String, String] {
override def load(key: String): String = throw e
})
intercept[Throwable] {
@@ -484,18 +484,18 @@ class ExecutorSuite extends SparkFunSuite
import Executor.isFatalError
// `e`'s depth is 1 so `depthToCheck` needs to be at least 3 to detect fatal errors.
assert(isFatalError(e, depthToCheck) == (depthToCheck >= 1 && isFatal))
+ assert(isFatalError(errorInCaffeine(e), depthToCheck) == (depthToCheck >= 1 && isFatal))
// `e`'s depth is 2 so `depthToCheck` needs to be at least 3 to detect fatal errors.
assert(isFatalError(errorInThreadPool(e), depthToCheck) == (depthToCheck >= 2 && isFatal))
- assert(isFatalError(errorInGuavaCache(e), depthToCheck) == (depthToCheck >= 2 && isFatal))
assert(isFatalError(
new SparkException("foo", e),
depthToCheck) == (depthToCheck >= 2 && isFatal))
- // `e`'s depth is 3 so `depthToCheck` needs to be at least 3 to detect fatal errors.
assert(isFatalError(
- errorInThreadPool(errorInGuavaCache(e)),
- depthToCheck) == (depthToCheck >= 3 && isFatal))
+ errorInThreadPool(errorInCaffeine(e)),
+ depthToCheck) == (depthToCheck >= 2 && isFatal))
+ // `e`'s depth is 3 so `depthToCheck` needs to be at least 3 to detect fatal errors.
assert(isFatalError(
- errorInGuavaCache(errorInThreadPool(e)),
+ errorInCaffeine(errorInThreadPool(e)),
depthToCheck) == (depthToCheck >= 3 && isFatal))
assert(isFatalError(
new SparkException("foo", new SparkException("foo", e)),
diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 b/dev/deps/spark-deps-hadoop-2.7-hive-2.3
index 50b5bf0f99f24..92f63089b906f 100644
--- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3
@@ -30,7 +30,9 @@ blas/2.2.0//blas-2.2.0.jar
bonecp/0.8.0.RELEASE//bonecp-0.8.0.RELEASE.jar
breeze-macros_2.12/1.2//breeze-macros_2.12-1.2.jar
breeze_2.12/1.2//breeze_2.12-1.2.jar
+caffeine/2.9.1//caffeine-2.9.1.jar
cats-kernel_2.12/2.1.1//cats-kernel_2.12-2.1.1.jar
+checker-qual/3.10.0//checker-qual-3.10.0.jar
chill-java/0.10.0//chill-java-0.10.0.jar
chill_2.12/0.10.0//chill_2.12-0.10.0.jar
commons-beanutils/1.9.4//commons-beanutils-1.9.4.jar
@@ -62,6 +64,7 @@ datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar
datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar
derby/10.14.2.0//derby-10.14.2.0.jar
dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar
+error_prone_annotations/2.5.1//error_prone_annotations-2.5.1.jar
flatbuffers-java/1.9.0//flatbuffers-java-1.9.0.jar
generex/1.0.2//generex-1.0.2.jar
gson/2.2.4//gson-2.2.4.jar
diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3
index 31c9b96ef690d..4cacf0ae263cc 100644
--- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3
@@ -25,7 +25,9 @@ blas/2.2.0//blas-2.2.0.jar
bonecp/0.8.0.RELEASE//bonecp-0.8.0.RELEASE.jar
breeze-macros_2.12/1.2//breeze-macros_2.12-1.2.jar
breeze_2.12/1.2//breeze_2.12-1.2.jar
+caffeine/2.9.1//caffeine-2.9.1.jar
cats-kernel_2.12/2.1.1//cats-kernel_2.12-2.1.1.jar
+checker-qual/3.10.0//checker-qual-3.10.0.jar
chill-java/0.10.0//chill-java-0.10.0.jar
chill_2.12/0.10.0//chill_2.12-0.10.0.jar
commons-cli/1.2//commons-cli-1.2.jar
@@ -53,6 +55,7 @@ datanucleus-core/4.1.17//datanucleus-core-4.1.17.jar
datanucleus-rdbms/4.1.19//datanucleus-rdbms-4.1.19.jar
derby/10.14.2.0//derby-10.14.2.0.jar
dropwizard-metrics-hadoop-metrics2-reporter/0.1.2//dropwizard-metrics-hadoop-metrics2-reporter-0.1.2.jar
+error_prone_annotations/2.5.1//error_prone_annotations-2.5.1.jar
flatbuffers-java/1.9.0//flatbuffers-java-1.9.0.jar
generex/1.0.2//generex-1.0.2.jar
gson/2.2.4//gson-2.2.4.jar
diff --git a/pom.xml b/pom.xml
index d17e055c8212e..8f82d1e668441 100644
--- a/pom.xml
+++ b/pom.xml
@@ -182,6 +182,7 @@
2.6.2
4.1.17
14.0.1
+ 2.9.1
3.0.16
2.34
2.10.10
@@ -492,6 +493,11 @@
${guava.version}
provided
+
+ com.github.ben-manes.caffeine
+ caffeine
+ ${caffeine.version}
+
org.jpmml
pmml-model
diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml
index 0cb5e115906a5..cb9c7b133619b 100644
--- a/resource-managers/kubernetes/core/pom.xml
+++ b/resource-managers/kubernetes/core/pom.xml
@@ -52,6 +52,11 @@
test-jar
+
+ com.github.ben-manes.caffeine
+ caffeine
+
+
io.fabric8
kubernetes-client
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index e255de4d2dd9e..38e7f99302169 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -18,12 +18,13 @@ package org.apache.spark.scheduler.cluster.k8s
import java.util.concurrent.TimeUnit
-import com.google.common.cache.CacheBuilder
-import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
-import io.fabric8.kubernetes.client.KubernetesClient
import scala.collection.JavaConverters._
import scala.collection.mutable
+import com.github.benmanes.caffeine.cache.Caffeine
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+import io.fabric8.kubernetes.client.KubernetesClient
+
import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
@@ -47,7 +48,7 @@ private[spark] class ExecutorPodsLifecycleManager(
// to avoid doing so. Expire cache entries so that this data structure doesn't grow beyond
// bounds.
private lazy val removedExecutorsCache =
- CacheBuilder.newBuilder()
+ Caffeine.newBuilder()
.expireAfterWrite(3, TimeUnit.MINUTES)
.build[java.lang.Long, java.lang.Long]()
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index f1d3a3a6bf02b..22fa09761635a 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -92,6 +92,10 @@
scalacheck_${scala.binary.version}
test
+
+ com.github.ben-manes.caffeine
+ caffeine
+
org.codehaus.janino
janino
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 4860f46884d44..6b1f519c20d94 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -26,7 +26,7 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
-import com.google.common.cache.{Cache, CacheBuilder}
+import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -159,19 +159,18 @@ class SessionCatalog(
}
private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
- var builder = CacheBuilder.newBuilder()
+ var builder = Caffeine.newBuilder()
.maximumSize(cacheSize)
if (cacheTTL > 0) {
builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS)
}
-
- builder.build[QualifiedTableName, LogicalPlan]()
+ builder.build()
}
/** This method provides a way to get a cached plan. */
def getCachedPlan(t: QualifiedTableName, c: Callable[LogicalPlan]): LogicalPlan = {
- tableRelationCache.get(t, c)
+ tableRelationCache.get(t, (_: QualifiedTableName) => c.call())
}
/** This method provides a way to get a cached plan if the key exists. */
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala
index fcc8ee67131f5..8cfa4664faa0c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.expressions
import java.util.IdentityHashMap
-import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache}
import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
import org.apache.spark.sql.catalyst.InternalRow
@@ -38,14 +38,17 @@ class SubExprEvaluationRuntime(cacheMaxEntries: Int) {
// won't be use by multi-threads so we don't need to consider concurrency here.
private var proxyExpressionCurrentId = 0
- private[sql] val cache: LoadingCache[ExpressionProxy, ResultProxy] = CacheBuilder.newBuilder()
- .maximumSize(cacheMaxEntries)
- .build(
- new CacheLoader[ExpressionProxy, ResultProxy]() {
- override def load(expr: ExpressionProxy): ResultProxy = {
- ResultProxy(expr.proxyEval(currentInput))
- }
- })
+ private[sql] val cache: LoadingCache[ExpressionProxy, ResultProxy] =
+ Caffeine.newBuilder().maximumSize(cacheMaxEntries)
+ // SPARK-34309: Use custom Executor to compatible with
+ // the data eviction behavior of Guava cache
+ .executor((command: Runnable) => command.run())
+ .build[ExpressionProxy, ResultProxy](
+ new CacheLoader[ExpressionProxy, ResultProxy]() {
+ override def load(expr: ExpressionProxy): ResultProxy = {
+ ResultProxy(expr.proxyEval(currentInput))
+ }
+ })
private var currentInput: InternalRow = null
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 7f2c1c652dc8e..4dbfd774a6c0d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
-import com.google.common.cache.{CacheBuilder, CacheLoader}
+import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
import org.codehaus.commons.compiler.CompileException
import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, InternalCompilerException, SimpleCompiler}
@@ -1577,9 +1577,9 @@ object CodeGenerator extends Logging {
* automatically, in order to constrain its memory footprint. Note that this cache does not use
* weak keys/values and thus does not respond to memory pressure.
*/
- private val cache = CacheBuilder.newBuilder()
+ private val cache = Caffeine.newBuilder()
.maximumSize(SQLConf.get.codegenCacheMaxEntries)
- .build(
+ .build[CodeAndComment, (GeneratedClass, ByteCodeStats)](
new CacheLoader[CodeAndComment, (GeneratedClass, ByteCodeStats)]() {
override def load(code: CodeAndComment): (GeneratedClass, ByteCodeStats) = {
val startTime = System.nanoTime()
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
index b00113b2e9ee3..9ae12c073a69e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeFormatterHelper.scala
@@ -23,7 +23,7 @@ import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder, ResolverSt
import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
import java.util.{Date, Locale}
-import com.google.common.cache.CacheBuilder
+import com.github.benmanes.caffeine.cache.Caffeine
import org.apache.spark.sql.catalyst.util.DateTimeFormatterHelper._
import org.apache.spark.sql.errors.QueryExecutionErrors
@@ -194,7 +194,7 @@ trait DateTimeFormatterHelper {
}
private object DateTimeFormatterHelper {
- val cache = CacheBuilder.newBuilder()
+ val cache = Caffeine.newBuilder()
.maximumSize(128)
.build[(String, Locale, Boolean), DateTimeFormatter]()
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
index da5bddb0c09fd..ab177d062e18b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.catalyst.expressions
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.CompletionException
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
@@ -83,7 +83,7 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT
}
test("codegen failures in the CODEGEN_ONLY mode") {
- val errMsg = intercept[ExecutionException] {
+ val errMsg = intercept[CompletionException] {
val input = Seq(BoundReference(0, IntegerType, nullable = true))
withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
FailedCodegenProjection.createObject(input)
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala
index f8dca266a62d4..88c1c0d4c6de5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntimeSuite.scala
@@ -23,47 +23,47 @@ class SubExprEvaluationRuntimeSuite extends SparkFunSuite {
test("Evaluate ExpressionProxy should create cached result") {
val runtime = new SubExprEvaluationRuntime(1)
val proxy = ExpressionProxy(Literal(1), 0, runtime)
- assert(runtime.cache.size() == 0)
+ assert(runtime.cache.estimatedSize() == 0)
proxy.eval()
- assert(runtime.cache.size() == 1)
+ assert(runtime.cache.estimatedSize() == 1)
assert(runtime.cache.get(proxy) == ResultProxy(1))
}
test("SubExprEvaluationRuntime cannot exceed configured max entries") {
val runtime = new SubExprEvaluationRuntime(2)
- assert(runtime.cache.size() == 0)
+ assert(runtime.cache.estimatedSize() == 0)
val proxy1 = ExpressionProxy(Literal(1), 0, runtime)
proxy1.eval()
- assert(runtime.cache.size() == 1)
+ assert(runtime.cache.estimatedSize() == 1)
assert(runtime.cache.get(proxy1) == ResultProxy(1))
val proxy2 = ExpressionProxy(Literal(2), 1, runtime)
proxy2.eval()
- assert(runtime.cache.size() == 2)
+ assert(runtime.cache.estimatedSize() == 2)
assert(runtime.cache.get(proxy2) == ResultProxy(2))
val proxy3 = ExpressionProxy(Literal(3), 2, runtime)
proxy3.eval()
- assert(runtime.cache.size() == 2)
+ assert(runtime.cache.estimatedSize() == 2)
assert(runtime.cache.get(proxy3) == ResultProxy(3))
}
test("setInput should empty cached result") {
val runtime = new SubExprEvaluationRuntime(2)
val proxy1 = ExpressionProxy(Literal(1), 0, runtime)
- assert(runtime.cache.size() == 0)
+ assert(runtime.cache.estimatedSize() == 0)
proxy1.eval()
- assert(runtime.cache.size() == 1)
+ assert(runtime.cache.estimatedSize() == 1)
assert(runtime.cache.get(proxy1) == ResultProxy(1))
val proxy2 = ExpressionProxy(Literal(2), 1, runtime)
proxy2.eval()
- assert(runtime.cache.size() == 2)
+ assert(runtime.cache.estimatedSize() == 2)
assert(runtime.cache.get(proxy2) == ResultProxy(2))
runtime.setInput()
- assert(runtime.cache.size() == 0)
+ assert(runtime.cache.estimatedSize() == 0)
}
test("Wrap ExpressionProxy on subexpressions") {
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index 73fa60c2173bc..5993e9893e96a 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -89,7 +89,10 @@
test-jar
test
-
+
+ com.github.ben-manes.caffeine
+ caffeine
+
org.apache.orc
orc-core
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
index b5d800f02862e..5db69a33dd94b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileStatusCache.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.JavaConverters._
-import com.google.common.cache._
+import com.github.benmanes.caffeine.cache.{Cache, Caffeine, RemovalCause, RemovalListener, Weigher}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.internal.Logging
@@ -119,11 +119,10 @@ private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends
}
}
}
- val removalListener = new RemovalListener[(ClientId, Path), Array[FileStatus]]() {
- override def onRemoval(
- removed: RemovalNotification[(ClientId, Path),
- Array[FileStatus]]): Unit = {
- if (removed.getCause == RemovalCause.SIZE &&
+ val removalListener = new RemovalListener[(ClientId, Path), Array[FileStatus]] {
+ override def onRemoval(key: (ClientId, Path), value: Array[FileStatus],
+ cause: RemovalCause): Unit = {
+ if (cause == RemovalCause.SIZE &&
warnedAboutEviction.compareAndSet(false, true)) {
logWarning(
"Evicting cached table partition metadata from memory due to size constraints " +
@@ -133,7 +132,7 @@ private class SharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends
}
}
- var builder = CacheBuilder.newBuilder()
+ var builder = Caffeine.newBuilder()
.weigher(weigher)
.removalListener(removalListener)
.maximumWeight(maxSizeInBytes / weightScale)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index a613a39b2ba89..d8fba8bdff38f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -22,7 +22,7 @@ import java.util.{Arrays, Locale}
import scala.concurrent.duration._
-import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
+import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache}
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.AccumulableInfo
@@ -97,7 +97,7 @@ object SQLMetrics {
val cachedSQLAccumIdentifier = Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)
private val metricsCache: LoadingCache[String, Option[String]] =
- CacheBuilder.newBuilder().maximumSize(10000)
+ Caffeine.newBuilder().maximumSize(10000)
.build(new CacheLoader[String, Option[String]] {
override def load(name: String): Option[String] = {
Option(name)