Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions services/analysis-engine/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ version = "0.1.0"
description = "BandScope local-first analysis engine"
requires-python = ">=3.12"
dependencies = [
"demucs>=4.0.1",
"librosa>=0.11.0",
"numba<0.63.0",
"soundfile>=0.13.1",
"torch>=2.11.0,<2.12.0",
"torchaudio>=2.11.0,<2.12.0",
"urllib3>=2.7.0",
"yt-dlp>=2026.3.17",
]
Expand Down
4 changes: 3 additions & 1 deletion services/analysis-engine/src/bandscope_analysis/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,9 @@ def build_demo_rehearsal_song() -> RehearsalSong:
}


def run_analysis_job(job_id: str, payload: object, requested_at: str) -> AnalysisJobStatus:
def run_analysis_job(
job_id: str, payload: object, requested_at: str, stems: dict[str, Any] | None = None
) -> AnalysisJobStatus:
"""Return a structured orchestration response for a validated analysis job."""
try:
request = validate_analysis_job_request(payload)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Chord recognizer using librosa's chromagrams."""

from typing import TypedDict
from typing import Any, TypedDict

import librosa
import numpy as np
from numpy.typing import NDArray


class TrackedChord(TypedDict):
Expand All @@ -24,7 +25,7 @@ def __init__(self) -> None:
self.templates = self._build_templates()
self.chord_labels = self._build_labels()

def _build_templates(self) -> np.ndarray:
def _build_templates(self) -> NDArray[np.floating[Any]]:
"""Build chromagram templates for 24 major and minor chords."""
templates = np.zeros((24, 12))
for i in range(12):
Expand Down Expand Up @@ -53,7 +54,7 @@ def _build_labels(self) -> list[str]:
labels.append(f"{note}m") # Minor
return labels

def recognize(self, y: np.ndarray, sr: int = 22050) -> list[TrackedChord]:
def recognize(self, y: NDArray[np.floating[Any]], sr: int = 22050) -> list[TrackedChord]:
"""
Recognize chords in an audio array using chromagrams.

Expand Down
59 changes: 41 additions & 18 deletions services/analysis-engine/src/bandscope_analysis/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def main() -> int:
input_data = sys.stdin.read().strip()

# Check if there are command line arguments (fallback for manual testing)
separate_stems = "--separate-stems" in sys.argv
if len(sys.argv) > 1:
if sys.argv[1] == "--status":
json.dump(get_analysis_status(), sys.stdout)
Expand Down Expand Up @@ -74,25 +75,47 @@ def main() -> int:

request = payload.get("request")

# Temporary: Inject temporal analyzer call if it's a local file, just to prove it works
# before full orchestrator integration
if (
isinstance(request, dict)
and request.get("sourceKind") == "local_audio"
and "localSource" in request
):
audio_path = request["localSource"].get("sourcePath")
if audio_path:
logging.info(f"Extracting temporal features from {audio_path}...")
try:
temporal_analyzer = TemporalAnalyzer()
features = temporal_analyzer.analyze(audio_path)
logging.info(f"Extracted BPM: {features['bpm']}")
except Exception as e:
logging.warning(f"Temporal analysis failed, continuing with mock: {e}")

requested_at = datetime.now(UTC).isoformat().replace("+00:00", "Z")
response = run_analysis_job(job_id, request, requested_at)

# Validate request
try:
from bandscope_analysis.api import validate_analysis_job_request

validate_analysis_job_request(request)
is_valid = True
except Exception:
is_valid = False

stems = None
if is_valid and separate_stems:
if (
isinstance(request, dict)
and request.get("sourceKind") == "local_audio"
and "localSource" in request
):
audio_path = request["localSource"].get("sourcePath")
if audio_path:
logging.info(f"Extracting temporal features from {audio_path}...")
try:
temporal_analyzer = TemporalAnalyzer()
features = temporal_analyzer.analyze(audio_path)
logging.info(f"Extracted BPM: {features['bpm']}")
except Exception as e:
logging.warning(f"Temporal analysis failed, continuing with mock: {e}")

logging.info(f"Performing stem separation on {audio_path}...")
# We do not swallow exceptions here per code review
import librosa

from bandscope_analysis.separation.audio_separator import AudioStemSeparator

# Load only the first 10 seconds for the CLI proof to prevent hanging
y, sr = librosa.load(audio_path, sr=44100, mono=False, duration=10.0)
separator = AudioStemSeparator()
stems = separator.separate_audio(y, sample_rate=int(sr), segment_seconds=2.0)
logging.info(f"Successfully extracted {len(stems)} stems: {list(stems.keys())}")

response = run_analysis_job(job_id, request, requested_at, stems=stems)
json.dump(response, sys.stdout)
return 0

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"""Pitch tracker using librosa's pYIN or YIN algorithm."""

import logging
from typing import Optional, TypedDict
from typing import Any, Optional, TypedDict

import librosa
import numpy as np
from numpy.typing import NDArray

logger = logging.getLogger(__name__)

Expand All @@ -20,7 +21,7 @@ class TrackedPitchRange(TypedDict):
class PitchTracker:
"""Extracts lowest and highest notes from audio data."""

def track(self, y: np.ndarray, sr: int = 22050) -> TrackedPitchRange:
def track(self, y: NDArray[np.floating[Any]], sr: int = 22050) -> TrackedPitchRange:
"""
Track pitch in an audio array and return the lowest/highest note.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
"""Audio source separation using Demucs."""

from __future__ import annotations

import logging
from typing import Any

import numpy as np
from numpy.typing import NDArray

logger = logging.getLogger(__name__)


class AudioStemSeparator:
"""Isolates standard stems from an audio mix using Demucs.

Security Notes:
- Trust boundary: Audio input is passed as raw numpy arrays from a prior decoding step
(e.g. librosa), reducing the risk of codec-based exploitation within Demucs itself.
- Limits: Employs chunked inference (split=True) to strictly bound peak memory (OOM avoidance).
- Network: Requires pre-provisioned model weights in the local model cache; it does not
download model artifacts at runtime.
"""

def __init__(self, model_name: str = "htdemucs") -> None:
"""Initialize the audio stem separator.

Args:
model_name: The name of the pretrained Demucs model to use.
"""
self.model_name = model_name
self._model = None

def _load_model(self) -> Any:
import hashlib
from pathlib import Path

from demucs.states import load_model

if self._model is None:
logger.info("Loading demucs model '%s'...", self.model_name)

cache_dir = Path.home() / ".cache" / "torch" / "hub" / "checkpoints"
expected_prefix = "f7e0c4bc"
model_file = cache_dir / f"{expected_prefix}-ba3fe64a.th"

if not model_file.exists():
raise RuntimeError(
f"Pre-provisioned model {self.model_name} not found at {model_file}"
)

# Verify checksum
sha256_hash = hashlib.sha256()
with open(model_file, "rb") as f:
for chunk in iter(lambda: f.read(4096 * 1024), b""):
sha256_hash.update(chunk)

if not sha256_hash.hexdigest().startswith(expected_prefix):
raise RuntimeError("Model checksum mismatch")

self._model = load_model(model_file) # type: ignore[no-untyped-call]
if self._model:
self._model.eval()
return self._model
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def separate_audio(
self,
audio_data: NDArray[np.floating[Any]],
sample_rate: int,
segment_seconds: float = 10.0,
) -> dict[str, NDArray[np.floating[Any]]]:
"""Perform source separation on the given audio array.

Args:
audio_data: The input audio waveform, shape (channels, samples).
If mono (samples,), it will be converted to stereo.
sample_rate: The sample rate of the input audio.
segment_seconds: The length of each chunk for OOM-safe processing.

Returns:
A dictionary mapping stem names ('vocals', 'bass', 'drums', 'other')
to their separated audio waveforms (channels, samples).
"""
import torch
from demucs.apply import apply_model
from demucs.audio import convert_audio

model = self._load_model()

# Ensure 2D (channels, samples)
if audio_data.ndim == 1:
audio_data = np.expand_dims(audio_data, axis=0)

# Convert to torch tensor
mix = torch.from_numpy(audio_data).float()

# Convert audio to match model expectations
mix = convert_audio( # type: ignore
mix,
sample_rate,
model.samplerate,
model.audio_channels,
)

# Add batch dimension: (1, channels, samples)
mix = mix.unsqueeze(0)

# Determine device
device = "cpu"
if torch.cuda.is_available():
device = "cuda"
elif torch.backends.mps.is_available():
device = "mps"

model.to(device)
mix = mix.to(device)

logger.info("Applying model to mix using device %s...", device)
# Apply model with chunking
with torch.no_grad():
stems = apply_model(
model,
mix,
shifts=1,
split=True,
overlap=0.25,
segment=segment_seconds,
progress=False,
)

# stems shape: [batch, sources, channels, samples]
# Remove batch dim
stems_np: NDArray[np.floating[Any]] = stems[0].cpu().numpy()

result: dict[str, NDArray[np.floating[Any]]] = {}
for idx, source_name in enumerate(model.sources):
result[source_name] = stems_np[idx]

return result
Loading
Loading