diff --git a/scenedetect/detectors/content_detector.py b/scenedetect/detectors/content_detector.py index d651a800..6a2b6699 100644 --- a/scenedetect/detectors/content_detector.py +++ b/scenedetect/detectors/content_detector.py @@ -24,10 +24,28 @@ from scenedetect.scene_detector import SceneDetector -def calculate_frame_score(current_frame_hsv: Iterable[numpy.ndarray], - last_frame_hsv: Iterable[numpy.ndarray]) -> Tuple[float]: - """Calculates score between two adjacent frames in the HSV colourspace. Frames should be - split, e.g. cv2.split(cv2.cvtColor(frame_data, cv2.COLOR_BGR2HSV)). +def calculate_frame_components(frame: numpy.ndarray, + calculate_edges: bool = True, + sigma: float = 1.0 / 3.0): + hsv = cv2.split(cv2.cvtColor(frame, cv2.COLOR_BGR2HSV)) + if not calculate_edges: + return hsv + median = numpy.median(hsv[2]) + # TODO: Add config file entries for sigma, aperture size, etc. + low = int(max(0, (1.0 - sigma) * median)) + high = int(min(255, (1.0 + sigma) * median)) + edge = cv2.Canny(hsv[2], low, high) + # TODO: Use morphological filter to open edges based on resolution, need to increase line width + # accordingly - just automatically size and allow to be overriden in config file. + return (*hsv, edge) + + +def calculate_frame_score(current_frame_hsve: Iterable[numpy.ndarray], + last_frame_hsve: Iterable[numpy.ndarray], + weight_map: Iterable[float]) -> Tuple[float]: + """Calculates score between two adjacent frames in the HSVE colourspace. Frames should be + split, e.g. cv2.split(cv2.cvtColor(frame_data, cv2.COLOR_BGR2HSV)), and edge information + appended. Arguments: curr_frame_hsv: Current frame. @@ -36,19 +54,25 @@ def calculate_frame_score(current_frame_hsv: Iterable[numpy.ndarray], Returns: Tuple containing the average pixel change for each component as well as the average - across all components, e.g. (avg_h, avg_s, avg_v, avg_all). + across all components, e.g. (avg_h, avg_s, avg_v, avg_e, avg_all). """ - current_frame_hsv = [x.astype(numpy.int32) for x in current_frame_hsv] - last_frame_hsv = [x.astype(numpy.int32) for x in last_frame_hsv] - delta_hsv = [0, 0, 0, 0] - for i in range(3): - num_pixels = current_frame_hsv[i].shape[0] * current_frame_hsv[i].shape[1] - delta_hsv[i] = numpy.sum( - numpy.abs(current_frame_hsv[i] - last_frame_hsv[i])) / float(num_pixels) + current_frame_hsve = [x.astype(numpy.int32) for x in current_frame_hsve] + last_frame_hsve = [x.astype(numpy.int32) for x in last_frame_hsve] + delta_hsve = [0.0] * 5 + calculate_edge_component: bool = (delta_hsve[3] > 0.0) + # TODO(v0.6.1): This is wrong for edges, just calculate edges separately. + # Need to multiply both edge masks together. + for i in range(4 if calculate_edge_component else 3): + num_pixels = current_frame_hsve[i].shape[0] * current_frame_hsve[i].shape[1] + delta_hsve[i] = numpy.sum( + numpy.abs(current_frame_hsve[i] - last_frame_hsve[i])) / float(num_pixels) + delta_hsve[4] = sum([(delta_hsve[i] * weight_map[i]) for i in range(4)]) / sum(weight_map) + return tuple(delta_hsve) - delta_hsv[3] = sum(delta_hsv[0:3]) / 3.0 - return tuple(delta_hsv) + +# TODO: May need to create a dataclass of ContentDetector options: +# - threshold, min_scene_len, luma_only, weight_h, weight_l, .... class ContentDetector(SceneDetector): @@ -60,10 +84,20 @@ class ContentDetector(SceneDetector): """ FRAME_SCORE_KEY = 'content_val' - DELTA_H_KEY, DELTA_S_KEY, DELTA_V_KEY = ('delta_hue', 'delta_sat', 'delta_lum') - METRIC_KEYS = [FRAME_SCORE_KEY, DELTA_H_KEY, DELTA_S_KEY, DELTA_V_KEY] - - def __init__(self, threshold: float = 27.0, min_scene_len: int = 15, luma_only: bool = False): + DELTA_H_KEY, DELTA_S_KEY, DELTA_V_KEY, DELTA_E_KEY = ('delta_hue', 'delta_sat', 'delta_lum', + 'delta_edge') + METRIC_KEYS = [FRAME_SCORE_KEY, DELTA_H_KEY, DELTA_S_KEY, DELTA_V_KEY, DELTA_E_KEY] + + # TODO: Come up with some good weights for a new default if there is one that can pass + # a wider variety of test cases. + DEFAULT_HSLE_WEIGHT_MAP = (1.0, 1.0, 1.0, 0.0) + + def __init__( + self, + threshold: float = 27.0, + min_scene_len: int = 15, + luma_only: bool = False, # TODO: Remove luma_only. + hsle_weights=DEFAULT_HSLE_WEIGHT_MAP): """ Arguments: threshold: Threshold the average change in pixel intensity must exceed to trigger a cut. @@ -74,38 +108,51 @@ def __init__(self, threshold: float = 27.0, min_scene_len: int = 15, luma_only: """ super().__init__() self.threshold = threshold - # Minimum length of any given scene, in frames (int) or FrameTimecode + # Minimum length of any given scene, in frames (int) or FrameTimecode self.min_scene_len = min_scene_len - self.luma_only = luma_only + self.last_frame = None self.last_scene_cut = None - self.last_hsv = None + self.last_hsve = None + self._hsle_weights = hsle_weights + # TODO: Need to calculate filter sizes based on downscale factor when creating the detector + self._debug_mode = False + self._edge_mask_out: Optional[cv2.VideoWriter] = None def get_metrics(self): return ContentDetector.METRIC_KEYS def is_processing_required(self, frame_num): - if self.stats_manager is None: - return False - # Note this will always return True on the last frame of a video, but that's fine - # as the only side-effect is the frame being decoded. We still don't perform the - # calculations for that frame in `process_frame` if the last frame's metrics exist. - return not self.stats_manager.metrics_exist(frame_num, ContentDetector.METRIC_KEYS) or ( - not self.stats_manager.metrics_exist(frame_num + 1, ContentDetector.METRIC_KEYS)) - - def _calculate_frame_score(self, frame_num: int, curr_hsv: List[numpy.ndarray], - last_hsv: List[numpy.ndarray]) -> float: - delta_h, delta_s, delta_v, delta_content = calculate_frame_score(curr_hsv, last_hsv) + # TODO(v0.6.1): Deprecate this method. + return True + def _calculate_frame_score(self, frame_num: int, curr_hsve: List[numpy.ndarray], + last_hsve: List[numpy.ndarray]) -> float: + delta_h, delta_s, delta_v, delta_e, delta_content = calculate_frame_score( + curr_hsve, last_hsve, self._hsle_weights) if self.stats_manager is not None: self.stats_manager.set_metrics( frame_num, { self.FRAME_SCORE_KEY: delta_content, self.DELTA_H_KEY: delta_h, self.DELTA_S_KEY: delta_s, - self.DELTA_V_KEY: delta_v + self.DELTA_V_KEY: delta_v, }) - return delta_content if not self.luma_only else delta_v + if self._hsle_weights[3] > 0.0: + self.stats_manager.set_metrics(frame_num, {self.DELTA_E_KEY: delta_e}) + + # TODO: Try to add debug mode params to the config file, + # e.g. allow edge_mask_file = video.avi in [detect-content]. + if self._debug_mode: + out_frame = cv2.cvtColor(curr_hsve[3], cv2.COLOR_GRAY2BGR) + if self._edge_mask_out is None: + self._edge_mask_out = cv2.VideoWriter('debug.avi', + cv2.VideoWriter_fourcc('X', 'V', 'I', + 'D'), 23.976, + (out_frame.shape[1], out_frame.shape[0])) + self._edge_mask_out.write(out_frame) + + return delta_content def process_frame(self, frame_num: int, frame_img: Optional[numpy.ndarray]) -> List[int]: """ Similar to ThresholdDetector, but using the HSV colour space DIFFERENCE instead @@ -130,23 +177,15 @@ def process_frame(self, frame_num: int, frame_img: Optional[numpy.ndarray]) -> L # We can only start detecting once we have a frame to compare with. if self.last_frame is not None: - # We obtain the change in average of HSV (frame_score), (h)ue only, - # (s)aturation only, and (l)uminance only. These are refered to in a statsfile - # as their respective metric keys. - metric_key = ( - ContentDetector.DELTA_V_KEY if self.luma_only else ContentDetector.FRAME_SCORE_KEY) - if (self.stats_manager is not None - and self.stats_manager.metrics_exist(frame_num, [metric_key])): - frame_score = self.stats_manager.get_metrics(frame_num, [metric_key])[0] - else: - curr_hsv = cv2.split(cv2.cvtColor(frame_img, cv2.COLOR_BGR2HSV)) - last_hsv = self.last_hsv - if not last_hsv: - last_hsv = cv2.split(cv2.cvtColor(self.last_frame, cv2.COLOR_BGR2HSV)) - - frame_score = self._calculate_frame_score(frame_num, curr_hsv, last_hsv) - - self.last_hsv = curr_hsv + + calculate_edges: bool = (self._hsle_weights[3] > 0.0) or self._debug_mode + curr_hsve = calculate_frame_components(frame_img, calculate_edges=calculate_edges) + last_hsve = self.last_hsve + if not last_hsve: + last_hsve = calculate_frame_components( + self.last_frame, calculate_edges=calculate_edges) + frame_score = self._calculate_frame_score(frame_num, curr_hsve, last_hsve) + self.last_hsve = curr_hsve # We consider any frame over the threshold a new scene, but only if # the minimum scene length has been reached (otherwise it is ignored). @@ -158,13 +197,7 @@ def process_frame(self, frame_num: int, frame_img: Optional[numpy.ndarray]) -> L if self.last_frame is not None and self.last_frame is not _unused: del self.last_frame - # If we have the next frame computed, don't copy the current frame - # into last_frame since we won't use it on the next call anyways. - if (self.stats_manager is not None - and self.stats_manager.metrics_exist(frame_num + 1, self.get_metrics())): - self.last_frame = _unused - else: - self.last_frame = frame_img.copy() + self.last_frame = frame_img.copy() return cut_list