diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java index d67a759f8d8d..2f4f821ffc4f 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java @@ -59,7 +59,7 @@ public RESPONSE processRequest( try { if (logger.isTraceEnabled()) { logger.trace( - "{} {} request is received: {}", + "[service={}] [type={}] request is received: {}", serviceName, type.toString(), request.toString().replaceAll("\n", "\\\\n")); @@ -73,7 +73,7 @@ public RESPONSE processRequest( if (logger.isTraceEnabled()) { logger.trace( - "{} {} request is processed. Response: " + "[service={}] [type={}] request is processed. Response: " + "{}", serviceName, type.toString(), diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java index cd09da666501..b0e4298b68c2 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java @@ -163,12 +163,12 @@ public > void fireEvent( queuedCount.incrementAndGet(); if (LOG.isDebugEnabled()) { LOG.debug( - "Delivering event {} to executor/handler {}: {}", + "Delivering [event={}] to executor/handler {}: {}", event.getName(), executorAndHandlers.getKey().getName(), TRACING_SERIALIZER.toJson(payload).replaceAll("\n", "\\\\n")); } else if (LOG.isDebugEnabled()) { - LOG.debug("Delivering event {} to executor/handler {}: {}", + LOG.debug("Delivering [event={}] to executor/handler {}: {}", event.getName(), executorAndHandlers.getKey().getName(), payload.getClass().getSimpleName()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java index e1e1d6cf3e6f..f077e72d4465 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -142,7 +142,6 @@ private void unregisterMXBean() { } } - /** * Returns all datanode that are in the given state. This function works by * taking a snapshot of the current collection and then returning the list @@ -154,7 +153,7 @@ private void unregisterMXBean() { @Override public List getNodes(NodeState nodestate) { return nodeStateManager.getNodes(nodestate).stream() - .map(node -> (DatanodeDetails)node).collect(Collectors.toList()); + .map(node -> (DatanodeDetails) node).collect(Collectors.toList()); } /** @@ -165,7 +164,7 @@ public List getNodes(NodeState nodestate) { @Override public List getAllNodes() { return nodeStateManager.getAllNodes().stream() - .map(node -> (DatanodeDetails)node).collect(Collectors.toList()); + .map(node -> (DatanodeDetails) node).collect(Collectors.toList()); } /** @@ -229,11 +228,11 @@ public VersionResponse getVersion(SCMVersionRequestProto versionRequest) { * SCM. * * @param datanodeDetails - Send datanodeDetails with Node info. - * This function generates and assigns new datanode ID - * for the datanode. This allows SCM to be run independent - * of Namenode if required. - * @param nodeReport NodeReport. - * + * This function generates and assigns new datanode ID + * for the datanode. This allows SCM to be run + * independent + * of Namenode if required. + * @param nodeReport NodeReport. * @return SCMHeartbeatResponseProto */ @Override @@ -338,7 +337,7 @@ public Boolean isNodeRegistered(DatanodeDetails datanodeDetails) { */ @Override public void processNodeReport(DatanodeDetails datanodeDetails, - NodeReportProto nodeReport) { + NodeReportProto nodeReport) { if (LOG.isDebugEnabled()) { LOG.debug("Processing node report from [datanode={}]", datanodeDetails.getHostName()); @@ -363,6 +362,7 @@ public void processNodeReport(DatanodeDetails datanodeDetails, /** * Returns the aggregated node stats. + * * @return the aggregated node stats. */ @Override @@ -381,6 +381,7 @@ public SCMNodeStat getStats() { /** * Return a map of node stats. + * * @return a map of individual node stats (live/stale but not dead). */ @Override @@ -388,7 +389,7 @@ public Map getNodeStats() { final Map nodeStats = new HashMap<>(); - final List healthyNodes = nodeStateManager + final List healthyNodes = nodeStateManager .getNodes(NodeState.HEALTHY); final List staleNodes = nodeStateManager .getNodes(NodeState.STALE); @@ -406,6 +407,7 @@ public Map getNodeStats() { /** * Return the node stat of the specified datanode. + * * @param datanodeDetails - datanode ID. * @return node stat if it is live/stale, null if it is decommissioned or * doesn't exist. @@ -442,7 +444,7 @@ private SCMNodeStat getNodeStatInternal(DatanodeDetails datanodeDetails) { @Override public Map getNodeCount() { Map nodeCountMap = new HashMap(); - for(NodeState state : NodeState.values()) { + for (NodeState state : NodeState.values()) { nodeCountMap.put(state.toString(), getNodeCount(state)); } return nodeCountMap; @@ -460,7 +462,7 @@ public Map getNodeInfo() { long ssdUsed = 0L; long ssdRemaining = 0L; - List healthyNodes = nodeStateManager + List healthyNodes = nodeStateManager .getNodes(NodeState.HEALTHY); List staleNodes = nodeStateManager .getNodes(NodeState.STALE); @@ -496,9 +498,9 @@ public Map getNodeInfo() { return nodeInfo; } - /** * Get set of pipelines a datanode is part of. + * * @param datanodeDetails - datanodeID * @return Set of PipelineID */ @@ -507,9 +509,9 @@ public Set getPipelines(DatanodeDetails datanodeDetails) { return nodeStateManager.getPipelineByDnID(datanodeDetails.getUuid()); } - /** * Add pipeline information in the NodeManager. + * * @param pipeline - Pipeline to be added */ @Override @@ -519,6 +521,7 @@ public void addPipeline(Pipeline pipeline) { /** * Remove a pipeline information from the NodeManager. + * * @param pipeline - Pipeline to be removed */ @Override @@ -528,17 +531,18 @@ public void removePipeline(Pipeline pipeline) { @Override public void addContainer(final DatanodeDetails datanodeDetails, - final ContainerID containerId) + final ContainerID containerId) throws NodeNotFoundException { nodeStateManager.addContainer(datanodeDetails.getUuid(), containerId); } /** * Update set of containers available on a datanode. + * * @param datanodeDetails - DatanodeID - * @param containerIds - Set of containerIDs + * @param containerIds - Set of containerIDs * @throws NodeNotFoundException - if datanode is not known. For new datanode - * use addDatanodeInContainerMap call. + * use addDatanodeInContainerMap call. */ @Override public void setContainers(DatanodeDetails datanodeDetails, @@ -549,6 +553,7 @@ public void setContainers(DatanodeDetails datanodeDetails, /** * Return set of containerIDs available on a datanode. + * * @param datanodeDetails - DatanodeID * @return - set of containerIDs */ @@ -572,7 +577,7 @@ public void addDatanodeCommand(UUID dnId, SCMCommand command) { * DATANODE_COMMAND to the Queue. * * @param commandForDatanode DatanodeCommand - * @param ignored publisher + * @param ignored publisher */ @Override public void onMessage(CommandForDatanode commandForDatanode, @@ -657,6 +662,7 @@ private String nodeResolve(String hostname) { /** * Test utility to stop heartbeat check process. + * * @return ScheduledFuture of next scheduled check that got cancelled. */ @VisibleForTesting @@ -666,6 +672,7 @@ ScheduledFuture pauseHealthCheck() { /** * Test utility to resume the paused heartbeat check process. + * * @return ScheduledFuture of the next scheduled check */ @VisibleForTesting @@ -675,6 +682,7 @@ ScheduledFuture unpauseHealthCheck() { /** * Test utility to get the count of skipped heartbeat check iterations. + * * @return count of skipped heartbeat check iterations */ @VisibleForTesting diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightPoint.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightPoint.java index a23b876b53d7..02a0596491f4 100644 --- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightPoint.java +++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightPoint.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.hdds.HddsUtils; @@ -185,4 +186,18 @@ public void addRpcMetrics(List metrics, metrics.add(performance); } + @Override + public boolean filterLog(Map filters, String logLine) { + if (filters == null) { + return true; + } + boolean result = true; + for (Entry entry : filters.entrySet()) { + if (!logLine.matches( + String.format(".*\\[%s=%s\\].*", entry.getKey(), entry.getValue()))) { + return false; + } + } + return true; + } } diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/InsightPoint.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/InsightPoint.java index 1284cfa95843..57e1ddda1f25 100644 --- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/InsightPoint.java +++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/InsightPoint.java @@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.insight; import java.util.List; +import java.util.Map; /** * Definition of a specific insight points. @@ -44,6 +45,9 @@ public interface InsightPoint { */ List getConfigurationClasses(); - + /** + * Decide if the specific log should be displayed or not.. + */ + boolean filterLog(Map filters, String logLine); } diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/LogSubcommand.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/LogSubcommand.java index 2e8787f2b262..c0fd59d89fc2 100644 --- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/LogSubcommand.java +++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/LogSubcommand.java @@ -23,8 +23,10 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; +import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -59,6 +61,10 @@ public class LogSubcommand extends BaseInsightSubCommand + "show more information / detailed message") private boolean verbose; + @CommandLine.Option(names = "-f", description = "Define filters to scope " + + "the output (eg. -f datanode=_1234_datanode_id)") + private Map filters; + @Override public Void call() throws Exception { OzoneConfiguration conf = @@ -76,7 +82,8 @@ public Void call() throws Exception { Set sources = loggers.stream().map(LoggerSource::getComponent) .collect(Collectors.toSet()); try { - streamLog(conf, sources, loggers); + streamLog(conf, sources, loggers, + (logLine) -> insight.filterLog(filters, logLine)); } finally { for (LoggerSource logger : loggers) { setLogLevel(conf, logger.getLoggerName(), logger.getComponent(), @@ -86,12 +93,20 @@ public Void call() throws Exception { return null; } + /** + * Stream log from multiple endpoint. + * + * @param conf Configuration (to find the log endpoints) + * @param sources Components to connect to (like scm, om...) + * @param relatedLoggers loggers to display + * @param filter any additional filter + */ private void streamLog(OzoneConfiguration conf, Set sources, - List relatedLoggers) { + List relatedLoggers, Predicate filter) { List loggers = new ArrayList<>(); for (Component sourceComponent : sources) { loggers.add(new Thread( - () -> streamLog(conf, sourceComponent, relatedLoggers))); + () -> streamLog(conf, sourceComponent, relatedLoggers, filter))); } for (Thread thread : loggers) { thread.start(); @@ -106,7 +121,7 @@ private void streamLog(OzoneConfiguration conf, Set sources, } private void streamLog(OzoneConfiguration conf, Component logComponent, - List loggers) { + List loggers, Predicate filter) { HttpClient client = HttpClientBuilder.create().build(); HttpGet get = new HttpGet(getHost(conf, logComponent) + "/logstream"); @@ -118,7 +133,8 @@ private void streamLog(OzoneConfiguration conf, Component logComponent, bufferedReader.lines() .filter(line -> { for (LoggerSource logger : loggers) { - if (line.contains(logger.getLoggerName())) { + if (line.contains(logger.getLoggerName()) && filter + .test(line)) { return true; } } diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/datanode/RatisInsight.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/datanode/RatisInsight.java index b87955e8aafa..411306771f8e 100644 --- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/datanode/RatisInsight.java +++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/datanode/RatisInsight.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -72,4 +73,8 @@ public String getDescription() { return "More information about one ratis datanode ring."; } + @Override + public boolean filterLog(Map filters, String logLine) { + return true; + } } diff --git a/hadoop-ozone/insight/src/test/java/org/apache/hadoop/ozone/insight/TestBaseInsightPoint.java b/hadoop-ozone/insight/src/test/java/org/apache/hadoop/ozone/insight/TestBaseInsightPoint.java new file mode 100644 index 000000000000..42fdb397f1c8 --- /dev/null +++ b/hadoop-ozone/insight/src/test/java/org/apache/hadoop/ozone/insight/TestBaseInsightPoint.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.insight; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import static org.junit.Assert.*; +import org.junit.Test; + +/** + * Test common insight point utility methods. + */ +public class TestBaseInsightPoint { + + @Test + public void filterLog() { + + BaseInsightPoint insightPoint = new BaseInsightPoint() { + @Override + public String getDescription() { + return "test"; + } + }; + + //with simple filter + Map filters = new HashMap<>(); + filters.put("datanode", "123"); + + Assert.assertTrue(insightPoint + .filterLog(filters, "This a log specific to [datanode=123]")); + + Assert.assertFalse(insightPoint + .filterLog(filters, "This a log specific to [datanode=234]")); + + //with empty filters + Assert.assertTrue(insightPoint + .filterLog(new HashMap<>(), "This a log specific to [datanode=234]")); + + //with multiple filters + filters.clear(); + filters.put("datanode", "123"); + filters.put("pipeline", "abcd"); + + Assert.assertFalse(insightPoint + .filterLog(filters, "This a log specific to [datanode=123]")); + + Assert.assertTrue(insightPoint + .filterLog(filters, + "This a log specific to [datanode=123] [pipeline=abcd]")); + + Assert.assertFalse(insightPoint + .filterLog(filters, + "This a log specific to [datanode=456] [pipeline=abcd]")); + + } +} \ No newline at end of file