Skip to content
This repository was archived by the owner on Apr 20, 2026. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions analysis/dashboard/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,14 @@ def render_sidebar(logs_dir, runs):
if run.metadata.gpu_type:
label += f" | {run.metadata.gpu_type}"

# Use folder name suffix (YAML file name) instead of YAML's internal name field
dirname = os.path.basename(run.metadata.path)
if "-" in dirname:
folder_suffix = dirname.split("-", 1)[1]
label += f" | {folder_suffix}"
elif run.metadata.job_name:
label += f" | {run.metadata.job_name}"

run_legend_labels[run_id] = label

# Get dataframe
Expand Down
2 changes: 1 addition & 1 deletion analysis/dashboard/node_metrics_tab.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def render(filtered_runs: list, logs_dir: str):
if not all_node_metrics:
st.warning("No log files found for the selected runs.")
st.info(
"Node metrics are extracted from files like `*_prefill_*.err`, `*_decode_*.err`, `*_prefill_*.out`, or `*_decode_*.out`"
"Node metrics are extracted from files like `*_prefill_*.out`, `*_decode_*.out`, or `*_agg_*.out`"
)
return

Expand Down
27 changes: 18 additions & 9 deletions analysis/srtlog/log_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def parse_run_logs(self, run_path: str, return_dicts: bool = False) -> list:
parsed_successfully = 0

for file in os.listdir(run_path):
if (file.endswith(".err") or file.endswith(".out")) and ("prefill" in file or "decode" in file):
if (file.endswith(".err") or file.endswith(".out")) and ("prefill" in file or "decode" in file or "agg" in file):
total_err_files += 1
filepath = os.path.join(run_path, file)
node = self.parse_single_log(filepath)
Expand Down Expand Up @@ -446,10 +446,11 @@ def _deserialize_node_metrics(self, df: pd.DataFrame) -> list:
def _parse_dp_tp_ep_tag(self, line: str) -> tuple[int | None, int | None, int | None, str | None]:
"""Extract DP, TP, EP indices and timestamp from log line.

Supports three formats:
Supports four formats:
- Full: [2025-11-04 05:31:43 DP0 TP0 EP0]
- Simple TP: [2025-11-04 07:05:55 TP0] (defaults DP=0, EP=0)
- Pipeline: [2025-12-08 14:34:44 PP0] (defaults DP=0, EP=0, TP=PP value)
- Agg/ANSI: [2m2026-02-23T22:33:43.664588Z[0m ... (ISO timestamp in ANSI codes, defaults DP=0, TP=0, EP=0)

Args:
line: Log line to parse
Expand All @@ -475,6 +476,14 @@ def _parse_dp_tp_ep_tag(self, line: str) -> tuple[int | None, int | None, int |
timestamp, pp = match.groups()
return 0, int(pp), 0, timestamp # Map PP to TP slot, default DP=0, EP=0

# Try ANSI-escaped ISO timestamp format (agg mode logs)
# Example: [2m2026-02-23T22:33:43.664588Z[0m
match = re.search(r"\[2m(\d{4}-\d{2}-\d{2})T(\d{2}:\d{2}:\d{2})", line)
if match:
date_part, time_part = match.groups()
timestamp = f"{date_part} {time_part}"
return 0, 0, 0, timestamp

return None, None, None, None

def _parse_prefill_batch_line(self, line: str) -> dict | None:
Expand All @@ -491,12 +500,12 @@ def _parse_prefill_batch_line(self, line: str) -> dict | None:

metrics = {"timestamp": timestamp, "dp": dp, "tp": tp, "ep": ep, "type": "prefill"}

# Extract metrics using regex
# Extract metrics using regex (support both disagg and agg log formats)
patterns = {
"new_seq": r"#new-seq:\s*(\d+)",
"new_token": r"#new-token:\s*(\d+)",
"cached_token": r"#cached-token:\s*(\d+)",
"token_usage": r"token usage:\s*([\d.]+)",
"token_usage": r"(?:full )?token usage:\s*([\d.]+)",
"running_req": r"#running-req:\s*(\d+)",
"queue_req": r"#queue-req:\s*(\d+)",
"prealloc_req": r"#prealloc-req:\s*(\d+)",
Expand Down Expand Up @@ -526,11 +535,11 @@ def _parse_decode_batch_line(self, line: str) -> dict | None:

metrics = {"timestamp": timestamp, "dp": dp, "tp": tp, "ep": ep, "type": "decode"}

# Extract metrics using regex
# Extract metrics using regex (support both disagg and agg log formats)
patterns = {
"running_req": r"#running-req:\s*(\d+)",
"num_tokens": r"#token:\s*(\d+)",
"token_usage": r"token usage:\s*([\d.]+)",
"num_tokens": r"#(?:full )?token:\s*(\d+)",
"token_usage": r"(?:full )?token usage:\s*([\d.]+)",
"preallocated_usage": r"pre-allocated usage:\s*([\d.]+)",
"prealloc_req": r"#prealloc-req:\s*(\d+)",
"transfer_req": r"#transfer-req:\s*(\d+)",
Expand Down Expand Up @@ -597,8 +606,8 @@ def _extract_node_info_from_filename(self, filename: str) -> dict | None:
Example: watchtower-navy-cn01_prefill_w0.err or r02-p01-dgx-c11_prefill_w0.out
Returns: {'node': 'watchtower-navy-cn01', 'worker_type': 'prefill', 'worker_id': 'w0'}
"""
# Use greedy match for node name up to _(prefill|decode|frontend)_
match = re.match(r"(.+)_(prefill|decode|frontend)_([^.]+)\.(err|out)", os.path.basename(filename))
# Use greedy match for node name up to _(prefill|decode|frontend|agg)_
match = re.match(r"(.+)_(prefill|decode|frontend|agg)_([^.]+)\.(err|out)", os.path.basename(filename))
if match:
return {
"node": match.group(1),
Expand Down
94 changes: 65 additions & 29 deletions analysis/srtlog/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,43 @@ def from_json(cls, json_data: dict, run_path: str) -> "RunMetadata":
Returns:
RunMetadata instance
"""
# Check if this is the old format (with run_metadata key) or new format (flat)
# Check if this is the new v2.0 format
if json_data.get("version") == "2.0":
resources = json_data.get("resources", {})
model = json_data.get("model", {})
agg_workers = resources.get("agg_workers") or 0

# Determine mode based on agg_workers
mode = "aggregated" if agg_workers > 0 else "disaggregated"

# agg_nodes: fall back to agg_workers if not specified
agg_nodes = (resources.get("agg_nodes") or 0) if agg_workers > 0 else 0
if agg_workers > 0 and agg_nodes == 0:
agg_nodes = agg_workers

return cls(
job_id=json_data.get("job_id", ""),
path=run_path,
run_date=json_data.get("generated_at", ""),
container=model.get("container", ""),
prefill_nodes=resources.get("prefill_nodes") or 0,
decode_nodes=resources.get("decode_nodes") or 0,
prefill_workers=resources.get("prefill_workers") or 0,
decode_workers=resources.get("decode_workers") or 0,
mode=mode,
job_name=json_data.get("job_name", ""),
partition="",
model_dir=model.get("path", ""),
gpus_per_node=resources.get("gpus_per_node") or 0,
gpu_type=resources.get("gpu_type", ""),
enable_multiple_frontends=False,
num_additional_frontends=0,
agg_nodes=agg_nodes,
agg_workers=agg_workers,
)

# Old format with run_metadata key
if "run_metadata" in json_data:
# Old format
run_meta = json_data.get("run_metadata", {})
mode = run_meta.get("mode", "disaggregated")

Expand All @@ -74,35 +108,37 @@ def from_json(cls, json_data: dict, run_path: str) -> "RunMetadata":
agg_nodes=run_meta.get("agg_nodes", 0),
agg_workers=run_meta.get("agg_workers", 0),
)
else:
# New format (flat structure)
model_data = json_data.get("model", {})
resources_data = json_data.get("resources", {})
agg_workers = resources_data.get("agg_workers", 0)

# Determine mode based on agg_workers
mode = "aggregated" if agg_workers > 0 else "disaggregated"
# New format (flat structure, no version field)
model_data = json_data.get("model", {})
resources_data = json_data.get("resources", {})
agg_workers = resources_data.get("agg_workers") or 0
mode = "aggregated" if agg_workers > 0 else "disaggregated"

return cls(
job_id=json_data.get("job_id", ""),
path=run_path,
run_date=json_data.get("generated_at", ""),
container=model_data.get("container", ""),
prefill_nodes=resources_data.get("prefill_nodes", 0),
decode_nodes=resources_data.get("decode_nodes", 0),
prefill_workers=resources_data.get("prefill_workers", 0),
decode_workers=resources_data.get("decode_workers", 0),
mode=mode,
job_name=json_data.get("job_name", ""),
partition="", # Not present in new format
model_dir=model_data.get("path", ""), # Use model path as model_dir
gpus_per_node=resources_data.get("gpus_per_node", 0),
gpu_type=resources_data.get("gpu_type", ""),
enable_multiple_frontends=False, # Not present in new format
num_additional_frontends=0, # Not present in new format
agg_nodes=resources_data.get("agg_nodes", 0), # Not present in new format
agg_workers=agg_workers,
)
agg_nodes = (resources_data.get("agg_nodes") or 0) if agg_workers > 0 else 0
if agg_workers > 0 and agg_nodes == 0:
agg_nodes = agg_workers

return cls(
job_id=json_data.get("job_id", ""),
path=run_path,
run_date=json_data.get("generated_at", ""),
container=model_data.get("container", ""),
prefill_nodes=resources_data.get("prefill_nodes") or 0,
decode_nodes=resources_data.get("decode_nodes") or 0,
prefill_workers=resources_data.get("prefill_workers") or 0,
decode_workers=resources_data.get("decode_workers") or 0,
mode=mode,
job_name=json_data.get("job_name", ""),
partition="",
model_dir=model_data.get("path", ""),
gpus_per_node=resources_data.get("gpus_per_node") or 0,
gpu_type=resources_data.get("gpu_type", ""),
enable_multiple_frontends=False,
num_additional_frontends=0,
agg_nodes=agg_nodes,
agg_workers=agg_workers,
)

@property
def is_aggregated(self) -> bool:
Expand Down
15 changes: 10 additions & 5 deletions analysis/srtlog/run_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,12 @@ def _load_benchmark_results(self, run: BenchmarkRun) -> None:
profiler_type = run.profiler.profiler_type
pattern_strs = [f"{profiler_type}_isl_{run.profiler.isl}_osl_{run.profiler.osl}"]

# Define source patterns for cache validation (check all possible patterns)
source_patterns = [f"{pattern}/*.json" for pattern in pattern_strs]
# Define source patterns for cache validation
# Include both direct and logs/ subdirectory paths since results could be in either
source_patterns = []
for pattern in pattern_strs:
source_patterns.append(f"{pattern}/*.json")
source_patterns.append(f"logs/{pattern}/*.json")

# Try to load from cache first
if cache_mgr.is_cache_valid("benchmark_results", source_patterns):
Expand Down Expand Up @@ -531,9 +535,10 @@ def to_dataframe(self, runs: list[BenchmarkRun] | None = None):
rows = []

for run in runs:
run_id = (
f"{run.job_id}_{run.metadata.prefill_workers}P_{run.metadata.decode_workers}D_{run.metadata.run_date}"
)
if run.metadata.is_aggregated:
run_id = f"{run.job_id}_{run.metadata.agg_workers}A_{run.metadata.run_date}"
else:
run_id = f"{run.job_id}_{run.metadata.prefill_workers}P_{run.metadata.decode_workers}D_{run.metadata.run_date}"
total_gpus = run.total_gpus

# Create a row for each concurrency level
Expand Down
Loading