|
| 1 | +import logging |
| 2 | +import numpy as np |
| 3 | + |
| 4 | +from frigate.detectors.detection_api import DetectionApi |
| 5 | +from frigate.detectors.detector_config import BaseDetectorConfig |
| 6 | +from typing import Literal |
| 7 | +from pydantic import Extra, Field |
| 8 | + |
| 9 | +try: |
| 10 | + from tflite_runtime.interpreter import Interpreter, load_delegate |
| 11 | +except ModuleNotFoundError: |
| 12 | + from tensorflow.lite.python.interpreter import Interpreter, load_delegate |
| 13 | + |
| 14 | + |
| 15 | +logger = logging.getLogger(__name__) |
| 16 | + |
| 17 | +DETECTOR_KEY = "vim3" |
| 18 | + |
| 19 | + |
| 20 | +class vim3DetectorConfig(BaseDetectorConfig): |
| 21 | + type: Literal[DETECTOR_KEY] |
| 22 | + device: str = Field(default=None, title="Device Type") |
| 23 | + |
| 24 | + |
| 25 | +class vim3Tfl(DetectionApi): |
| 26 | + type_key = DETECTOR_KEY |
| 27 | + |
| 28 | + def __init__(self, detector_config: vim3DetectorConfig): |
| 29 | + device_config = {"device": "usb"} |
| 30 | + if detector_config.device is not None: |
| 31 | + device_config = {"device": detector_config.device} |
| 32 | + |
| 33 | + edge_tpu_delegate = None |
| 34 | + |
| 35 | + try: |
| 36 | + logger.info(f"Attempting to register VIM3 TPU") |
| 37 | + edge_tpu_delegate = load_delegate("libvx_delegate.so") |
| 38 | + logger.info("TPU found") |
| 39 | + self.interpreter = Interpreter( |
| 40 | + model_path=detector_config.model.path or "/cpu_model.tflite", |
| 41 | + experimental_delegates=[edge_tpu_delegate], |
| 42 | + ) |
| 43 | + except ValueError: |
| 44 | + logger.error( |
| 45 | + "No EdgeTPU was detected. If you do not have a Accelerator (VIM3) device yet, you must configure CPU detectors." |
| 46 | + ) |
| 47 | + raise |
| 48 | + |
| 49 | + self.interpreter.allocate_tensors() |
| 50 | + |
| 51 | + self.tensor_input_details = self.interpreter.get_input_details() |
| 52 | + self.tensor_output_details = self.interpreter.get_output_details() |
| 53 | + |
| 54 | + def detect_raw(self, tensor_input): |
| 55 | + self.interpreter.set_tensor(self.tensor_input_details[0]["index"], tensor_input) |
| 56 | + self.interpreter.invoke() |
| 57 | + |
| 58 | + boxes = self.interpreter.tensor(self.tensor_output_details[0]["index"])()[0] |
| 59 | + class_ids = self.interpreter.tensor(self.tensor_output_details[1]["index"])()[0] |
| 60 | + scores = self.interpreter.tensor(self.tensor_output_details[2]["index"])()[0] |
| 61 | + count = int( |
| 62 | + self.interpreter.tensor(self.tensor_output_details[3]["index"])()[0] |
| 63 | + ) |
| 64 | + |
| 65 | + detections = np.zeros((20, 6), np.float32) |
| 66 | + |
| 67 | + for i in range(count): |
| 68 | + if scores[i] < 0.4 or i == 20: |
| 69 | + break |
| 70 | + detections[i] = [ |
| 71 | + class_ids[i], |
| 72 | + float(scores[i]), |
| 73 | + boxes[i][0], |
| 74 | + boxes[i][1], |
| 75 | + boxes[i][2], |
| 76 | + boxes[i][3], |
| 77 | + ] |
| 78 | + |
| 79 | + return detections |
0 commit comments