Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/byte track 0 fps with webrtc #743

Merged
merged 10 commits into from
Oct 11, 2024
61 changes: 0 additions & 61 deletions inference/core/interfaces/camera/entities.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
import logging
import time
from collections import deque
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from threading import Event, Lock
from typing import Callable, Dict, Optional, Tuple, Union

import numpy as np

from inference.core import logger
from inference.core.utils.function import experimental

FrameTimestamp = datetime
FrameID = int

Expand Down Expand Up @@ -103,59 +97,4 @@ def initialize_source_properties(self, properties: Dict[str, float]):
pass


class WebRTCVideoFrameProducer(VideoFrameProducer):
@experimental(
reason="Usage of WebRTCVideoFrameProducer with `InferencePipeline` is an experimental feature."
"Please report any issues here: https://github.com/roboflow/inference/issues"
)
def __init__(
self, to_inference_queue: deque, to_inference_lock: Lock, stop_event: Event
):
self.to_inference_queue: deque = to_inference_queue
self.to_inference_lock: Lock = to_inference_lock
self._stop_event = stop_event
self._w: Optional[int] = None
self._h: Optional[int] = None
self._fps_buff = []
self._is_opened = True

def grab(self) -> bool:
return self._is_opened

def retrieve(self) -> Tuple[bool, np.ndarray]:
while not self._stop_event.is_set() and not self.to_inference_queue:
time.sleep(0.1)
if self._stop_event.is_set():
logger.info("Received termination signal, closing.")
self._is_opened = False
return False, None
with self.to_inference_lock:
img = self.to_inference_queue.pop()
return True, img

def release(self):
self._is_opened = False

def isOpened(self) -> bool:
return self._is_opened

def discover_source_properties(self) -> SourceProperties:
max_ts = max(self._fps_buff, key=lambda x: x["ts"]) if self._fps_buff else 0
min_ts = min(self._fps_buff, key=lambda x: x["ts"]) if self._fps_buff else 0
if max_ts == min_ts:
max_ts += 0.1
fps = len(self._fps_buff) / (max_ts - min_ts)
return SourceProperties(
width=self._w,
height=self._h,
total_frames=-1,
is_file=False,
fps=fps,
is_reconnectable=False,
)

def initialize_source_properties(self, properties: Dict[str, float]):
pass


VideoSourceIdentifier = Union[str, int, Callable[[], VideoFrameProducer]]
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class InitialiseWebRTCPipelinePayload(InitialisePipelinePayload):
stream_output: Optional[List[str]] = Field(default_factory=list)
data_output: Optional[List[str]] = Field(default_factory=list)
webrtc_peer_timeout: float = 1
webcam_fps: Optional[float] = None


class ConsumeResultsPayload(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import signal
import threading
import time
from collections import deque
from dataclasses import asdict
from functools import partial
Expand All @@ -10,7 +11,6 @@
from types import FrameType
from typing import Deque, Dict, Optional, Tuple

from aiortc import RTCPeerConnection
from pydantic import ValidationError

from inference.core import logger
Expand All @@ -19,10 +19,7 @@
RoboflowAPINotAuthorizedError,
RoboflowAPINotNotFoundError,
)
from inference.core.interfaces.camera.entities import (
VideoFrame,
WebRTCVideoFrameProducer,
)
from inference.core.interfaces.camera.entities import VideoFrame
from inference.core.interfaces.camera.exceptions import StreamOperationNotAllowedError
from inference.core.interfaces.http.orjson_utils import (
serialise_single_workflow_result_element,
Expand All @@ -41,12 +38,13 @@
InitialisePipelinePayload,
InitialiseWebRTCPipelinePayload,
OperationStatus,
WebRTCOffer,
)
from inference.core.interfaces.stream_manager.manager_app.serialisation import (
describe_error,
)
from inference.core.interfaces.stream_manager.manager_app.webrtc import (
RTCPeerConnectionWithFPS,
WebRTCVideoFrameProducer,
init_rtc_peer_connection,
)
from inference.core.workflows.execution_engine.entities.base import WorkflowImageData
Expand Down Expand Up @@ -202,18 +200,13 @@ def _start_webrtc(self, request_id: str, payload: dict):
watchdog = BasePipelineWatchDog()

webrtc_offer = parsed_payload.webrtc_offer
webcam_fps = parsed_payload.webcam_fps
to_inference_queue = deque()
to_inference_lock = Lock()
from_inference_queue = deque()
from_inference_lock = Lock()

stop_event = Event()
webrtc_producer = partial(
WebRTCVideoFrameProducer,
to_inference_lock=to_inference_lock,
to_inference_queue=to_inference_queue,
stop_event=stop_event,
)

def start_loop(loop: asyncio.AbstractEventLoop):
asyncio.set_event_loop(loop)
Expand All @@ -232,10 +225,19 @@ def start_loop(loop: asyncio.AbstractEventLoop):
from_inference_lock=from_inference_lock,
webrtc_peer_timeout=parsed_payload.webrtc_peer_timeout,
feedback_stop_event=stop_event,
webcam_fps=webcam_fps,
),
loop,
)
peer_connection = future.result()
peer_connection: RTCPeerConnectionWithFPS = future.result()

webrtc_producer = partial(
WebRTCVideoFrameProducer,
to_inference_lock=to_inference_lock,
to_inference_queue=to_inference_queue,
stop_event=stop_event,
webrtc_video_transform_track=peer_connection.video_transform_track,
)

def webrtc_sink(
prediction: Dict[str, WorkflowImageData], video_frame: VideoFrame
Expand Down
Loading
Loading