diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclTransformation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclTransformation.java index 83ca54e0bb21c..288fddc02bd9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclTransformation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/AclTransformation.java @@ -246,19 +246,14 @@ private AclTransformation() { * All access ACL entries sort ahead of all default ACL entries. */ static final Comparator ACL_ENTRY_COMPARATOR = - new Comparator() { - @Override - public int compare(AclEntry entry1, AclEntry entry2) { - return ComparisonChain.start() + (entry1, entry2) -> ComparisonChain.start() .compare(entry1.getScope(), entry2.getScope(), - Ordering.explicit(ACCESS, DEFAULT)) + Ordering.explicit(ACCESS, DEFAULT)) .compare(entry1.getType(), entry2.getType(), - Ordering.explicit(USER, GROUP, MASK, OTHER)) + Ordering.explicit(USER, GROUP, MASK, OTHER)) .compare(entry1.getName(), entry2.getName(), - Ordering.natural().nullsFirst()) + Ordering.natural().nullsFirst()) .result(); - } - }; /** * Builds the final list of ACL entries to return by trimming, sorting and diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DefaultAuditLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DefaultAuditLogger.java index 9ac0bec44cae0..49f6ed3dc690f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DefaultAuditLogger.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DefaultAuditLogger.java @@ -39,12 +39,7 @@ @InterfaceStability.Evolving public abstract class DefaultAuditLogger extends HdfsAuditLogger { protected static final ThreadLocal STRING_BUILDER = - new ThreadLocal() { - @Override - protected StringBuilder initialValue() { - return new StringBuilder(); - } - }; + ThreadLocal.withInitial(StringBuilder::new); protected volatile boolean isCallerContextEnabled; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java index ba4f32fd2154d..cf03df17f9e9a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java @@ -28,7 +28,6 @@ import java.net.HttpURLConnection; import java.net.URL; import java.nio.file.Files; -import java.security.PrivilegedExceptionAction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -478,41 +477,33 @@ public URLLog(URLConnectionFactory connectionFactory, URL url) { @Override public InputStream getInputStream() throws IOException { - return SecurityUtil.doAsCurrentUser( - new PrivilegedExceptionAction() { - @Override - public InputStream run() throws IOException { - HttpURLConnection connection; - try { - connection = (HttpURLConnection) - connectionFactory.openConnection(url, isSpnegoEnabled); - } catch (AuthenticationException e) { - throw new IOException(e); - } - - if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) { - throw new HttpGetFailedException( - "Fetch of " + url + - " failed with status code " + connection.getResponseCode() + - "\nResponse message:\n" + connection.getResponseMessage(), - connection); - } - - String contentLength = connection.getHeaderField(CONTENT_LENGTH); - if (contentLength != null) { - advertisedSize = Long.parseLong(contentLength); - if (advertisedSize <= 0) { - throw new IOException("Invalid " + CONTENT_LENGTH + " header: " + - contentLength); - } - } else { - throw new IOException(CONTENT_LENGTH + " header is not provided " + - "by the server when trying to fetch " + url); - } - - return connection.getInputStream(); - } - }); + return SecurityUtil.doAsCurrentUser(() -> { + HttpURLConnection connection; + try { + connection = (HttpURLConnection) connectionFactory.openConnection(url, isSpnegoEnabled); + } catch (AuthenticationException e) { + throw new IOException(e); + } + + if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) { + throw new HttpGetFailedException("Fetch of " + url + + " failed with status code " + connection.getResponseCode() + + "\nResponse message:\n" + connection.getResponseMessage(), connection); + } + + String contentLength = connection.getHeaderField(CONTENT_LENGTH); + if (contentLength != null) { + advertisedSize = Long.parseLong(contentLength); + if (advertisedSize <= 0) { + throw new IOException("Invalid " + CONTENT_LENGTH + " header: " + contentLength); + } + } else { + throw new IOException(CONTENT_LENGTH + " header is not provided " + + "by the server when trying to fetch " + url); + } + + return connection.getInputStream(); + }); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java index 2110a408b0877..1a8ead563f5d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirEncryptionZoneOp.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.security.GeneralSecurityException; -import java.security.PrivilegedExceptionAction; import java.util.AbstractMap; import java.util.concurrent.ExecutorService; import java.util.EnumSet; @@ -94,14 +93,11 @@ private static EncryptedKeyVersion generateEncryptedDataEncryptionKey( // KMS does not need configuration to allow non-hdfs user GENERATE_EEK // operation. EncryptedKeyVersion edek = SecurityUtil.doAsLoginUser( - new PrivilegedExceptionAction() { - @Override - public EncryptedKeyVersion run() throws IOException { - try { - return fsd.getProvider().generateEncryptedKey(ezKeyName); - } catch (GeneralSecurityException e) { - throw new IOException(e); - } + () -> { + try { + return fsd.getProvider().generateEncryptedKey(ezKeyName); + } catch (GeneralSecurityException e) { + throw new IOException(e); } }); long generateEDEKTime = monotonicNow() - generateEDEKStartTime; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java index 6628b56a132e0..8bdeca3ccad7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirErasureCodingOp.java @@ -76,9 +76,7 @@ static ErasureCodingPolicy getEnabledErasureCodingPolicyByName( .getEnabledPolicyByName(ecPolicyName); if (ecPolicy == null) { final String sysPolicies = - Arrays.asList( - fsn.getErasureCodingPolicyManager().getEnabledPolicies()) - .stream() + Arrays.stream(fsn.getErasureCodingPolicyManager().getEnabledPolicies()) .map(ErasureCodingPolicy::getName) .collect(Collectors.joining(", ")); final String message = String.format("Policy '%s' does not match any " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java index c8e0cd224e1af..cd592238ad9c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java @@ -125,7 +125,6 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableFactories; -import org.apache.hadoop.io.WritableFactory; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.ErasureCodeConstants; import org.apache.hadoop.ipc.ClientId; @@ -158,12 +157,7 @@ public abstract class FSEditLogOp { public static class OpInstanceCache { private static final ThreadLocal CACHE = - new ThreadLocal() { - @Override - protected OpInstanceCacheMap initialValue() { - return new OpInstanceCacheMap(); - } - }; + ThreadLocal.withInitial(OpInstanceCacheMap::new); @SuppressWarnings("serial") static final class OpInstanceCacheMap extends @@ -4467,12 +4461,7 @@ static class BlockTwo implements Writable { long len; static { // register a ctor - WritableFactories.setFactory - (BlockTwo.class, - new WritableFactory() { - @Override - public Writable newInstance() { return new BlockTwo(); } - }); + WritableFactories.setFactory(BlockTwo.class, BlockTwo::new); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index 3f0c9faa97c9a..5b14afc73ad12 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -307,20 +307,8 @@ private void fillUpInodeList(ArrayList inodeList, INode inode) { private void addToCacheAndBlockMap(final ArrayList inodeList) { final ArrayList inodes = new ArrayList<>(inodeList); - nameCacheUpdateExecutor.submit( - new Runnable() { - @Override - public void run() { - addToCacheInternal(inodes); - } - }); - blocksMapUpdateExecutor.submit( - new Runnable() { - @Override - public void run() { - updateBlockMapInternal(inodes); - } - }); + nameCacheUpdateExecutor.submit(() -> addToCacheInternal(inodes)); + blocksMapUpdateExecutor.submit(() -> updateBlockMapInternal(inodes)); } // update name cache with non-thread safe diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java index 58c24d4377be0..b0b46d7d56be3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatProtobuf.java @@ -34,8 +34,6 @@ import java.security.DigestOutputStream; import java.security.MessageDigest; import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -377,18 +375,15 @@ private void loadInternal(RandomAccessFile raFile, FileInputStream fin) ArrayList sections = Lists.newArrayList(summary .getSectionsList()); - Collections.sort(sections, new Comparator() { - @Override - public int compare(FileSummary.Section s1, FileSummary.Section s2) { - SectionName n1 = SectionName.fromString(s1.getName()); - SectionName n2 = SectionName.fromString(s2.getName()); - if (n1 == null) { - return n2 == null ? 0 : -1; - } else if (n2 == null) { - return -1; - } else { - return n1.ordinal() - n2.ordinal(); - } + sections.sort((s1, s2) -> { + SectionName n1 = SectionName.fromString(s1.getName()); + SectionName n2 = SectionName.fromString(s2.getName()); + if (n1 == null) { + return n2 == null ? 0 : -1; + } else if (n2 == null) { + return -1; + } else { + return n1.ordinal() - n2.ordinal(); } }); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java index 404f2c73ad3a2..b1cd36f4785da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java @@ -75,13 +75,7 @@ private FSImageSerialization() {} * in this class should be thread-safe since image-saving is multithreaded, so * we need to keep the static objects in a thread-local. */ - static private final ThreadLocal TL_DATA = - new ThreadLocal() { - @Override - protected TLData initialValue() { - return new TLData(); - } - }; + static private final ThreadLocal TL_DATA = ThreadLocal.withInitial(TLData::new); /** * Simple container "struct" for threadlocal data. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 347fec858675d..f5a301947c950 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -6736,12 +6736,7 @@ public String getNodeUsage() { new HashMap>(); final List live = new ArrayList(); blockManager.getDatanodeManager().fetchDatanodes(live, null, true); - for (Iterator it = live.iterator(); it.hasNext();) { - DatanodeDescriptor node = it.next(); - if (!node.isInService()) { - it.remove(); - } - } + live.removeIf(node -> !node.isInService()); if (live.size() > 0) { float totalDfsUsed = 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java index e510bd625e738..f4d9425a6a691 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystemLock.java @@ -93,12 +93,7 @@ class FSNamesystemLock { * many read locks held simultaneously. */ private final ThreadLocal readLockHeldTimeStampNanos = - new ThreadLocal() { - @Override - public Long initialValue() { - return Long.MAX_VALUE; - } - }; + ThreadLocal.withInitial(() -> Long.MAX_VALUE); private final AtomicInteger numReadLockWarningsSuppressed = new AtomicInteger(0); /** Time stamp (ms) of the last time a read lock report was written. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java index 42a5d2d7385bf..13ddbd6073077 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java @@ -537,16 +537,11 @@ public static class EditLogFile { private boolean hasCorruptHeader = false; private final boolean isInProgress; - final static Comparator COMPARE_BY_START_TXID - = new Comparator() { - @Override - public int compare(EditLogFile a, EditLogFile b) { - return ComparisonChain.start() - .compare(a.getFirstTxId(), b.getFirstTxId()) - .compare(a.getLastTxId(), b.getLastTxId()) - .result(); - } - }; + final static Comparator COMPARE_BY_START_TXID = + (a, b) -> ComparisonChain.start() + .compare(a.getFirstTxId(), b.getFirstTxId()) + .compare(a.getLastTxId(), b.getLastTxId()) + .result(); EditLogFile(File file, long firstTxId, long lastTxId) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FsImageValidation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FsImageValidation.java index 3325222267d5d..6f226f3aa00fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FsImageValidation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FsImageValidation.java @@ -48,7 +48,6 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collections; -import java.util.Iterator; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.atomic.AtomicInteger; @@ -148,19 +147,16 @@ static String toCommaSeparatedNumber(long n) { /** @return a filter for the given type. */ static FilenameFilter newFilenameFilter(NameNodeFile type) { final String prefix = type.getName() + "_"; - return new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - if (!name.startsWith(prefix)) { + return (dir, name) -> { + if (!name.startsWith(prefix)) { + return false; + } + for (int i = prefix.length(); i < name.length(); i++) { + if (!Character.isDigit(name.charAt(i))) { return false; } - for (int i = prefix.length(); i < name.length(); i++) { - if (!Character.isDigit(name.charAt(i))) { - return false; - } - } - return true; } + return true; }; } } @@ -264,12 +260,7 @@ FSNamesystem checkINodeReference(Configuration conf, static class INodeMapValidation { static Iterable iterate(INodeMap map) { - return new Iterable() { - @Override - public Iterator iterator() { - return map.getMapIterator(); - } - }; + return map::getMapIterator; } static void run(FSDirectory fsdir, AtomicInteger errorCount) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java index 6e655f7a13408..aa1a9614ce091 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java @@ -388,13 +388,8 @@ public static class WithCount extends INodeReference { * Compare snapshot with IDs, where null indicates the current status thus * is greater than any non-null snapshot. */ - public static final Comparator WITHNAME_COMPARATOR - = new Comparator() { - @Override - public int compare(WithName left, WithName right) { - return left.lastSnapshotId - right.lastSnapshotId; - } - }; + public static final Comparator WITHNAME_COMPARATOR = + Comparator.comparingInt(left -> left.lastSnapshotId); public WithCount(INodeReference parent, INode referred) { super(parent, referred); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java index 442c1aba95b1c..b7734add740bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ImageServlet.java @@ -565,135 +565,117 @@ protected void doPut(final HttpServletRequest request, validateRequest(context, conf, request, response, nnImage, parsedParams.getStorageInfoString()); - UserGroupInformation.getCurrentUser().doAs( - new PrivilegedExceptionAction() { - - @Override - public Void run() throws Exception { - // if its not the active NN, then we need to notify the caller it was was the wrong - // target (regardless of the fact that we got the image) - HAServiceProtocol.HAServiceState state = NameNodeHttpServer - .getNameNodeStateFromContext(getServletContext()); - if (state != HAServiceProtocol.HAServiceState.ACTIVE && - state != HAServiceProtocol.HAServiceState.OBSERVER) { - // we need a different response type here so the client can differentiate this - // from the failure to upload due to (1) security, or (2) other checkpoints already - // present - sendError(response, HttpServletResponse.SC_EXPECTATION_FAILED, - "Nameode "+request.getLocalAddr()+" is currently not in a state which can " - + "accept uploads of new fsimages. State: "+state); - return null; - } - - final long txid = parsedParams.getTxId(); - String remoteAddr = request.getRemoteAddr(); - ImageUploadRequest imageRequest = new ImageUploadRequest(txid, remoteAddr); - - final NameNodeFile nnf = parsedParams.getNameNodeFile(); - - // if the node is attempting to upload an older transaction, we ignore it - SortedSet larger = currentlyDownloadingCheckpoints.tailSet(imageRequest); - if (larger.size() > 0) { - sendError(response, HttpServletResponse.SC_CONFLICT, - "Another checkpointer is already in the process of uploading a" + - " checkpoint made up to transaction ID " + larger.last()); - return null; - } - - //make sure no one else has started uploading one - if (!currentlyDownloadingCheckpoints.add(imageRequest)) { - sendError(response, HttpServletResponse.SC_CONFLICT, - "Either current namenode is checkpointing or another" - + " checkpointer is already in the process of " - + "uploading a checkpoint made at transaction ID " - + txid); - return null; - } - - long now = System.currentTimeMillis(); - long lastCheckpointTime = - nnImage.getStorage().getMostRecentCheckpointTime(); - long lastCheckpointTxid = - nnImage.getStorage().getMostRecentCheckpointTxId(); - - long checkpointPeriod = - conf.getTimeDuration(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, - DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT, TimeUnit.SECONDS); - checkpointPeriod = Math.round( - checkpointPeriod * recentImageCheckTimePrecision); - - long checkpointTxnCount = - conf.getLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY, - DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT); - - long timeDelta = TimeUnit.MILLISECONDS.toSeconds( - now - lastCheckpointTime); - - // Since the goal of the check below is to prevent overly - // frequent upload from Standby, the check should only be done - // for the periodical upload from Standby. For the other - // scenarios such as rollback image and ckpt file, they skip - // this check, see HDFS-15036 for more info. - if (checkRecentImageEnable && - NameNodeFile.IMAGE.equals(parsedParams.getNameNodeFile()) && - timeDelta < checkpointPeriod && - txid - lastCheckpointTxid < checkpointTxnCount) { - // only when at least one of two conditions are met we accept - // a new fsImage - // 1. most recent image's txid is too far behind - // 2. last checkpoint time was too old - String message = "Rejecting a fsimage due to small time delta " - + "and txnid delta. Time since previous checkpoint is " - + timeDelta + " expecting at least " + checkpointPeriod - + " txnid delta since previous checkpoint is " + - (txid - lastCheckpointTxid) + " expecting at least " - + checkpointTxnCount; - LOG.info(message); - sendError(response, HttpServletResponse.SC_CONFLICT, message); - return null; - } - - try { - if (nnImage.getStorage().findImageFile(nnf, txid) != null) { - String message = "Either current namenode has checkpointed or " - + "another checkpointer already uploaded an " - + "checkpoint for txid " + txid; - LOG.info(message); - sendError(response, HttpServletResponse.SC_CONFLICT, message); - return null; - } - - InputStream stream = request.getInputStream(); - try { - long start = monotonicNow(); - MD5Hash downloadImageDigest = TransferFsImage - .handleUploadImageRequest(request, txid, - nnImage.getStorage(), stream, - parsedParams.getFileSize(), getThrottler(conf)); - nnImage.saveDigestAndRenameCheckpointImage(nnf, txid, - downloadImageDigest); - // Metrics non-null only when used inside name node - if (metrics != null) { - long elapsed = monotonicNow() - start; - metrics.addPutImage(elapsed); - } - // Now that we have a new checkpoint, we might be able to - // remove some old ones. - nnImage.purgeOldStorage(nnf); - } finally { - // remove the request once we've processed it, or it threw an error, so we - // aren't using it either - currentlyDownloadingCheckpoints.remove(imageRequest); - - stream.close(); - } - } finally { - nnImage.removeFromCheckpointing(txid); - } - return null; - } + UserGroupInformation.getCurrentUser().doAs((PrivilegedExceptionAction) () -> { + // if its not the active NN, then we need to notify the caller it was was the wrong + // target (regardless of the fact that we got the image) + HAServiceProtocol.HAServiceState state = NameNodeHttpServer + .getNameNodeStateFromContext(getServletContext()); + if (state != HAServiceProtocol.HAServiceState.ACTIVE && + state != HAServiceProtocol.HAServiceState.OBSERVER) { + // we need a different response type here so the client can differentiate this + // from the failure to upload due to (1) security, or (2) other checkpoints already + // present + sendError(response, HttpServletResponse.SC_EXPECTATION_FAILED, + "Nameode "+request.getLocalAddr()+" is currently not in a state which can " + + "accept uploads of new fsimages. State: "+state); + return null; + } + + final long txid = parsedParams.getTxId(); + String remoteAddr = request.getRemoteAddr(); + ImageUploadRequest imageRequest = new ImageUploadRequest(txid, remoteAddr); - }); + final NameNodeFile nnf = parsedParams.getNameNodeFile(); + + // if the node is attempting to upload an older transaction, we ignore it + SortedSet larger = + currentlyDownloadingCheckpoints.tailSet(imageRequest); + if (larger.size() > 0) { + sendError(response, HttpServletResponse.SC_CONFLICT, + "Another checkpointer is already in the process of uploading a" + + " checkpoint made up to transaction ID " + larger.last()); + return null; + } + + //make sure no one else has started uploading one + if (!currentlyDownloadingCheckpoints.add(imageRequest)) { + sendError(response, HttpServletResponse.SC_CONFLICT, + "Either current namenode is checkpointing or another" + + " checkpointer is already in the process of " + + "uploading a checkpoint made at transaction ID " + + txid); + return null; + } + + long now = System.currentTimeMillis(); + long lastCheckpointTime = nnImage.getStorage().getMostRecentCheckpointTime(); + long lastCheckpointTxid = nnImage.getStorage().getMostRecentCheckpointTxId(); + + long checkpointPeriod = conf.getTimeDuration(DFS_NAMENODE_CHECKPOINT_PERIOD_KEY, + DFS_NAMENODE_CHECKPOINT_PERIOD_DEFAULT, TimeUnit.SECONDS); + checkpointPeriod = Math.round(checkpointPeriod * recentImageCheckTimePrecision); + + long checkpointTxnCount = conf.getLong(DFS_NAMENODE_CHECKPOINT_TXNS_KEY, + DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT); + + long timeDelta = TimeUnit.MILLISECONDS.toSeconds(now - lastCheckpointTime); + + // Since the goal of the check below is to prevent overly + // frequent upload from Standby, the check should only be done + // for the periodical upload from Standby. For the other + // scenarios such as rollback image and ckpt file, they skip + // this check, see HDFS-15036 for more info. + if (checkRecentImageEnable && NameNodeFile.IMAGE.equals(parsedParams.getNameNodeFile()) && + timeDelta < checkpointPeriod && txid - lastCheckpointTxid < checkpointTxnCount) { + // only when at least one of two conditions are met we accept + // a new fsImage + // 1. most recent image's txid is too far behind + // 2. last checkpoint time was too old + String message = "Rejecting a fsimage due to small time delta " + + "and txnid delta. Time since previous checkpoint is " + + timeDelta + " expecting at least " + checkpointPeriod + + " txnid delta since previous checkpoint is " + + (txid - lastCheckpointTxid) + " expecting at least " + + checkpointTxnCount; + LOG.info(message); + sendError(response, HttpServletResponse.SC_CONFLICT, message); + return null; + } + + try { + if (nnImage.getStorage().findImageFile(nnf, txid) != null) { + String message = "Either current namenode has checkpointed or " + + "another checkpointer already uploaded an " + + "checkpoint for txid " + txid; + LOG.info(message); + sendError(response, HttpServletResponse.SC_CONFLICT, message); + return null; + } + + try (InputStream stream = request.getInputStream()) { + long start = monotonicNow(); + MD5Hash downloadImageDigest = TransferFsImage.handleUploadImageRequest( + request, txid, nnImage.getStorage(), stream, + parsedParams.getFileSize(), getThrottler(conf)); + nnImage.saveDigestAndRenameCheckpointImage(nnf, txid, downloadImageDigest); + // Metrics non-null only when used inside name node + if (metrics != null) { + long elapsed = monotonicNow() - start; + metrics.addPutImage(elapsed); + } + // Now that we have a new checkpoint, we might be able to + // remove some old ones. + nnImage.purgeOldStorage(nnf); + } finally { + // remove the request once we've processed it, or it threw an error, so we + // aren't using it either + currentlyDownloadingCheckpoints.remove(imageRequest); + } + } finally { + nnImage.removeFromCheckpointing(txid); + } + return null; + }); } catch (Throwable t) { String errMsg = "PutImage failed. " + StringUtils.stringifyException(t); sendError(response, HttpServletResponse.SC_GONE, errMsg); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java index b21a34a932af9..77acba586508e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java @@ -223,25 +223,21 @@ public Set getINodeWithLeases(final INodeDirectory Executors.newFixedThreadPool(workerCount); for (int workerIdx = 0; workerIdx < workerCount; workerIdx++) { final int startIdx = workerIdx; - Callable> c = new Callable>() { - @Override - public List call() { - List iNodesInPaths = Lists.newArrayList(); - for (int idx = startIdx; idx < inodeCount; idx += workerCount) { - INode inode = inodes[idx]; - if (!inode.isFile()) { - continue; - } - INodesInPath inodesInPath = INodesInPath.fromINode( - fsnamesystem.getFSDirectory().getRoot(), inode.asFile()); - if (ancestorDir != null && - !inodesInPath.isDescendant(ancestorDir)) { - continue; - } - iNodesInPaths.add(inodesInPath); + Callable> c = () -> { + List iNodesInPaths = Lists.newArrayList(); + for (int idx = startIdx; idx < inodeCount; idx += workerCount) { + INode inode = inodes[idx]; + if (!inode.isFile()) { + continue; + } + INodesInPath inodesInPath = INodesInPath.fromINode( + fsnamesystem.getFSDirectory().getRoot(), inode.asFile()); + if (ancestorDir != null && !inodesInPath.isDescendant(ancestorDir)) { + continue; } - return iNodesInPaths; + iNodesInPaths.add(inodesInPath); } + return iNodesInPaths; }; // Submit the inode filter task to the Executor Service diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java index b81cfccd75734..aa80b60093245 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java @@ -18,11 +18,9 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.EnumSet; import java.util.Iterator; import java.util.List; @@ -134,15 +132,10 @@ void purgeOldStorage(NameNodeFile nnf) throws IOException { ArrayList editLogs = new ArrayList(); purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false, false); - Collections.sort(editLogs, new Comparator() { - @Override - public int compare(EditLogInputStream a, EditLogInputStream b) { - return ComparisonChain.start() - .compare(a.getFirstTxId(), b.getFirstTxId()) - .compare(a.getLastTxId(), b.getLastTxId()) - .result(); - } - }); + editLogs.sort((a, b) -> ComparisonChain.start() + .compare(a.getFirstTxId(), b.getFirstTxId()) + .compare(a.getLastTxId(), b.getLastTxId()) + .result()); // Remove from consideration any edit logs that are in fact required. while (editLogs.size() > 0 && @@ -256,12 +249,8 @@ void purgeOldLegacyOIVImages(String dir, long txid) { String filesInStorage[]; // Get the listing - filesInStorage = oivImageDir.list(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.matches(oivImagePrefix + "_(\\d+)"); - } - }); + filesInStorage = oivImageDir.list( + (dir1, name) -> name.matches(oivImagePrefix + "_(\\d+)")); // Check whether there is any work to do. if (filesInStorage != null diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 63c7721b7493c..7ed3dbd140daa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -112,7 +112,6 @@ import java.io.PrintStream; import java.net.InetSocketAddress; import java.net.URI; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -1032,13 +1031,10 @@ private void startTrashEmptier(final Configuration conf) throws IOException { // case the current user is the administrator, not the NN. The trash // emptier needs to run as the NN. See HDFS-3972. FileSystem fs = SecurityUtil.doAsLoginUser( - new PrivilegedExceptionAction() { - @Override - public FileSystem run() throws IOException { - FileSystem dfs = new DistributedFileSystem(); - dfs.initialize(FileSystem.getDefaultUri(conf), conf); - return dfs; - } + () -> { + FileSystem dfs = new DistributedFileSystem(); + dfs.initialize(FileSystem.getDefaultUri(conf), conf); + return dfs; }); this.emptier = new Thread(new Trash(fs, conf).getEmptier(), "Trash Emptier"); this.emptier.setDaemon(true); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index b19bfc13acf65..124be95cdf428 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1694,18 +1694,14 @@ public void blockReceivedAndDeleted(final DatanodeRegistration nodeReg, nodeReg, receivedAndDeletedBlocks.length); final BlockManager bm = namesystem.getBlockManager(); for (final StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) { - bm.enqueueBlockOp(new Runnable() { - @Override - public void run() { - try { - namesystem.processIncrementalBlockReport(nodeReg, r); - } catch (Exception ex) { - // usually because the node is unregistered/dead. next heartbeat - // will correct the problem - blockStateChangeLog.error( - "*BLOCK* NameNode.blockReceivedAndDeleted: " - + "failed from " + nodeReg + ": " + ex.getMessage()); - } + bm.enqueueBlockOp(() -> { + try { + namesystem.processIncrementalBlockReport(nodeReg, r); + } catch (Exception ex) { + // usually because the node is unregistered/dead. next heartbeat + // will correct the problem + blockStateChangeLog.error("*BLOCK* NameNode.blockReceivedAndDeleted: " + + "failed from " + nodeReg + ": " + ex.getMessage()); } }); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 1e8517edecad1..f6a5bc4802b3c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -51,11 +51,9 @@ import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; -import org.apache.hadoop.hdfs.RemotePeerFactory; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; import org.apache.hadoop.hdfs.protocol.DirectoryListing; @@ -67,7 +65,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; @@ -84,7 +81,6 @@ import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.tracing.TraceUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.tracing.Tracer; @@ -1094,26 +1090,20 @@ private void copyBlock(final DFSClient dfs, LocatedBlock lblock, setCachingStrategy(CachingStrategy.newDropBehind()). setClientCacheContext(dfs.getClientContext()). setConfiguration(namenode.getConf()). - setRemotePeerFactory(new RemotePeerFactory() { - @Override - public Peer newConnectedPeer(InetSocketAddress addr, - Token blockToken, DatanodeID datanodeId) - throws IOException { - Peer peer = null; - Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket(); - try { - s.connect(addr, HdfsConstants.READ_TIMEOUT); - s.setSoTimeout(HdfsConstants.READ_TIMEOUT); - peer = DFSUtilClient.peerFromSocketAndKey( - dfs.getSaslDataTransferClient(), s, NamenodeFsck.this, - blockToken, datanodeId, HdfsConstants.READ_TIMEOUT); - } finally { - if (peer == null) { - IOUtils.closeStream(s); - } + setRemotePeerFactory((addr, blockToken, datanodeId) -> { + Peer peer = null; + Socket s = NetUtils.getDefaultSocketFactory(conf).createSocket(); + try { + s.connect(addr, HdfsConstants.READ_TIMEOUT); + s.setSoTimeout(HdfsConstants.READ_TIMEOUT); + peer = DFSUtilClient.peerFromSocketAndKey(dfs.getSaslDataTransferClient(), s, + NamenodeFsck.this, blockToken, datanodeId, HdfsConstants.READ_TIMEOUT); + } finally { + if (peer == null) { + IOUtils.closeStream(s); } - return peer; } + return peer; }). build(); } catch (IOException ex) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java index e95200b35aaa5..f0fe53dd2a20e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java @@ -25,7 +25,6 @@ import java.net.InetSocketAddress; import java.net.URI; import java.net.URL; -import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.util.*; @@ -318,14 +317,10 @@ public void shutdown() { @Override public void run() { - SecurityUtil.doAsLoginUserOrFatal( - new PrivilegedAction() { - @Override - public Object run() { - doWork(); - return null; - } - }); + SecurityUtil.doAsLoginUserOrFatal(() -> { + doWork(); + return null; + }); } // // The main work loop @@ -406,11 +401,8 @@ static boolean downloadCheckpointFiles( } try { - Boolean b = UserGroupInformation.getCurrentUser().doAs( - new PrivilegedExceptionAction() { - - @Override - public Boolean run() throws Exception { + return UserGroupInformation.getCurrentUser().doAs( + (PrivilegedExceptionAction) () -> { dstImage.getStorage().cTime = sig.cTime; // get fsimage @@ -425,19 +417,16 @@ public Boolean run() throws Exception { dstImage.saveDigestAndRenameCheckpointImage(NameNodeFile.IMAGE, sig.mostRecentCheckpointTxId, downloadedHash); } - + // get edits file for (RemoteEditLog log : manifest.getLogs()) { - TransferFsImage.downloadEditsToStorage( - nnHostPort, log, dstImage.getStorage()); + TransferFsImage.downloadEditsToStorage(nnHostPort, log, dstImage.getStorage()); } - + // true if we haven't loaded all the transactions represented by the // downloaded fsimage. return dstImage.getLastAppliedTxId() < sig.mostRecentCheckpointTxId; - } - }); - return b.booleanValue(); + }); } catch (InterruptedException e) { throw new RuntimeException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySummary.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySummary.java index a13032de1c85d..48ecca2e268a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySummary.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySummary.java @@ -20,8 +20,6 @@ import java.text.NumberFormat; import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; import java.util.EnumMap; import java.util.Formatter; import java.util.HashMap; @@ -70,14 +68,7 @@ static List> sortByComparator( List> storageAllocations = new LinkedList<>(unsortMap.entrySet()); // Sorting the list based on values - Collections.sort(storageAllocations, - new Comparator>() { - public int compare(Entry o1, - Entry o2) - { - return o2.getValue().compareTo(o1.getValue()); - } - }); + storageAllocations.sort((o1, o2) -> o2.getValue().compareTo(o1.getValue())); return storageAllocations; }