diff --git a/src/gantry/widgets.py b/src/gantry/widgets.py index f914c01..40cca0d 100644 --- a/src/gantry/widgets.py +++ b/src/gantry/widgets.py @@ -1,12 +1,14 @@ """Custom widgets for Gantry TUI.""" import logging -from typing import Any, Dict, List, Optional, Callable +from datetime import datetime +from typing import Any, Dict, List, Optional, Callable, Tuple from textual.widgets import DataTable, Static, Input from textual.containers import Container, Horizontal, Vertical from textual.message import Message from textual.events import Key from textual.css.query import NoMatches +from rich.text import Text logger = logging.getLogger(__name__) @@ -45,6 +47,100 @@ def __init__(self, *args, **kwargs): self._all_rows: Dict[str, List[Any]] = {} self._search_term: str = "" self._columns: List[str] = [] + self._column_keys: List[str] = [] + # Each entry is (column_index, reverse); index 0 = primary sort + self._sort_columns: List[Tuple[int, bool]] = [] + self._shift_held: bool = False + + def on_mouse_down(self, event) -> None: + """Capture shift modifier state so header clicks can use it.""" + self._shift_held = getattr(event, "shift", False) + + def _compute_next_sort(self, col_idx: int, shift: bool) -> None: + """Update _sort_columns based on a header click. + + Args: + col_idx: Index of the clicked column. + shift: Whether Shift was held (adds secondary sort). + """ + existing_pos = next( + (i for i, (idx, _) in enumerate(self._sort_columns) if idx == col_idx), + None, + ) + if shift: + if existing_pos is not None: + idx, rev = self._sort_columns[existing_pos] + self._sort_columns[existing_pos] = (idx, not rev) + else: + self._sort_columns.append((col_idx, False)) + if len(self._sort_columns) > 3: + self._sort_columns.pop(0) + else: + if existing_pos == 0: + # Clicking current primary sort: toggle direction, clear secondaries + _, rev = self._sort_columns[0] + self._sort_columns = [(col_idx, not rev)] + else: + # New primary sort column: start ascending + self._sort_columns = [(col_idx, False)] + + def on_data_table_header_selected(self, event) -> None: + """Handle column header click to set sort order.""" + ck = event.column_key + col_key_str = str(getattr(ck, "value", ck)) + if col_key_str not in self._column_keys: + return + col_idx = self._column_keys.index(col_key_str) + self._compute_next_sort(col_idx, self._shift_held) + self._shift_held = False + self._update_column_labels() + self._apply_filter(self._search_term) + + def _build_column_label_text(self, col_name: str, col_idx: int) -> str: + """Return column label text with sort indicator appended if applicable.""" + sort_map = {idx: (rank, rev) for rank, (idx, rev) in enumerate(self._sort_columns)} + multi = len(self._sort_columns) > 1 + if col_idx in sort_map: + rank, rev = sort_map[col_idx] + indicator = "▼" if rev else "▲" + suffix = f" {indicator}{rank + 1}" if multi else f" {indicator}" + return col_name + suffix + return col_name + + def _coerce_sort_value(self, value: Any) -> tuple: + """Return a (type_tag, coerced_value) tuple for type-aware, cross-type-safe sorting. + + Ordering: numbers (0) < datetimes (1) < strings (2). + Ensures no TypeError when a column contains mixed types. + """ + if value is None: + return (2, "") + s = str(value).strip() + if not s: + return (2, "") + try: + return (0, float(s)) + except (ValueError, TypeError): + pass + try: + return (1, datetime.fromisoformat(s.replace("Z", "+00:00"))) + except (ValueError, TypeError): + return (2, s.lower()) + + def _update_column_labels(self) -> None: + """Rewrite column header labels in-place to show sort indicators.""" + for col_key_obj, column in self.columns.items(): + key_str = str(getattr(col_key_obj, "value", col_key_obj)) + if not key_str.startswith("col_"): + continue + try: + i = int(key_str.split("_")[1]) + except (ValueError, IndexError): + continue + if i >= len(self._columns): + continue + column.label = Text(self._build_column_label_text(self._columns[i], i)) + self.refresh() def populate_resources( self, @@ -55,29 +151,33 @@ def populate_resources( """ Populate the table with resources. + Maintains current sort when column set is unchanged; resets sort when + columns change (e.g., switching resource type). + Args: resources: List of resource dictionaries from K8s API. columns: List of column headers to display. column_keys: List of keys to extract from each resource dict. """ + old_columns = self._columns[:] self.clear(columns=True) self._all_rows.clear() - self._columns = columns + self._columns = list(columns) + self._column_keys = [f"col_{i}" for i in range(len(columns))] - # Add columns - for col in columns: - self.add_column(col) + if list(columns) != old_columns: + self._sort_columns.clear() + + for i, col in enumerate(columns): + key = f"col_{i}" + self.add_column(self._build_column_label_text(col, i), key=key) - # Add rows for i, resource in enumerate(resources): row_key = f"row-{i}" - row_values = [str(resource.get(key, "")) for key in column_keys] - self.add_row(*row_values, key=row_key) + row_values = [resource.get(k, "") for k in column_keys] self._all_rows[row_key] = row_values - # Apply current search filter - if self._search_term: - self._apply_filter(self._search_term) + self._apply_filter(self._search_term) def filter_by_search(self, search_term: str) -> None: """ @@ -89,19 +189,38 @@ def filter_by_search(self, search_term: str) -> None: self._search_term = search_term.lower() self._apply_filter(self._search_term) + def _sort_items( + self, items: List[Tuple[str, List[Any]]] + ) -> List[Tuple[str, List[Any]]]: + """Return items sorted according to _sort_columns (stable multi-key).""" + if not self._sort_columns: + return items + sorted_items = list(items) + # Stable sort: apply from least to most significant column + for col_idx, reverse in reversed(self._sort_columns): + sorted_items.sort( + key=lambda item, c=col_idx: self._coerce_sort_value(item[1][c]) + if c < len(item[1]) + else (2, ""), + reverse=reverse, + ) + return sorted_items + def _apply_filter(self, search_term: str) -> None: - """Apply the search filter to the table.""" + """Apply the search filter and current sort order to the table.""" self.clear() # Keeps columns; only clears rows if not search_term: - # Show all rows - for row_key, row_values in self._all_rows.items(): - self.add_row(*row_values, key=row_key) + visible_items = list(self._all_rows.items()) else: - # Filter rows by search term - for row_key, row_values in self._all_rows.items(): - if any(search_term in str(val).lower() for val in row_values): - self.add_row(*row_values, key=row_key) + visible_items = [ + (key, vals) + for key, vals in self._all_rows.items() + if any(search_term in str(val).lower() for val in vals) + ] + + for row_key, row_values in self._sort_items(visible_items): + self.add_row(*[str(v) for v in row_values], key=row_key) def on_data_table_row_selected(self, event) -> None: """Handle row selection.""" diff --git a/tests/test_app.py b/tests/test_app.py index 05c63ed..2aba750 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -817,3 +817,229 @@ async def test_yaml_language_preserved_after_toggle(): await pilot.pause() text_area = screen.query_one("#yaml-content", TextArea) assert text_area.language == "yaml" + + +# --- Multi-column Sort Tests --- + +def test_resource_table_initial_sort_state(): + """ResourceTable starts with no sort columns and shift_held=False.""" + table = ResourceTable() + assert table._sort_columns == [] + assert table._column_keys == [] + assert table._shift_held is False + + +def test_resource_table_has_sort_attributes(): + """ResourceTable exposes all sort-related instance attributes.""" + table = ResourceTable() + assert hasattr(table, "_sort_columns") + assert hasattr(table, "_column_keys") + assert hasattr(table, "_shift_held") + + +def test_resource_table_sort_items_no_sort(): + """_sort_items returns items unchanged when no sort columns are set.""" + table = ResourceTable() + table._sort_columns = [] + items = [("row-0", ["b", "2"]), ("row-1", ["a", "1"])] + assert table._sort_items(items) == items + + +def test_resource_table_sort_items_ascending(): + """_sort_items sorts by column ascending.""" + table = ResourceTable() + table._sort_columns = [(0, False)] + items = [ + ("row-0", ["banana", "2"]), + ("row-1", ["apple", "1"]), + ("row-2", ["cherry", "3"]), + ] + result = table._sort_items(items) + assert [v[0] for _, v in result] == ["apple", "banana", "cherry"] + + +def test_resource_table_sort_items_descending(): + """_sort_items sorts by column descending.""" + table = ResourceTable() + table._sort_columns = [(0, True)] + items = [ + ("row-0", ["banana", "2"]), + ("row-1", ["apple", "1"]), + ("row-2", ["cherry", "3"]), + ] + result = table._sort_items(items) + assert [v[0] for _, v in result] == ["cherry", "banana", "apple"] + + +def test_resource_table_sort_items_multicolumn(): + """_sort_items handles multi-column sort with stable ordering.""" + table = ResourceTable() + # Primary sort: col 1 (Status) ascending; secondary: col 0 (Name) ascending + table._sort_columns = [(1, False), (0, False)] + items = [ + ("row-0", ["pod-b", "Running"]), + ("row-1", ["pod-a", "Pending"]), + ("row-2", ["pod-c", "Running"]), + ("row-3", ["pod-d", "Pending"]), + ] + result = table._sort_items(items) + names = [v[0] for _, v in result] + # Pending first (pod-a, pod-d), then Running (pod-b, pod-c) + assert names == ["pod-a", "pod-d", "pod-b", "pod-c"] + + +def test_resource_table_sort_items_case_insensitive(): + """_sort_items uses case-insensitive comparison.""" + table = ResourceTable() + table._sort_columns = [(0, False)] + items = [ + ("row-0", ["Zebra"]), + ("row-1", ["apple"]), + ("row-2", ["Banana"]), + ] + result = table._sort_items(items) + assert [v[0] for _, v in result] == ["apple", "Banana", "Zebra"] + + +def test_compute_next_sort_single_column(): + """First click sets primary sort ascending.""" + table = ResourceTable() + table._compute_next_sort(col_idx=1, shift=False) + assert table._sort_columns == [(1, False)] + + +def test_compute_next_sort_toggle_direction(): + """Clicking the same column twice toggles direction.""" + table = ResourceTable() + table._compute_next_sort(col_idx=1, shift=False) + assert table._sort_columns == [(1, False)] + table._compute_next_sort(col_idx=1, shift=False) + assert table._sort_columns == [(1, True)] + + +def test_compute_next_sort_new_primary(): + """Clicking a different column replaces primary sort.""" + table = ResourceTable() + table._compute_next_sort(col_idx=0, shift=False) + table._compute_next_sort(col_idx=2, shift=False) + assert table._sort_columns == [(2, False)] + + +def test_compute_next_sort_shift_adds_secondary(): + """Shift+click on new column adds it as secondary sort.""" + table = ResourceTable() + table._compute_next_sort(col_idx=0, shift=False) + table._compute_next_sort(col_idx=1, shift=True) + assert len(table._sort_columns) == 2 + assert table._sort_columns[0] == (0, False) + assert table._sort_columns[1] == (1, False) + + +def test_compute_next_sort_shift_toggles_existing(): + """Shift+click on already-sorted column toggles its direction.""" + table = ResourceTable() + table._compute_next_sort(col_idx=0, shift=False) + table._compute_next_sort(col_idx=1, shift=True) + assert table._sort_columns[1] == (1, False) + table._compute_next_sort(col_idx=1, shift=True) + assert table._sort_columns[1] == (1, True) + + +def test_compute_next_sort_max_three_columns(): + """Multi-column sort supports at most 3 columns; oldest is evicted.""" + table = ResourceTable() + table._compute_next_sort(col_idx=0, shift=False) + table._compute_next_sort(col_idx=1, shift=True) + table._compute_next_sort(col_idx=2, shift=True) + assert len(table._sort_columns) == 3 + table._compute_next_sort(col_idx=3, shift=True) + assert len(table._sort_columns) == 3 + # col_idx=0 (oldest) should have been evicted + assert all(idx != 0 for idx, _ in table._sort_columns) + + +@pytest.mark.asyncio +async def test_resource_table_sort_resets_on_column_change(): + """Sort resets when populate_resources is called with different columns.""" + app = GantryApp() + async with app.run_test() as pilot: + screen = app.screen + assert isinstance(screen, ClusterScreen) + table = screen.query_one("#resource-table", ResourceTable) + + table.populate_resources( + [{"name": "pod-a", "status": "Running", "age": "1d"}], + ["Name", "Status", "Age"], + ["name", "status", "age"], + ) + await pilot.pause() + + table._compute_next_sort(col_idx=0, shift=False) + assert len(table._sort_columns) == 1 + + # Repopulate with different columns (simulate resource type switch) + table.populate_resources( + [{"name": "dep-a", "ready": "1/1", "age": "2d"}], + ["Name", "Ready", "Age"], + ["name", "ready", "age"], + ) + await pilot.pause() + + assert table._sort_columns == [] + + +@pytest.mark.asyncio +async def test_resource_table_sort_maintained_on_same_columns(): + """Sort is preserved when populate_resources is called with the same columns.""" + app = GantryApp() + async with app.run_test() as pilot: + screen = app.screen + assert isinstance(screen, ClusterScreen) + table = screen.query_one("#resource-table", ResourceTable) + + table.populate_resources( + [{"name": "pod-b"}, {"name": "pod-a"}], + ["Name"], + ["name"], + ) + await pilot.pause() + + table._compute_next_sort(col_idx=0, shift=False) + assert table._sort_columns == [(0, False)] + + # Refresh with same column layout + table.populate_resources( + [{"name": "pod-z"}, {"name": "pod-a"}], + ["Name"], + ["name"], + ) + await pilot.pause() + + assert table._sort_columns == [(0, False)] + + +@pytest.mark.asyncio +async def test_resource_table_sorted_rows_ascending(): + """Rows should appear in ascending order after sort is applied.""" + app = GantryApp() + async with app.run_test() as pilot: + screen = app.screen + assert isinstance(screen, ClusterScreen) + table = screen.query_one("#resource-table", ResourceTable) + + resources = [ + {"name": "pod-c", "status": "Running"}, + {"name": "pod-a", "status": "Pending"}, + {"name": "pod-b", "status": "Running"}, + ] + table.populate_resources(resources, ["Name", "Status"], ["name", "status"]) + await pilot.pause() + + table._compute_next_sort(col_idx=0, shift=False) + await pilot.pause() + + # Verify sort state is correct and _sort_items returns sorted order + assert table._sort_columns == [(0, False)] + sorted_items = table._sort_items(list(table._all_rows.items())) + sorted_names = [vals[0] for _, vals in sorted_items] + assert sorted_names == ["pod-a", "pod-b", "pod-c"]