Skip to content

Commit

Permalink
[feature] Integrate edge detector
Browse files Browse the repository at this point in the history
Based off https://github.com/wjs018/QuantitativeEditing (thanks @wjs018).

Works well with referenced material in #35 but many outstanding TODOs
remain in regards to integration with the rest of the application.

Remove support for loading data in ContentDetector from statsfile as this
functionality is now deprecated (stats will always be re-calculated). The
same functionality can be achieved if/when required by implementing another
ContentDetector which reads stats from a file.
  • Loading branch information
Breakthrough committed Aug 1, 2022
1 parent 39b3e1d commit 1d70ed1
Showing 1 changed file with 91 additions and 58 deletions.
149 changes: 91 additions & 58 deletions scenedetect/detectors/content_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,28 @@
from scenedetect.scene_detector import SceneDetector


def calculate_frame_score(current_frame_hsv: Iterable[numpy.ndarray],
last_frame_hsv: Iterable[numpy.ndarray]) -> Tuple[float]:
"""Calculates score between two adjacent frames in the HSV colourspace. Frames should be
split, e.g. cv2.split(cv2.cvtColor(frame_data, cv2.COLOR_BGR2HSV)).
def calculate_frame_components(frame: numpy.ndarray,
calculate_edges: bool = True,
sigma: float = 1.0 / 3.0):
hsv = cv2.split(cv2.cvtColor(frame, cv2.COLOR_BGR2HSV))
if not calculate_edges:
return hsv
median = numpy.median(hsv[2])
# TODO: Add config file entries for sigma, aperture size, etc.
low = int(max(0, (1.0 - sigma) * median))
high = int(min(255, (1.0 + sigma) * median))
edge = cv2.Canny(hsv[2], low, high)
# TODO: Use morphological filter to open edges based on resolution, need to increase line width
# accordingly - just automatically size and allow to be overriden in config file.
return (*hsv, edge)


def calculate_frame_score(current_frame_hsve: Iterable[numpy.ndarray],
last_frame_hsve: Iterable[numpy.ndarray],
weight_map: Iterable[float]) -> Tuple[float]:
"""Calculates score between two adjacent frames in the HSVE colourspace. Frames should be
split, e.g. cv2.split(cv2.cvtColor(frame_data, cv2.COLOR_BGR2HSV)), and edge information
appended.
Arguments:
curr_frame_hsv: Current frame.
Expand All @@ -36,19 +54,25 @@ def calculate_frame_score(current_frame_hsv: Iterable[numpy.ndarray],
Returns:
Tuple containing the average pixel change for each component as well as the average
across all components, e.g. (avg_h, avg_s, avg_v, avg_all).
across all components, e.g. (avg_h, avg_s, avg_v, avg_e, avg_all).
"""
current_frame_hsv = [x.astype(numpy.int32) for x in current_frame_hsv]
last_frame_hsv = [x.astype(numpy.int32) for x in last_frame_hsv]
delta_hsv = [0, 0, 0, 0]
for i in range(3):
num_pixels = current_frame_hsv[i].shape[0] * current_frame_hsv[i].shape[1]
delta_hsv[i] = numpy.sum(
numpy.abs(current_frame_hsv[i] - last_frame_hsv[i])) / float(num_pixels)
current_frame_hsve = [x.astype(numpy.int32) for x in current_frame_hsve]
last_frame_hsve = [x.astype(numpy.int32) for x in last_frame_hsve]
delta_hsve = [0.0] * 5
calculate_edge_component: bool = (delta_hsve[3] > 0.0)
# TODO(v0.6.1): This is wrong for edges, just calculate edges separately.
# Need to multiply both edge masks together.
for i in range(4 if calculate_edge_component else 3):
num_pixels = current_frame_hsve[i].shape[0] * current_frame_hsve[i].shape[1]
delta_hsve[i] = numpy.sum(
numpy.abs(current_frame_hsve[i] - last_frame_hsve[i])) / float(num_pixels)
delta_hsve[4] = sum([(delta_hsve[i] * weight_map[i]) for i in range(4)]) / sum(weight_map)
return tuple(delta_hsve)

delta_hsv[3] = sum(delta_hsv[0:3]) / 3.0
return tuple(delta_hsv)

# TODO: May need to create a dataclass of ContentDetector options:
# - threshold, min_scene_len, luma_only, weight_h, weight_l, ....


class ContentDetector(SceneDetector):
Expand All @@ -60,10 +84,20 @@ class ContentDetector(SceneDetector):
"""

FRAME_SCORE_KEY = 'content_val'
DELTA_H_KEY, DELTA_S_KEY, DELTA_V_KEY = ('delta_hue', 'delta_sat', 'delta_lum')
METRIC_KEYS = [FRAME_SCORE_KEY, DELTA_H_KEY, DELTA_S_KEY, DELTA_V_KEY]

def __init__(self, threshold: float = 27.0, min_scene_len: int = 15, luma_only: bool = False):
DELTA_H_KEY, DELTA_S_KEY, DELTA_V_KEY, DELTA_E_KEY = ('delta_hue', 'delta_sat', 'delta_lum',
'delta_edge')
METRIC_KEYS = [FRAME_SCORE_KEY, DELTA_H_KEY, DELTA_S_KEY, DELTA_V_KEY, DELTA_E_KEY]

# TODO: Come up with some good weights for a new default if there is one that can pass
# a wider variety of test cases.
DEFAULT_HSLE_WEIGHT_MAP = (1.0, 1.0, 1.0, 0.0)

def __init__(
self,
threshold: float = 27.0,
min_scene_len: int = 15,
luma_only: bool = False, # TODO: Remove luma_only.
hsle_weights=DEFAULT_HSLE_WEIGHT_MAP):
"""
Arguments:
threshold: Threshold the average change in pixel intensity must exceed to trigger a cut.
Expand All @@ -74,38 +108,51 @@ def __init__(self, threshold: float = 27.0, min_scene_len: int = 15, luma_only:
"""
super().__init__()
self.threshold = threshold
# Minimum length of any given scene, in frames (int) or FrameTimecode
# Minimum length of any given scene, in frames (int) or FrameTimecode
self.min_scene_len = min_scene_len
self.luma_only = luma_only

self.last_frame = None
self.last_scene_cut = None
self.last_hsv = None
self.last_hsve = None
self._hsle_weights = hsle_weights
# TODO: Need to calculate filter sizes based on downscale factor when creating the detector
self._debug_mode = False
self._edge_mask_out: Optional[cv2.VideoWriter] = None

def get_metrics(self):
return ContentDetector.METRIC_KEYS

def is_processing_required(self, frame_num):
if self.stats_manager is None:
return False
# Note this will always return True on the last frame of a video, but that's fine
# as the only side-effect is the frame being decoded. We still don't perform the
# calculations for that frame in `process_frame` if the last frame's metrics exist.
return not self.stats_manager.metrics_exist(frame_num, ContentDetector.METRIC_KEYS) or (
not self.stats_manager.metrics_exist(frame_num + 1, ContentDetector.METRIC_KEYS))

def _calculate_frame_score(self, frame_num: int, curr_hsv: List[numpy.ndarray],
last_hsv: List[numpy.ndarray]) -> float:
delta_h, delta_s, delta_v, delta_content = calculate_frame_score(curr_hsv, last_hsv)
# TODO(v0.6.1): Deprecate this method.
return True

def _calculate_frame_score(self, frame_num: int, curr_hsve: List[numpy.ndarray],
last_hsve: List[numpy.ndarray]) -> float:
delta_h, delta_s, delta_v, delta_e, delta_content = calculate_frame_score(
curr_hsve, last_hsve, self._hsle_weights)
if self.stats_manager is not None:
self.stats_manager.set_metrics(
frame_num, {
self.FRAME_SCORE_KEY: delta_content,
self.DELTA_H_KEY: delta_h,
self.DELTA_S_KEY: delta_s,
self.DELTA_V_KEY: delta_v
self.DELTA_V_KEY: delta_v,
})
return delta_content if not self.luma_only else delta_v
if self._hsle_weights[3] > 0.0:
self.stats_manager.set_metrics(frame_num, {self.DELTA_E_KEY: delta_e})

# TODO: Try to add debug mode params to the config file,
# e.g. allow edge_mask_file = video.avi in [detect-content].
if self._debug_mode:
out_frame = cv2.cvtColor(curr_hsve[3], cv2.COLOR_GRAY2BGR)
if self._edge_mask_out is None:
self._edge_mask_out = cv2.VideoWriter('debug.avi',
cv2.VideoWriter_fourcc('X', 'V', 'I',
'D'), 23.976,
(out_frame.shape[1], out_frame.shape[0]))
self._edge_mask_out.write(out_frame)

return delta_content

def process_frame(self, frame_num: int, frame_img: Optional[numpy.ndarray]) -> List[int]:
""" Similar to ThresholdDetector, but using the HSV colour space DIFFERENCE instead
Expand All @@ -130,23 +177,15 @@ def process_frame(self, frame_num: int, frame_img: Optional[numpy.ndarray]) -> L

# We can only start detecting once we have a frame to compare with.
if self.last_frame is not None:
# We obtain the change in average of HSV (frame_score), (h)ue only,
# (s)aturation only, and (l)uminance only. These are refered to in a statsfile
# as their respective metric keys.
metric_key = (
ContentDetector.DELTA_V_KEY if self.luma_only else ContentDetector.FRAME_SCORE_KEY)
if (self.stats_manager is not None
and self.stats_manager.metrics_exist(frame_num, [metric_key])):
frame_score = self.stats_manager.get_metrics(frame_num, [metric_key])[0]
else:
curr_hsv = cv2.split(cv2.cvtColor(frame_img, cv2.COLOR_BGR2HSV))
last_hsv = self.last_hsv
if not last_hsv:
last_hsv = cv2.split(cv2.cvtColor(self.last_frame, cv2.COLOR_BGR2HSV))

frame_score = self._calculate_frame_score(frame_num, curr_hsv, last_hsv)

self.last_hsv = curr_hsv

calculate_edges: bool = (self._hsle_weights[3] > 0.0) or self._debug_mode
curr_hsve = calculate_frame_components(frame_img, calculate_edges=calculate_edges)
last_hsve = self.last_hsve
if not last_hsve:
last_hsve = calculate_frame_components(
self.last_frame, calculate_edges=calculate_edges)
frame_score = self._calculate_frame_score(frame_num, curr_hsve, last_hsve)
self.last_hsve = curr_hsve

# We consider any frame over the threshold a new scene, but only if
# the minimum scene length has been reached (otherwise it is ignored).
Expand All @@ -158,13 +197,7 @@ def process_frame(self, frame_num: int, frame_img: Optional[numpy.ndarray]) -> L
if self.last_frame is not None and self.last_frame is not _unused:
del self.last_frame

# If we have the next frame computed, don't copy the current frame
# into last_frame since we won't use it on the next call anyways.
if (self.stats_manager is not None
and self.stats_manager.metrics_exist(frame_num + 1, self.get_metrics())):
self.last_frame = _unused
else:
self.last_frame = frame_img.copy()
self.last_frame = frame_img.copy()

return cut_list

Expand Down

0 comments on commit 1d70ed1

Please sign in to comment.