Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Philip/fastapi updates #59

Open
wants to merge 17 commits into
base: jon/fastapi
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions skellycam/api/routes/http/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async def test_camera_connection(number_of_frames: int = 10) -> BaseResponse:
try:
# Record for the specified number of frames
connected_cameras = await controller.connect(number_of_frames=number_of_frames)
# await controller.close() # this isn't correct, but when connect returns we do want to call close for everything to work properly
logger.api("`/connect/test` GET request handled successfully.")
return CamerasConnectedResponse(connected_cameras=connected_cameras)
except Exception as e:
Expand Down
42 changes: 36 additions & 6 deletions skellycam/core/cameras/config/apply_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ class FailedToApplyCameraConfigurationError(Exception):
pass


def apply_camera_configuration(cv2_vid_capture: cv2.VideoCapture, config: CameraConfig) -> CameraConfig:
def apply_camera_configuration(
cv2_vid_capture: cv2.VideoCapture, config: CameraConfig
) -> CameraConfig:
# set camera stream parameters

logger.info(
Expand All @@ -34,12 +36,16 @@ def apply_camera_configuration(cv2_vid_capture: cv2.VideoCapture, config: Camera
cv2_vid_capture.set(cv2.CAP_PROP_FRAME_HEIGHT, config.resolution.height)
cv2_vid_capture.set(cv2.CAP_PROP_FPS, config.frame_rate)
cv2_vid_capture.set(
cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*config.capture_fourcc)
cv2.CAP_PROP_FOURCC, cv2.VideoWriter.fourcc(*config.capture_fourcc)
)
extracted_config = extract_config_from_cv2_capture(camera_id=config.camera_id,
cv2_capture=cv2_vid_capture,
rotation=config.rotation,
use_this_camera=config.use_this_camera)
extracted_config = extract_config_from_cv2_capture(
camera_id=config.camera_id,
cv2_capture=cv2_vid_capture,
rotation=config.rotation,
use_this_camera=config.use_this_camera,
)

verify_applied_config(provided_config=config, extracted_config=extracted_config) #TODO: we might not want to error out here, although a mismatch in configs could cause problems elsewhere

return extracted_config
except Exception as e:
Expand All @@ -48,3 +54,27 @@ def apply_camera_configuration(cv2_vid_capture: cv2.VideoCapture, config: Camera
raise FailedToApplyCameraConfigurationError(
f"Failed to apply configuration to Camera {config.camera_id} - {type(e).__name__} - {e}"
)


def verify_applied_config(
provided_config: CameraConfig, extracted_config: CameraConfig
) -> None:
# TODO: the __eq__ method in Camera Config achieves this, but without good reporting on where they aren't equal
assert (
extracted_config.camera_id == provided_config.camera_id
), f"Provided camera id {provided_config.camera_id} does not match extracted camera id {extracted_config.camera_id}"
assert (
extracted_config.exposure == provided_config.exposure
), f"Provided camera exposure {provided_config.exposure} does not match extracted camera exposure {extracted_config.exposure}"
assert (
extracted_config.resolution.height == provided_config.resolution.height
), f"Provided height {provided_config.resolution.height} does not match extracted height {extracted_config.resolution.height}"
assert (
extracted_config.resolution.width == provided_config.resolution.width
), f"Provided width {provided_config.resolution.width} does not match extracted width {extracted_config.resolution.width}"
assert (
extracted_config.frame_rate == provided_config.frame_rate
), f"Provided framerate {provided_config.frame_rate} does not match extracted framerate {extracted_config.frame_rate}"
assert (
extracted_config.capture_fourcc == provided_config.capture_fourcc
), f"Provided fourcc {provided_config.capture_fourcc} does not match extracted fourcc {extracted_config.capture_fourcc}"
4 changes: 2 additions & 2 deletions skellycam/core/cameras/config/default_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from skellycam.core.detection.image_resolution import ImageResolution
from skellycam.core.detection.image_rotation_types import RotationTypes

DEFAULT_EXPOSURE = -7
DEFAULT_EXPOSURE = 0 # The -7 was breaking my integrated webcam, and there's no easy way to change this on the swaggerui

DEFAULT_IMAGE_HEIGHT: int = 1080
DEFAULT_IMAGE_WIDTH: int = 1920
Expand All @@ -24,4 +24,4 @@ class DefaultCameraConfig(Enum):
FRAMERATE: float = DEFAULT_FRAME_RATE
ROTATION: RotationTypes = RotationTypes.NO_ROTATION
CAPTURE_FOURCC: str = "MJPG" # TODO - consider other capture codecs
WRITER_FOURCC: str = "MP4V" # TODO - consider other writer codecs
WRITER_FOURCC: str = "mp4v" # TODO - consider other writer codecs
11 changes: 9 additions & 2 deletions skellycam/core/cameras/group/camera_group.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
import logging
import multiprocessing
from multiprocessing import Queue
from multiprocessing.synchronize import Event as MultiprocessingEvent
from typing import Optional

from skellycam.core.cameras.config.camera_config import CameraConfigs
from skellycam.core.cameras.group.camera_group_process import CameraGroupProcess
from skellycam.core.consumers.frame_consumer_process import FrameConsumerProcess

logger = logging.getLogger(__name__)


class CameraGroup:
def __init__(
self,
consumer_queue: Queue, # TODO: include in tests
exit_event: MultiprocessingEvent,
):
self._exit_event = multiprocessing.Event()
self._exit_event = exit_event
self._process: Optional[CameraGroupProcess] = None

self._consumer_queue = consumer_queue

@property
def camera_ids(self):
if self._process is None:
Expand All @@ -24,6 +30,7 @@ def camera_ids(self):
def set_camera_configs(self, configs: CameraConfigs):
logger.debug(f"Setting camera configs to {configs}")
self._process = CameraGroupProcess(camera_configs=configs,
consumer_queue=self._consumer_queue,
exit_event=self._exit_event, )

async def start(self, number_of_frames: Optional[int] = None):
Expand Down
8 changes: 6 additions & 2 deletions skellycam/core/cameras/group/camera_group_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def camera_group_trigger_loop(
group_orchestrator.trigger_multi_frame_read()

if number_of_frames is not None:
check_loop_count(number_of_frames, loop_count, exit_event)
check_loop_count(number_of_frames, loop_count, exit_event, group_orchestrator)

if loop_count > 0:
elapsed_per_loop_ns.append((time.perf_counter_ns() - tik))
Expand Down Expand Up @@ -81,8 +81,12 @@ def log_time_stats(camera_configs: CameraConfigs,
)


def check_loop_count(number_of_frames: int, loop_count: int, exit_event: multiprocessing.Event):
def check_loop_count(number_of_frames: int, loop_count: int, exit_event: multiprocessing.Event, group_orchestrator: CameraGroupOrchestrator):
if number_of_frames is not None:
if loop_count + 1 >= number_of_frames:
# TODO: we were setting the exit event immediately after triggering the cameras, causing issues downstream
# in particular, sometimes we stop listening for frames before the last frame is captured
# is there a way to set the exit event at the end of the camera loop/once frames are put into shared memory?
# waiting 30 ms seems to do the trick, but doesn't seem reliable
logger.trace(f"Reached number of frames: {number_of_frames} - setting `exit` event")
exit_event.set()
4 changes: 2 additions & 2 deletions skellycam/core/cameras/group/camera_group_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def from_camera_configs(cls,
camera_triggers={
camera_id: CameraTriggers.from_camera_id(camera_id=camera_id,
exit_event=exit_event)
for camera_id, camera_config in camera_configs.items()
for camera_id in camera_configs.keys()
},
_exit_event=exit_event
)
Expand Down Expand Up @@ -136,7 +136,7 @@ def _clear_retrieve_frames_triggers(self):
def _await_frames_copied(self):
while self.new_frames_available and self.should_continue:
wait_1us()
self._clear_retrieve_frames_triggers()
self._clear_retrieve_frames_triggers() # TODO: we're already clearing these triggers in await_new_frames_available

def _fire_grab_trigger(self):
logger.loop("Triggering all cameras to `grab` a frame...")
Expand Down
22 changes: 15 additions & 7 deletions skellycam/core/cameras/group/camera_group_process.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import logging
import multiprocessing
from multiprocessing import Process
from typing import Optional
from multiprocessing import Process, Queue
from multiprocessing.synchronize import Event as MultiprocessingEvent
from typing import List, Optional

from skellycam.core import CameraId
from skellycam.core.cameras.config.camera_config import CameraConfigs
from skellycam.core.cameras.group.camera_group_loop import camera_group_trigger_loop
from skellycam.core.cameras.group.camera_group_orchestrator import CameraGroupOrchestrator

from skellycam.core.frames.frame_wrangler import FrameWrangler
from skellycam.core.memory.camera_shared_memory_manager import CameraGroupSharedMemory

Expand All @@ -17,26 +19,31 @@ class CameraGroupProcess:
def __init__(
self,
camera_configs: CameraConfigs,
exit_event: multiprocessing.Event,
consumer_queue: Queue, # TODO: include in tests
exit_event: MultiprocessingEvent,
):
self._camera_configs = camera_configs
self._exit_event = exit_event

self._process: Optional[Process] = None

def _create_process(self, number_of_frames: Optional[int] = None):
self._consumer_queue = consumer_queue

def _create_process(self, number_of_frames: Optional[int] = None): # TODO: this process does not seem to close properly on shutdown
self._process = Process(
name="MultiCameraTriggerProcess",
target=CameraGroupProcess._run_process,
args=(self._camera_configs,
self._consumer_queue,
self._exit_event,
number_of_frames
)
)

@staticmethod
def _run_process(configs: CameraConfigs,
exit_event: multiprocessing.Event,
consumer_queue: multiprocessing.Queue,
exit_event: MultiprocessingEvent,
number_of_frames: Optional[int] = None
):
group_orchestrator = CameraGroupOrchestrator.from_camera_configs(camera_configs=configs,
Expand All @@ -47,7 +54,8 @@ def _run_process(configs: CameraConfigs,
frame_wrangler = FrameWrangler(exit_event=exit_event,
camera_configs=configs,
group_shm_names=group_shm.shared_memory_names,
group_orchestrator=group_orchestrator)
group_orchestrator=group_orchestrator,
consumer_queue=consumer_queue)
try:
logger.debug(f"CameraGroupProcess started")
frame_wrangler.start()
Expand Down Expand Up @@ -80,5 +88,5 @@ def close(self):
logger.debug("CameraTriggerProcess closed")

@property
def camera_ids(self) -> [CameraId]:
def camera_ids(self) -> List[CameraId]:
return [CameraId(camera_id) for camera_id in self._camera_configs.keys()]
70 changes: 70 additions & 0 deletions skellycam/core/consumers/consumer_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import logging
from multiprocessing import Queue
import multiprocessing
from multiprocessing.synchronize import Event as MultiprocessingEvent
from typing import Optional

from skellycam.core.consumers.frame_consumer_process import FrameConsumerProcess

logger = logging.getLogger(__name__)


class ConsumerManager:
"""I broke the recorder with this. The point of it is to separate the running of the process from the control of it,
so we can close the process from outside the process. But it's not closing properly, and the camera doesn't even disconnect now (so something is blocking)
"""

def __init__(
self,
exit_event: MultiprocessingEvent,
recording_event: MultiprocessingEvent,
display_queue: Optional[Queue] = Queue(),
recording_queue: Optional[Queue] = Queue(),
output_queue: Optional[Queue] = Queue(),
):
self.exit_event = exit_event
self.recording_event = recording_event
self.display_queue = display_queue
self.recording_queue = recording_queue
self.output_queue = output_queue

self.consumer_queue = Queue()

self._process: Optional[multiprocessing.Process] = None

def start_process(self):
if self._process is not None and self._process.is_alive():
self.close_process()
# raise RuntimeError("Process is already running") # TODO: we might not want to error here, or we need to be more careful to avoid states where this happens
# I ran into this after "ensure cameras are ready" errored - ideally that error stops the execution and we don't get to here.

if self.exit_event.is_set():
self.exit_event.clear()

self._process = self._setup_process()
try:
logger.debug("Frame Consumer Process starting")
self._process.start()
finally:
logger.debug("Closing Frame Consumer Process")
self.close_process()
logger.debug("Frame Consumer Process closed")

def close_process(self):
if self._process and self._process.is_alive():
self._process.join()

def _setup_process(self) -> multiprocessing.Process:
frame_consumer_process = FrameConsumerProcess(
exit_event=self.exit_event,
recording_event=self.recording_event,
consumer_queue=self.consumer_queue,
display_queue=self.display_queue,
recording_queue=self.recording_queue,
output_queue=self.output_queue,
)

return multiprocessing.Process(
target=frame_consumer_process.run_process,
name=f"FrameConsumerProcess",
)
Loading