Skip to content

Commit

Permalink
[AutoParallel] Support multi machine case for the visualize tool (#59179
Browse files Browse the repository at this point in the history
)

* merge from openvino master

* add InterpreterRunTime() to record interpreter's run time

* add profiler helper static to produce json file

* add color map and support perfetto format

* recover codes

* control include env for gpu_timer.h

* fix logic for profiler_helper_static.py

* fix build error

* fix build error

* recover thirdparty

* add flag control: not support new ir now

* set auto_parallel_profiler flag to false

* fix

* add auto_parallel_profiler as command parameter

* fix value name

* support gettimeofday for win env

* fix win build error

* fix win build error

* use job_type_to_id

* Fixed repeatedly timing the same stream

* add step line for timeline

* add step timeline and fix logic when job overlap

* update time record logic

* fix bug when start profile start from none zero step

* fix note

* remove FLAGS_auto_parallel_profiler

* use run config instead FLAGS_auto_parallelxx

* fix color map logic

* fix color map logic

* fix bug when log step does not start from 0

* fix

* fix

* don't use set_enable_auto_parallel_profiler

* fix bug

* disable auto_parallel_profiler when not open flag by command line

* fix bug

* remove resettime

* fix build bug

* fix

* remove set enable

* fix build error

* fix build error

* fix build error

* fix ci error

* fix

* fix run error

* fix

* fix

* fix calculate_stream_timer logic

* remove fluid head

* fix build error

* set default value for enable_job_schedule_profiler

* support multi machine

* fix load dir logic
  • Loading branch information
AndSonder authored Nov 25, 2023
1 parent e19a92d commit 6ab2fce
Showing 1 changed file with 89 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ def parse_args():
all_devices = ",".join([str(i) for i in range(device_count)])
parser.add_argument("--devices", type=str, default=all_devices)
parser.add_argument("--log_dir", type=str, required=True)
parser.add_argument("--multi_machine", action="store_true")
args = parser.parse_args()
return args


def process_job_log(log_data, device_id):
def process_job_log(log_data, device_id, multi_machine_idx=-1):
log_pattern = r'.*?Profiler Info: Job \((\d+)\), type = (\w+), micro_batch_id = (\d+), job_start_time = (\d+.\d+), job_end_time = (\d+.\d+)'
matches = re.findall(log_pattern, log_data)
events = []
Expand All @@ -66,21 +67,30 @@ def process_job_log(log_data, device_id):
step_start_time = start_time
step_end_time = end_time

tid_name = (
"GPU" + str(device_id)
if multi_machine_idx == -1
else "GPU"
+ str(device_id)
+ "(machine:"
+ str(multi_machine_idx)
+ ")"
)
event_start = {
"name": job_type + "_" + str(job_id),
"cat": job_type,
"ph": "B",
"ts": start_time,
"pid": 0,
"tid": "GPU" + str(device_id),
"tid": tid_name,
}
event_end = {
"name": job_type + "_" + str(job_id),
"cat": job_type,
"ph": "E",
"pid": 0,
"ts": end_time,
"tid": "GPU" + str(device_id),
"tid": tid_name,
}
if job_type in color_map:
event_start["cname"] = color_map[job_type]
Expand All @@ -100,29 +110,48 @@ def main():
all_events = []
step_infos = []
start_step = 0

for device_id in args.devices.split(","):
_logger.info(f"Process device {device_id}")
device_id = int(device_id)
log_file = os.path.join(args.log_dir, "workerlog." + str(device_id))
with open(log_file, "r") as f:
log_data = f.read()

start_step_pattern = (
r'.*?Schedule Profiler start at step (\d+) and end at step.*'
)
start_step_match = re.findall(start_step_pattern, log_data)
start_step = (
int(start_step_match[0]) if len(start_step_match) > 0 else 0
)

events, step_times = process_job_log(log_data, device_id)
all_events.extend(events)
for i, info in enumerate(step_times):
if len(step_infos) <= i:
step_infos.append([float("inf"), float("-inf")])
step_infos[i][0] = min(step_infos[i][0], info[0])
step_infos[i][1] = max(step_infos[i][1], info[1])
machine_num = 1

def process_one_machine_log(log_dir, multi_machine_idx=-1):
for device_id in args.devices.split(","):
_logger.info(f"Process device {device_id}")
device_id = int(device_id)
log_file = os.path.join(log_dir, "workerlog." + str(device_id))
with open(log_file, "r") as f:
log_data = f.read()

start_step_pattern = (
r'.*?Schedule Profiler start at step (\d+) and end at step.*'
)
start_step_match = re.findall(start_step_pattern, log_data)
start_step = (
int(start_step_match[0]) if len(start_step_match) > 0 else 0
)

events, step_times = process_job_log(
log_data, device_id, multi_machine_idx
)
all_events.extend(events)
for i, info in enumerate(step_times):
if len(step_infos) <= i:
step_infos.append([float("inf"), float("-inf")])
step_infos[i][0] = min(step_infos[i][0], info[0])
step_infos[i][1] = max(step_infos[i][1], info[1])

if args.multi_machine:
multi_machine_dirs = os.listdir(args.log_dir)
multi_machine_dirs = [
os.path.join(args.log_dir, d)
for d in multi_machine_dirs
if d.startswith("machine")
and os.path.isdir(os.path.join(args.log_dir, d))
]
machine_num = len(multi_machine_dirs)
for i, d in enumerate(multi_machine_dirs):
_logger.info(f"Process machine {i}")
process_one_machine_log(d, i)
else:
process_one_machine_log(args.log_dir)

for i, info in enumerate(step_infos):
start_time = info[0]
Expand Down Expand Up @@ -170,24 +199,41 @@ def main():
}
]
)
for i in range(len(args.devices.split(","))):
all_events.extend(
[
{
"args": {"name": f"GPU:{i}"},
"cat": "__metadata",
"name": "thread_name",
"ph": "M",
"pid": 0,
"tid": i + 2334,
"ts": 0,
}
]
)

for i in range(machine_num):
for j in range(len(args.devices.split(","))):
if machine_num > 1:
name = f"GPU:{j}(machine:{i})"
tid = i * len(args.devices.split(",")) + j + 2334
else:
name = f"GPU:{j}"
tid = j + 2334
all_events.extend(
[
{
"args": {"name": name},
"cat": "__metadata",
"name": "thread_name",
"ph": "M",
"pid": 0,
"tid": tid,
"ts": 0,
}
]
)

json_str = json.dumps({"traceEvents": all_events})
for i in range(len(args.devices.split(","))):
json_str = json_str.replace('"Step"', '2333')
json_str = json_str.replace(f'"GPU{i}"', f'{i + 2334}')
json_str = json_str.replace('"Step"', '2333')

for i in range(machine_num):
for j in range(len(args.devices.split(","))):
if machine_num > 1:
json_str = json_str.replace(
f'"GPU{j}(machine:{i})"',
f'{i * len(args.devices.split(",")) + j + 2334}',
)
else:
json_str = json_str.replace(f'"GPU{j}"', f'{j + 2334}')

with open(save_path, "w") as f:
f.write(json_str)
Expand Down

0 comments on commit 6ab2fce

Please sign in to comment.