diff --git a/tez-tools/counter-diff/README.md b/tez-tools/counter-diff/README.md index a88a32f249..811c26b798 100644 --- a/tez-tools/counter-diff/README.md +++ b/tez-tools/counter-diff/README.md @@ -26,11 +26,11 @@ Before using the tool, make sure to install texttable using `pip install texttab To use the tool, run -`python counter-diff.py dag_1.zip dag_2.zip [--detail]` +`python3 counter-diff.py dag_1.zip dag_2.zip [--detail]` This will print counter output difference between the specified DAGs like this -Example: `python counter-diff.py dag_1499978510619_0002_143.zip dag_1499978510619_0002_144.zip` +Example: `python3 counter-diff.py dag_1499978510619_0002_143.zip dag_1499978510619_0002_144.zip` ``` +-------------------+-------------------------------------+----------------------------+----------------------------+-------------+ @@ -81,4 +81,4 @@ Example: `python counter-diff.py dag_1499978510619_0002_143.zip dag_149997851061 | | KILLED_TASKS | 0 | 0 | 0 | | | TIME_TAKEN | 250198 | 68981 | -181217 | +-------------------+-------------------------------------+----------------------------+----------------------------+-------------+ -``` \ No newline at end of file +``` diff --git a/tez-tools/counter-diff/counter-diff.py b/tez-tools/counter-diff/counter-diff.py index d7d07220c1..a7ef9fbd76 100644 --- a/tez-tools/counter-diff/counter-diff.py +++ b/tez-tools/counter-diff/counter-diff.py @@ -17,187 +17,222 @@ # under the License. # -import imp, json, os, shutil, sys, tempfile, zipfile +import json +import os +import shutil +import sys +import tempfile +import zipfile + try: - imp.find_module('texttable') from texttable import Texttable except ImportError: - sys.stderr.write("Could not import Texttable\nRetry after 'pip install texttable'\n") - exit() + print( + "Could not import Texttable. Retry after 'pip install texttable'", + file=sys.stderr, + ) + sys.exit(1) tmpdir = tempfile.mkdtemp() + def extract_zip(filename): - file_dir = os.path.join(tmpdir, os.path.splitext(filename)[0]) - if not os.path.exists(file_dir): - os.makedirs(file_dir) + file_dir = os.path.join(tmpdir, os.path.splitext(filename)[0]) + if not os.path.exists(file_dir): + os.makedirs(file_dir) - zip_ref = zipfile.ZipFile(os.path.abspath(filename), 'r') - zip_ref.extractall(os.path.abspath(file_dir)) - zip_ref.close() - return file_dir + zip_ref = zipfile.ZipFile(os.path.abspath(filename), "r") + zip_ref.extractall(os.path.abspath(file_dir)) + zip_ref.close() + return file_dir def diff(file1, file2): - # extract ZIP files - file1_dir = extract_zip(file1) - file2_dir = extract_zip(file2) - - # tez debugtool writes json data to TEZ_DAG file whereas tez UI writes to dag.json - # also in dag.json data is inside "dag" root node - file1_using_dag_json = True - dag_json_file1 = os.path.join(file1_dir, "dag.json") - if os.path.isfile(dag_json_file1) == False: - file1_using_dag_json = False - dag_json_file1 = os.path.join(file1_dir, "TEZ_DAG") - if os.path.isfile(dag_json_file1) == False: - print "Unable to find dag.json/TEZ_DAG file inside the archive " + file1 - exit() - - file2_using_dag_json = True - dag_json_file2 = os.path.join(file2_dir, "dag.json") - if os.path.isfile(dag_json_file2) == False: - file2_using_dag_json = False - dag_json_file2 = os.path.join(file2_dir, "TEZ_DAG") - if os.path.isfile(dag_json_file2) == False: - print "Unable to find dag.json/TEZ_DAG file inside the archive " + file1 - exit() - - # populate diff table - difftable = {} - with open(dag_json_file1) as data_file: - file1_dag_json = json.load(data_file)["dag"] if file1_using_dag_json else json.load(data_file) - counters = file1_dag_json['otherinfo']['counters'] - for group in counters['counterGroups']: - countertable = {} - for counter in group['counters']: - counterName = counter['counterName'] - countertable[counterName] = [] - countertable[counterName].append(counter['counterValue']) - - groupName = group['counterGroupName'] - difftable[groupName] = countertable - - # add other info - otherinfo = file1_dag_json['otherinfo'] - countertable = {} - countertable["TIME_TAKEN"] = [otherinfo['timeTaken']] - countertable["COMPLETED_TASKS"] = [otherinfo['numCompletedTasks']] - countertable["SUCCEEDED_TASKS"] = [otherinfo['numSucceededTasks']] - countertable["FAILED_TASKS"] = [otherinfo['numFailedTasks']] - countertable["KILLED_TASKS"] = [otherinfo['numKilledTasks']] - countertable["FAILED_TASK_ATTEMPTS"] = [otherinfo['numFailedTaskAttempts']] - countertable["KILLED_TASK_ATTEMPTS"] = [otherinfo['numKilledTaskAttempts']] - difftable['otherinfo'] = countertable - - with open(dag_json_file2) as data_file: - file2_dag_json = json.load(data_file)["dag"] if file2_using_dag_json else json.load(data_file) - counters = file2_dag_json['otherinfo']['counters'] - for group in counters['counterGroups']: - groupName = group['counterGroupName'] - if groupName not in difftable: - difftable[groupName] = {} - countertable = difftable[groupName] - for counter in group['counters']: - counterName = counter['counterName'] - # if counter does not exist in file1, add it with 0 value - if counterName not in countertable: - countertable[counterName] = [0] - countertable[counterName].append(counter['counterValue']) - - # append other info - otherinfo = file2_dag_json['otherinfo'] - countertable = difftable['otherinfo'] - countertable["TIME_TAKEN"].append(otherinfo['timeTaken']) - countertable["COMPLETED_TASKS"].append(otherinfo['numCompletedTasks']) - countertable["SUCCEEDED_TASKS"].append(otherinfo['numSucceededTasks']) - countertable["FAILED_TASKS"].append(otherinfo['numFailedTasks']) - countertable["KILLED_TASKS"].append(otherinfo['numKilledTasks']) - countertable["FAILED_TASK_ATTEMPTS"].append(otherinfo['numFailedTaskAttempts']) - countertable["KILLED_TASK_ATTEMPTS"].append(otherinfo['numKilledTaskAttempts']) - difftable['otherinfo'] = countertable - - # if some counters are missing, consider it as 0 and compute delta difference - for k,v in difftable.items(): - for key, value in v.items(): - # if counter value does not exisit in file2, add it with 0 value - if len(value) == 1: - value.append(0) - - # store delta difference - delta = value[1] - value[0] - value.append(("+" if delta > 0 else "") + str(delta)) - - return difftable + # extract ZIP files + file1_dir = extract_zip(file1) + file2_dir = extract_zip(file2) + + # tez debugtool writes json data to TEZ_DAG file whereas tez UI writes to dag.json + # also in dag.json data is inside "dag" root node + file1_using_dag_json = True + dag_json_file1 = os.path.join(file1_dir, "dag.json") + if not os.path.isfile(dag_json_file1): + file1_using_dag_json = False + dag_json_file1 = os.path.join(file1_dir, "TEZ_DAG") + if not os.path.isfile(dag_json_file1): + print("Unable to find dag.json/TEZ_DAG file inside the archive " + file1) + sys.exit() + + file2_using_dag_json = True + dag_json_file2 = os.path.join(file2_dir, "dag.json") + if not os.path.isfile(dag_json_file2): + file2_using_dag_json = False + dag_json_file2 = os.path.join(file2_dir, "TEZ_DAG") + if not os.path.isfile(dag_json_file2): + print("Unable to find dag.json/TEZ_DAG file inside the archive " + file1) + sys.exit() + + # populate diff table + difftable = {} + with open(dag_json_file1) as data_file: + file1_dag_json = ( + json.load(data_file)["dag"] + if file1_using_dag_json + else json.load(data_file) + ) + + # Safe access to otherinfo and counters + otherinfo = file1_dag_json.get("otherinfo", {}) + counters = otherinfo.get("counters", {}) + + # Iterate only if counterGroups exists + for group in counters.get("counterGroups", []): + countertable = {} + for counter in group["counters"]: + counterName = counter["counterName"] + countertable[counterName] = [] + countertable[counterName].append(counter["counterValue"]) + + groupName = group["counterGroupName"] + difftable[groupName] = countertable + + # add other info safely + countertable = {} + countertable["TIME_TAKEN"] = [otherinfo.get("timeTaken", 0)] + countertable["COMPLETED_TASKS"] = [otherinfo.get("numCompletedTasks", 0)] + countertable["SUCCEEDED_TASKS"] = [otherinfo.get("numSucceededTasks", 0)] + countertable["FAILED_TASKS"] = [otherinfo.get("numFailedTasks", 0)] + countertable["KILLED_TASKS"] = [otherinfo.get("numKilledTasks", 0)] + countertable["FAILED_TASK_ATTEMPTS"] = [ + otherinfo.get("numFailedTaskAttempts", 0) + ] + countertable["KILLED_TASK_ATTEMPTS"] = [ + otherinfo.get("numKilledTaskAttempts", 0) + ] + difftable["otherinfo"] = countertable + + with open(dag_json_file2) as data_file: + file2_dag_json = ( + json.load(data_file)["dag"] + if file2_using_dag_json + else json.load(data_file) + ) + + otherinfo = file2_dag_json.get("otherinfo", {}) + counters = otherinfo.get("counters", {}) + + for group in counters.get("counterGroups", []): + groupName = group["counterGroupName"] + if groupName not in difftable: + difftable[groupName] = {} + countertable = difftable[groupName] + for counter in group["counters"]: + counterName = counter["counterName"] + # if counter does not exist in file1, add it with 0 value + if counterName not in countertable: + countertable[counterName] = [0] + countertable[counterName].append(counter["counterValue"]) + + # append other info safely + countertable = difftable["otherinfo"] + countertable["TIME_TAKEN"].append(otherinfo.get("timeTaken", 0)) + countertable["COMPLETED_TASKS"].append(otherinfo.get("numCompletedTasks", 0)) + countertable["SUCCEEDED_TASKS"].append(otherinfo.get("numSucceededTasks", 0)) + countertable["FAILED_TASKS"].append(otherinfo.get("numFailedTasks", 0)) + countertable["KILLED_TASKS"].append(otherinfo.get("numKilledTasks", 0)) + countertable["FAILED_TASK_ATTEMPTS"].append( + otherinfo.get("numFailedTaskAttempts", 0) + ) + countertable["KILLED_TASK_ATTEMPTS"].append( + otherinfo.get("numKilledTaskAttempts", 0) + ) + difftable["otherinfo"] = countertable + + # if some counters are missing, consider it as 0 and compute delta difference + for k, v in difftable.items(): + for key, value in v.items(): + # if counter value does not exisit in file2, add it with 0 value + if len(value) == 1: + value.append(0) + + # store delta difference + delta = value[1] - value[0] + value.append(("+" if delta > 0 else "") + str(delta)) + + return difftable + def print_table(difftable, name1, name2, detailed=False): - table = Texttable(max_width=0) - table.set_cols_align(["l", "l", "l", "l", "l"]) - table.set_cols_valign(["m", "m", "m", "m", "m"]) - table.add_row(["Counter Group", "Counter Name", name1, name2, "delta"]); - for k in sorted(difftable): - # ignore task specific counters in default output - if not detailed and ("_INPUT_" in k or "_OUTPUT_" in k): - continue - - v = difftable[k] - row = [] - # counter group. using shortname here instead of FQCN - if detailed: - row.append(k) - else: - row.append(k.split(".")[-1]) - - # keys as list (counter names) - row.append("\n".join(list(v.keys()))) - - # counter values for dag1 - for key, value in v.items(): - if len(value) == 1: - value.append(0) - value.append(value[0] - value[1]) - - # dag1 counter values - name1Val = [] - for key, value in v.items(): - name1Val.append(str(value[0])) - row.append("\n".join(name1Val)) - - # dag2 counter values - name2Val = [] - for key, value in v.items(): - name2Val.append(str(value[1])) - row.append("\n".join(name2Val)) - - # delta values - deltaVal = [] - for key, value in v.items(): - deltaVal.append(str(value[2])) - row.append("\n".join(deltaVal)) - - table.add_row(row) - - print table.draw() + "\n" + table = Texttable(max_width=0) + table.set_cols_align(["l", "l", "l", "l", "l"]) + table.set_cols_valign(["m", "m", "m", "m", "m"]) + table.add_row(["Counter Group", "Counter Name", name1, name2, "delta"]) + for k in sorted(difftable): + # ignore task specific counters in default output + if not detailed and ("_INPUT_" in k or "_OUTPUT_" in k): + continue + + v = difftable[k] + row = [] + # counter group. using shortname here instead of FQCN + if detailed: + row.append(k) + else: + row.append(k.split(".")[-1]) + + # keys as list (counter names) + row.append("\n".join(list(v.keys()))) + + # counter values for dag1 + for key, value in v.items(): + if len(value) == 1: + value.append(0) + value.append(value[0] - value[1]) + + # dag1 counter values + name1Val = [] + for key, value in v.items(): + name1Val.append(str(value[0])) + row.append("\n".join(name1Val)) + + # dag2 counter values + name2Val = [] + for key, value in v.items(): + name2Val.append(str(value[1])) + row.append("\n".join(name2Val)) + + # delta values + deltaVal = [] + for key, value in v.items(): + deltaVal.append(str(value[2])) + row.append("\n".join(deltaVal)) + + table.add_row(row) + + print(table.draw() + "\n") def main(argv): - sysargs = len(argv) - if sysargs < 2: - print "Usage: python counter-diff.py dag_file1.zip dag_file2.zip [--detail]" - return -1 + sysargs = len(argv) + if sysargs < 2: + print("Usage: python3 counter-diff.py dag_file1.zip dag_file2.zip [--detail]") + return -1 + + file1 = argv[0] + file2 = argv[1] + difftable = diff(file1, file2) - file1 = argv[0] - file2 = argv[1] - difftable = diff(file1, file2) + detailed = False + if sysargs == 3 and argv[2] == "--detail": + detailed = True - detailed = False - if sysargs == 3 and argv[2] == "--detail": - detailed = True + print_table( + difftable, os.path.splitext(file1)[0], os.path.splitext(file2)[0], detailed + ) - print_table(difftable, os.path.splitext(file1)[0], os.path.splitext(file2)[0], detailed) if __name__ == "__main__": - try: - sys.exit(main(sys.argv[1:])) - finally: - shutil.rmtree(tmpdir) \ No newline at end of file + try: + sys.exit(main(sys.argv[1:])) + finally: + shutil.rmtree(tmpdir) diff --git a/tez-tools/swimlanes/amlogparser.py b/tez-tools/swimlanes/amlogparser.py index 16b82bcf54..17fb8f448f 100644 --- a/tez-tools/swimlanes/amlogparser.py +++ b/tez-tools/swimlanes/amlogparser.py @@ -17,274 +17,311 @@ # under the License. # -import sys,re +import sys +import re from itertools import groupby -from bz2 import BZ2File -from gzip import GzipFile as GZFile -try: - from urllib.request import urlopen -except: - from urllib2 import urlopen as urlopen +import bz2 +import gzip +from urllib.request import urlopen + class AMRawEvent(object): - def __init__(self, ts, dag, event, args): - self.ts = ts - self.dag = dag - self.event = event - self.args = args - def __repr__(self): - return "%s->%s (%s)" % (self.dag, self.event, self.args) + def __init__(self, ts, dag, event, args): + self.ts = ts + self.dag = dag + self.event = event + self.args = args + + def __repr__(self): + return "%s->%s (%s)" % (self.dag, self.event, self.args) + def first(l): - return (l[:1] or [None])[0] + return (l[:1] or [None])[0] + def kv_add(d, k, v): - if(d.has_key(k)): - oldv = d[k] - if(type(oldv) is list): - oldv.append(v) - else: - oldv = [oldv, v] - d[k] = oldv - else: - d[k] = v - + if k in d: + oldv = d[k] + if type(oldv) is list: + oldv.append(v) + else: + oldv = [oldv, v] + d[k] = oldv + else: + d[k] = v + + def csv_kv(args): - kvs = {}; - pairs = [p.strip() for p in args.split(",")] - for kv in pairs: - if(kv.find("=") == -1): - kv_add(kvs, kv, None) - elif(kv.find("=") == kv.rfind("=")): - (k,v) = kv.split("=") - kv_add(kvs, k, v) - return kvs + kvs = {} + pairs = [p.strip() for p in args.split(",")] + for kv in pairs: + if kv.find("=") == -1: + kv_add(kvs, kv, None) + elif kv.find("=") == kv.rfind("="): + (k, v) = kv.split("=") + kv_add(kvs, k, v) + return kvs + class AppMaster(object): - def __init__(self, raw): - self.raw = raw - self.kvs = csv_kv(raw.args) - self.name = self.kvs["appAttemptId"] - self.zero = int(self.kvs["startTime"]) - #self.ready = int(self.kvs["initTime"]) - #self.start = int(self.kvs["appSubmitTime"]) - self.containers = None - self.dags = None - def __repr__(self): - return "[%s started at %d]" % (self.name, self.zero) + def __init__(self, raw): + self.raw = raw + self.kvs = csv_kv(raw.args) + self.name = self.kvs["appAttemptId"] + self.zero = int(self.kvs["startTime"]) + # self.ready = int(self.kvs["initTime"]) + # self.start = int(self.kvs["appSubmitTime"]) + self.containers = None + self.dags = None + + def __repr__(self): + return "[%s started at %d]" % (self.name, self.zero) + class DummyAppMaster(object): - """ magic of duck typing """ - def __init__(self, dag): - self.raw = None - self.kvs = {} - self.name = "Appmaster for %s" % dag.name - self.zero = dag.start - self.containers = None - self.dags = None - + """magic of duck typing""" + + def __init__(self, dag): + self.raw = None + self.kvs = {} + self.name = "Appmaster for %s" % dag.name + self.zero = dag.start + self.containers = None + self.dags = None + + class Container(object): - def __init__(self, raw): - self.raw = raw - self.kvs = csv_kv(raw.args) - self.name = self.kvs["containerId"] - self.start = int(self.kvs["launchTime"]) - self.stop = -1 - self.status = 0 - self.node ="" - def __repr__(self): - return "[%s start=%d]" % (self.name, self.start) + def __init__(self, raw): + self.raw = raw + self.kvs = csv_kv(raw.args) + self.name = self.kvs["containerId"] + self.start = int(self.kvs["launchTime"]) + self.stop = -1 + self.status = 0 + self.node = "" + + def __repr__(self): + return "[%s start=%d]" % (self.name, self.start) + class DummyContainer(object): - def __init__(self, attempt): - self.raw = None - self.kvs = {} - self.name = attempt.container - self.status = 0 - self.start = attempt.start - self.stop = -1 - self.status = 0 - self.node = None + def __init__(self, attempt): + self.raw = None + self.kvs = {} + self.name = attempt.container + self.status = 0 + self.start = attempt.start + self.stop = -1 + self.status = 0 + self.node = None + class DAG(object): - def __init__(self, raw): - self.raw = raw - self.name = raw.dag - self.kvs = csv_kv(raw.args) - self.start = (int)(self.kvs["startTime"]) - self.finish = (int)(self.kvs["finishTime"]) - self.duration = (int)(self.kvs["timeTaken"]) - def structure(self, vertexes): - self.vertexes = [v for v in vertexes if v.dag == self.name] - def attempts(self): - for v in self.vertexes: - for t in v.tasks: - for a in t.attempts: - if(a.dag == self.name): - yield a - def __repr__(self): - return "%s (%d+%d)" % (self.name, self.start, self.duration) + def __init__(self, raw): + self.raw = raw + self.name = raw.dag + self.kvs = csv_kv(raw.args) + self.start = (int)(self.kvs["startTime"]) + self.finish = (int)(self.kvs["finishTime"]) + self.duration = (int)(self.kvs["timeTaken"]) + + def structure(self, vertexes): + self.vertexes = [v for v in vertexes if v.dag == self.name] + + def attempts(self): + for v in self.vertexes: + for t in v.tasks: + for a in t.attempts: + if a.dag == self.name: + yield a + + def __repr__(self): + return "%s (%d+%d)" % (self.name, self.start, self.duration) + class Vertex(object): - def __init__(self, raw): - self.raw = raw - self.dag = raw.dag - self.kvs = csv_kv(raw.args) - self.name = self.kvs["vertexName"] - self.initZero = (int)(self.kvs["initRequestedTime"]) - self.init = (int)(self.kvs["initedTime"]) - self.startZero = (int)(self.kvs["startRequestedTime"]) - self.start = (int)(self.kvs["startedTime"]) - self.finish = (int)(self.kvs["finishTime"]) - self.duration = (int)(self.kvs["timeTaken"]) - def structure(self, tasks): - self.tasks = [t for t in tasks if t.vertex == self.name] - def __repr__(self): - return "%s (%d+%d)" % (self.name, self.start, self.duration) + def __init__(self, raw): + self.raw = raw + self.dag = raw.dag + self.kvs = csv_kv(raw.args) + self.name = self.kvs["vertexName"] + self.initZero = (int)(self.kvs["initRequestedTime"]) + self.init = (int)(self.kvs["initedTime"]) + self.startZero = (int)(self.kvs["startRequestedTime"]) + self.start = (int)(self.kvs["startedTime"]) + self.finish = (int)(self.kvs["finishTime"]) + self.duration = (int)(self.kvs["timeTaken"]) + + def structure(self, tasks): + self.tasks = [t for t in tasks if t.vertex == self.name] + + def __repr__(self): + return "%s (%d+%d)" % (self.name, self.start, self.duration) class Task(object): - def __init__(self, raw): - self.raw = raw - self.dag = raw.dag - self.kvs = csv_kv(raw.args) - self.vertex = self.kvs["vertexName"] - self.name = self.kvs["taskId"] - self.start = (int)(self.kvs["startTime"]) - self.finish = (int)(self.kvs["finishTime"]) - self.duration = (int)(self.kvs["timeTaken"]) - def structure(self, attempts): - self.attempts = [a for a in attempts if a.task == self.name] - def __repr__(self): - return "%s (%d+%d)" % (self.name, self.start, self.duration) + def __init__(self, raw): + self.raw = raw + self.dag = raw.dag + self.kvs = csv_kv(raw.args) + self.vertex = self.kvs["vertexName"] + self.name = self.kvs["taskId"] + self.start = (int)(self.kvs["startTime"]) + self.finish = (int)(self.kvs["finishTime"]) + self.duration = (int)(self.kvs["timeTaken"]) + + def structure(self, attempts): + self.attempts = [a for a in attempts if a.task == self.name] + + def __repr__(self): + return "%s (%d+%d)" % (self.name, self.start, self.duration) + class Attempt(object): - def __init__(self, pair): - start = first(filter(lambda a: a.event == "TASK_ATTEMPT_STARTED", pair)) - finish = first(filter(lambda a: a.event == "TASK_ATTEMPT_FINISHED", pair)) - if start is None or finish is None: - print [start, finish]; - self.raw = finish - self.kvs = csv_kv(start.args) - if finish is not None: - self.dag = finish.dag - self.kvs.update(csv_kv(finish.args)) - self.finish = (int)(self.kvs["finishTime"]) - self.duration = (int)(self.kvs["timeTaken"]) - self.name = self.kvs["taskAttemptId"] - self.task = self.name[:self.name.rfind("_")].replace("attempt","task") - (_, _, amid, dagid, vertexid, taskid, attemptid) = self.name.split("_") - self.tasknum = int(taskid) - self.attemptnum = int(attemptid) - self.vertex = self.kvs["vertexName"] - self.start = (int)(self.kvs["startTime"]) - self.container = self.kvs["containerId"] - self.node = self.kvs["nodeId"] - def __repr__(self): - return "%s (%d+%d)" % (self.name, self.start, self.duration) - + def __init__(self, pair): + # Consuming iterators immediately with list() for Py3 compatibility + start = first(list(filter(lambda a: a.event == "TASK_ATTEMPT_STARTED", pair))) + finish = first(list(filter(lambda a: a.event == "TASK_ATTEMPT_FINISHED", pair))) + if start is None or finish is None: + print([start, finish]) + self.raw = finish + self.kvs = csv_kv(start.args) + if finish is not None: + self.dag = finish.dag + self.kvs.update(csv_kv(finish.args)) + self.finish = (int)(self.kvs["finishTime"]) + self.duration = (int)(self.kvs["timeTaken"]) + self.name = self.kvs["taskAttemptId"] + self.task = self.name[: self.name.rfind("_")].replace("attempt", "task") + (_, _, amid, dagid, vertexid, taskid, attemptid) = self.name.split("_") + self.tasknum = int(taskid) + self.attemptnum = int(attemptid) + self.vertex = self.kvs["vertexName"] + self.start = (int)(self.kvs["startTime"]) + self.container = self.kvs["containerId"] + self.node = self.kvs["nodeId"] + + def __repr__(self): + return "%s (%d+%d)" % (self.name, self.start, self.duration) + def open_file(f): - if(f.endswith(".gz")): - return GZFile(f) - elif(f.endswith(".bz2")): - return BZ2File(f) - elif(f.startswith("http://")): - return urlopen(f) - return open(f) - -class AMLog(object): - def init(self): - ID=r'[^\]]*' - TS=r'[0-9:\-, ]*' - MAIN_RE=r'^(?P%(ts)s) [?INFO]? [(?P%(id)s)] \|?((HistoryEventHandler.criticalEvents)|((org.apache.tez.dag.)?history.HistoryEventHandler))\|?: [HISTORY][DAG:(?P%(id)s)][Event:(?P%(id)s)]: (?P.*)' - MAIN_RE = MAIN_RE.replace('[','\[').replace(']','\]') - MAIN_RE = MAIN_RE % {'ts' : TS, 'id' : ID} - self.MAIN_RE = re.compile(MAIN_RE) - - def __init__(self, f): - fp = open_file(f) - self.init() - self.events = filter(lambda a:a, [self.parse(l.strip()) for l in fp]) - - def structure(self): - am = self.appmaster() # this is a copy - containers = dict([(a.name, a) for a in self.containers()]) - dags = self.dags() - vertexes = self.vertexes() - tasks = self.tasks() - attempts = self.attempts() - for t in tasks: - t.structure(attempts) - for v in vertexes: - v.structure(tasks) - for d in dags: - d.structure(vertexes) - for a in attempts: - if containers.has_key(a.container): - c = containers[a.container] - c.node = a.node - else: - c = DummyContainer(a) - containers[a.container] = c - if not am: - am = DummyAppMaster(first(dags)) - am.containers = containers - am.dags = dags - return am - - def appmaster(self): - return first([AppMaster(ev) for ev in self.events if ev.event == "AM_STARTED"]) - - def containers(self): - containers = [Container(ev) for ev in self.events if ev.event == "CONTAINER_LAUNCHED"] - containermap = dict([(c.name, c) for c in containers]) - for ev in self.events: - if ev.event == "CONTAINER_STOPPED": - kvs = csv_kv(ev.args) - if containermap.has_key(kvs["containerId"]): - containermap[kvs["containerId"]].stop = int(kvs["stoppedTime"]) - containermap[kvs["containerId"]].status = int(kvs["exitStatus"]) - return containers - - - def dags(self): - dags = [DAG(ev) for ev in self.events if ev.event == "DAG_FINISHED"] - return dags - - def vertexes(self): - """ yes, not vertices """ - vertexes = [Vertex(ev) for ev in self.events if ev.event == "VERTEX_FINISHED"] - return vertexes - - def tasks(self): - tasks = [Task(ev) for ev in self.events if ev.event == "TASK_FINISHED"] - return tasks - - def attempts(self): - key = lambda a:a[0] - value = lambda a:a[1] - raw = [(csv_kv(ev.args)["taskAttemptId"], ev) for ev in self.events if ev.event == "TASK_ATTEMPT_STARTED" or ev.event == "TASK_ATTEMPT_FINISHED"] - pairs = groupby(sorted(raw), key = key) - attempts = [Attempt(map(value,p)) for (k,p) in pairs] - return attempts - - def parse(self, l): - if(l.find("[HISTORY]") != -1): - m = self.MAIN_RE.match(l) - ts = m.group("ts") - dag = m.group("dag") - event = m.group("event") - args = m.group("args") - return AMRawEvent(ts, dag, event, args) + if f.endswith(".gz"): + return gzip.open(f, "rt") + elif f.endswith(".bz2"): + return bz2.open(f, "rt") + elif f.startswith("http://"): + return urlopen(f) + return open(f, "r") + + +class AMLog(object): + def init(self): + ID = r"[^\]]*" + TS = r"[0-9:\-, ]*" + MAIN_RE = r"^(?P%(ts)s) [?INFO]? [(?P%(id)s)] \|?((HistoryEventHandler.criticalEvents)|((org.apache.tez.dag.)?history.HistoryEventHandler))\|?: [HISTORY][DAG:(?P%(id)s)][Event:(?P%(id)s)]: (?P.*)" + # Fix for SyntaxWarning: using raw strings + MAIN_RE = MAIN_RE.replace("[", r"\[").replace("]", r"\]") + MAIN_RE = MAIN_RE % {"ts": TS, "id": ID} + self.MAIN_RE = re.compile(MAIN_RE) + + def __init__(self, f): + fp = open_file(f) + self.init() + # Filter returns iterator in Py3, list() ensures immediate execution + self.events = list(filter(lambda a: a, [self.parse(l.strip()) for l in fp])) + + def structure(self): + am = self.appmaster() # this is a copy + containers = dict([(a.name, a) for a in self.containers()]) + dags = self.dags() + vertexes = self.vertexes() + tasks = self.tasks() + attempts = self.attempts() + for t in tasks: + t.structure(attempts) + for v in vertexes: + v.structure(tasks) + for d in dags: + d.structure(vertexes) + for a in attempts: + if a.container in containers: + c = containers[a.container] + c.node = a.node + else: + c = DummyContainer(a) + containers[a.container] = c + if not am: + am = DummyAppMaster(first(dags)) + am.containers = containers + am.dags = dags + return am + + def appmaster(self): + return first([AppMaster(ev) for ev in self.events if ev.event == "AM_STARTED"]) + + def containers(self): + containers = [ + Container(ev) for ev in self.events if ev.event == "CONTAINER_LAUNCHED" + ] + containermap = dict([(c.name, c) for c in containers]) + for ev in self.events: + if ev.event == "CONTAINER_STOPPED": + kvs = csv_kv(ev.args) + if kvs["containerId"] in containermap: + containermap[kvs["containerId"]].stop = int(kvs["stoppedTime"]) + containermap[kvs["containerId"]].status = int(kvs["exitStatus"]) + return containers + + def dags(self): + dags = [DAG(ev) for ev in self.events if ev.event == "DAG_FINISHED"] + return dags + + def vertexes(self): + """yes, not vertices""" + vertexes = [Vertex(ev) for ev in self.events if ev.event == "VERTEX_FINISHED"] + return vertexes + + def tasks(self): + tasks = [Task(ev) for ev in self.events if ev.event == "TASK_FINISHED"] + return tasks + + def attempts(self): + key = lambda a: a[0] + value = lambda a: a[1] + raw = [ + (csv_kv(ev.args)["taskAttemptId"], ev) + for ev in self.events + if ev.event == "TASK_ATTEMPT_STARTED" or ev.event == "TASK_ATTEMPT_FINISHED" + ] + # FIX: explicitly pass key to sorted() to avoid comparing AMRawEvent objects + # which causes TypeError in Python 3 + pairs = groupby(sorted(raw, key=key), key=key) + # Map returns iterator in Py3, list() creates the necessary list + attempts = [Attempt(list(map(value, p))) for (k, p) in pairs] + return attempts + + def parse(self, l): + if l.find("[HISTORY]") != -1: + m = self.MAIN_RE.match(l) + if m: + ts = m.group("ts") + dag = m.group("dag") + event = m.group("event") + args = m.group("args") + return AMRawEvent(ts, dag, event, args) + return None + def main(argv): - tree = AMLog(argv[0]).structure() - # AM -> dag -> vertex -> task -> attempt - # AM -> container - for d in tree.dags: - for a in d.attempts(): - print [a.vertex, a.name, a.container, a.start, a.finish] + tree = AMLog(argv[0]).structure() + # AM -> dag -> vertex -> task -> attempt + # AM -> container + for d in tree.dags: + for a in d.attempts(): + print([a.vertex, a.name, a.container, a.start, a.finish]) + if __name__ == "__main__": - main(sys.argv[1:]) + main(sys.argv[1:]) diff --git a/tez-tools/swimlanes/swimlane.py b/tez-tools/swimlanes/swimlane.py index 11976daab9..f763b871d1 100644 --- a/tez-tools/swimlanes/swimlane.py +++ b/tez-tools/swimlanes/swimlane.py @@ -17,185 +17,363 @@ # under the License. # -import sys,math,os.path -import StringIO +import sys +import io from amlogparser import AMLog from getopt import getopt + class ColourManager(object): - def __init__(self): - # text-printable colours - self.colours = [ - '#E4F5FC', '#62C2A2', '#E2F2D8', '#A9DDB4', '#E2F6E1', '#D8DAD7', '#BBBDBA', '#FEE6CE', '#FFCF9F', - '#FDAE69', '#FDE4DD', '#EDE6F2', '#A5BDDB', '#FDE1EE', '#D8B9D8', '#D7DCEC', '#BABDDA', '#FDC5BF', - '#FC9FB3', '#FDE1D2', '#FBBB9E', '#DBEF9F', '#AADD8E', '#81CDBB', '#C7EDE8', '#96D9C8', '#E3EBF4', - '#BAD3E5', '#9DBDD9', '#8996C8', '#CEEAC6', '#76CCC6', '#C7E9BE', '#9ED99C', '#71C572', '#EFF1EE', - '#949693', '#FD8D3D', '#FFF7ED', '#FED3AE', '#FEBB8F', '#FCE9CA', '#FED49B', '#FBBC85', '#FB8E58', - '#FFEEE8', '#D0D0E8', '#76A9CE', '#FDFFFC', '#E9E2EE', '#64A8D2', '#FAF7FC', '#F6ECF2', '#F8E7F0', - '#C994C6', '#E063B1', '#ECEDF7', '#DDD9EB', '#9B9BCA', '#FEDFDE', '#F8689F', '#FC9273', '#FC6948', - '#F6FDB6', '#78C67B', '#EBF9B0', '#C5E9B0', '#40B7C7', '#FDF7BA', '#FFE392', '#FFC34C', '#FF982A'] - self.i = 0 - def next(self): - self.i += 1 - return self.colours[self.i % len(self.colours)] + def __init__(self): + # text-printable colours + self.colours = [ + "#E4F5FC", + "#62C2A2", + "#E2F2D8", + "#A9DDB4", + "#E2F6E1", + "#D8DAD7", + "#BBBDBA", + "#FEE6CE", + "#FFCF9F", + "#FDAE69", + "#FDE4DD", + "#EDE6F2", + "#A5BDDB", + "#FDE1EE", + "#D8B9D8", + "#D7DCEC", + "#BABDDA", + "#FDC5BF", + "#FC9FB3", + "#FDE1D2", + "#FBBB9E", + "#DBEF9F", + "#AADD8E", + "#81CDBB", + "#C7EDE8", + "#96D9C8", + "#E3EBF4", + "#BAD3E5", + "#9DBDD9", + "#8996C8", + "#CEEAC6", + "#76CCC6", + "#C7E9BE", + "#9ED99C", + "#71C572", + "#EFF1EE", + "#949693", + "#FD8D3D", + "#FFF7ED", + "#FED3AE", + "#FEBB8F", + "#FCE9CA", + "#FED49B", + "#FBBC85", + "#FB8E58", + "#FFEEE8", + "#D0D0E8", + "#76A9CE", + "#FDFFFC", + "#E9E2EE", + "#64A8D2", + "#FAF7FC", + "#F6ECF2", + "#F8E7F0", + "#C994C6", + "#E063B1", + "#ECEDF7", + "#DDD9EB", + "#9B9BCA", + "#FEDFDE", + "#F8689F", + "#FC9273", + "#FC6948", + "#F6FDB6", + "#78C67B", + "#EBF9B0", + "#C5E9B0", + "#40B7C7", + "#FDF7BA", + "#FFE392", + "#FFC34C", + "#FF982A", + ] + self.i = 0 + + def next(self): + self.i += 1 + return self.colours[self.i % len(self.colours)] + def attempts(tree): - for d in tree.dags: - for a in d.attempts(): - yield (a.vertex, a.name, a.container, a.start, a.finish) + for d in tree.dags: + for a in d.attempts(): + yield (a.vertex, a.name, a.container, a.start, a.finish) + def attrs(args): - s = "" - for k in args: - v = args[k] - k = k.replace("_","-") # css - if type(v) is str: - s += "%s='%s' " % (k,v) - else: - s += "%s=%s " % (k,str(v)) - return s + s = "" + for k in args: + v = args[k] + k = k.replace("_", "-") # css + if type(v) is str: + s += "%s='%s' " % (k, v) + else: + s += "%s=%s " % (k, str(v)) + return s + class SVGHelper(object): - def __init__(self, w, h, parent=None): - self.width = w - self.height = h - self.parent = parent - if(not parent): - self.lines = StringIO.StringIO() - self.write(""" + def __init__(self, w, h, parent=None): + self.width = w + self.height = h + self.parent = parent + if not parent: + self.lines = io.StringIO() + self.write(""" """) - else: - self.lines = parent.lines - self.write("""""" % (h, w)) - self.write(""" + else: + self.lines = parent.lines + self.write( + """""" + % (h, w) + ) + self.write(""" """) - def line(self, x1, y1, x2, y2, style="stroke: #000", **kwargs): - self.write("""""" % (x1, y1, x2, y2, style, attrs(kwargs))) - def rect(self, left, top, right, bottom, style="", title="", link=None): - w = (right-left) - h = (bottom-top) - if link: - self.write("" % link) - self.write("""%s""" % (left, top, w, h, style, title)) - if link: - self.write("") - def text(self, x, y, text, style="", transform=""): - self.write("""%s""" % (x, y, style, transform, text)) - def link(self, x, y, text, link, style=""): - self.write("" % link) - self.text(x, y, text, style) - self.write("") - def write(self, s): - self.lines.write(s) - def flush(self): - self.write("") - if(self.parent): - self.parent.flush() - return self.lines.getvalue() + + def line(self, x1, y1, x2, y2, style="stroke: #000", **kwargs): + self.write( + """""" + % (x1, y1, x2, y2, style, attrs(kwargs)) + ) + + def rect(self, left, top, right, bottom, style="", title="", link=None): + w = right - left + h = bottom - top + if link: + self.write("" % link) + self.write( + """%s""" + % (left, top, w, h, style, title) + ) + if link: + self.write("") + + def text(self, x, y, text, style="", transform=""): + self.write( + """%s""" + % (x, y, style, transform, text) + ) + + def link(self, x, y, text, link, style=""): + self.write("" % link) + self.text(x, y, text, style) + self.write("") + + def write(self, s): + self.lines.write(s) + + def flush(self): + self.write("") + if self.parent: + self.parent.flush() + return self.lines.getvalue() + def usage(): - sys.stderr.write(""" + sys.stderr.write(""" usage: swimlane.py [-t ms-per-pixel] [-o outputfile] [-f redline-fraction] Input files for this tool can be prepared by "yarn logs -applicationId | grep HISTORY". """) + def main(argv): - (opts, args) = getopt(argv, "o:t:f:") - out = sys.stdout - ticks = -1 # precision of 1/tick - fraction = -1 - for k,v in opts: - if(k == "-o"): - out = open(v, "w") - if(k == "-t"): - ticks = int(v) - if(k == "-f"): - if(int(v) < 100): - fraction = int(v)/100.0 - if len(args) == 0: - return usage() - log = AMLog(args[0]).structure() - lanes = [c.name for c in sorted(log.containers.values(), key=lambda a: a.start)] - marginTop = 128 - marginRight = 100; - laneSize = 24 - y = len(lanes)*laneSize - items = attempts(log) - maxx = max([a[4] for a in items]) - if ticks == -1: - ticks = min(1000, (maxx - log.zero)/2048) - xdomain = lambda t : (t - log.zero)/ticks - x = xdomain(maxx) - svg = SVGHelper(x+2*marginRight+256, y+2*marginTop) - a = marginTop - svg.text(x/2, 32, log.name, style="font-size: 32px; text-anchor: middle") - containerMap = dict(zip(list(lanes), xrange(len(lanes)))) - svg.text(marginRight - 16, marginTop - 32, "Container ID", "text-anchor:end; font-size: 16px;") - # draw a grid - for l in lanes: - a += laneSize - svg.text(marginRight - 4, a, l, "text-anchor:end; font-size: 16px;") - svg.line(marginRight, a, marginRight+x, a, "stroke: #ccc") - for x1 in set(range(0, x, 10*ticks)) | set([x]): - svg.text(marginRight+x1, marginTop-laneSize/2, "%0.2f s" % ((x1 * ticks)/1000), "text-anchor: middle; font-size: 12px") - svg.line(marginRight+x1, marginTop-laneSize/2, marginRight+x1, marginTop+y, "stroke: #ddd") - svg.line(marginRight, marginTop, marginRight+x, marginTop) - svg.line(marginRight, y+marginTop, marginRight+x, y+marginTop) - svg.line(marginRight, marginTop, marginRight, y+marginTop) - svg.line(marginRight+x, marginTop, marginRight+x, y+marginTop) - - colourman = ColourManager() - for c in log.containers.values(): - y1 = marginTop+(containerMap[c.name]*laneSize) - x1 = marginRight+xdomain(c.start) - svg.line(x1, y1, x1, y1 + laneSize, style="stroke: green") - if c.stop > c.start: - x2 = marginRight+xdomain(c.stop) - if (c.status == 0): - svg.line(x2, y1, x2, y1 + laneSize, style="stroke: green") - else: - svg.line(x2, y1, x2, y1 + laneSize, style="stroke: red") - svg.text(x2, y1, "%d" % (c.status), style="text-anchor: right; font-size: 12px; stroke: red", transform="rotate(90, %d, %d)" % (x2, y1)) - svg.rect(x1, y1, x2, y1 + laneSize, style="fill: #ccc; opacity: 0.3") - elif c.stop == -1: - x2 = marginRight+x - svg.rect(x1, y1, x2, y1 + laneSize, style="fill: #ccc; opacity: 0.3") - for dag in log.dags: - x1 = marginRight+xdomain(dag.start) - svg.line(x1, marginTop-24, x1, marginTop+y, "stroke: black;", stroke_dasharray="8,4") - x2 = marginRight+xdomain(dag.finish) - svg.line(x2, marginTop-24, x2, marginTop+y, "stroke: black;", stroke_dasharray="8,4") - svg.line(x1, marginTop-24, x2, marginTop-24, "stroke: black") - svg.text((x1+x2)/2, marginTop-32, "%s (%0.1f s)" % (dag.name, (dag.finish-dag.start)/1000.0) , "text-anchor: middle; font-size: 12px;") - vertexes = set([v.name for v in dag.vertexes]) - colourmap = dict([(v,colourman.next()) for v in list(vertexes)]) - for c in dag.attempts(): - colour = colourmap[c.vertex] - y1 = marginTop+(containerMap[c.container]*laneSize)+1 - x1 = marginRight+xdomain(c.start) - x2 = marginRight+xdomain(c.finish) - y2 = y1 + laneSize - 2 - locality = (c.kvs.has_key("DATA_LOCAL_TASKS") * 1) + (c.kvs.has_key("RACK_LOCAL_TASKS")*2) - #CompletedLogs may not be present in latest tez logs - link = c.kvs.get("completedLogs", "") - svg.rect(x1, y1, x2, y2, title=c.name, style="fill: %s; stroke: #ccc;" % (colour), link=link) - if locality > 1: # rack-local (no-locality isn't counted) - svg.rect(x1, y2-4, x2, y2, style="fill: #f00; fill-opacity: 0.5;", link=link) - if x2 - x1 > 64: - svg.text((x1+x2)/2, y2-12, "%s (%05d_%d)" % (c.vertex, c.tasknum, c.attemptnum), style="text-anchor: middle; font-size: 9px;") - else: - svg.text((x1+x2)/2, y2-12, "%s" % c.vertex, style="text-anchor: middle; font-size: 9px;") - finishes = sorted([c.finish for c in dag.attempts()]) - if(len(finishes) > 10 and fraction > 0): - percentX = finishes[int(len(finishes)*fraction)] - svg.line(marginRight+xdomain(percentX), marginTop, marginRight+xdomain(percentX), y+marginTop, style="stroke: red") - svg.text(marginRight+xdomain(percentX), y+marginTop+12, "%d%% (%0.1fs)" % (int(fraction*100), (percentX - dag.start)/1000.0), style="font-size:12px; text-anchor: middle") - out.write(svg.flush()) - out.close() - print("Output svg is written into: " + str(out)) + (opts, args) = getopt(argv, "o:t:f:") + out = sys.stdout + out_filename = "stdout" + ticks = -1 # precision of 1/tick + fraction = -1 + for k, v in opts: + if k == "-o": + out = open(v, "w") + out_filename = v + if k == "-t": + ticks = int(v) + if k == "-f": + if int(v) < 100: + fraction = int(v) / 100.0 + if len(args) == 0: + return usage() + log = AMLog(args[0]).structure() + lanes = [c.name for c in sorted(log.containers.values(), key=lambda a: a.start)] + marginTop = 128 + marginRight = 100 + laneSize = 24 + y = len(lanes) * laneSize + items = list(attempts(log)) + maxx = max([a[4] for a in items]) + if ticks == -1: + ticks = min(1000, (maxx - log.zero) / 2048) + xdomain = lambda t: (t - log.zero) / ticks + x = xdomain(maxx) + svg = SVGHelper(x + 2 * marginRight + 256, y + 2 * marginTop) + a = marginTop + svg.text(x / 2, 32, log.name, style="font-size: 32px; text-anchor: middle") + containerMap = dict(zip(list(lanes), range(len(lanes)))) + svg.text( + marginRight - 16, + marginTop - 32, + "Container ID", + "text-anchor:end; font-size: 16px;", + ) + # draw a grid + for l in lanes: + a += laneSize + svg.text(marginRight - 4, a, l, "text-anchor:end; font-size: 16px;") + svg.line(marginRight, a, marginRight + x, a, "stroke: #ccc") + for x1 in set(range(0, int(x), int(10 * ticks))) | set([x]): + svg.text( + marginRight + x1, + marginTop - laneSize / 2, + "%0.2f s" % ((x1 * ticks) / 1000), + "text-anchor: middle; font-size: 12px", + ) + svg.line( + marginRight + x1, + marginTop - laneSize / 2, + marginRight + x1, + marginTop + y, + "stroke: #ddd", + ) + svg.line(marginRight, marginTop, marginRight + x, marginTop) + svg.line(marginRight, y + marginTop, marginRight + x, y + marginTop) + svg.line(marginRight, marginTop, marginRight, y + marginTop) + svg.line(marginRight + x, marginTop, marginRight + x, y + marginTop) + + colourman = ColourManager() + for c in log.containers.values(): + y1 = marginTop + (containerMap[c.name] * laneSize) + x1 = marginRight + xdomain(c.start) + svg.line(x1, y1, x1, y1 + laneSize, style="stroke: green") + if c.stop > c.start: + x2 = marginRight + xdomain(c.stop) + if c.status == 0: + svg.line(x2, y1, x2, y1 + laneSize, style="stroke: green") + else: + svg.line(x2, y1, x2, y1 + laneSize, style="stroke: red") + svg.text( + x2, + y1, + "%d" % (c.status), + style="text-anchor: right; font-size: 12px; stroke: red", + transform="rotate(90, %d, %d)" % (x2, y1), + ) + svg.rect(x1, y1, x2, y1 + laneSize, style="fill: #ccc; opacity: 0.3") + elif c.stop == -1: + x2 = marginRight + x + svg.rect(x1, y1, x2, y1 + laneSize, style="fill: #ccc; opacity: 0.3") + for dag in log.dags: + x1 = marginRight + xdomain(dag.start) + svg.line( + x1, + marginTop - 24, + x1, + marginTop + y, + "stroke: black;", + stroke_dasharray="8,4", + ) + x2 = marginRight + xdomain(dag.finish) + svg.line( + x2, + marginTop - 24, + x2, + marginTop + y, + "stroke: black;", + stroke_dasharray="8,4", + ) + svg.line(x1, marginTop - 24, x2, marginTop - 24, "stroke: black") + svg.text( + (x1 + x2) / 2, + marginTop - 32, + "%s (%0.1f s)" % (dag.name, (dag.finish - dag.start) / 1000.0), + "text-anchor: middle; font-size: 12px;", + ) + vertexes = set([v.name for v in dag.vertexes]) + colourmap = dict([(v, colourman.next()) for v in list(vertexes)]) + for c in dag.attempts(): + colour = colourmap[c.vertex] + y1 = marginTop + (containerMap[c.container] * laneSize) + 1 + x1 = marginRight + xdomain(c.start) + x2 = marginRight + xdomain(c.finish) + y2 = y1 + laneSize - 2 + locality = ("DATA_LOCAL_TASKS" in c.kvs) * 1 + ( + "RACK_LOCAL_TASKS" in c.kvs + ) * 2 + # CompletedLogs may not be present in latest tez logs + link = c.kvs.get("completedLogs", "") + svg.rect( + x1, + y1, + x2, + y2, + title=c.name, + style="fill: %s; stroke: #ccc;" % (colour), + link=link, + ) + if locality > 1: # rack-local (no-locality isn't counted) + svg.rect( + x1, + y2 - 4, + x2, + y2, + style="fill: #f00; fill-opacity: 0.5;", + link=link, + ) + if x2 - x1 > 64: + svg.text( + (x1 + x2) / 2, + y2 - 12, + "%s (%05d_%d)" % (c.vertex, c.tasknum, c.attemptnum), + style="text-anchor: middle; font-size: 9px;", + ) + else: + svg.text( + (x1 + x2) / 2, + y2 - 12, + "%s" % c.vertex, + style="text-anchor: middle; font-size: 9px;", + ) + finishes = sorted([c.finish for c in dag.attempts()]) + if len(finishes) > 10 and fraction > 0: + percentX = finishes[int(len(finishes) * fraction)] + svg.line( + marginRight + xdomain(percentX), + marginTop, + marginRight + xdomain(percentX), + y + marginTop, + style="stroke: red", + ) + svg.text( + marginRight + xdomain(percentX), + y + marginTop + 12, + "%d%% (%0.1fs)" + % (int(fraction * 100), (percentX - dag.start) / 1000.0), + style="font-size:12px; text-anchor: middle", + ) + + out.write(svg.flush()) + + # Do not close sys.stdout as it causes print() to fail afterwards + if out is not sys.stdout: + out.close() + + print("Output svg is written into: " + str(out_filename)) + if __name__ == "__main__": - sys.exit(main(sys.argv[1:])) + sys.exit(main(sys.argv[1:])) diff --git a/tez-tools/swimlanes/yarn-swimlanes.sh b/tez-tools/swimlanes/yarn-swimlanes.sh index 02465b0129..216c757332 100755 --- a/tez-tools/swimlanes/yarn-swimlanes.sh +++ b/tez-tools/swimlanes/yarn-swimlanes.sh @@ -24,7 +24,7 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" if [[ -f $APPID ]]; then echo "Reading yarn logs from local file: $APPID" - cat "$APPID" | grep HISTORY > "$TMP" + grep "HISTORY" "$APPID" > "$TMP" else YARN=$(which yarn); echo "Fetching yarn logs for $APPID" @@ -32,4 +32,4 @@ else fi echo "History was written into $TMP" -python "$DIR/swimlane.py" -o "$APPID.svg" "$TMP" \ No newline at end of file +python3 "$DIR/swimlane.py" -o "$APPID.svg" "$TMP" diff --git a/tez-tools/tez-log-split/logsplit.py b/tez-tools/tez-log-split/logsplit.py index 47e17da11b..063b7fd90e 100644 --- a/tez-tools/tez-log-split/logsplit.py +++ b/tez-tools/tez-log-split/logsplit.py @@ -23,6 +23,7 @@ from gzip import GzipFile as GZFile from getopt import getopt + def usage(): sys.stderr.write(""" usage: logsplit.py @@ -30,20 +31,24 @@ def usage(): Input files for this tool can be prepared by "yarn logs -applicationId ". """) + def open_file(f): if f.endswith(".gz"): return GZFile(f) return open(f) + class AggregatedLog(object): def __init__(self): self.in_container = False self.in_logfile = False self.current_container_header = None self.current_container_name = None - self.current_host_name = None # as read from log line: "hello.my.host.com_8041" + self.current_host_name = None # as read from log line: "hello.my.host.com_8041" self.current_file = None - self.HEADER_CONTAINER_RE = re.compile("Container: (container_[a-z0-9_]+) on (.*)") + self.HEADER_CONTAINER_RE = re.compile( + "Container: (container_[a-z0-9_]+) on (.*)" + ) self.HEADER_LAST_ROW_RE = re.compile("^LogContents:$") self.HEADER_LOG_TYPE_RE = re.compile("^LogType:(.*)") self.LAST_LOG_LINE_RE = re.compile("^End of LogType:.*") @@ -72,7 +77,9 @@ def parse(self, line): self.create_file_in_current_container(file_name) elif self.HEADER_LAST_ROW_RE.match(line): self.in_logfile = True - self.write_to_current_file(self.current_container_header) #for host reference + self.write_to_current_file( + self.current_container_header + ) # for host reference else: m = self.HEADER_CONTAINER_RE.match(line) self.current_container_header = line @@ -83,12 +90,16 @@ def parse(self, line): self.start_container_folder() def start_container_folder(self): - container_dir = os.path.join(self.output_folder, self.get_current_container_dir_name()) + container_dir = os.path.join( + self.output_folder, self.get_current_container_dir_name() + ) if not os.path.exists(container_dir): os.makedirs(container_dir) def create_file_in_current_container(self, file_name): - file_to_be_created = os.path.join(self.output_folder, self.get_current_container_dir_name(), file_name) + file_to_be_created = os.path.join( + self.output_folder, self.get_current_container_dir_name(), file_name + ) file = open(file_to_be_created, "w+") self.current_file = file @@ -98,14 +109,18 @@ def write_to_current_file(self, line): def get_current_container_dir_name(self): return os.path.join(self.current_host_name, self.current_container_name) + def main(argv): (opts, args) = getopt(argv, "") input_file = args[0] fp = open_file(input_file) aggregated_log = AggregatedLog() aggregated_log.process(fp) - print ("Split application logs was written into folder " + aggregated_log.output_folder) + print( + "Split application logs was written into folder " + aggregated_log.output_folder + ) fp.close() + if __name__ == "__main__": sys.exit(main(sys.argv[1:])) diff --git a/tez-tools/tez-log-split/tez-log-splitter.sh b/tez-tools/tez-log-split/tez-log-splitter.sh index 712e499a4f..cc102ab723 100755 --- a/tez-tools/tez-log-split/tez-log-splitter.sh +++ b/tez-tools/tez-log-split/tez-log-splitter.sh @@ -32,4 +32,4 @@ else echo "Application log was written into $TMP" fi -python "$DIR/logsplit.py" "$TMP" \ No newline at end of file +python3 "$DIR/logsplit.py" "$TMP"