|
7 | 7 | import tempfile |
8 | 8 | import time |
9 | 9 | from pathlib import Path |
10 | | -from typing import Any, Mapping, Optional |
| 10 | +from typing import Any, Dict, List, Mapping, Optional, Tuple |
11 | 11 |
|
12 | 12 | import openai |
13 | 13 | import sentry_sdk |
@@ -166,6 +166,91 @@ def format_actions(llm_output: dict) -> str: |
166 | 166 | return "" |
167 | 167 |
|
168 | 168 |
|
| 169 | +def _split_src(src: str) -> List[str]: |
| 170 | + """Split the source string into words, ignoring extra spaces.""" |
| 171 | + return [w for w in src.split() if w] |
| 172 | + |
| 173 | + |
| 174 | +def _find_consecutive_exact( |
| 175 | + tokens: List[str], src_words: List[str] |
| 176 | +) -> List[Tuple[int, int]]: |
| 177 | + """Find all occurrences of src_words as consecutive tokens in the tokens list.""" |
| 178 | + wins: List[Tuple[int, int]] = [] |
| 179 | + if not tokens or not src_words: |
| 180 | + return wins |
| 181 | + n, m = len(tokens), len(src_words) |
| 182 | + i = 0 |
| 183 | + while i <= n - m: |
| 184 | + if tokens[i : i + m] == src_words: |
| 185 | + wins.append((i, i + m - 1)) |
| 186 | + i += m |
| 187 | + else: |
| 188 | + i += 1 |
| 189 | + return wins |
| 190 | + |
| 191 | + |
| 192 | +def _merge_slice(words: List[Dict[str, Any]], i: int, j: int, dst: str) -> None: |
| 193 | + """Merge a slice of word dicts from index i to j into a single word dict.""" |
| 194 | + first, last = words[i], words[j] |
| 195 | + merged = dict(first) |
| 196 | + merged["word"] = dst |
| 197 | + if "start" in first: |
| 198 | + merged["start"] = first["start"] |
| 199 | + if "end" in last: |
| 200 | + merged["end"] = last["end"] |
| 201 | + words[i : j + 1] = [merged] |
| 202 | + |
| 203 | + |
| 204 | +def _apply_seq_replacements_exact( |
| 205 | + words: List[Dict[str, Any]], repls: List[Tuple[str, str]] |
| 206 | +) -> None: |
| 207 | + """Aplly sequential exact replacements in a list of word dicts.""" |
| 208 | + tokens = [w.get("word", "") for w in words] |
| 209 | + for old, new in repls: |
| 210 | + if not old: |
| 211 | + continue |
| 212 | + src_words = _split_src(old) |
| 213 | + if not src_words: |
| 214 | + continue |
| 215 | + matches = _find_consecutive_exact(tokens, src_words) |
| 216 | + if not matches: |
| 217 | + continue |
| 218 | + for i, j in reversed(matches): |
| 219 | + _merge_slice(words, i, j, new) |
| 220 | + tokens[i : j + 1] = [new] |
| 221 | + |
| 222 | + |
| 223 | +def replace_in_transcription( |
| 224 | + transcription: Dict[str, Any] | Any, |
| 225 | + replacements: List[Tuple[str, str]], |
| 226 | +) -> Dict[str, Any] | Any: |
| 227 | + """Apply string replacements in the transcription data structure.""" |
| 228 | + segments = ( |
| 229 | + transcription.segments |
| 230 | + if hasattr(transcription, "segments") |
| 231 | + else transcription.get("segments", []) |
| 232 | + ) |
| 233 | + word_segments = ( |
| 234 | + transcription.word_segments |
| 235 | + if hasattr(transcription, "word_segments") |
| 236 | + else transcription.get("word_segments", []) |
| 237 | + ) |
| 238 | + |
| 239 | + for seg in segments: |
| 240 | + if isinstance(seg, dict) and isinstance(seg.get("text"), str): |
| 241 | + txt = seg["text"] |
| 242 | + for old, new in replacements: |
| 243 | + if old and old in txt: |
| 244 | + txt = txt.replace(old, new) |
| 245 | + if isinstance(seg, dict) and isinstance(seg.get("words"), list): |
| 246 | + _apply_seq_replacements_exact(seg["words"], replacements) |
| 247 | + seg["text"] = txt |
| 248 | + if isinstance(word_segments, list): |
| 249 | + _apply_seq_replacements_exact(word_segments, replacements) |
| 250 | + |
| 251 | + return transcription |
| 252 | + |
| 253 | + |
169 | 254 | def format_segments(transcription_data): |
170 | 255 | """Format transcription segments from WhisperX into a readable conversation format. |
171 | 256 |
|
@@ -193,9 +278,6 @@ def format_segments(transcription_data): |
193 | 278 | else: |
194 | 279 | formatted_output += f" {text}" |
195 | 280 | previous_speaker = speaker |
196 | | - formatted_output = formatted_output.replace( |
197 | | - "Vap'n'Roll Thierry", "[texte impossible à transcrire]" |
198 | | - ) |
199 | 281 | return formatted_output |
200 | 282 |
|
201 | 283 |
|
@@ -304,6 +386,11 @@ def process_audio_transcribe_summarize_v2( |
304 | 386 | os.remove(temp_file_path) |
305 | 387 | logger.debug("Temporary file removed: %s", temp_file_path) |
306 | 388 |
|
| 389 | + transcription = replace_in_transcription( |
| 390 | + transcription, replacements=settings.replacement_sequence |
| 391 | + ) |
| 392 | + |
| 393 | + logger.debug("Transcription after replacements: \n %s", transcription) |
307 | 394 | formatted_transcription = ( |
308 | 395 | DEFAULT_EMPTY_TRANSCRIPTION |
309 | 396 | if not transcription.segments |
|
0 commit comments