diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java index d28fd67d4f..397a46fde9 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/ProtoHistoryParser.java @@ -98,7 +98,6 @@ public boolean hasNext() throws IOException { return message != null; } catch (java.io.EOFException e) { reader.close(); - if (!fileIt.hasNext()) { return false; } else { diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java index db3f6488e0..c1711ce2cb 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/SimpleHistoryParser.java @@ -236,11 +236,13 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source, // time etc). if (dagJson == null) { dagJson = jsonObject; - } else if (dagJson.optJSONObject(ATSConstants.OTHER_INFO) - .optJSONObject(ATSConstants.DAG_PLAN) == null) { - // if DAG_PLAN is not filled already, let's try to fetch it from other - dagJson.getJSONObject(ATSConstants.OTHER_INFO).put(ATSConstants.DAG_PLAN, jsonObject - .getJSONObject(ATSConstants.OTHER_INFO).getJSONObject(ATSConstants.DAG_PLAN)); + } else { + if (dagJson.optJSONObject(ATSConstants.OTHER_INFO).optJSONObject(ATSConstants.DAG_PLAN) == null) { + // if DAG_PLAN is not filled already, let's try to fetch it from other + dagJson.getJSONObject(ATSConstants.OTHER_INFO).put(ATSConstants.DAG_PLAN, + jsonObject.getJSONObject(ATSConstants.OTHER_INFO).getJSONObject(ATSConstants.DAG_PLAN)); + } + mergeSubJSONArray(jsonObject, dagJson, Constants.EVENTS); } JSONArray relatedEntities = dagJson.optJSONArray(Constants .RELATED_ENTITIES); @@ -268,6 +270,8 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source, } if (!vertexJsonMap.containsKey(vertexName)) { vertexJsonMap.put(vertexName, jsonObject); + } else { + mergeSubJSONArray(jsonObject, vertexJsonMap.get(vertexName), Constants.EVENTS); } populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), vertexName, vertexJsonMap); break; @@ -281,6 +285,8 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source, } if (!taskJsonMap.containsKey(taskName)) { taskJsonMap.put(taskName, jsonObject); + } else { + mergeSubJSONArray(jsonObject, taskJsonMap.get(taskName), Constants.EVENTS); } populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), taskName, taskJsonMap); break; @@ -294,6 +300,8 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source, } if (!attemptJsonMap.containsKey(taskAttemptName)) { attemptJsonMap.put(taskAttemptName, jsonObject); + } else { + mergeSubJSONArray(jsonObject, attemptJsonMap.get(taskAttemptName), Constants.EVENTS); } populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), taskAttemptName, attemptJsonMap); break; @@ -311,4 +319,17 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source, "Please provide a valid/complete history log file containing " + dagId); } } + + private void mergeSubJSONArray(JSONObject source, JSONObject destination, String key) + throws JSONException { + if (source.optJSONArray(key) == null) { + source.put(key, new JSONArray()); + } + if (destination.optJSONArray(key) == null) { + destination.put(key, new JSONArray()); + } + for (int i = 0; i < source.getJSONArray(key).length(); i++) { + destination.getJSONArray(key).put(source.getJSONArray(key).get(i)); + } + } } \ No newline at end of file diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java index 3f9666a950..783f486a15 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/datamodel/BaseInfo.java @@ -44,8 +44,20 @@ public abstract class BaseInfo { BaseInfo(JSONObject jsonObject) throws JSONException { final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO); //parse tez counters - tezCounters = Utils.parseTezCountersFromJSON( - otherInfoNode.optJSONObject(Constants.COUNTERS)); + JSONObject countersObj = otherInfoNode.optJSONObject(Constants.COUNTERS); + if (countersObj == null) { + /* + * This is a workaround for formatting differences, where a TaskFinishedEvent's + * counter is a correct json object shown as string, but VertexFinishedEvent's + * counter is an encoded json string, so the latter is interpreted as a String + * while parsing. The issue might be somewhere while converting these event objects + * to proto (HistoryEventProtoConverter). Even if should be fixed there, + * already generated events should be parsed correctly, hence this workaround. + * Will be investigated in the scope of TEZ-4324. + */ + countersObj = new JSONObject(otherInfoNode.optString(Constants.COUNTERS)); + } + tezCounters = Utils.parseTezCountersFromJSON(countersObj); //parse events eventList = Lists.newArrayList(); diff --git a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java index 08eb92b967..94b50a6c5c 100644 --- a/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java +++ b/tez-plugins/tez-history-parser/src/main/java/org/apache/tez/history/parser/utils/Utils.java @@ -128,12 +128,12 @@ public static void parseEvents(JSONArray eventNodes, List eventList) thro JSONObject eventNode = eventNodes.optJSONObject(i); final String eventInfo = eventNode.optString(Constants.EVENT_INFO); final String eventType = eventNode.optString(Constants.EVENT_TYPE); - final long time = eventNode.optLong(Constants.EVENT_TIME_STAMP); + final long time = eventNode.optLong(Constants.EVENT_TIME_STAMP) == 0 + ? eventNode.optLong(Constants.TIMESTAMP) : eventNode.optLong(Constants.EVENT_TIME_STAMP); Event event = new Event(eventInfo, eventType, time); eventList.add(event); - } } diff --git a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoJsonConversion.java b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoJsonConversion.java index 26e20abeb3..ef84b2ec61 100644 --- a/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoJsonConversion.java +++ b/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoJsonConversion.java @@ -556,12 +556,12 @@ private static JSONObject convertTaskFinishedEvent(HistoryEventProto event) thro events.put(finishEvent); jsonObject.put(ATSConstants.EVENTS, events); - long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME); + long timeTaken = getLongDataValueByKey(event, ATSConstants.TIME_TAKEN); JSONObject otherInfo = new JSONObject(); - otherInfo.put(ATSConstants.START_TIME, startTime); + otherInfo.put(ATSConstants.START_TIME, event.getEventTime() - timeTaken); otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime()); - otherInfo.put(ATSConstants.TIME_TAKEN, event.getEventTime() - startTime); + otherInfo.put(ATSConstants.TIME_TAKEN, timeTaken); otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, ATSConstants.STATUS)); otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event, ATSConstants.DIAGNOSTICS)); @@ -620,11 +620,13 @@ private static JSONObject convertVertexFinishedEvent(HistoryEventProto event) events.put(finishEvent); jsonObject.put(ATSConstants.EVENTS, events); - long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME); + long timeTaken = getLongDataValueByKey(event, ATSConstants.TIME_TAKEN); JSONObject otherInfo = new JSONObject(); + otherInfo.put(ATSConstants.START_TIME, event.getEventTime() - timeTaken); otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime()); - otherInfo.put(ATSConstants.TIME_TAKEN, (event.getEventTime() - startTime)); + otherInfo.put(ATSConstants.TIME_TAKEN, timeTaken); + otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, ATSConstants.STATUS)); otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event, ATSConstants.DIAGNOSTICS)); otherInfo.put(ATSConstants.COUNTERS, getJSONDataValueByKey(event, ATSConstants.COUNTERS)); diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java index 6021c5897e..1f0a7ad62c 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/Analyzer.java @@ -18,7 +18,6 @@ package org.apache.tez.analyzer; -import org.apache.hadoop.conf.Configuration; import org.apache.tez.dag.api.TezException; import org.apache.tez.history.parser.datamodel.DagInfo; @@ -54,11 +53,4 @@ public interface Analyzer { * @return description of analyzer */ public String getDescription(); - - /** - * Get config properties related to this analyzer - * - * @return config related to analyzer - */ - public Configuration getConfiguration(); } diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java index cad0d98d75..294527cd3e 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/AnalyzerDriver.java @@ -48,12 +48,18 @@ public static void main(String argv[]){ "Print task-to-node assignment details of a DAG"); pgd.addClass("TaskAttemptResultStatisticsAnalyzer", TaskAttemptResultStatisticsAnalyzer.class, "Print vertex:node:status level details of task attempt results"); + pgd.addClass("InputReadErrorAnalyzer", InputReadErrorAnalyzer.class, + "Print INPUT_READ_ERROR sources"); pgd.addClass("TaskConcurrencyAnalyzer", TaskConcurrencyAnalyzer.class, "Print the task concurrency details in a DAG"); pgd.addClass("VertexLevelCriticalPathAnalyzer", VertexLevelCriticalPathAnalyzer.class, "Find critical path at vertex level in a DAG"); pgd.addClass("OneOnOneEdgeAnalyzer", OneOnOneEdgeAnalyzer.class, "Find out schedule misses in 1:1 edges in a DAG"); + pgd.addClass("DagOverviewAnalyzer", DagOverviewAnalyzer.class, + "Print basic dag information (dag/vertex events)"); + pgd.addClass("TaskHangAnalyzer", HungTaskAnalyzer.class, + "Print all vertices/tasks and their last attempts with status/duration/node"); exitCode = pgd.run(argv); } catch(Throwable e){ e.printStackTrace(); diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java index 5b862f87f7..553ff0e2cc 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ContainerReuseAnalyzer.java @@ -39,15 +39,13 @@ */ public class ContainerReuseAnalyzer extends TezAnalyzerBase implements Analyzer { - private final Configuration config; - private static final String[] headers = { "vertexName", "taskAttempts", "node", "containerId", "reuseCount" }; private final CSVResult csvResult; public ContainerReuseAnalyzer(Configuration config) { - this.config = config; + super(config); this.csvResult = new CSVResult(headers); } @@ -82,11 +80,6 @@ public String getDescription() { return "Get details on container reuse analysis"; } - @Override - public Configuration getConfiguration() { - return config; - } - public static void main(String[] args) throws Exception { Configuration config = new Configuration(); ContainerReuseAnalyzer analyzer = new ContainerReuseAnalyzer(config); diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java index 387b0cf99d..3f5e3004b8 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/CriticalPathAnalyzer.java @@ -113,10 +113,11 @@ public List getNotes() { ArrayList concurrencyByTime = Lists.newArrayList(); public CriticalPathAnalyzer() { + super(new Configuration()); } public CriticalPathAnalyzer(Configuration conf) { - setConf(conf); + super(conf); } @Override @@ -643,13 +644,9 @@ public String getDescription() { return "Analyze critical path of the DAG"; } - @Override - public Configuration getConfiguration() { - return getConf(); - } - public static void main(String[] args) throws Exception { - int res = ToolRunner.run(new Configuration(), new CriticalPathAnalyzer(), args); + Configuration config = new Configuration(); + int res = ToolRunner.run(config, new CriticalPathAnalyzer(config), args); System.exit(res); } diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/DagOverviewAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/DagOverviewAnalyzer.java new file mode 100644 index 0000000000..b193c30a90 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/DagOverviewAnalyzer.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.analyzer.plugins; + +import java.text.SimpleDateFormat; +import java.util.Comparator; +import java.util.Date; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.analyzer.Result; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.Event; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.TaskInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +public class DagOverviewAnalyzer extends TezAnalyzerBase implements Analyzer { + private final String[] headers = + { "name", "id", "event_type", "status", "event_time", "event_time_str", "vertex_task_stats", "diagnostics" }; + private final CSVResult csvResult; + private static final SimpleDateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + + public DagOverviewAnalyzer(Configuration config) { + super(config); + csvResult = new CSVResult(headers); + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + for (Event event : dagInfo.getEvents()) { + csvResult.addRecord(new String[] { dagInfo.getDagId(), dagInfo.getDagId(), event.getType(), + dagInfo.getStatus(), Long.toString(event.getTime()), toDateStr(event.getTime()), "", "" }); + } + for (VertexInfo vertex : dagInfo.getVertices()) { + for (Event event : vertex.getEvents()) { + String vertexFailureInfoIfAny = ""; + for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) { + if (attempt.getStatus().contains("FAILED")) { + vertexFailureInfoIfAny = attempt.getTaskAttemptId() + ": " + + attempt.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " "); + break; + } + } + csvResult.addRecord(new String[] { vertex.getVertexName(), vertex.getVertexId(), + event.getType(), vertex.getStatus(), Long.toString(event.getTime()), + toDateStr(event.getTime()), getTaskStats(vertex), vertexFailureInfoIfAny }); + } + + // a failed task can lead to dag failure, so hopefully holds valuable information + for (TaskInfo failedTask : vertex.getFailedTasks()) { + for (Event failedTaskEvent : failedTask.getEvents()) { + if (failedTaskEvent.getType().equalsIgnoreCase("TASK_FINISHED")) { + csvResult.addRecord(new String[] { vertex.getVertexName(), failedTask.getTaskId(), + failedTaskEvent.getType(), failedTask.getStatus(), Long.toString(failedTaskEvent.getTime()), + toDateStr(failedTaskEvent.getTime()), getTaskStats(vertex), + failedTask.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ") }); + } + } + // if we already found a failing task, let's scan the failing attempts as well + for (TaskAttemptInfo failedAttempt : failedTask.getFailedTaskAttempts()) { + for (Event failedTaskAttemptEvent : failedAttempt.getEvents()) { + if (failedTaskAttemptEvent.getType().equalsIgnoreCase("TASK_ATTEMPT_FINISHED")) { + csvResult.addRecord(new String[] { vertex.getVertexName(), + failedAttempt.getTaskAttemptId(), failedTaskAttemptEvent.getType(), + failedAttempt.getStatus(), Long.toString(failedTaskAttemptEvent.getTime()), + toDateStr(failedTaskAttemptEvent.getTime()), getTaskStats(vertex), + failedAttempt.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ") }); + } + } + } + } + } + + csvResult.sort(new Comparator() { + public int compare(String[] first, String[] second) { + return (int) (Long.parseLong(first[4]) - Long.parseLong(second[4])); + } + }); + } + + private String getTaskStats(VertexInfo vertex) { + return String.format("numTasks: %d failedTasks: %d completedTasks: %d", vertex.getNumTasks(), + vertex.getFailedTasksCount(), vertex.getCompletedTasksCount()); + } + + private static synchronized String toDateStr(long time) { + return FORMAT.format(new Date(time)); + } + + @Override + public Result getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "Dag overview analyzer"; + } + + @Override + public String getDescription() { + return "High level dag events overview (dag, vertex event summary)." + + " Helps understand the overall progress of a dag by simply listing the dag/vertex related events"; + } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + DagOverviewAnalyzer analyzer = new DagOverviewAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } +} diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/HungTaskAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/HungTaskAnalyzer.java new file mode 100644 index 0000000000..9a38e28ba2 --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/HungTaskAnalyzer.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.analyzer.plugins; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.analyzer.Result; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; + +/** + * Gives insights about hanging task attempts by providing details about last attempts of all tasks. + */ +public class HungTaskAnalyzer extends TezAnalyzerBase implements Analyzer { + private final String[] headers = { "vertex", "task", " number_of_attempts", "last_attempt_id", + "last_attempt_status", "last_attempt_duration_ms", "last_attempt_node" }; + private final CSVResult csvResult; + + private static final String HEADER_NUM_ATTEMPTS = "num_attempts"; + private static final String HEADER_LAST_ATTEMPT_ID_AND_STATUS = "last_attempt_id_and_status"; + private static final String HEADER_LAST_ATTEMPT_STATUS = "last_attempt_status"; + private static final String HEADER_LAST_ATTEMPT_NODE = "last_attempt_node"; + private static final String HEADER_LAST_ATTEMPT_DURATION_MS = "last_attempt_duration_ms"; + + public HungTaskAnalyzer(Configuration config) { + super(config); + csvResult = new CSVResult(headers); + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + Map> taskData = new HashMap<>(); // task attempt count per task + for (VertexInfo vertex : dagInfo.getVertices()) { + taskData.clear(); + for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) { + String taskId = attempt.getTaskInfo().getTaskId(); + + int numAttemptsForTask = attempt.getTaskInfo().getNumberOfTaskAttempts(); + Map thisTaskData = taskData.get(taskId); + + if (thisTaskData == null) { + thisTaskData = new HashMap<>(); + thisTaskData.put(HEADER_NUM_ATTEMPTS, Integer.toString(numAttemptsForTask)); + taskData.put(taskId, thisTaskData); + } + + int attemptNumber = TezTaskAttemptID.fromString(attempt.getTaskAttemptId()).getId(); + if (attemptNumber == numAttemptsForTask - 1) { + thisTaskData.put(HEADER_LAST_ATTEMPT_ID_AND_STATUS, String.format("%s/%s", attempt.getTaskAttemptId(), attempt.getStatus())); + thisTaskData.put(HEADER_LAST_ATTEMPT_STATUS, attempt.getDetailedStatus()); + thisTaskData.put(HEADER_LAST_ATTEMPT_NODE, attempt.getNodeId()); + + thisTaskData.put(HEADER_LAST_ATTEMPT_DURATION_MS, + (attempt.getFinishTime() == 0 || attempt.getStartTime() == 0) ? "-1" + : Long.toString(attempt.getFinishTime() - attempt.getStartTime())); + } + } + for (Map.Entry> task : taskData.entrySet()) { + addARecord(vertex.getVertexName(), task.getKey(), task.getValue().get(HEADER_NUM_ATTEMPTS), + task.getValue().get(HEADER_LAST_ATTEMPT_ID_AND_STATUS), task.getValue().get(HEADER_LAST_ATTEMPT_STATUS), + task.getValue().get(HEADER_LAST_ATTEMPT_DURATION_MS), + task.getValue().get(HEADER_LAST_ATTEMPT_NODE)); + } + } + + csvResult.sort(new Comparator() { + public int compare(String[] first, String[] second) { + int vertexOrder = first[0].compareTo(second[0]); + int lastAttemptStatusOrder = + (first[4] == null || second[4] == null) ? 0 : first[4].compareTo(second[4]); + int attemptNumberOrder = Integer.valueOf(second[2]).compareTo(Integer.valueOf(first[2])); + + return vertexOrder == 0 + ? (lastAttemptStatusOrder == 0 ? attemptNumberOrder : lastAttemptStatusOrder) + : vertexOrder; + } + }); + } + + private void addARecord(String vertexName, String taskId, String numAttempts, + String lastAttemptId, String lastAttemptStatus, String lastAttemptDuration, + String lastAttemptNode) { + String[] record = new String[7]; + record[0] = vertexName; + record[1] = taskId; + record[2] = numAttempts; + record[3] = lastAttemptId; + record[4] = lastAttemptStatus; + record[5] = lastAttemptDuration; + record[6] = lastAttemptNode; + + csvResult.addRecord(record); + } + + @Override + public Result getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "Hung Task Analyzer"; + } + + @Override + public String getDescription() { + return "TaskHandAnalyzer can give quick insights about hanging task attempts" + + " by giving an overview of all tasks and their last attempts' status, duration, etc."; + } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + HungTaskAnalyzer analyzer = new HungTaskAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } +} diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/InputReadErrorAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/InputReadErrorAnalyzer.java new file mode 100644 index 0000000000..3cb523ff9a --- /dev/null +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/InputReadErrorAnalyzer.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.analyzer.plugins; + +import java.util.Comparator; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.analyzer.Analyzer; +import org.apache.tez.analyzer.CSVResult; +import org.apache.tez.analyzer.Result; +import org.apache.tez.dag.api.TezException; +import org.apache.tez.history.parser.datamodel.DagInfo; +import org.apache.tez.history.parser.datamodel.Event; +import org.apache.tez.history.parser.datamodel.TaskAttemptInfo; +import org.apache.tez.history.parser.datamodel.VertexInfo; + +/** + * Helps finding the root cause of shuffle errors, e.g. which node(s) can be blamed for them. + */ +public class InputReadErrorAnalyzer extends TezAnalyzerBase implements Analyzer { + private final String[] headers = { "vertex:attempt", "status", "time", "node", "diagnostics" }; + private final CSVResult csvResult; + + public InputReadErrorAnalyzer(Configuration config) { + super(config); + csvResult = new CSVResult(headers); + } + + @Override + public void analyze(DagInfo dagInfo) throws TezException { + for (VertexInfo vertex : dagInfo.getVertices()) { + for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) { + String terminationCause = attempt.getTerminationCause(); + if ("INPUT_READ_ERROR".equalsIgnoreCase(terminationCause) + || "OUTPUT_LOST".equalsIgnoreCase(terminationCause) + || "NODE_FAILED".equalsIgnoreCase(terminationCause)) { + for (Event event : attempt.getEvents()) { + if (event.getType().equalsIgnoreCase("TASK_ATTEMPT_FINISHED")) { + csvResult.addRecord(new String[] { + vertex.getVertexName() + ":" + attempt.getTaskAttemptId(), + attempt.getDetailedStatus(), String.valueOf(event.getTime()), attempt.getNodeId(), + attempt.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ") }); + } + } + } + } + } + + csvResult.sort(new Comparator() { + public int compare(String[] first, String[] second) { + return (int) (Long.parseLong(second[2]) - Long.parseLong(first[2])); + } + }); + } + + @Override + public Result getResult() throws TezException { + return csvResult; + } + + @Override + public String getName() { + return "Input read error analyzer"; + } + + @Override + public String getDescription() { + return "Prints every task attempt (with node) which are related to input read errors"; + } + + public static void main(String[] args) throws Exception { + Configuration config = new Configuration(); + InputReadErrorAnalyzer analyzer = new InputReadErrorAnalyzer(config); + int res = ToolRunner.run(config, analyzer, args); + analyzer.printResults(); + System.exit(res); + } +} diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java index ec72df17ac..d640704f93 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/LocalityAnalyzer.java @@ -52,12 +52,10 @@ public class LocalityAnalyzer extends TezAnalyzerBase implements Analyzer { private static final String DATA_LOCAL_RATIO = "tez.locality-analyzer.data.local.ratio"; private static final float DATA_LOCAL_RATIO_DEFAULT = 0.5f; - private final Configuration config; - private final CSVResult csvResult; public LocalityAnalyzer(Configuration config) { - this.config = config; + super(config); csvResult = new CSVResult(headers); } @@ -119,7 +117,7 @@ public void analyze(DagInfo dagInfo) throws TezException { record.add(otherTaskResult.avgHDFSBytesRead + ""); String recommendation = ""; - if (dataLocalRatio < config.getFloat(DATA_LOCAL_RATIO, DATA_LOCAL_RATIO_DEFAULT)) { + if (dataLocalRatio < getConf().getFloat(DATA_LOCAL_RATIO, DATA_LOCAL_RATIO_DEFAULT)) { recommendation = "Data locality is poor for this vertex. Try tuning " + TezConfiguration.TEZ_AM_CONTAINER_REUSE_LOCALITY_DELAY_ALLOCATION_MILLIS + ", " + TezConfiguration.TEZ_AM_CONTAINER_REUSE_RACK_FALLBACK_ENABLED + ", " @@ -182,10 +180,6 @@ private TaskAttemptDetails computeAverages(VertexInfo vertexInfo, DAGCounter cou return "Analyze for locality information (data local, rack local, off-rack)"; } - @Override public Configuration getConfiguration() { - return config; - } - /** * Placeholder for task attempt details */ diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/OneOnOneEdgeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/OneOnOneEdgeAnalyzer.java index 2ba715ed43..a6cb3f1511 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/OneOnOneEdgeAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/OneOnOneEdgeAnalyzer.java @@ -55,12 +55,10 @@ public class OneOnOneEdgeAnalyzer extends TezAnalyzerBase implements Analyzer { // DataMovementType::ONE_TO_ONE private static final String ONE_TO_ONE = "ONE_TO_ONE"; - private final Configuration config; - private final CSVResult csvResult; public OneOnOneEdgeAnalyzer(Configuration config) { - this.config = config; + super(config); csvResult = new CSVResult(headers); } @@ -140,11 +138,6 @@ public String getDescription() { return "To understand the locality miss in 1:1 edge"; } - @Override - public Configuration getConfiguration() { - return config; - } - public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); OneOnOneEdgeAnalyzer analyzer = new OneOnOneEdgeAnalyzer(conf); diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java index 57e91c62d3..f8f9112bb7 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/ShuffleTimeAnalyzer.java @@ -66,14 +66,12 @@ public class ShuffleTimeAnalyzer extends TezAnalyzerBase implements Analyzer { private final CSVResult csvResult = new CSVResult(headers); - private final Configuration config; - private final float realWorkDoneRatio; private final long minShuffleRecords; public ShuffleTimeAnalyzer(Configuration config) { - this.config = config; + super(config); realWorkDoneRatio = config.getFloat (REAL_WORK_DONE_RATIO, REAL_WORK_DONE_RATIO_DEFAULT); @@ -208,11 +206,6 @@ public String getDescription() { + "and the real work done in the task"; } - @Override - public Configuration getConfiguration() { - return config; - } - public static void main(String[] args) throws Exception { Configuration config = new Configuration(); ShuffleTimeAnalyzer analyzer = new ShuffleTimeAnalyzer(config); diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java index 6025541fc9..a7d14fae25 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SkewAnalyzer.java @@ -85,14 +85,12 @@ public class SkewAnalyzer extends TezAnalyzerBase implements Analyzer { private final CSVResult csvResult = new CSVResult(headers); - private final Configuration config; - private final float minRatio; private final float maxRatio; private final long maxShuffleBytesPerSource; public SkewAnalyzer(Configuration config) { - this.config = config; + super(config); maxRatio = config.getFloat(ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO, ATTEMPT_SHUFFLE_KEY_GROUP_MAX_RATIO_DEFAULT); minRatio = config.getFloat(ATTEMPT_SHUFFLE_KEY_GROUP_MIN_RATIO, @@ -214,7 +212,7 @@ private void analyzeRecordSkewPerSource(TaskAttemptInfo attemptInfo) { if (vertexNumTasks > 1) { if (ratio > maxRatio) { //input records > 60% of vertex level record count - if (inputRecordsCount > (vertexLevelInputRecordsCount * 0.60)) { + if (inputRecordsCount > (vertexLevelInputRecordsCount * 0.6f)) { List result = Lists.newLinkedList(); result.add(attemptInfo.getTaskInfo().getVertexInfo().getVertexName()); result.add(attemptInfo.getTaskAttemptId()); @@ -305,12 +303,7 @@ public String getName() { @Override public String getDescription() { - return "Analyzer reducer skews by mining reducer task counters"; - } - - @Override - public Configuration getConfiguration() { - return null; + return "Analyze reducer skews by mining reducer task counters"; } public static void main(String[] args) throws Exception { diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java index a810a8a645..9e573c2033 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowNodeAnalyzer.java @@ -59,10 +59,8 @@ public class SlowNodeAnalyzer extends TezAnalyzerBase implements Analyzer { private final CSVResult csvResult = new CSVResult(headers); - private final Configuration config; - public SlowNodeAnalyzer(Configuration config) { - this.config = config; + super(config); } @Override @@ -182,11 +180,6 @@ public String getDescription() { return sb.toString(); } - @Override - public Configuration getConfiguration() { - return config; - } - public static void main(String[] args) throws Exception { Configuration config = new Configuration(); SlowNodeAnalyzer analyzer = new SlowNodeAnalyzer(config); diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java index d2474ad0f6..7c9958b250 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowTaskIdentifier.java @@ -51,10 +51,8 @@ public class SlowTaskIdentifier extends TezAnalyzerBase implements Analyzer { private static final String NO_OF_TASKS = "tez.slow-task-analyzer.task.count"; private static final int NO_OF_TASKS_DEFAULT = 100; - private final Configuration config; - public SlowTaskIdentifier(Configuration config) { - this.config = config; + super(config); this.csvResult = new CSVResult(headers); } @@ -75,7 +73,7 @@ public void analyze(DagInfo dagInfo) throws TezException { }); int limit = Math.min(taskAttempts.size(), - Math.max(0, config.getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT))); + Math.max(0, getConf().getInt(NO_OF_TASKS, NO_OF_TASKS_DEFAULT))); if (limit == 0) { return; @@ -111,11 +109,6 @@ public String getDescription() { return "Identifies slow tasks in the DAG"; } - @Override - public Configuration getConfiguration() { - return config; - } - public static void main(String[] args) throws Exception { Configuration config = new Configuration(); SlowTaskIdentifier analyzer = new SlowTaskIdentifier(config); diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java index 33f2421699..efa39a3223 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SlowestVertexAnalyzer.java @@ -49,7 +49,6 @@ public class SlowestVertexAnalyzer extends TezAnalyzerBase implements Analyzer { private final CSVResult csvResult = new CSVResult(headers); - private final Configuration config; private final MetricRegistry metrics = new MetricRegistry(); private Histogram taskAttemptRuntimeHistorgram; @@ -59,7 +58,7 @@ public class SlowestVertexAnalyzer extends TezAnalyzerBase implements Analyzer { private final long vertexRuntimeThreshold; public SlowestVertexAnalyzer(Configuration config) { - this.config = config; + super(config); this.vertexRuntimeThreshold = Math.max(1, config.getLong(MAX_VERTEX_RUNTIME, MAX_VERTEX_RUNTIME_DEFAULT)); @@ -204,11 +203,6 @@ public String getDescription() { return "Identify the slowest vertex in the DAG, which needs to be looked into first"; } - @Override - public Configuration getConfiguration() { - return config; - } - public static void main(String[] args) throws Exception { Configuration config = new Configuration(); SlowestVertexAnalyzer analyzer = new SlowestVertexAnalyzer(config); diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java index d69ca23b5a..026dd1593f 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/SpillAnalyzerImpl.java @@ -60,10 +60,8 @@ public class SpillAnalyzerImpl extends TezAnalyzerBase implements Analyzer { private final long minOutputBytesPerTask; - private final Configuration config; - public SpillAnalyzerImpl(Configuration config) { - this.config = config; + super(config); minOutputBytesPerTask = Math.max(0, config.getLong(OUTPUT_BYTES_THRESHOLD, OUTPUT_BYTES_THRESHOLD_DEFAULT)); this.csvResult = new CSVResult(headers); @@ -130,11 +128,6 @@ public String getDescription() { return "Analyze spill details in the task"; } - @Override - public Configuration getConfiguration() { - return config; - } - public static void main(String[] args) throws Exception { Configuration config = new Configuration(); SpillAnalyzerImpl analyzer = new SpillAnalyzerImpl(config); diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java index ce6fa417c1..02b821f367 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAssignmentAnalyzer.java @@ -36,29 +36,27 @@ */ public class TaskAssignmentAnalyzer extends TezAnalyzerBase implements Analyzer { - private final String[] headers = { "vertex", "node", "numTasks", "load" }; - private final Configuration config; + private final String[] headers = { "vertex", "node", "numTaskAttempts", "load" }; private final CSVResult csvResult; public TaskAssignmentAnalyzer(Configuration config) { - this.config = config; + super(config); csvResult = new CSVResult(headers); } @Override public void analyze(DagInfo dagInfo) throws TezException { - Map map = new HashMap<>(); + Map taskAttemptsPerNode = new HashMap<>(); for (VertexInfo vertex : dagInfo.getVertices()) { - map.clear(); + taskAttemptsPerNode.clear(); for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) { - Integer previousValue = map.get(attempt.getNodeId()); - map.put(attempt.getNodeId(), - previousValue == null ? 1 : previousValue + 1); + Integer previousValue = taskAttemptsPerNode.get(attempt.getNodeId()); + taskAttemptsPerNode.put(attempt.getNodeId(), previousValue == null ? 1 : previousValue + 1); } - double mean = vertex.getTaskAttempts().size() / Math.max(1.0, map.size()); - for (Map.Entry assignment : map.entrySet()) { - addARecord(vertex.getVertexName(), assignment.getKey(), - assignment.getValue(), assignment.getValue() * 100 / mean); + double mean = vertex.getTaskAttempts().size() / Math.max(1.0, taskAttemptsPerNode.size()); + for (Map.Entry assignment : taskAttemptsPerNode.entrySet()) { + addARecord(vertex.getVertexName(), assignment.getKey(), assignment.getValue(), + assignment.getValue() * 100 / mean); } } } @@ -88,11 +86,6 @@ public String getDescription() { return "Get the Task assignments on different nodes of the cluster"; } - @Override - public Configuration getConfiguration() { - return config; - } - public static void main(String[] args) throws Exception { Configuration config = new Configuration(); TaskAssignmentAnalyzer analyzer = new TaskAssignmentAnalyzer(config); diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAttemptResultStatisticsAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAttemptResultStatisticsAnalyzer.java index df2f95cdce..cf6b2f0d8e 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAttemptResultStatisticsAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskAttemptResultStatisticsAnalyzer.java @@ -44,11 +44,10 @@ public class TaskAttemptResultStatisticsAnalyzer extends TezAnalyzerBase implements Analyzer { private final String[] headers = { "vertex (+task stats: all/succeeded/failed/killed)", "node", "status", "numAttempts" }; - private final Configuration config; private final CSVResult csvResult; public TaskAttemptResultStatisticsAnalyzer(Configuration config) { - this.config = config; + super(config); csvResult = new CSVResult(headers); } @@ -71,7 +70,8 @@ public void analyze(DagInfo dagInfo) throws TezException { } map.forEach((key, value) -> { - addARecord(key.split("#")[0], key.split("#")[1], key.split("#")[2], value); + String[] keys = key.split("#"); + addARecord(keys[0], keys[1], keys.length > 2 ? keys[2] : "", value); }); csvResult.sort(new Comparator() { @@ -110,11 +110,6 @@ public String getDescription() { return "Get statistics about task attempts states in vertex:node:status level"; } - @Override - public Configuration getConfiguration() { - return config; - } - public static void main(String[] args) throws Exception { Configuration config = new Configuration(); TaskAttemptResultStatisticsAnalyzer analyzer = new TaskAttemptResultStatisticsAnalyzer(config); diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java index 72f3b36a5b..91f51b4c21 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TaskConcurrencyAnalyzer.java @@ -41,11 +41,10 @@ public class TaskConcurrencyAnalyzer extends TezAnalyzerBase implements Analyzer private static final String[] headers = { "time", "vertexName", "concurrentTasksRunning" }; private final CSVResult csvResult; - private final Configuration config; public TaskConcurrencyAnalyzer(Configuration conf) { + super(conf); this.csvResult = new CSVResult(headers); - this.config = conf; } private enum EventType {START, FINISH} @@ -153,11 +152,6 @@ public String getDescription() { + "would be helpful in understanding whether any starvation was there or not."; } - @Override - public Configuration getConfiguration() { - return config; - } - public static void main(String[] args) throws Exception { Configuration config = new Configuration(); TaskConcurrencyAnalyzer analyzer = new TaskConcurrencyAnalyzer(config); diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java index 75a55a754d..705c6e9cfb 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/TezAnalyzerBase.java @@ -33,6 +33,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.Tool; @@ -67,7 +68,11 @@ public abstract class TezAnalyzerBase extends Configured implements Tool, Analyz private String outputDir; private boolean saveResults = false; - + + public TezAnalyzerBase(Configuration config) { + setConf(config); + } + @SuppressWarnings("static-access") private static Options buildOptions() { Option dagIdOption = OptionBuilder.withArgName(DAG_ID).withLongOpt(DAG_ID) diff --git a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java index 06b8983e9d..78a4d41f38 100644 --- a/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java +++ b/tez-tools/analyzers/job-analyzer/src/main/java/org/apache/tez/analyzer/plugins/VertexLevelCriticalPathAnalyzer.java @@ -44,8 +44,6 @@ * Identify a set of vertices which fall in the critical path in a DAG. */ public class VertexLevelCriticalPathAnalyzer extends TezAnalyzerBase implements Analyzer { - private final Configuration config; - private static final String[] headers = { "CriticalPath", "Score" }; private final CSVResult csvResult; @@ -58,7 +56,7 @@ public class VertexLevelCriticalPathAnalyzer extends TezAnalyzerBase implements private static final String CONNECTOR = "-->"; public VertexLevelCriticalPathAnalyzer(Configuration config) { - this.config = config; + super(config); this.csvResult = new CSVResult(headers); this.dotFileLocation = config.get(DOT_FILE_DIR, DOT_FILE_DIR_DEFAULT); } @@ -105,11 +103,6 @@ public String getDescription() { return "Analyze vertex level critical path of the DAG"; } - @Override - public Configuration getConfiguration() { - return config; - } - private static Map sortByValues(Map result) { //Sort result by time in reverse order final Ordering reversValueOrdering =