diff --git a/pytrickle/client.py b/pytrickle/client.py index 9d67e49..3d0aedd 100644 --- a/pytrickle/client.py +++ b/pytrickle/client.py @@ -64,6 +64,9 @@ def __init__( self.output_queue = asyncio.Queue(maxsize=200) self.data_queue: Deque[Any] = deque(maxlen=1000) + # Track queue size for clearing + self.max_queue_size = max_queue_size + # Optional frame skipper if frame_skip_config is not None: self.frame_skipper = AdaptiveFrameSkipper( @@ -137,6 +140,34 @@ async def start(self, request_id: str = "default"): if self.frame_skipper: self.frame_skipper.reset() + async def clear_input_queues(self): + """ + Clear pending frames from input queues. + + Useful when parameters change and old queued frames become stale. + This prevents processing old frames with new parameters. + """ + # Clear video queue + cleared_video = 0 + while not self.video_input_queue.empty(): + try: + self.video_input_queue.get_nowait() + cleared_video += 1 + except asyncio.QueueEmpty: + break + + # Clear audio queue + cleared_audio = 0 + while not self.audio_input_queue.empty(): + try: + self.audio_input_queue.get_nowait() + cleared_audio += 1 + except asyncio.QueueEmpty: + break + + if cleared_video or cleared_audio: + logger.info(f"Cleared stale frames from input queues: {cleared_video} video, {cleared_audio} audio") + async def stop(self): """Stop the trickle client.""" if not self.running: diff --git a/pytrickle/server.py b/pytrickle/server.py index a14823b..de0bcb3 100644 --- a/pytrickle/server.py +++ b/pytrickle/server.py @@ -63,6 +63,7 @@ def __init__( app_kwargs: Optional[Dict[str, Any]] = None, # Frame skipping configuration frame_skip_config: Optional[FrameSkipConfig] = None, + clear_queues_on_update: bool = True, ): """Initialize StreamServer. @@ -85,6 +86,7 @@ def __init__( on_shutdown: List of shutdown handlers app_kwargs: Additional kwargs for aiohttp.web.Application frame_skip_config: Optional frame skipping configuration (None = no frame skipping) + clear_queues_on_update: Whether to clear client input queues before updating params """ self.frame_processor = frame_processor self.port = port @@ -109,6 +111,9 @@ def __init__( # Frame skipping configuration self.frame_skip_config = frame_skip_config + # Parameter update queue management + self.clear_queues_on_update = clear_queues_on_update + # Stream management - simple and direct self.current_client: Optional[TrickleClient] = None self.current_params: Optional[StreamStartRequest] = None @@ -349,6 +354,8 @@ async def _handle_start_stream(self, request: web.Request) -> web.Response: # Set params if provided if params.params: try: + # Clear input queues before setting params to avoid stale frames + await self.current_client.clear_input_queues() await self.frame_processor.update_params(params.params) except Exception as e: logger.warning(f"Failed to set params: {e}") @@ -423,8 +430,13 @@ async def _handle_control_message(self, control_data: dict): """Handle control messages from trickle protocol. Routes control messages to the frame processor's update_params method. + Clears input queues before updating to avoid processing stale frames. """ try: + # Clear input queues to avoid processing stale frames with new parameters + if self.clear_queues_on_update and self.current_client: + await self.current_client.clear_input_queues() + await self.frame_processor.update_params(control_data) logger.debug("Control message routed to frame processor") except Exception as e: @@ -443,6 +455,10 @@ async def _handle_update_params(self, request: web.Request) -> web.Response: "message": "No active stream to update" }, status=400) + # Clear input queues before updating to avoid processing stale frames + if self.clear_queues_on_update: + await self.current_client.clear_input_queues() + # Update frame processor parameters await self.frame_processor.update_params(data) logger.info(f"Parameters updated: {data}") diff --git a/pytrickle/state.py b/pytrickle/state.py index 2781a52..b881373 100644 --- a/pytrickle/state.py +++ b/pytrickle/state.py @@ -149,7 +149,10 @@ def set_startup_complete(self) -> None: self.startup_complete = True # When startup completes, transition to IDLE (ready state) if self._state == PipelineState.LOADING: + logger.info("State transition: LOADING → IDLE") self.set_state(PipelineState.IDLE) + else: + logger.info(f"State already {self._state.name}, not transitioning") def update_active_streams(self, count: int) -> None: """Update number of active streams for health/status reporting.""" diff --git a/pytrickle/stream_processor.py b/pytrickle/stream_processor.py index 3686c39..6094dd3 100644 --- a/pytrickle/stream_processor.py +++ b/pytrickle/stream_processor.py @@ -35,6 +35,7 @@ def from_handlers( port: int = 8000, frame_skip_config: Optional[FrameSkipConfig] = None, validate_signature: bool = True, + clear_queues_on_update: bool = True, **server_kwargs ): """Construct a StreamProcessor by discovering handlers on *handler_instance*.""" @@ -73,6 +74,7 @@ def from_handlers( port=port, frame_skip_config=frame_skip_config, validate_signature=validate_signature, + clear_queues_on_update=clear_queues_on_update, **server_kwargs ) @@ -92,6 +94,7 @@ def __init__( name: str = "stream-processor", port: int = 8000, frame_skip_config: Optional[FrameSkipConfig] = None, + clear_queues_on_update: bool = True, validate_signature: bool = True, **server_kwargs ): @@ -109,6 +112,7 @@ def __init__( name: Processor name port: Server port frame_skip_config: Optional frame skipping configuration (None = no frame skipping) + clear_queues_on_update: Whether to flush client queues before applying updated params **server_kwargs: Additional arguments passed to StreamServer """ # Validate that processors are async functions @@ -140,6 +144,7 @@ def __init__( self.name = name self.port = port self.frame_skip_config = frame_skip_config + self.clear_queues_on_update = clear_queues_on_update self.server_kwargs = server_kwargs self._handler_registry: Optional[HandlerRegistry] = None @@ -159,6 +164,7 @@ def __init__( frame_processor=self._frame_processor, port=port, frame_skip_config=frame_skip_config, + clear_queues_on_update=clear_queues_on_update, **server_kwargs )