Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
94 commits
Select commit Hold shift + click to select a range
d4e3901
use caffeine instead of guava cache
LuciferYang Feb 1, 2021
a3d794a
fix compile error
LuciferYang Feb 1, 2021
85dc7b9
remove some change
LuciferYang Feb 1, 2021
399f5b2
fix exception type
LuciferYang Feb 1, 2021
dbb46dc
use cache writer to sync mode
LuciferYang Feb 1, 2021
01f99fe
remove listener
LuciferYang Feb 1, 2021
18813e9
add comments
LuciferYang Feb 1, 2021
3f13689
fix test suites
LuciferYang Feb 2, 2021
39032e6
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Feb 8, 2021
674af31
Merge branch 'guava-cache-to-caffeine' of github.com:LuciferYang/spar…
LuciferYang Feb 8, 2021
5f08d0f
update deps
LuciferYang Feb 8, 2021
0c5382a
fix unused import
LuciferYang Feb 8, 2021
4761a5b
fix import
LuciferYang Feb 8, 2021
4b49b84
fix redundant space
LuciferYang Feb 8, 2021
1805f34
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Mar 22, 2021
adc6d92
revert change of spark-deps
LuciferYang Mar 22, 2021
bdb522b
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Mar 23, 2021
ac35393
update deps doc
LuciferYang Mar 23, 2021
b116f87
upgrade caffeine to 3.0.1
LuciferYang Mar 23, 2021
bd46b67
use evictionListener instead of CacheWriter
LuciferYang Mar 23, 2021
8842fa3
downgrade to 2.9.0
LuciferYang Mar 23, 2021
8985d6d
rewrite invalidate method in ApplicationCache
LuciferYang Mar 24, 2021
b568fcc
revert ExecutorSuite
LuciferYang Mar 24, 2021
ca9d58d
use custom executor
LuciferYang Mar 24, 2021
97c3c74
fix ExecutorSuite
LuciferYang Mar 24, 2021
f47cdb8
fix format
LuciferYang Mar 24, 2021
425c345
ExternalShuffleBlockResolver change to use CaffeinatedGuava
LuciferYang Mar 24, 2021
e0006c6
RemoteBlockPushResolver change to use CaffeinatedGuava
LuciferYang Mar 24, 2021
c02bc5b
RemoteBlockPushResolver change to use CaffeinatedGuava
LuciferYang Mar 24, 2021
c1c3cef
ApplicationCache change to use CaffeinatedGuava
LuciferYang Mar 24, 2021
c9f45ae
ApplicationCache change to use CaffeinatedGuava
LuciferYang Mar 24, 2021
24c361b
ReliableCheckpointRDD change to use CaffeinatedGuava
LuciferYang Mar 24, 2021
2b3072e
BlockManagerId change to use CaffeinatedGuava
LuciferYang Mar 24, 2021
636bfb5
Utils change to use CaffeinatedGuava
LuciferYang Mar 24, 2021
825b69a
Utils change to use CaffeinatedGuava
LuciferYang Mar 24, 2021
d1315c3
fix compile
LuciferYang Mar 24, 2021
32d9ce0
fix compile
LuciferYang Mar 24, 2021
8d6ad81
add caffeine guava to sql/core pom.xml
LuciferYang Mar 24, 2021
cf30234
remvoe type def of blockManagerIdCache
LuciferYang Mar 25, 2021
5eb75fa
remvoe type def of cachedPreferredLocations
LuciferYang Mar 25, 2021
7129802
try revert to use removalListener
LuciferYang Mar 25, 2021
045d3dc
Add comments
LuciferYang Mar 25, 2021
554b5a5
revert BlockManagerId
LuciferYang Mar 25, 2021
f488a3b
revert change of BlockManagerId
LuciferYang Mar 25, 2021
89abb61
relocation exclude guava cache
LuciferYang Mar 25, 2021
1a24ed4
Add LocalCacheBenchmark to compare guava cache and Caffeine
LuciferYang Mar 29, 2021
96e6cc8
add comments
LuciferYang Mar 29, 2021
7028ffa
Add benchmark result
LuciferYang Mar 29, 2021
3fad6ef
Compatible with Scala 2.13
LuciferYang Mar 30, 2021
aa742d3
fix import order
LuciferYang Mar 30, 2021
95d5c2e
update benchmark result
LuciferYang Mar 30, 2021
3ba2574
use guava cache CacheLoader in SubExprEvaluationRuntime
LuciferYang Mar 30, 2021
8995d72
remove site.ycsb
LuciferYang Mar 30, 2021
f61b041
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Apr 8, 2021
6146529
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Apr 28, 2021
7c53f73
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Jun 28, 2021
a069bff
update deps file
LuciferYang Jun 28, 2021
1cbebd3
remove dep of caffeine:guava 1
LuciferYang Jun 28, 2021
52037e1
revert some changes
LuciferYang Jun 28, 2021
8391e8b
remove dep of caffeine:guava 2
LuciferYang Jun 28, 2021
0e3c5fb
remove caffeine:guvava from core module
LuciferYang Jun 29, 2021
4cb428d
format
LuciferYang Jun 29, 2021
35bea2d
remove caffeine:guava from sql core module
LuciferYang Jun 29, 2021
f314403
remove caffeine:guava from sql catalyst module
LuciferYang Jun 29, 2021
994fcee
remove caffeine:guava from parent pom
LuciferYang Jun 29, 2021
a64b280
new guava to caffeine
LuciferYang Jun 29, 2021
a93e37e
remove empty line
LuciferYang Jun 29, 2021
56d41f1
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Jun 29, 2021
d7610ac
update deps file
LuciferYang Jun 29, 2021
93784cc
fix test in catalyst module
LuciferYang Jun 29, 2021
db1de65
remove unused import
LuciferYang Jun 29, 2021
47b0bf8
fix test
LuciferYang Jul 1, 2021
34d31fd
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Jul 1, 2021
93330ce
upgrade caffeine to 2.9.1
LuciferYang Jul 1, 2021
e3b81c5
revert test change
LuciferYang Jul 5, 2021
74e2ac3
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Jul 5, 2021
7506999
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Jul 8, 2021
68193a9
add scaladoc
LuciferYang Jul 9, 2021
ee05314
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Jul 12, 2021
db9cc56
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Jul 16, 2021
5423f36
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Jul 20, 2021
c6b9dc6
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Jul 21, 2021
0743e3b
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Jul 22, 2021
c5c84cd
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Jul 26, 2021
f2a656a
update deps file
LuciferYang Jul 26, 2021
d69df8e
fix sunchao's comments
LuciferYang Jul 27, 2021
9cd9c35
remove unsued imports
LuciferYang Jul 27, 2021
7b360a7
revert for compile
LuciferYang Jul 27, 2021
33f5353
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Jul 27, 2021
87def1a
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Jul 27, 2021
6c74fc6
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Jul 29, 2021
5a75b2c
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Jul 29, 2021
706a68d
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Aug 2, 2021
81f863f
Merge branch 'upmaster' into guava-cache-to-caffeine
LuciferYang Aug 4, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/network-shuffle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.*;
import java.util.stream.Collectors;

import org.apache.commons.lang3.builder.ToStringBuilder;
Expand All @@ -32,11 +29,10 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Weigher;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.Weigher;
import com.google.common.collect.Maps;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
Expand Down Expand Up @@ -109,23 +105,13 @@ public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorF
Executor directoryCleaner) throws IOException {
this.conf = conf;
this.rddFetchEnabled =
Boolean.valueOf(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "false"));
Boolean.parseBoolean(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "false"));
this.registeredExecutorFile = registeredExecutorFile;
String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m");
CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
new CacheLoader<File, ShuffleIndexInformation>() {
public ShuffleIndexInformation load(File file) throws IOException {
return new ShuffleIndexInformation(file);
}
};
shuffleIndexCache = CacheBuilder.newBuilder()
shuffleIndexCache = Caffeine.newBuilder()
.maximumWeight(JavaUtils.byteStringAsBytes(indexCacheSize))
.weigher(new Weigher<File, ShuffleIndexInformation>() {
public int weigh(File file, ShuffleIndexInformation indexInfo) {
return indexInfo.getSize();
}
})
.build(indexCacheLoader);
.weigher((Weigher<File, ShuffleIndexInformation>)(file, indexInfo) -> indexInfo.getSize())
.build(ShuffleIndexInformation::new);
db = LevelDBProvider.initLevelDB(this.registeredExecutorFile, CURRENT_VERSION, mapper);
if (db != null) {
executors = reloadRegisteredExecutors(db);
Expand Down Expand Up @@ -317,7 +303,7 @@ private ManagedBuffer getSortBasedShuffleBlockData(
"shuffle_" + shuffleId + "_" + mapId + "_0.data"),
shuffleIndexRecord.getOffset(),
shuffleIndexRecord.getLength());
} catch (ExecutionException e) {
} catch (CompletionException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you tell me why we needed to change the exception? Is this just what Caffeine throws instead? Do we have test coverage for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you tell me why we needed to change the exception? Is this just what Caffeine throws instead?

com.github.benmanes.caffeine.cache.LoadingCache#get method throw CompletionException if a checked exception was thrown while loading the value.

com.google.common.cache.LoadingCache#get method throw ExecutionException if a checked exception was thrown while loading the value.

So the exception type here has changed.

Copy link
Contributor Author

@LuciferYang LuciferYang Jul 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have test coverage for this?

The ExecutionException (now is CompletionException) is re-throw as RuntimeException, I need to further check whether it is covered by existing test case

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@holdenk testFetchWrongExecutor and testFetchNonexistent in ExternalBlockHandlerSuite already cover this.

throw new RuntimeException("Failed to open file: " + indexFile, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,13 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Weigher;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -43,10 +47,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.Weigher;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import org.roaringbitmap.RoaringBitmap;
Expand Down Expand Up @@ -115,16 +115,10 @@ public RemoteBlockPushResolver(TransportConf conf) {
NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner"));
this.minChunkSize = conf.minChunkSizeInMergedShuffleFile();
this.ioExceptionsThresholdDuringMerge = conf.ioExceptionsThresholdDuringMerge();
CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
new CacheLoader<File, ShuffleIndexInformation>() {
public ShuffleIndexInformation load(File file) throws IOException {
return new ShuffleIndexInformation(file);
}
};
indexCache = CacheBuilder.newBuilder()
indexCache = Caffeine.newBuilder()
.maximumWeight(conf.mergedIndexCacheSize())
.weigher((Weigher<File, ShuffleIndexInformation>) (file, indexInfo) -> indexInfo.getSize())
.build(indexCacheLoader);
.weigher((Weigher<File, ShuffleIndexInformation>)(file, indexInfo) -> indexInfo.getSize())
.build(ShuffleIndexInformation::new);
this.errorHandler = new ErrorHandler.BlockPushErrorHandler();
}

Expand Down Expand Up @@ -299,7 +293,7 @@ public ManagedBuffer getMergedBlockData(
ShuffleIndexRecord shuffleIndexRecord = shuffleIndexInformation.getIndex(chunkId);
return new FileSegmentManagedBuffer(
conf, dataFile, shuffleIndexRecord.getOffset(), shuffleIndexRecord.getLength());
} catch (ExecutionException e) {
} catch (CompletionException e) {
throw new RuntimeException(String.format(
"Failed to open merged shuffle index file %s", indexFile.getPath()), e);
}
Expand Down
12 changes: 12 additions & 0 deletions core/benchmarks/LocalCacheBenchmark-jdk11-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
================================================================================================
Loading Cache
================================================================================================

OpenJDK 64-Bit Server VM 11.0.8+10-LTS on Mac OS X 10.15.7
Intel(R) Core(TM) i5-7360U CPU @ 2.30GHz
Loading Cache: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------
Guava Cache 5 6 1 15.9 62.8 1.0X
Caffeine 2 2 0 46.1 21.7 2.9X


12 changes: 12 additions & 0 deletions core/benchmarks/LocalCacheBenchmark-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
================================================================================================
Loading Cache
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_232-b18 on Mac OS X 10.15.7
Intel(R) Core(TM) i5-7360U CPU @ 2.30GHz
Loading Cache: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
--------------------------------------------------------------------------------------------------------------------------
Guava Cache 5 5 0 16.7 60.0 1.0X
Caffeine 2 2 1 44.3 22.6 2.7X


4 changes: 4 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.concurrent
import scala.collection.mutable
import scala.util.Properties

import com.google.common.cache.CacheBuilder
import com.github.benmanes.caffeine.cache.Caffeine
import org.apache.hadoop.conf.Configuration

import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -77,7 +77,7 @@ class SparkEnv (
// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata =
CacheBuilder.newBuilder().softValues().build[String, AnyRef]().asMap()
Caffeine.newBuilder().softValues().build[String, AnyRef]().asMap()

private[spark] var driverTmpDir: Option[String] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
package org.apache.spark.deploy.history

import java.util.NoSuchElementException
import java.util.concurrent.ExecutionException
import java.util.concurrent.CompletionException
import javax.servlet.{DispatcherType, Filter, FilterChain, FilterConfig, ServletException, ServletRequest, ServletResponse}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}

import scala.collection.JavaConverters._

import com.codahale.metrics.{Counter, MetricRegistry, Timer}
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache, RemovalListener, RemovalNotification}
import com.google.common.util.concurrent.UncheckedExecutionException
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine, LoadingCache, RemovalCause, RemovalListener}
import org.eclipse.jetty.servlet.FilterHolder

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -62,21 +61,27 @@ private[history] class ApplicationCache(

/**
* Removal event notifies the provider to detach the UI.
* @param rm removal notification
* @param key removal key
* @param value removal value
* @param cause the reason why a `CacheEntry` was removed, it should
* always be `SIZE` because `appCache` configured with
* `maximumSize` eviction strategy
*/
override def onRemoval(rm: RemovalNotification[CacheKey, CacheEntry]): Unit = {
override def onRemoval(key: CacheKey, value: CacheEntry, cause: RemovalCause): Unit = {
metrics.evictionCount.inc()
val key = rm.getKey
logDebug(s"Evicting entry ${key}")
operations.detachSparkUI(key.appId, key.attemptId, rm.getValue().loadedUI.ui)
logDebug(s"Evicting entry $key")
operations.detachSparkUI(key.appId, key.attemptId, value.loadedUI.ui)
}
}

private val appCache: LoadingCache[CacheKey, CacheEntry] = {
CacheBuilder.newBuilder()
.maximumSize(retainedApplications)
.removalListener(removalListener)
.build(appLoader)
val builder = Caffeine.newBuilder()
.maximumSize(retainedApplications)
.removalListener(removalListener)
// SPARK-34309: Use custom Executor to compatible with
// the data eviction behavior of Guava cache
.executor((command: Runnable) => command.run())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to run in the same thread? Is that what the old behaviour would have been?

Copy link
Contributor Author

@LuciferYang LuciferYang Jul 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's a compromise for compatibility with old behaviour at present. In the future, I will optimize this behaviour through other pr.

builder.build[CacheKey, CacheEntry](appLoader)
}

/**
Expand All @@ -86,9 +91,9 @@ private[history] class ApplicationCache(

def get(appId: String, attemptId: Option[String] = None): CacheEntry = {
try {
appCache.get(new CacheKey(appId, attemptId))
appCache.get(CacheKey(appId, attemptId))
} catch {
case e @ (_: ExecutionException | _: UncheckedExecutionException) =>
case e @ (_: CompletionException | _: RuntimeException) =>
throw Option(e.getCause()).getOrElse(e)
}
}
Expand Down Expand Up @@ -127,7 +132,7 @@ private[history] class ApplicationCache(
}

/** @return Number of cached UIs. */
def size(): Long = appCache.size()
def size(): Long = appCache.estimatedSize()

private def time[T](t: Timer)(f: => T): T = {
val timeCtx = t.time()
Expand Down Expand Up @@ -197,7 +202,7 @@ private[history] class ApplicationCache(
val sb = new StringBuilder(s"ApplicationCache(" +
s" retainedApplications= $retainedApplications)")
sb.append(s"; time= ${clock.getTimeMillis()}")
sb.append(s"; entry count= ${appCache.size()}\n")
sb.append(s"; entry count= ${appCache.estimatedSize()}\n")
sb.append("----\n")
appCache.asMap().asScala.foreach {
case(key, entry) => sb.append(s" $key -> $entry\n")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import com.google.common.cache.{CacheBuilder, CacheLoader}
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
import org.apache.hadoop.fs.Path

import org.apache.spark._
Expand Down Expand Up @@ -85,16 +85,18 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag](
}

// Cache of preferred locations of checkpointed files.
@transient private[spark] lazy val cachedPreferredLocations = CacheBuilder.newBuilder()
.expireAfterWrite(
SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME).get,
TimeUnit.MINUTES)
.build(
new CacheLoader[Partition, Seq[String]]() {
override def load(split: Partition): Seq[String] = {
getPartitionBlockLocations(split)
}
})
@transient private[spark] lazy val cachedPreferredLocations = {
val builder = Caffeine.newBuilder()
.expireAfterWrite(
SparkEnv.get.conf.get(CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME).get,
TimeUnit.MINUTES)
val loader = new CacheLoader[Partition, Seq[String]]() {
override def load(split: Partition): Seq[String] = {
getPartitionBlockLocations(split)
}
}
builder.build[Partition, Seq[String]](loader)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why can't we just use builder.build[Partition, Seq[String]](getPartitionBlockLocations)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Try to use builder.build[Partition, Seq[String]](getPartitionBlockLocations)
    The compile errors as follows:
[ERROR] /spark-mine/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala:92: missing argument list for method getPartitionBlockLocations in class ReliableCheckpointRDD
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `getPartitionBlockLocations _` or `getPartitionBlockLocations(_)` instead of `getPartitionBlockLocations`.
  1. Then try to use getPartitionBlockLocations(_) and getPartitionBlockLocations _

The compile errors as follows:

[ERROR] /spark-mine/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala:92: overloaded method value build with alternatives:
  (x$1: com.github.benmanes.caffeine.cache.CacheLoader[_ >: org.apache.spark.Partition, Seq[String]])com.github.benmanes.caffeine.cache.LoadingCache[org.apache.spark.Partition,Seq[String]] <and>
  ()com.github.benmanes.caffeine.cache.Cache[org.apache.spark.Partition,Seq[String]]
 cannot be applied to (org.apache.spark.Partition => Seq[String])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then try to use (partition: Partition) => getPartitionBlockLocations(partition), the error message same as getPartitionBlockLocations(_)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7b360a7 revert this change

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks for trying it out - seems there are some issue in Java/Scala interop there.

}

// Returns the block locations of given partition on file system.
private def getPartitionBlockLocations(split: Partition): Seq[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import scala.util.{Failure, Random, Success, Try}
import scala.util.control.NonFatal

import com.codahale.metrics.{MetricRegistry, MetricSet}
import com.google.common.cache.CacheBuilder
import com.github.benmanes.caffeine.cache.Caffeine
import org.apache.commons.io.IOUtils

import org.apache.spark._
Expand Down Expand Up @@ -123,7 +123,7 @@ private[spark] class HostLocalDirManager(
blockStoreClient: BlockStoreClient) extends Logging {

private val executorIdToLocalDirsCache =
CacheBuilder
Caffeine
.newBuilder()
.maximumSize(cacheSize)
.build[String, Array[String]]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.storage

import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}

import com.google.common.cache.{CacheBuilder, CacheLoader}
import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}

import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -136,11 +136,14 @@ private[spark] object BlockManagerId {
* The max cache size is hardcoded to 10000, since the size of a BlockManagerId
* object is about 48B, the total memory cost should be below 1MB which is feasible.
*/
val blockManagerIdCache = CacheBuilder.newBuilder()
.maximumSize(10000)
.build(new CacheLoader[BlockManagerId, BlockManagerId]() {
override def load(id: BlockManagerId) = id
})
val blockManagerIdCache = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe change this to:

  val blockManagerIdCache = Caffeine.newBuilder()
      .maximumSize(10000)
      .build[BlockManagerId, BlockManagerId](identity)

?

Copy link
Contributor Author

@LuciferYang LuciferYang Jul 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to getPartitionBlockLocations , it cannot be compiled with .build[BlockManagerId, BlockManagerId](identity)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7b360a7 revert this change

Caffeine.newBuilder()
.maximumSize(10000)
.build[BlockManagerId, BlockManagerId](
new CacheLoader[BlockManagerId, BlockManagerId]() {
override def load(id: BlockManagerId): BlockManagerId = id
})
}

def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
blockManagerIdCache.get(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.concurrent.{ExecutionContext, Future, TimeoutException}
import scala.util.Random
import scala.util.control.NonFatal

import com.google.common.cache.CacheBuilder
import com.github.benmanes.caffeine.cache.Caffeine

import org.apache.spark.{MapOutputTrackerMaster, SparkConf}
import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -56,7 +56,7 @@ class BlockManagerMasterEndpoint(

// Mapping from executor id to the block manager's local disk directories.
private val executorIdToLocalDirs =
CacheBuilder
Caffeine
.newBuilder()
.maximumSize(conf.get(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE))
.build[String, Array[String]]()
Expand Down
Loading