Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ public boolean hasNext() throws IOException {
return message != null;
} catch (java.io.EOFException e) {
reader.close();

if (!fileIt.hasNext()) {
return false;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,13 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source,
// time etc).
if (dagJson == null) {
dagJson = jsonObject;
} else if (dagJson.optJSONObject(ATSConstants.OTHER_INFO)
.optJSONObject(ATSConstants.DAG_PLAN) == null) {
// if DAG_PLAN is not filled already, let's try to fetch it from other
dagJson.getJSONObject(ATSConstants.OTHER_INFO).put(ATSConstants.DAG_PLAN, jsonObject
.getJSONObject(ATSConstants.OTHER_INFO).getJSONObject(ATSConstants.DAG_PLAN));
} else {
if (dagJson.optJSONObject(ATSConstants.OTHER_INFO).optJSONObject(ATSConstants.DAG_PLAN) == null) {
// if DAG_PLAN is not filled already, let's try to fetch it from other
dagJson.getJSONObject(ATSConstants.OTHER_INFO).put(ATSConstants.DAG_PLAN,
jsonObject.getJSONObject(ATSConstants.OTHER_INFO).getJSONObject(ATSConstants.DAG_PLAN));
}
mergeSubJSONArray(jsonObject, dagJson, Constants.EVENTS);
}
JSONArray relatedEntities = dagJson.optJSONArray(Constants
.RELATED_ENTITIES);
Expand Down Expand Up @@ -268,6 +270,8 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source,
}
if (!vertexJsonMap.containsKey(vertexName)) {
vertexJsonMap.put(vertexName, jsonObject);
} else {
mergeSubJSONArray(jsonObject, vertexJsonMap.get(vertexName), Constants.EVENTS);
}
populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), vertexName, vertexJsonMap);
break;
Expand All @@ -281,6 +285,8 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source,
}
if (!taskJsonMap.containsKey(taskName)) {
taskJsonMap.put(taskName, jsonObject);
} else {
mergeSubJSONArray(jsonObject, taskJsonMap.get(taskName), Constants.EVENTS);
}
populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), taskName, taskJsonMap);
break;
Expand All @@ -294,6 +300,8 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source,
}
if (!attemptJsonMap.containsKey(taskAttemptName)) {
attemptJsonMap.put(taskAttemptName, jsonObject);
} else {
mergeSubJSONArray(jsonObject, attemptJsonMap.get(taskAttemptName), Constants.EVENTS);
}
populateOtherInfo(jsonObject.optJSONObject(Constants.OTHER_INFO), taskAttemptName, attemptJsonMap);
break;
Expand All @@ -311,4 +319,17 @@ protected void readEventsFromSource(String dagId, JSONObjectSource source,
"Please provide a valid/complete history log file containing " + dagId);
}
}

private void mergeSubJSONArray(JSONObject source, JSONObject destination, String key)
throws JSONException {
if (source.optJSONArray(key) == null) {
source.put(key, new JSONArray());
}
if (destination.optJSONArray(key) == null) {
destination.put(key, new JSONArray());
}
for (int i = 0; i < source.getJSONArray(key).length(); i++) {
destination.getJSONArray(key).put(source.getJSONArray(key).get(i));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,20 @@ public abstract class BaseInfo {
BaseInfo(JSONObject jsonObject) throws JSONException {
final JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);
//parse tez counters
tezCounters = Utils.parseTezCountersFromJSON(
otherInfoNode.optJSONObject(Constants.COUNTERS));
JSONObject countersObj = otherInfoNode.optJSONObject(Constants.COUNTERS);
if (countersObj == null) {
/*
* This is a workaround for formatting differences, where a TaskFinishedEvent's
* counter is a correct json object shown as string, but VertexFinishedEvent's
* counter is an encoded json string, so the latter is interpreted as a String
* while parsing. The issue might be somewhere while converting these event objects
* to proto (HistoryEventProtoConverter). Even if should be fixed there,
* already generated events should be parsed correctly, hence this workaround.
* Will be investigated in the scope of TEZ-4324.
*/
countersObj = new JSONObject(otherInfoNode.optString(Constants.COUNTERS));
}
tezCounters = Utils.parseTezCountersFromJSON(countersObj);

//parse events
eventList = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,12 @@ public static void parseEvents(JSONArray eventNodes, List<Event> eventList) thro
JSONObject eventNode = eventNodes.optJSONObject(i);
final String eventInfo = eventNode.optString(Constants.EVENT_INFO);
final String eventType = eventNode.optString(Constants.EVENT_TYPE);
final long time = eventNode.optLong(Constants.EVENT_TIME_STAMP);
final long time = eventNode.optLong(Constants.EVENT_TIME_STAMP) == 0
? eventNode.optLong(Constants.TIMESTAMP) : eventNode.optLong(Constants.EVENT_TIME_STAMP);

Event event = new Event(eventInfo, eventType, time);

eventList.add(event);

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,12 +556,12 @@ private static JSONObject convertTaskFinishedEvent(HistoryEventProto event) thro
events.put(finishEvent);
jsonObject.put(ATSConstants.EVENTS, events);

long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME);
long timeTaken = getLongDataValueByKey(event, ATSConstants.TIME_TAKEN);

JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.START_TIME, startTime);
otherInfo.put(ATSConstants.START_TIME, event.getEventTime() - timeTaken);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, there were corner cases where events will not be properly populated. May be in cases, where vertices were shutdown due to errors or so (need to check).

In such cases, this would have returned "-ve" value earlier.

Current patch seem to change the start_time, depending on getEventTime. This could give a perspective that the task/vertex was there for very short time.

Can you plz share more info on prev error? Were you getting -ve values earlier for which this is being modified?

Copy link
Contributor Author

@abstractdog abstractdog Aug 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, I saw this for all protobuf history files, that's why I'm not suspecting corner case
I found that in case of a TASK_FINISHED finished event, there is always an event time (which is the end time obviously) and timeTaken in event_data, but there is no startTime there

this string is what the debugger writes for a HistoryLoggerProtos$HistoryEventProto instance while doing this conversion:

event_type: "TASK_FINISHED"
event_time: 1628149977709
app_id: "application_1628051798891_0030"
dag_id: "dag_1628051798891_0030_1"
vertex_id: "vertex_1628051798891_0030_1_00"
task_id: "task_1628051798891_0030_1_00_000001"
event_data {
  key: "timeTaken"
  value: "4193"
}
event_data {
  key: "status"
  value: "SUCCEEDED"
}
event_data {
  key: "numFailedTaskAttempts"
  value: "0"
}
event_data {
  key: "successfulAttemptId"
  value: "attempt_1628051798891_0030_1_00_000001_0"
}
event_data {
  key: "diagnostics"
  value: ""
}
event_data {
  key: "counters"
  value: "..."
}

the root cause of this behavior would be:
https://github.com/apache/tez/blob/master/tez-plugins/tez-protobuf-history-plugin/src/main/java/org/apache/tez/dag/history/logging/proto/HistoryEventProtoConverter.java#L392

  private HistoryEventProto convertTaskFinishedEvent(TaskFinishedEvent event) {
    HistoryEventProto.Builder builder = makeBuilderForEvent(event, event.getFinishTime(),
        null, null, null, null, event.getTaskID(), null, null);

    addEventData(builder, ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));

here I can see the builder consumes only event.getFinishTime() for the "time" parameter, and startTime is shipped indirectly...according to blame, this code part is unchanged since the introduction of proto history logger (TEZ-3915)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the note. Earlier code didn't populate START_TIME (& had only timeTaken) causing the issue.

otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
otherInfo.put(ATSConstants.TIME_TAKEN, event.getEventTime() - startTime);
otherInfo.put(ATSConstants.TIME_TAKEN, timeTaken);

otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, ATSConstants.STATUS));
otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event, ATSConstants.DIAGNOSTICS));
Expand Down Expand Up @@ -620,11 +620,13 @@ private static JSONObject convertVertexFinishedEvent(HistoryEventProto event)
events.put(finishEvent);
jsonObject.put(ATSConstants.EVENTS, events);

long startTime = getLongDataValueByKey(event, ATSConstants.START_TIME);
long timeTaken = getLongDataValueByKey(event, ATSConstants.TIME_TAKEN);

JSONObject otherInfo = new JSONObject();
otherInfo.put(ATSConstants.START_TIME, event.getEventTime() - timeTaken);
otherInfo.put(ATSConstants.FINISH_TIME, event.getEventTime());
otherInfo.put(ATSConstants.TIME_TAKEN, (event.getEventTime() - startTime));
otherInfo.put(ATSConstants.TIME_TAKEN, timeTaken);

otherInfo.put(ATSConstants.STATUS, getDataValueByKey(event, ATSConstants.STATUS));
otherInfo.put(ATSConstants.DIAGNOSTICS, getDataValueByKey(event, ATSConstants.DIAGNOSTICS));
otherInfo.put(ATSConstants.COUNTERS, getJSONDataValueByKey(event, ATSConstants.COUNTERS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.tez.analyzer;

import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.history.parser.datamodel.DagInfo;

Expand Down Expand Up @@ -54,11 +53,4 @@ public interface Analyzer {
* @return description of analyzer
*/
public String getDescription();

/**
* Get config properties related to this analyzer
*
* @return config related to analyzer
*/
public Configuration getConfiguration();
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,18 @@ public static void main(String argv[]){
"Print task-to-node assignment details of a DAG");
pgd.addClass("TaskAttemptResultStatisticsAnalyzer", TaskAttemptResultStatisticsAnalyzer.class,
"Print vertex:node:status level details of task attempt results");
pgd.addClass("InputReadErrorAnalyzer", InputReadErrorAnalyzer.class,
"Print INPUT_READ_ERROR sources");
pgd.addClass("TaskConcurrencyAnalyzer", TaskConcurrencyAnalyzer.class,
"Print the task concurrency details in a DAG");
pgd.addClass("VertexLevelCriticalPathAnalyzer", VertexLevelCriticalPathAnalyzer.class,
"Find critical path at vertex level in a DAG");
pgd.addClass("OneOnOneEdgeAnalyzer", OneOnOneEdgeAnalyzer.class,
"Find out schedule misses in 1:1 edges in a DAG");
pgd.addClass("DagOverviewAnalyzer", DagOverviewAnalyzer.class,
"Print basic dag information (dag/vertex events)");
pgd.addClass("TaskHangAnalyzer", HungTaskAnalyzer.class,
"Print all vertices/tasks and their last attempts with status/duration/node");
exitCode = pgd.run(argv);
} catch(Throwable e){
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,13 @@
*/
public class ContainerReuseAnalyzer extends TezAnalyzerBase implements Analyzer {

private final Configuration config;

private static final String[] headers =
{ "vertexName", "taskAttempts", "node", "containerId", "reuseCount" };

private final CSVResult csvResult;

public ContainerReuseAnalyzer(Configuration config) {
this.config = config;
super(config);
this.csvResult = new CSVResult(headers);
}

Expand Down Expand Up @@ -82,11 +80,6 @@ public String getDescription() {
return "Get details on container reuse analysis";
}

@Override
public Configuration getConfiguration() {
return config;
}

public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
ContainerReuseAnalyzer analyzer = new ContainerReuseAnalyzer(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,11 @@ public List<String> getNotes() {
ArrayList<TimeInfo> concurrencyByTime = Lists.newArrayList();

public CriticalPathAnalyzer() {
super(new Configuration());
}

public CriticalPathAnalyzer(Configuration conf) {
setConf(conf);
super(conf);
}

@Override
Expand Down Expand Up @@ -643,13 +644,9 @@ public String getDescription() {
return "Analyze critical path of the DAG";
}

@Override
public Configuration getConfiguration() {
return getConf();
}

public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new CriticalPathAnalyzer(), args);
Configuration config = new Configuration();
int res = ToolRunner.run(config, new CriticalPathAnalyzer(config), args);
System.exit(res);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.analyzer.plugins;

import java.text.SimpleDateFormat;
import java.util.Comparator;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.analyzer.Analyzer;
import org.apache.tez.analyzer.CSVResult;
import org.apache.tez.analyzer.Result;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.history.parser.datamodel.DagInfo;
import org.apache.tez.history.parser.datamodel.Event;
import org.apache.tez.history.parser.datamodel.TaskAttemptInfo;
import org.apache.tez.history.parser.datamodel.TaskInfo;
import org.apache.tez.history.parser.datamodel.VertexInfo;

public class DagOverviewAnalyzer extends TezAnalyzerBase implements Analyzer {
private final String[] headers =
{ "name", "id", "event_type", "status", "event_time", "event_time_str", "vertex_task_stats", "diagnostics" };
private final CSVResult csvResult;
private static final SimpleDateFormat FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

public DagOverviewAnalyzer(Configuration config) {
super(config);
csvResult = new CSVResult(headers);
}

@Override
public void analyze(DagInfo dagInfo) throws TezException {
for (Event event : dagInfo.getEvents()) {
csvResult.addRecord(new String[] { dagInfo.getDagId(), dagInfo.getDagId(), event.getType(),
dagInfo.getStatus(), Long.toString(event.getTime()), toDateStr(event.getTime()), "", "" });
}
for (VertexInfo vertex : dagInfo.getVertices()) {
for (Event event : vertex.getEvents()) {
String vertexFailureInfoIfAny = "";
for (TaskAttemptInfo attempt : vertex.getTaskAttempts()) {
if (attempt.getStatus().contains("FAILED")) {
vertexFailureInfoIfAny = attempt.getTaskAttemptId() + ": "
+ attempt.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ");
break;
}
}
csvResult.addRecord(new String[] { vertex.getVertexName(), vertex.getVertexId(),
event.getType(), vertex.getStatus(), Long.toString(event.getTime()),
toDateStr(event.getTime()), getTaskStats(vertex), vertexFailureInfoIfAny });
}

// a failed task can lead to dag failure, so hopefully holds valuable information
for (TaskInfo failedTask : vertex.getFailedTasks()) {
for (Event failedTaskEvent : failedTask.getEvents()) {
if (failedTaskEvent.getType().equalsIgnoreCase("TASK_FINISHED")) {
csvResult.addRecord(new String[] { vertex.getVertexName(), failedTask.getTaskId(),
failedTaskEvent.getType(), failedTask.getStatus(), Long.toString(failedTaskEvent.getTime()),
toDateStr(failedTaskEvent.getTime()), getTaskStats(vertex),
failedTask.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ") });
}
}
// if we already found a failing task, let's scan the failing attempts as well
for (TaskAttemptInfo failedAttempt : failedTask.getFailedTaskAttempts()) {
for (Event failedTaskAttemptEvent : failedAttempt.getEvents()) {
if (failedTaskAttemptEvent.getType().equalsIgnoreCase("TASK_ATTEMPT_FINISHED")) {
csvResult.addRecord(new String[] { vertex.getVertexName(),
failedAttempt.getTaskAttemptId(), failedTaskAttemptEvent.getType(),
failedAttempt.getStatus(), Long.toString(failedTaskAttemptEvent.getTime()),
toDateStr(failedTaskAttemptEvent.getTime()), getTaskStats(vertex),
failedAttempt.getDiagnostics().replaceAll(",", " ").replaceAll("\n", " ") });
}
}
}
}
}

csvResult.sort(new Comparator<String[]>() {
public int compare(String[] first, String[] second) {
return (int) (Long.parseLong(first[4]) - Long.parseLong(second[4]));
}
});
}

private String getTaskStats(VertexInfo vertex) {
return String.format("numTasks: %d failedTasks: %d completedTasks: %d", vertex.getNumTasks(),
vertex.getFailedTasksCount(), vertex.getCompletedTasksCount());
}

private static synchronized String toDateStr(long time) {
return FORMAT.format(new Date(time));
}

@Override
public Result getResult() throws TezException {
return csvResult;
}

@Override
public String getName() {
return "Dag overview analyzer";
}

@Override
public String getDescription() {
return "High level dag events overview (dag, vertex event summary)."
+ " Helps understand the overall progress of a dag by simply listing the dag/vertex related events";
}

public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
DagOverviewAnalyzer analyzer = new DagOverviewAnalyzer(config);
int res = ToolRunner.run(config, analyzer, args);
analyzer.printResults();
System.exit(res);
}
}
Loading