Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions pytrickle/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ def __init__(
self.output_queue = asyncio.Queue(maxsize=200)
self.data_queue: Deque[Any] = deque(maxlen=1000)

# Track queue size for clearing
self.max_queue_size = max_queue_size

# Optional frame skipper
if frame_skip_config is not None:
self.frame_skipper = AdaptiveFrameSkipper(
Expand Down Expand Up @@ -137,6 +140,34 @@ async def start(self, request_id: str = "default"):
if self.frame_skipper:
self.frame_skipper.reset()

async def clear_input_queues(self):
"""
Clear pending frames from input queues.

Useful when parameters change and old queued frames become stale.
This prevents processing old frames with new parameters.
"""
# Clear video queue
cleared_video = 0
while not self.video_input_queue.empty():
try:
self.video_input_queue.get_nowait()
cleared_video += 1
except asyncio.QueueEmpty:
break

# Clear audio queue
cleared_audio = 0
while not self.audio_input_queue.empty():
try:
self.audio_input_queue.get_nowait()
cleared_audio += 1
except asyncio.QueueEmpty:
break

if cleared_video or cleared_audio:
logger.info(f"Cleared stale frames from input queues: {cleared_video} video, {cleared_audio} audio")

async def stop(self):
"""Stop the trickle client."""
if not self.running:
Expand Down
16 changes: 16 additions & 0 deletions pytrickle/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def __init__(
app_kwargs: Optional[Dict[str, Any]] = None,
# Frame skipping configuration
frame_skip_config: Optional[FrameSkipConfig] = None,
clear_queues_on_update: bool = True,

):
"""Initialize StreamServer.
Expand All @@ -85,6 +86,7 @@ def __init__(
on_shutdown: List of shutdown handlers
app_kwargs: Additional kwargs for aiohttp.web.Application
frame_skip_config: Optional frame skipping configuration (None = no frame skipping)
clear_queues_on_update: Whether to clear client input queues before updating params
"""
self.frame_processor = frame_processor
self.port = port
Expand All @@ -109,6 +111,9 @@ def __init__(
# Frame skipping configuration
self.frame_skip_config = frame_skip_config

# Parameter update queue management
self.clear_queues_on_update = clear_queues_on_update

# Stream management - simple and direct
self.current_client: Optional[TrickleClient] = None
self.current_params: Optional[StreamStartRequest] = None
Expand Down Expand Up @@ -349,6 +354,8 @@ async def _handle_start_stream(self, request: web.Request) -> web.Response:
# Set params if provided
if params.params:
try:
# Clear input queues before setting params to avoid stale frames
await self.current_client.clear_input_queues()
await self.frame_processor.update_params(params.params)
except Exception as e:
logger.warning(f"Failed to set params: {e}")
Expand Down Expand Up @@ -423,8 +430,13 @@ async def _handle_control_message(self, control_data: dict):
"""Handle control messages from trickle protocol.

Routes control messages to the frame processor's update_params method.
Clears input queues before updating to avoid processing stale frames.
"""
try:
# Clear input queues to avoid processing stale frames with new parameters
if self.clear_queues_on_update and self.current_client:
await self.current_client.clear_input_queues()

await self.frame_processor.update_params(control_data)
logger.debug("Control message routed to frame processor")
except Exception as e:
Expand All @@ -443,6 +455,10 @@ async def _handle_update_params(self, request: web.Request) -> web.Response:
"message": "No active stream to update"
}, status=400)

# Clear input queues before updating to avoid processing stale frames
if self.clear_queues_on_update:
await self.current_client.clear_input_queues()

# Update frame processor parameters
await self.frame_processor.update_params(data)
logger.info(f"Parameters updated: {data}")
Expand Down
3 changes: 3 additions & 0 deletions pytrickle/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ def set_startup_complete(self) -> None:
self.startup_complete = True
# When startup completes, transition to IDLE (ready state)
if self._state == PipelineState.LOADING:
logger.info("State transition: LOADING → IDLE")
self.set_state(PipelineState.IDLE)
else:
logger.info(f"State already {self._state.name}, not transitioning")

def update_active_streams(self, count: int) -> None:
"""Update number of active streams for health/status reporting."""
Expand Down
6 changes: 6 additions & 0 deletions pytrickle/stream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def from_handlers(
port: int = 8000,
frame_skip_config: Optional[FrameSkipConfig] = None,
validate_signature: bool = True,
clear_queues_on_update: bool = True,
**server_kwargs
):
"""Construct a StreamProcessor by discovering handlers on *handler_instance*."""
Expand Down Expand Up @@ -73,6 +74,7 @@ def from_handlers(
port=port,
frame_skip_config=frame_skip_config,
validate_signature=validate_signature,
clear_queues_on_update=clear_queues_on_update,
**server_kwargs
)

Expand All @@ -92,6 +94,7 @@ def __init__(
name: str = "stream-processor",
port: int = 8000,
frame_skip_config: Optional[FrameSkipConfig] = None,
clear_queues_on_update: bool = True,
validate_signature: bool = True,
**server_kwargs
):
Expand All @@ -109,6 +112,7 @@ def __init__(
name: Processor name
port: Server port
frame_skip_config: Optional frame skipping configuration (None = no frame skipping)
clear_queues_on_update: Whether to flush client queues before applying updated params
**server_kwargs: Additional arguments passed to StreamServer
"""
# Validate that processors are async functions
Expand Down Expand Up @@ -140,6 +144,7 @@ def __init__(
self.name = name
self.port = port
self.frame_skip_config = frame_skip_config
self.clear_queues_on_update = clear_queues_on_update
self.server_kwargs = server_kwargs
self._handler_registry: Optional[HandlerRegistry] = None

Expand All @@ -159,6 +164,7 @@ def __init__(
frame_processor=self._frame_processor,
port=port,
frame_skip_config=frame_skip_config,
clear_queues_on_update=clear_queues_on_update,
**server_kwargs
)

Expand Down
Loading