Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 102 additions & 29 deletions .github/scripts/triage/embedding_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import numpy as np
from numpy.typing import NDArray
from fastembed import TextEmbedding
from scipy.stats import chi2
from sklearn.decomposition import PCA
from sklearn.covariance import EllipticEnvelope
from sklearn.metrics.pairwise import cosine_similarity
Expand Down Expand Up @@ -78,19 +77,25 @@ def reduce_dimensions(

def detect_outliers(
matrix: NDArray[np.float32],
percentile: float = 0.997,
contamination: float = 0.1,
iqr_multiplier: float = 3.0,
max_outlier_pct: float = 0.05,
) -> list[tuple[int, float]]:
"""Flag items whose Mahalanobis distance exceeds a dimension-aware cutoff.
"""Flag items whose Mahalanobis distance exceeds an IQR-based cutoff.

Uses EllipticEnvelope (robust covariance via MCD) to estimate the
multivariate Gaussian, then computes sqrt(squared Mahalanobis distance)
for each sample. The cutoff is derived from the chi2 distribution with
k degrees of freedom (k = number of features), so it scales correctly
regardless of dimensionality. Returns (index, distance) tuples sorted
by index ascending.
for each sample. The cutoff is Q75 + iqr_multiplier * IQR, which
adapts to the actual distribution of distances.

A hard cap ensures no more than max_outlier_pct * n items are flagged;
when the cap is hit, only the most extreme items (sorted by distance
descending) are kept.

Returns (index, distance) tuples sorted by index ascending, along with
the cutoff value stored as an attribute on the returned list.
"""
n, k = matrix.shape
n = matrix.shape[0]
if n < 2:
return []

Expand All @@ -100,11 +105,34 @@ def detect_outliers(
# .mahalanobis() returns squared Mahalanobis distances
distances = np.sqrt(envelope.mahalanobis(matrix))

# Dimension-aware cutoff: sqrt(chi2.ppf(percentile, df=k))
cutoff = np.sqrt(chi2.ppf(percentile, df=k))
# IQR-based cutoff
q25, q75 = np.percentile(distances, [25, 75])
iqr = q75 - q25
cutoff = q75 + iqr_multiplier * iqr

outlier_mask = distances > cutoff
indices = np.where(outlier_mask)[0]
return [(int(idx), float(distances[idx])) for idx in indices]

# Hard cap: keep at most max_outlier_pct * n items
max_count = max(1, int(max_outlier_pct * n))
if len(indices) > max_count:
# Sort by distance descending, take the most extreme
sorted_by_dist = sorted(indices, key=lambda i: distances[i], reverse=True)
indices = np.array(sorted_by_dist[:max_count])

# Sort by index ascending for stable output
indices = np.sort(indices)
result = [(int(idx), float(distances[idx])) for idx in indices]

# Attach cutoff as metadata so the report can use it
result = _OutlierResult(result) # type: ignore[assignment]
result.cutoff = float(cutoff) # type: ignore[attr-defined]
return result # type: ignore[return-value]


class _OutlierResult(list):
"""A list subclass that carries metadata (cutoff) from outlier detection."""
cutoff: float = 0.0


def find_duplicate_pairs(
Expand Down Expand Up @@ -134,11 +162,22 @@ def find_duplicate_pairs(
return pairs


# ── Label suggestion via embedding similarity ────────────────────────
# ── Label suggestion via z-score normalized embedding similarity ──────

# Z-score threshold: a label must be this many standard deviations above
# the column mean to be considered a match.
LABEL_Z_THRESHOLD: float = 1.5

# Minimum similarity between an item and a label to suggest it.
# 0.4 is intentionally permissive — the report is for human review.
LABEL_SIMILARITY_THRESHOLD: float = 0.4
# Margin gate: the top-1 label must beat the second-best by this many
# z-score units to be accepted (subsequent labels don't need a margin).
LABEL_Z_MARGIN: float = 0.5

# Floor for per-column standard deviation to avoid division by near-zero.
LABEL_Z_STD_FLOOR: float = 0.01

# Minimum raw cosine similarity required even if z-score is high.
# Prevents suggesting labels that are "relatively best" but still poor.
MIN_RAW_SIMILARITY: float = 0.3

# Maximum number of labels to suggest per item.
MAX_LABELS_PER_ITEM: int = 3
Expand All @@ -148,37 +187,71 @@ def suggest_labels(
item_embeddings: NDArray[np.float32],
label_embeddings: NDArray[np.float32],
label_names: list[str],
threshold: float = LABEL_SIMILARITY_THRESHOLD,
z_threshold: float = LABEL_Z_THRESHOLD,
z_margin: float = LABEL_Z_MARGIN,
std_floor: float = LABEL_Z_STD_FLOOR,
min_raw_sim: float = MIN_RAW_SIMILARITY,
max_per_item: int = MAX_LABELS_PER_ITEM,
) -> list[list[tuple[str, float]]]:
"""Suggest labels for each item based on embedding similarity.
"""Suggest labels for each item using z-score normalized similarity.

Computes cosine similarity between item embeddings (n, 384) and
label embeddings (m, 384). For each item, returns the top-k labels
whose similarity exceeds the threshold, sorted by similarity descending.
1. Compute raw cosine similarity matrix (n items x m labels).
2. Column-wise z-score: for each label j, normalize across all items.
3. For each item, rank labels by z-score descending.
4. Accept a label only if z >= z_threshold AND raw_sim >= min_raw_sim.
5. Margin gate: the top-1 label must beat #2 by z_margin; subsequent
labels don't need a margin.
6. Cap at max_per_item.

Returns a list of length n, where each element is a list of
(label_name, similarity) tuples. Empty list if no label exceeds threshold.
(label_name, raw_similarity) tuples. Empty list if nothing qualifies.
"""
n = item_embeddings.shape[0]
m = label_embeddings.shape[0]
if n == 0 or m == 0:
return [[] for _ in range(n)]

# (n, m) similarity matrix: each row is one item vs all labels
# (n, m) raw similarity matrix
sim_matrix = cosine_similarity(item_embeddings, label_embeddings)

# Column-wise z-score normalization
col_means = sim_matrix.mean(axis=0) # shape (m,)
col_stds = sim_matrix.std(axis=0) # shape (m,)
col_stds = np.maximum(col_stds, std_floor)
z_matrix = (sim_matrix - col_means) / col_stds

suggestions: list[list[tuple[str, float]]] = []
for i in range(n):
row = sim_matrix[i]
# Indices sorted by similarity descending
ranked = np.argsort(row)[::-1]
z_row = z_matrix[i]
raw_row = sim_matrix[i]

# Rank labels by z-score descending
ranked = np.argsort(z_row)[::-1]

item_labels: list[tuple[str, float]] = []
for idx in ranked[:max_per_item]:
score = float(row[idx])
if score < threshold:

# Margin gate: top-1 z-score must beat #2 by z_margin.
# If not, the assignment is ambiguous — skip this item entirely.
if len(ranked) > 1:
top1_z = float(z_row[ranked[0]])
top2_z = float(z_row[ranked[1]])
if top1_z - top2_z < z_margin:
suggestions.append(item_labels)
continue

for rank_pos, idx in enumerate(ranked):
if len(item_labels) >= max_per_item:
break
item_labels.append((label_names[idx], score))

z_val = float(z_row[idx])
raw_val = float(raw_row[idx])

# Must pass both z-threshold and raw similarity floor
if z_val < z_threshold or raw_val < min_raw_sim:
continue

item_labels.append((label_names[idx], raw_val))

suggestions.append(item_labels)

return suggestions
Loading
Loading