Skip to content
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/agents/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ COPY --from=builder /install /usr/local

COPY . .

CMD ["python", "multi-user-transcriber.py", "start"]
CMD ["python", "multi_user_transcriber.py", "start"]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Inconsistent startup command: Dockerfile vs. Helm configuration.

The Dockerfile invokes multi_user_transcriber.py start, but the Helm values (src/helm/env.d/dev-keycloak/values.meet.yaml.gotmpl, lines 276-279) override this with metadata_extractor.py start. This creates confusion about which script is the intended entrypoint for the metadata extraction agent.

For consistency and clarity, align the Dockerfile CMD with the intended agent script.

If metadata_extractor.py is the correct entrypoint, apply this diff:

-CMD ["python", "multi_user_transcriber.py", "start"]
+CMD ["python", "metadata_extractor.py", "start"]
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
CMD ["python", "multi_user_transcriber.py", "start"]
CMD ["python", "metadata_extractor.py", "start"]
🤖 Prompt for AI Agents
In src/agents/Dockerfile around line 24, the Dockerfile CMD currently starts the
wrong script ("multi_user_transcriber.py start") which conflicts with the Helm
values that expect "metadata_extractor.py start"; update the Dockerfile CMD to
invoke "metadata_extractor.py start" so the container entrypoint matches the
Helm configuration and avoid startup inconsistencies.

314 changes: 314 additions & 0 deletions src/agents/metadata_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,314 @@
"""Metadata agent that extracts metadata from active room."""

import asyncio
import json
import logging
import os
import uuid
from dataclasses import asdict, dataclass
from datetime import datetime
from io import BytesIO
from typing import List

from dotenv import load_dotenv
from livekit import rtc
from livekit.agents import (
Agent,
AgentSession,
AutoSubscribe,
JobContext,
JobProcess,
RoomInputOptions,
RoomIO,
RoomOutputOptions,
WorkerOptions,
cli,
utils,
)
from livekit.plugins import silero
from minio import Minio
from minio.error import S3Error

load_dotenv()

logger = logging.getLogger("metadata-extractor")

AGENT_NAME = os.getenv("ROOM_METADATA_AGENT_NAME", "metadata-extractor")

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Remove unused AGENT_NAME constant.

Line 36 defines AGENT_NAME, but it is never referenced. Please drop it to avoid shipping dead code.

🤖 Prompt for AI Agents
In src/agents/metadata_extractor.py around lines 36 to 37, remove the unused
AGENT_NAME constant declaration (AGENT_NAME =
os.getenv("ROOM_METADATA_AGENT_NAME", "metadata-extractor")) to eliminate dead
code; ensure no other references exist to that name (search file/project) before
deleting, then run tests/lint to confirm there are no remaining usages or
import-order changes required.


@dataclass
class MetadataEvent:
"""Wip."""

participant_id: str
type: str
timestamp: datetime

def to_dict(self) -> dict:
"""Return a JSON-serializable dictionary representation of the event."""
data = asdict(self)
data["timestamp"] = self.timestamp.isoformat()
return data


class VADAgent(Agent):
"""Agent that monitors voice activity for a specific participant."""

def __init__(self, participant_identity: str, events: List):
"""Wip."""
super().__init__(
instructions="not-needed",
)
self.participant_identity = participant_identity
self.events = events

async def on_enter(self) -> None:
"""Initialize VAD monitoring for this participant."""
@self.session.on("user_state_changed")
def on_user_state(event):
timestamp = datetime.now()

if event.new_state == "speaking":
event = MetadataEvent(
participant_id=self.participant_identity,
type="speech_start",
timestamp=timestamp,
)
self.events.append(event)

elif event.old_state == "speaking":
event = MetadataEvent(
participant_id=self.participant_identity,
type="speech_end",
timestamp=timestamp,
)
self.events.append(event)


class MetadataAgent:
"""Monitor and manage real-time metadata extraction from meeting rooms.
Oversees VAD (Voice Activity Detection) and participant metadata streams
to track and analyze real-time events, coordinating data collection across
participants for insights like speaking activity and engagement.
"""

def __init__(self, ctx: JobContext):
"""Initialize metadata agent."""
self.minio_client = Minio(
endpoint=os.getenv("AWS_S3_ENDPOINT_URL"),
access_key=os.getenv("AWS_S3_ACCESS_KEY_ID"),
secret_key=os.getenv("AWS_S3_SECRET_ACCESS_KEY"),
secure=os.getenv("AWS_S3_SECURE_ACCESS", "False").lower() == "true",
)

# todo - raise error if none
self.bucket_name = os.getenv("AWS_STORAGE_BUCKET_NAME")

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Raise when S3 endpoint or bucket env vars are missing.

Lines 98-107 build the MinIO client before verifying the required configuration. If AWS_S3_ENDPOINT_URL or AWS_STORAGE_BUCKET_NAME is absent (the TODO on Line 105), the agent survives initialization but crashes at shutdown when save() calls put_object, making the failure harder to diagnose. Fail fast by validating these env vars upfront.

-        self.minio_client = Minio(
-            endpoint=os.getenv("AWS_S3_ENDPOINT_URL"),
+        endpoint = os.getenv("AWS_S3_ENDPOINT_URL")
+        if not endpoint:
+            raise RuntimeError(
+                "AWS_S3_ENDPOINT_URL is required for metadata persistence."
+            )
+
+        self.bucket_name = os.getenv("AWS_STORAGE_BUCKET_NAME")
+        if not self.bucket_name:
+            raise RuntimeError(
+                "AWS_STORAGE_BUCKET_NAME is required for metadata persistence."
+            )
+
+        self.minio_client = Minio(
+            endpoint=endpoint,
             access_key=os.getenv("AWS_S3_ACCESS_KEY_ID"),
             secret_key=os.getenv("AWS_S3_SECRET_ACCESS_KEY"),
             secure=os.getenv("AWS_S3_SECURE_ACCESS", "False").lower() == "true",
         )
-
-        # todo - raise error if none
-        self.bucket_name = os.getenv("AWS_STORAGE_BUCKET_NAME")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def __init__(self, ctx: JobContext):
"""Initialize metadata agent."""
self.minio_client = Minio(
endpoint=os.getenv("AWS_S3_ENDPOINT_URL"),
access_key=os.getenv("AWS_S3_ACCESS_KEY_ID"),
secret_key=os.getenv("AWS_S3_SECRET_ACCESS_KEY"),
secure=os.getenv("AWS_S3_SECURE_ACCESS", "False").lower() == "true",
)
# todo - raise error if none
self.bucket_name = os.getenv("AWS_STORAGE_BUCKET_NAME")
def __init__(self, ctx: JobContext):
"""Initialize metadata agent."""
endpoint = os.getenv("AWS_S3_ENDPOINT_URL")
if not endpoint:
raise RuntimeError(
"AWS_S3_ENDPOINT_URL is required for metadata persistence."
)
self.bucket_name = os.getenv("AWS_STORAGE_BUCKET_NAME")
if not self.bucket_name:
raise RuntimeError(
"AWS_STORAGE_BUCKET_NAME is required for metadata persistence."
)
self.minio_client = Minio(
endpoint=endpoint,
access_key=os.getenv("AWS_S3_ACCESS_KEY_ID"),
secret_key=os.getenv("AWS_S3_SECRET_ACCESS_KEY"),
secure=os.getenv("AWS_S3_SECURE_ACCESS", "False").lower() == "true",
)
🤖 Prompt for AI Agents
In src/agents/metadata_extractor.py around lines 96 to 107, the Minio client and
bucket_name are created without validating required environment variables; add
upfront validation to fail fast: check that AWS_S3_ENDPOINT_URL and
AWS_STORAGE_BUCKET_NAME (and optionally
AWS_S3_ACCESS_KEY_ID/AWS_S3_SECRET_ACCESS_KEY if required) are present and raise
a clear exception (ValueError or custom) if missing before constructing Minio,
so initialization fails immediately with an explanatory message rather than
crashing later during save().

self.ctx = ctx
self._sessions: dict[str, AgentSession] = {}
self._tasks: set[asyncio.Task] = set()

# Storage for events
self.events = []
self.participants_seen = {}

logger.info("MetadataAgent initialized")

def start(self):
"""Start listening for participant connection events."""
self.ctx.room.on("participant_connected", self.on_participant_connected)
self.ctx.room.on("participant_disconnected", self.on_participant_disconnected)

logger.info("Started listening for participant events")

def save(self):
"""Wip."""
logger.info("Persisting processed metadata output to disk…")

participants = []
for k, v in self.participants_seen.items():
participants.append({"participantId": k, "name": v})

sorted_event = sorted(self.events, key=lambda e: e.timestamp)

payload = {
"events": [event.to_dict() for event in sorted_event],
"participants": participants,
}

object_name = f"speaker_logs/{str(uuid.uuid4())}.json"
data = json.dumps(payload, indent=2).encode("utf-8")

stream = BytesIO(data)

try:
self.minio_client.put_object(
self.bucket_name,
object_name,
stream,
length=len(data),
content_type="application/json",
)
logger.info(
"Uploaded speaker meeting metadata",
)
except S3Error:
logger.exception(
"Failed to upload meeting metadata",
)
Comment on lines 179 to 193
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Broaden exception handling to prevent uncaught errors.

Line 189 catches only S3Error, but other exceptions (e.g., network errors, credential issues, or SDK bugs) may occur during the upload. Uncaught exceptions could crash the agent during cleanup.

Apply this diff to catch all exceptions:

         try:
             self.minio_client.put_object(
                 self.bucket_name,
                 object_name,
                 stream,
                 length=len(data),
                 content_type="application/json",
             )
             logger.info(
                 "Uploaded speaker meeting metadata",
             )
-        except S3Error:
+        except Exception:
             logger.exception(
                 "Failed to upload meeting metadata",
             )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
try:
self.minio_client.put_object(
self.bucket_name,
object_name,
stream,
length=len(data),
content_type="application/json",
)
logger.info(
"Uploaded speaker meeting metadata",
)
except S3Error:
logger.exception(
"Failed to upload meeting metadata",
)
try:
self.minio_client.put_object(
self.bucket_name,
object_name,
stream,
length=len(data),
content_type="application/json",
)
logger.info(
"Uploaded speaker meeting metadata",
)
except Exception:
logger.exception(
"Failed to upload meeting metadata",
)
🤖 Prompt for AI Agents
In src/agents/metadata_extractor.py around lines 178 to 192, the current upload
block only catches S3Error which can leave other failures uncaught; change the
exception handling to catch all exceptions (except Exception as e) so
network/credential/SDK errors are handled, and log the full exception/traceback
(use logger.exception with a descriptive message) to avoid crashing during
cleanup while keeping the existing success log; do not re-raise so the agent can
continue cleanup.


async def aclose(self):
"""Close all sessions and cleanup resources."""
logger.info("Closing all VAD monitoring sessions…")

await utils.aio.cancel_and_wait(*self._tasks)

await asyncio.gather(
*[self._close_session(session) for session in self._sessions.values()],
return_exceptions=True,
)

self.ctx.room.off("participant_connected", self.on_participant_connected)
self.ctx.room.off("participant_disconnected", self.on_participant_disconnected)

logger.info("All VAD sessions closed")
self.save()

def on_participant_connected(self, participant: rtc.RemoteParticipant):
"""Handle new participant connection by starting VAD monitoring."""
if participant.identity in self._sessions:
logger.debug("Session already exists for %s", participant.identity)
return

self.events.append(
MetadataEvent(
participant_id=participant.identity,
type="participant_connected",
timestamp=datetime.now(),
)
)

self.participants_seen[participant.identity] = participant.name

logger.info("New participant connected: %s", participant.identity)
task = asyncio.create_task(self._start_session(participant))
self._tasks.add(task)

def on_task_done(task: asyncio.Task):
try:
self._sessions[participant.identity] = task.result()
except Exception:
logger.exception("Failed to start session for %s", participant.identity)
finally:
self._tasks.discard(task)

task.add_done_callback(on_task_done)

def on_participant_disconnected(self, participant: rtc.RemoteParticipant):
"""Handle participant disconnection by closing VAD monitoring."""
self.events.append(
MetadataEvent(
participant_id=participant.identity,
type="participant_disconnected",
timestamp=datetime.now(),
)
)

session = self._sessions.pop(participant.identity, None)
if session is None:
logger.debug("No session found for %s", participant.identity)
return

logger.info("Participant disconnected: %s", participant.identity)
task = asyncio.create_task(self._close_session(session))
self._tasks.add(task)

def on_close_done(_):
self._tasks.discard(task)
logger.info(
"VAD session closed for %s (remaining sessions: %d)",
participant.identity,
len(self._sessions),
)

task.add_done_callback(on_close_done)

async def _start_session(self, participant: rtc.RemoteParticipant) -> AgentSession:
"""Create and start VAD monitoring session for participant."""
if participant.identity in self._sessions:
return self._sessions[participant.identity]

# Create session with VAD only - no STT, LLM, or TTS
session = AgentSession(
vad=self.ctx.proc.userdata["vad"],
turn_detection="vad",
user_away_timeout=30.0,
)

# Set up room IO to receive audio from this specific participant
room_io = RoomIO(
agent_session=session,
room=self.ctx.room,
participant=participant,
input_options=RoomInputOptions(
audio_enabled=True,
text_enabled=False,
),
output_options=RoomOutputOptions(
audio_enabled=False,
transcription_enabled=False,
),
)

await room_io.start()
await session.start(
agent=VADAgent(
participant_identity=participant.identity, events=self.events
)
)

return session

async def _close_session(self, session: AgentSession) -> None:
"""Close and cleanup VAD monitoring session."""
try:
await session.drain()
await session.aclose()
except Exception:
logger.exception("Error closing session")


async def entrypoint(ctx: JobContext):
"""Initialize and run the multi-user VAD monitor."""
logger.info("Starting metadata agent in room: %s", ctx.room.name)

vad_monitor = MetadataAgent(ctx)
vad_monitor.start()

# Connect to room and subscribe to audio only
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

existing_participants = list(ctx.room.remote_participants.values())
for participant in existing_participants:
vad_monitor.on_participant_connected(participant)

async def cleanup():
logger.info("Shutting down VAD monitor...")
await vad_monitor.aclose()

ctx.add_shutdown_callback(cleanup)


def prewarm(proc: JobProcess):
"""Preload voice activity detection model."""
proc.userdata["vad"] = silero.VAD.load()


if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
prewarm_fnc=prewarm,
)
)
3 changes: 2 additions & 1 deletion src/agents/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ dependencies = [
"livekit-agents==1.2.6",
"livekit-plugins-deepgram==1.2.6",
"livekit-plugins-silero==1.2.6",
"python-dotenv==1.1.1"
"python-dotenv==1.1.1",
"minio==7.2.15"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

🧩 Analysis chain

Verify the minio package version for validity and security.

Since my knowledge cutoff is outdated, please verify that minio==7.2.15 is a valid, current, and secure version.

Run the following script to check for the latest version and any security advisories:


🏁 Script executed:

#!/bin/bash
# Description: Check PyPI for latest minio version and GitHub security advisories.

# Check PyPI for latest version
echo "=== Latest minio version on PyPI ==="
curl -s https://pypi.org/pypi/minio/json | jq -r '.info.version'

# Check for security advisories
echo -e "\n=== Security advisories for minio (Python) ==="
gh api graphql -f query='
{
  securityVulnerabilities(first: 10, ecosystem: PIP, package: "minio") {
    nodes {
      advisory {
        summary
        severity
        publishedAt
      }
      vulnerableVersionRange
      firstPatchedVersion {
        identifier
      }
    }
  }
}'

Length of output: 141


Upgrade minio dependency to 7.2.18
No security advisories were found; bump to the latest patch for bug fixes.

🤖 Prompt for AI Agents
In src/agents/pyproject.toml at line 11, the minio dependency is pinned to
"minio==7.2.15"; update this entry to "minio==7.2.18" to bump to the latest
patch release, then regenerate/update the lockfile (e.g., run your dependency
tool's lock/install command) to ensure the new version is reflected in the
lockfile and CI.

]

[project.optional-dependencies]
Expand Down
10 changes: 10 additions & 0 deletions src/helm/env.d/dev-keycloak/values.meet.yaml.gotmpl
Original file line number Diff line number Diff line change
Expand Up @@ -262,12 +262,22 @@ agents:
LIVEKIT_API_KEY: {{ $key }}
{{- end }}
{{- end }}
AWS_S3_ENDPOINT_URL: minio.meet.svc.cluster.local:9000
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Inconsistent S3 endpoint URL format.

Line 265 specifies AWS_S3_ENDPOINT_URL: minio.meet.svc.cluster.local:9000 without a protocol prefix, while other sections (lines 60, 151, 187, 224) use http://minio.meet.svc.cluster.local:9000. The MinIO Python client may require an explicit protocol.

Apply this diff to ensure consistency and prevent connection errors:

-    AWS_S3_ENDPOINT_URL: minio.meet.svc.cluster.local:9000
+    AWS_S3_ENDPOINT_URL: http://minio.meet.svc.cluster.local:9000
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
AWS_S3_ENDPOINT_URL: minio.meet.svc.cluster.local:9000
AWS_S3_ENDPOINT_URL: http://minio.meet.svc.cluster.local:9000
🤖 Prompt for AI Agents
In src/helm/env.d/dev-keycloak/values.meet.yaml.gotmpl around line 265, the
AWS_S3_ENDPOINT_URL is missing the protocol (currently
"minio.meet.svc.cluster.local:9000") while other entries use "http://...";
update this value to include the explicit protocol (e.g.
"http://minio.meet.svc.cluster.local:9000") to match the other entries and
ensure MinIO clients receive a full URL.

AWS_S3_ACCESS_KEY_ID: meet
AWS_S3_SECRET_ACCESS_KEY: password
AWS_S3_SECURE_ACCESS: False
AWS_STORAGE_BUCKET_NAME: meet-media-storage

image:
repository: localhost:5001/meet-agents
pullPolicy: Always
tag: "latest"

command:
- "python"
- "metadata_extractor.py"
- "start"

# Extra volume mounts to manage our local custom CA and avoid to disable ssl
extraVolumeMounts:
- name: certs
Expand Down
Loading