Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public RESPONSE processRequest(
try {
if (logger.isTraceEnabled()) {
logger.trace(
"{} {} request is received: <json>{}</json>",
"[service={}] [type={}] request is received: <json>{}</json>",
serviceName,
type.toString(),
request.toString().replaceAll("\n", "\\\\n"));
Expand All @@ -73,7 +73,7 @@ public RESPONSE processRequest(

if (logger.isTraceEnabled()) {
logger.trace(
"{} {} request is processed. Response: "
"[service={}] [type={}] request is processed. Response: "
+ "<json>{}</json>",
serviceName,
type.toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,12 @@ public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
queuedCount.incrementAndGet();
if (LOG.isDebugEnabled()) {
LOG.debug(
"Delivering event {} to executor/handler {}: <json>{}</json>",
"Delivering [event={}] to executor/handler {}: <json>{}</json>",
event.getName(),
executorAndHandlers.getKey().getName(),
TRACING_SERIALIZER.toJson(payload).replaceAll("\n", "\\\\n"));
} else if (LOG.isDebugEnabled()) {
LOG.debug("Delivering event {} to executor/handler {}: {}",
LOG.debug("Delivering [event={}] to executor/handler {}: {}",
event.getName(),
executorAndHandlers.getKey().getName(),
payload.getClass().getSimpleName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand Down Expand Up @@ -142,7 +142,6 @@ private void unregisterMXBean() {
}
}


/**
* Returns all datanode that are in the given state. This function works by
* taking a snapshot of the current collection and then returning the list
Expand All @@ -154,7 +153,7 @@ private void unregisterMXBean() {
@Override
public List<DatanodeDetails> getNodes(NodeState nodestate) {
return nodeStateManager.getNodes(nodestate).stream()
.map(node -> (DatanodeDetails)node).collect(Collectors.toList());
.map(node -> (DatanodeDetails) node).collect(Collectors.toList());
}

/**
Expand All @@ -165,7 +164,7 @@ public List<DatanodeDetails> getNodes(NodeState nodestate) {
@Override
public List<DatanodeDetails> getAllNodes() {
return nodeStateManager.getAllNodes().stream()
.map(node -> (DatanodeDetails)node).collect(Collectors.toList());
.map(node -> (DatanodeDetails) node).collect(Collectors.toList());
}

/**
Expand Down Expand Up @@ -229,11 +228,11 @@ public VersionResponse getVersion(SCMVersionRequestProto versionRequest) {
* SCM.
*
* @param datanodeDetails - Send datanodeDetails with Node info.
* This function generates and assigns new datanode ID
* for the datanode. This allows SCM to be run independent
* of Namenode if required.
* @param nodeReport NodeReport.
*
* This function generates and assigns new datanode ID
* for the datanode. This allows SCM to be run
* independent
* of Namenode if required.
* @param nodeReport NodeReport.
* @return SCMHeartbeatResponseProto
*/
@Override
Expand Down Expand Up @@ -338,7 +337,7 @@ public Boolean isNodeRegistered(DatanodeDetails datanodeDetails) {
*/
@Override
public void processNodeReport(DatanodeDetails datanodeDetails,
NodeReportProto nodeReport) {
NodeReportProto nodeReport) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing node report from [datanode={}]",
datanodeDetails.getHostName());
Expand All @@ -363,6 +362,7 @@ public void processNodeReport(DatanodeDetails datanodeDetails,

/**
* Returns the aggregated node stats.
*
* @return the aggregated node stats.
*/
@Override
Expand All @@ -381,14 +381,15 @@ public SCMNodeStat getStats() {

/**
* Return a map of node stats.
*
* @return a map of individual node stats (live/stale but not dead).
*/
@Override
public Map<DatanodeDetails, SCMNodeStat> getNodeStats() {

final Map<DatanodeDetails, SCMNodeStat> nodeStats = new HashMap<>();

final List<DatanodeInfo> healthyNodes = nodeStateManager
final List<DatanodeInfo> healthyNodes = nodeStateManager
.getNodes(NodeState.HEALTHY);
final List<DatanodeInfo> staleNodes = nodeStateManager
.getNodes(NodeState.STALE);
Expand All @@ -406,6 +407,7 @@ public Map<DatanodeDetails, SCMNodeStat> getNodeStats() {

/**
* Return the node stat of the specified datanode.
*
* @param datanodeDetails - datanode ID.
* @return node stat if it is live/stale, null if it is decommissioned or
* doesn't exist.
Expand Down Expand Up @@ -442,7 +444,7 @@ private SCMNodeStat getNodeStatInternal(DatanodeDetails datanodeDetails) {
@Override
public Map<String, Integer> getNodeCount() {
Map<String, Integer> nodeCountMap = new HashMap<String, Integer>();
for(NodeState state : NodeState.values()) {
for (NodeState state : NodeState.values()) {
nodeCountMap.put(state.toString(), getNodeCount(state));
}
return nodeCountMap;
Expand All @@ -460,7 +462,7 @@ public Map<String, Long> getNodeInfo() {
long ssdUsed = 0L;
long ssdRemaining = 0L;

List<DatanodeInfo> healthyNodes = nodeStateManager
List<DatanodeInfo> healthyNodes = nodeStateManager
.getNodes(NodeState.HEALTHY);
List<DatanodeInfo> staleNodes = nodeStateManager
.getNodes(NodeState.STALE);
Expand Down Expand Up @@ -496,9 +498,9 @@ public Map<String, Long> getNodeInfo() {
return nodeInfo;
}


/**
* Get set of pipelines a datanode is part of.
*
* @param datanodeDetails - datanodeID
* @return Set of PipelineID
*/
Expand All @@ -507,9 +509,9 @@ public Set<PipelineID> getPipelines(DatanodeDetails datanodeDetails) {
return nodeStateManager.getPipelineByDnID(datanodeDetails.getUuid());
}


/**
* Add pipeline information in the NodeManager.
*
* @param pipeline - Pipeline to be added
*/
@Override
Expand All @@ -519,6 +521,7 @@ public void addPipeline(Pipeline pipeline) {

/**
* Remove a pipeline information from the NodeManager.
*
* @param pipeline - Pipeline to be removed
*/
@Override
Expand All @@ -528,17 +531,18 @@ public void removePipeline(Pipeline pipeline) {

@Override
public void addContainer(final DatanodeDetails datanodeDetails,
final ContainerID containerId)
final ContainerID containerId)
throws NodeNotFoundException {
nodeStateManager.addContainer(datanodeDetails.getUuid(), containerId);
}

/**
* Update set of containers available on a datanode.
*
* @param datanodeDetails - DatanodeID
* @param containerIds - Set of containerIDs
* @param containerIds - Set of containerIDs
* @throws NodeNotFoundException - if datanode is not known. For new datanode
* use addDatanodeInContainerMap call.
* use addDatanodeInContainerMap call.
*/
@Override
public void setContainers(DatanodeDetails datanodeDetails,
Expand All @@ -549,6 +553,7 @@ public void setContainers(DatanodeDetails datanodeDetails,

/**
* Return set of containerIDs available on a datanode.
*
* @param datanodeDetails - DatanodeID
* @return - set of containerIDs
*/
Expand All @@ -572,7 +577,7 @@ public void addDatanodeCommand(UUID dnId, SCMCommand command) {
* DATANODE_COMMAND to the Queue.
*
* @param commandForDatanode DatanodeCommand
* @param ignored publisher
* @param ignored publisher
*/
@Override
public void onMessage(CommandForDatanode commandForDatanode,
Expand Down Expand Up @@ -657,6 +662,7 @@ private String nodeResolve(String hostname) {

/**
* Test utility to stop heartbeat check process.
*
* @return ScheduledFuture of next scheduled check that got cancelled.
*/
@VisibleForTesting
Expand All @@ -666,6 +672,7 @@ ScheduledFuture pauseHealthCheck() {

/**
* Test utility to resume the paused heartbeat check process.
*
* @return ScheduledFuture of the next scheduled check
*/
@VisibleForTesting
Expand All @@ -675,6 +682,7 @@ ScheduledFuture unpauseHealthCheck() {

/**
* Test utility to get the count of skipped heartbeat check iterations.
*
* @return count of skipped heartbeat check iterations
*/
@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsUtils;
Expand Down Expand Up @@ -185,4 +186,18 @@ public void addRpcMetrics(List<MetricGroupDisplay> metrics,
metrics.add(performance);
}

@Override
public boolean filterLog(Map<String, String> filters, String logLine) {
if (filters == null) {
return true;
}
boolean result = true;
for (Entry<String, String> entry : filters.entrySet()) {
if (!logLine.matches(
String.format(".*\\[%s=%s\\].*", entry.getKey(), entry.getValue()))) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.insight;

import java.util.List;
import java.util.Map;

/**
* Definition of a specific insight points.
Expand All @@ -44,6 +45,9 @@ public interface InsightPoint {
*/
List<Class> getConfigurationClasses();


/**
* Decide if the specific log should be displayed or not..
*/
boolean filterLog(Map<String, String> filters, String logLine);

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -59,6 +61,10 @@ public class LogSubcommand extends BaseInsightSubCommand
+ "show more information / detailed message")
private boolean verbose;

@CommandLine.Option(names = "-f", description = "Define filters to scope "
+ "the output (eg. -f datanode=_1234_datanode_id)")
private Map<String, String> filters;

@Override
public Void call() throws Exception {
OzoneConfiguration conf =
Expand All @@ -76,7 +82,8 @@ public Void call() throws Exception {
Set<Component> sources = loggers.stream().map(LoggerSource::getComponent)
.collect(Collectors.toSet());
try {
streamLog(conf, sources, loggers);
streamLog(conf, sources, loggers,
(logLine) -> insight.filterLog(filters, logLine));
} finally {
for (LoggerSource logger : loggers) {
setLogLevel(conf, logger.getLoggerName(), logger.getComponent(),
Expand All @@ -86,12 +93,20 @@ public Void call() throws Exception {
return null;
}

/**
* Stream log from multiple endpoint.
*
* @param conf Configuration (to find the log endpoints)
* @param sources Components to connect to (like scm, om...)
* @param relatedLoggers loggers to display
* @param filter any additional filter
*/
private void streamLog(OzoneConfiguration conf, Set<Component> sources,
List<LoggerSource> relatedLoggers) {
List<LoggerSource> relatedLoggers, Predicate<String> filter) {
List<Thread> loggers = new ArrayList<>();
for (Component sourceComponent : sources) {
loggers.add(new Thread(
() -> streamLog(conf, sourceComponent, relatedLoggers)));
() -> streamLog(conf, sourceComponent, relatedLoggers, filter)));
}
for (Thread thread : loggers) {
thread.start();
Expand All @@ -106,7 +121,7 @@ private void streamLog(OzoneConfiguration conf, Set<Component> sources,
}

private void streamLog(OzoneConfiguration conf, Component logComponent,
List<LoggerSource> loggers) {
List<LoggerSource> loggers, Predicate<String> filter) {
HttpClient client = HttpClientBuilder.create().build();

HttpGet get = new HttpGet(getHost(conf, logComponent) + "/logstream");
Expand All @@ -118,7 +133,8 @@ private void streamLog(OzoneConfiguration conf, Component logComponent,
bufferedReader.lines()
.filter(line -> {
for (LoggerSource logger : loggers) {
if (line.contains(logger.getLoggerName())) {
if (line.contains(logger.getLoggerName()) && filter
.test(line)) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
Expand Down Expand Up @@ -72,4 +73,8 @@ public String getDescription() {
return "More information about one ratis datanode ring.";
}

@Override
public boolean filterLog(Map<String, String> filters, String logLine) {
return true;
}
}
Loading