@@ -87,7 +87,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
8787 */
8888 private final ConcurrentMap <String , AppShuffleInfo > appsShuffleInfo ;
8989
90- private final Executor mergedShuffleCleanerExecutor ;
90+ private final Executor mergedShuffleCleaner ;
9191 private final TransportConf conf ;
9292 private final int minChunkSize ;
9393 private final int ioExceptionsThresholdDuringMerge ;
@@ -100,7 +100,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
100100 public RemoteBlockPushResolver (TransportConf conf ) {
101101 this .conf = conf ;
102102 this .appsShuffleInfo = new ConcurrentHashMap <>();
103- this .mergedShuffleCleanerExecutor = Executors .newSingleThreadExecutor (
103+ this .mergedShuffleCleaner = Executors .newSingleThreadExecutor (
104104 // Add `spark` prefix because it will run in NM in Yarn mode.
105105 NettyUtils .createThreadFactory ("spark-shuffle-merged-shuffle-directory-cleaner" ));
106106 this .minChunkSize = conf .minChunkSizeInMergedShuffleFile ();
@@ -121,9 +121,9 @@ public ShuffleIndexInformation load(File file) throws IOException {
121121 @ VisibleForTesting
122122 protected AppShuffleInfo validateAndGetAppShuffleInfo (String appId ) {
123123 // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart
124- AppShuffleInfo appShuffleInfo =
125- Preconditions .checkNotNull ( appsShuffleInfo . get ( appId ) ,
126- "application " + appId + " is not registered or NM was restarted." );
124+ AppShuffleInfo appShuffleInfo = appsShuffleInfo . get ( appId );
125+ Preconditions .checkArgument ( appShuffleInfo != null ,
126+ "application " + appId + " is not registered or NM was restarted." );
127127 return appShuffleInfo ;
128128 }
129129
@@ -255,7 +255,7 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
255255 logger .info ("Application {} removed, cleanupLocalDirs = {}" , appId , cleanupLocalDirs );
256256 AppShuffleInfo appShuffleInfo = appsShuffleInfo .remove (appId );
257257 if (null != appShuffleInfo ) {
258- mergedShuffleCleanerExecutor .execute (
258+ mergedShuffleCleaner .execute (
259259 () -> closeAndDeletePartitionFilesIfNeeded (appShuffleInfo , cleanupLocalDirs ));
260260 }
261261 }
@@ -501,7 +501,7 @@ public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) {
501501 AppShuffleInfo appShuffleInfo = originalAppShuffleInfo .get ();
502502 logger .warn ("Cleanup shuffle info and merged shuffle files for {}_{} as new " +
503503 "application attempt registered" , appId , appShuffleInfo .attemptId );
504- mergedShuffleCleanerExecutor .execute (
504+ mergedShuffleCleaner .execute (
505505 () -> closeAndDeletePartitionFilesIfNeeded (appShuffleInfo , true ));
506506 }
507507 }
@@ -538,10 +538,13 @@ private PushBlockStreamCallback(
538538 String streamId ,
539539 AppShufflePartitionInfo partitionInfo ,
540540 int mapIndex ) {
541- this .mergeManager = Preconditions .checkNotNull (mergeManager );
542- this .appShuffleInfo = Preconditions .checkNotNull (appShuffleInfo );
541+ Preconditions .checkArgument (mergeManager != null );
542+ this .mergeManager = mergeManager ;
543+ Preconditions .checkArgument (appShuffleInfo != null );
544+ this .appShuffleInfo = appShuffleInfo ;
543545 this .streamId = streamId ;
544- this .partitionInfo = Preconditions .checkNotNull (partitionInfo );
546+ Preconditions .checkArgument (partitionInfo != null );
547+ this .partitionInfo = partitionInfo ;
545548 this .mapIndex = mapIndex ;
546549 abortIfNecessary ();
547550 }
@@ -855,7 +858,8 @@ public static class AppShufflePartitionInfo {
855858 File dataFile ,
856859 MergeShuffleFile indexFile ,
857860 MergeShuffleFile metaFile ) throws IOException {
858- this .appId = Preconditions .checkNotNull (appId , "app id is null" );
861+ Preconditions .checkArgument (appId != null , "app id is null" );
862+ this .appId = appId ;
859863 this .shuffleId = shuffleId ;
860864 this .reduceId = reduceId ;
861865 this .dataChannel = new FileOutputStream (dataFile ).getChannel ();
0 commit comments