Skip to content

Commit

Permalink
refactor: 0.1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
zaigie committed Dec 7, 2023
1 parent def95f2 commit ef4008f
Show file tree
Hide file tree
Showing 16 changed files with 571 additions and 477 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,5 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/

test.py
test.py
run.py
339 changes: 184 additions & 155 deletions README.md

Large diffs are not rendered by default.

313 changes: 171 additions & 142 deletions README.zh.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/img/flow.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
File renamed without changes
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ build-backend = "setuptools.build_meta"

[project]
name = "stream_infer"
version = "0.0.4"
description = "Real-time/offline analytics framework for video and streaming media"
version = "0.1.1"
description = "Real-time/offline inference framework for video and streaming media"
readme = "README.md"
license = { file = "LICENSE" }
authors = [
Expand Down
4 changes: 2 additions & 2 deletions src/stream_infer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
__version__ = "0.0.4"
__version__ = "0.1.1"

from .inference import Inference
from .trackers import FrameTracker, TrackerManager
from .dispatcher import Dispatcher, DispatcherManager
from .player import Player
1 change: 0 additions & 1 deletion src/stream_infer/collector/__init__.py

This file was deleted.

21 changes: 0 additions & 21 deletions src/stream_infer/collector/base.py

This file was deleted.

66 changes: 66 additions & 0 deletions src/stream_infer/dispatcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from multiprocessing.managers import BaseManager
from collections import deque
from typing import List, Any


class Dispatcher:
def __init__(self, max_size: int = 120):
self.queue = deque(maxlen=max_size)
self.current_time = 0
self.current_frame = 0
self.collect_results = {}

def get_size(self) -> int:
return len(self.queue)

def add_frame(self, frame):
self.queue.append(frame)
self.current_frame += 1

def get_frames(self, count: int, step: int) -> List[Any]:
if len(self.queue) < count * step:
return []

return list(self.queue)[-count * step :][::-1][::step][::-1]

def collect_result(self, inference_result):
if inference_result is not None:
time = str(inference_result[0])
name = inference_result[1]
data = inference_result[2]
if self.collect_results.get(name) is None:
self.collect_results[name] = {}
self.collect_results[name][time] = data

def clear(self):
self.queue.clear()
self.collect_results.clear()
self.current_time = 0
self.current_frame = 0

def increase_current_time(self):
self.current_time += 1

def get_current_time(self) -> int:
return self.current_time

def get_current_frame(self) -> int:
return self.current_frame


class DispatcherManager:
def __init__(self, obj=None):
self._manager = None
self._dispatcher = None
self._obj = Dispatcher if obj is None else obj

def create(self, max_size: int = 120):
if self._manager is None:
self._initialize_manager(max_size)
return self._dispatcher

def _initialize_manager(self, max_size):
BaseManager.register("Dispatcher", self._obj)
self._manager = BaseManager()
self._manager.start()
self._dispatcher = self._manager.Dispatcher(max_size)
63 changes: 28 additions & 35 deletions src/stream_infer/inference.py
Original file line number Diff line number Diff line change
@@ -1,66 +1,59 @@
import threading as th

from .timer import Timer
from .log import logger


class Inference:
def __init__(self, frame_tracker, collector=None):
self.frame_tracker = frame_tracker
self.collector = collector
def __init__(self, dispatcher):
self.dispatcher = dispatcher
self.inferences_info = []
self.timers = {}
self.is_stop = False

def load_algo(self, algo_instance, frame_count, frame_step, interval):
self.inferences_info.append((algo_instance, frame_count, frame_step, interval))
self.timers[algo_instance.name] = Timer(interval)
algo_instance.init()

def start(self, player, fps: int = None, is_offline: bool = False):
"""
Easy to use function to start inference with realtime mode.
"""
if is_offline:
for _, current_frame in player.play(fps):
self.auto_run_specific_inference(player.fps, current_frame)
else:
player.play_realtime(fps)
while player.is_active():
self.run_inference()

def run_inference(self):
# start_time = time.time()

def run(self):
for inference_info in self.inferences_info:
algo_instance, _, _, _ = inference_info
timer = self.timers[algo_instance.name]
if timer.is_time():
self._inference_task(inference_info)
self._infer(inference_info)

# end_time = time.time()
# elapsed_time = end_time - start_time
def run_loop(self):
while not self.is_stop:
self.run()

def auto_run_specific_inference(self, fps, current_frame) -> str:
def run_async(self):
thread = th.Thread(target=self.run_loop)
thread.start()
return thread

def stop(self):
self.is_stop = True

def auto_run_specific(self, fps, current_frame) -> str:
for algo_instance, _, _, frequency in self.inferences_info:
if current_frame % int(frequency * fps) == 0:
self.run_specific_inference(algo_instance.name)
self.run_specific(algo_instance.name)
return algo_instance.name

def run_specific_inference(self, algo_name):
def run_specific(self, algo_name):
for inference_info in self.inferences_info:
algo_instance, _, _, _ = inference_info
if algo_instance.name == algo_name:
self._inference_task(inference_info)

def run_inference_loop(self):
while True:
self.run_inference()
self._infer(inference_info)

def _inference_task(self, inference_info):
def _infer(self, inference_info):
algo_instance, frame_count, frame_step, _ = inference_info
frames = self.frame_tracker.get_frames(frame_count, frame_step)
frames = self.dispatcher.get_frames(frame_count, frame_step)
if not frames:
return -1
result = algo_instance.run(frames)
if self.collector:
self.collector.collect(
(self.frame_tracker.get_current_time(), algo_instance.name, result)
)
self.dispatcher.collect_result(
(self.dispatcher.get_current_time(), algo_instance.name, result)
)
return result
116 changes: 61 additions & 55 deletions src/stream_infer/player.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,106 +7,112 @@


class Player:
def __init__(self, producer, frame_tracker, path):
self.frame_tracker = frame_tracker
def __init__(self, dispatcher, producer, path):
self.dispatcher = dispatcher
self.producer = producer
self.path = path
self.process = None
self.fps = None
self.current_frame = 0

def play(self, fps=None):
try:
original_fps = self.producer.get_info(self.path)["fps"]
if fps is None:
fps = original_fps
self.fps = fps
self.info = self.producer.get_info(self.path)
except Exception as e:
raise ValueError(f"Error getting fps: {e}")
raise ValueError(f"Error getting info: {e}")
self.fps = self.info["fps"]
self.play_fps = self.fps
self.frame_count = self.info["frame_count"]
self.process = None
self.is_end = mp.Value("b", False)

def play(self, fps=None):
fps = self.fps if fps is None else fps
self.play_fps = fps
interval_count = 0

for frame in self.producer.read(self.path, fps):
self.frame_tracker.add_frame(frame)
for idx, frame in enumerate(self.producer.read(self.path, fps)):
self.dispatcher.add_frame(frame)
interval_count += 1
if interval_count >= fps:
interval_count = 0
self.frame_tracker.increase_current_time()
logger.debug(f"current time: {self.get_current_time_str()}")
self.current_frame += 1
yield frame, self.current_frame
self.dispatcher.increase_current_time()
logger.debug(f"current time: {self.get_play_time()}")
yield frame, self.dispatcher.get_current_frame()

def play_realtime(self, fps=None):
def play_async(self, fps=None):
"""
Starts the appropriate streaming process based on the frame count.
"""
if not isinstance(self.frame_tracker, BaseProxy):
if not isinstance(self.dispatcher, BaseProxy):
logger.error(
f"Frame tracker is not an proxy: {type(self.frame_tracker)}, use TrackerManager().create() to create one"
f"Dispatcher is not an proxy: {type(self.dispatcher)}, use DispatcherManager().create() to create one"
)
raise ValueError(
f"Frame tracker is not an proxy: {type(self.frame_tracker)}, use TrackerManager().create() to create one"
f"Dispatcher is not an proxy: {type(self.dispatcher)}, use DispatcherManager().create() to create one"
)
try:
info = self.producer.get_info(self.path)
frame_count = info["frame_count"]
original_fps = info["fps"]
if fps is None or fps >= original_fps:
fps = original_fps
if fps > 30:
logger.warning(
f"FPS {fps} is too high, if your player is playing more slowly than the actual time, set a lower fps"
)
self.fps = fps
except Exception as e:
raise ValueError(f"Error getting info: {e}")

if frame_count == -1:
self.rt_frames = mp.Queue()
if fps is None or fps >= self.fps:
fps = self.fps
if fps > 30:
logger.warning(
f"FPS {fps} is too high, if your player is playing more slowly than the actual time, set a lower play fps"
)
self.play_fps = fps

if self.frame_count == -1:
target = self.normal_stream
else:
target = self.video_stream

self.process = mp.Process(
target=target, args=(self.frame_tracker, self.producer, self.path)
)
self.process = mp.Process(target=target)
self.process.start()
return self.process

def stop(self):
if self.process:
self.is_end.value = True
self.process.terminate()

def is_active(self) -> bool:
"""
Checks if the streaming process is still running.
"""
return self.process.is_alive() if self.process else False
return (
self.process.is_alive() and not self.is_end.value if self.process else False
)

def get_current_time_str(self) -> str:
current_time = self.frame_tracker.get_current_time()
def get_play_time(self) -> str:
current_time = self.dispatcher.get_current_time()
return f"{current_time // 3600:02d}:{current_time // 60 % 60:02d}:{current_time % 60:02d}"

def video_stream(self, frame_tracker, producer, path):
def video_stream(self):
"""
Handles streaming for video files. Frames are processed at a rate determined by the video's FPS.
"""
base_interval = 1 / self.fps
base_interval = 1 / self.play_fps
start_time = time.time()
interval_count = 0

for idx, frame in enumerate(producer.read(path, self.fps)):
for idx, frame in enumerate(self.producer.read(self.path, self.play_fps)):
target_time = start_time + (idx * base_interval)
time.sleep(max(0, target_time - time.time()))

frame_tracker.add_frame(frame)
self.dispatcher.add_frame(frame)
self.rt_frames.put(frame)
interval_count += 1
if interval_count >= self.fps:
if interval_count >= self.play_fps:
interval_count = 0
frame_tracker.increase_current_time()
logger.debug(f"current time: {self.get_current_time_str()}")
self.dispatcher.increase_current_time()
logger.debug(f"current time: {self.get_play_time()}")

def normal_stream(self, frame_tracker, producer, path):
self.is_end.value = True

def normal_stream(self):
"""
Handles streaming for non-video files. Frames are processed at regular intervals.
"""
timer = Timer(interval=1)
for frame in producer.read(path, self.fps):
for frame in self.producer.read(self.path, self.play_fps):
if timer.is_time():
frame_tracker.increase_current_time()
logger.debug(f"current time: {self.get_current_time_str()}")
frame_tracker.add_frame(frame)
self.dispatcher.increase_current_time()
logger.debug(f"current time: {self.get_play_time()}")
self.dispatcher.add_frame(frame)
self.rt_frames.put(frame)

self.is_end.value = True
Loading

0 comments on commit ef4008f

Please sign in to comment.