Skip to content

Commit

Permalink
mass object detection
Browse files Browse the repository at this point in the history
This is an experiment for scottlamb/moonfire-nvr#30. I'm reasonably
happy with the schema for representing detected objects.
  • Loading branch information
scottlamb committed Apr 14, 2020
1 parent cd7e935 commit 9ff4646
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 13 deletions.
49 changes: 49 additions & 0 deletions nvr-motion/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions nvr-motion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ structopt = "0.3.12"
tokio = { version = "0.2.0", features = ["blocking", "macros", "rt-threaded", "signal"] }
tonic = "0.1"
uuid = "0.8.1"
zstd = "0.5.1"

[build-dependencies]
prost-build = "0.6.1"
Expand Down
112 changes: 109 additions & 3 deletions nvr-motion/src/bin/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ struct Opt {

#[structopt(short, long, parse(try_from_str))]
end: Option<moonfire_nvr_client::Time>,

#[structopt(short="f", long)]
fps: Option<f32>,

#[structopt(short="C", long, use_delimiter=true)]
cameras: Option<Vec<String>>,
}

struct Context<'a> {
Expand All @@ -38,6 +44,7 @@ struct Context<'a> {
client: moonfire_nvr_client::Client,
start: Option<moonfire_nvr_client::Time>,
end: Option<moonfire_nvr_client::Time>,
cameras: Option<Vec<String>>,

// Stuff for processing recordings.
// This supports using multiple interpreters, one per Edge TPU device.
Expand All @@ -46,6 +53,7 @@ struct Context<'a> {
interpreter_rx: crossbeam::channel::Receiver<moonfire_tflite::Interpreter<'a>>,
width: usize,
height: usize,
min_interval_90k: i32,
}

/// Gets the id range of committed recordings indicated by `r`.
Expand Down Expand Up @@ -80,6 +88,11 @@ async fn list_recordings(ctx: &Context<'_>) -> Result<Vec<Option<(Stream, Vec<i3
async fn process_camera(ctx: &Context<'_>, camera: &moonfire_nvr_client::Camera)
-> Result<Option<(Stream, Vec<i32>)>, Error> {
const DESIRED_STREAM: &str = "sub";
if let Some(cameras) = &ctx.cameras {
if !cameras.contains(&camera.short_name) {
return Ok(None);
}
}
if !camera.streams.contains_key(DESIRED_STREAM) {
return Ok(None);
}
Expand Down Expand Up @@ -156,6 +169,63 @@ async fn fetch_recording(ctx: &Context<'_>, stream: &Stream, id: i32)
Ok(resp.bytes().await?)
}

pub fn zigzag32(i: i32) -> u32 { ((i << 1) as u32) ^ ((i >> 31) as u32) }

pub fn append_varint32(i: u32, data: &mut Vec<u8>) {
if i < 1u32 << 7 {
data.push(i as u8);
} else if i < 1u32 << 14 {
data.extend_from_slice(&[(( i & 0x7F) | 0x80) as u8,
(i >> 7) as u8]);
} else if i < 1u32 << 21 {
data.extend_from_slice(&[(( i & 0x7F) | 0x80) as u8,
(((i >> 7) & 0x7F) | 0x80) as u8,
(i >> 14) as u8]);
} else if i < 1u32 << 28 {
data.extend_from_slice(&[(( i & 0x7F) | 0x80) as u8,
(((i >> 7) & 0x7F) | 0x80) as u8,
(((i >> 14) & 0x7F) | 0x80) as u8,
(i >> 21) as u8]);
} else {
data.extend_from_slice(&[(( i & 0x7F) | 0x80) as u8,
(((i >> 7) & 0x7F) | 0x80) as u8,
(((i >> 14) & 0x7F) | 0x80) as u8,
(((i >> 21) & 0x7F) | 0x80) as u8,
(i >> 28) as u8]);
}
}

const SCORE_THRESHOLD: f32 = 0.5;

fn normalize(v: f32) -> u8 {
(v.max(0.).min(1.0) * 255.) as u8
}

pub fn append_frame(interpreter: &moonfire_tflite::Interpreter<'_>, data: &mut Vec<u8>) {
let outputs = interpreter.outputs();
let boxes = outputs[0].f32s();
let classes = outputs[1].f32s();
let scores = outputs[2].f32s();
let num_labels = scores.iter().filter(|&&s| s >= SCORE_THRESHOLD).count();
append_varint32(u32::try_from(num_labels).unwrap(), data);
for (i, &score) in scores.iter().enumerate() {
if score < SCORE_THRESHOLD {
continue;
}
let box_ = &boxes[4*i..4*i+4];
let y = normalize(box_[0]);
let x = normalize(box_[1]);
let h = normalize(box_[2] - box_[0]);
let w = normalize(box_[3] - box_[1]);
append_varint32(classes[i] as u32, data);
data.push(x);
data.push(w);
data.push(y);
data.push(h);
data.push(normalize(scores[i]));
}
}

fn process_recording(ctx: &Context<'_>, streams: &Vec<&Stream>, recording: &Recording)
-> Result<(), Error> {
let mut open_options = moonfire_ffmpeg::avutil::Dictionary::new();
Expand All @@ -182,6 +252,11 @@ fn process_recording(ctx: &Context<'_>, streams: &Vec<&Stream>, recording: &Reco
let mut f = VideoFrame::empty().unwrap();
let mut s = moonfire_ffmpeg::swscale::Scaler::new(par.dims(), scaled.dims()).unwrap();

let mut frame_data = Vec::with_capacity(4096);
let mut durations = Vec::with_capacity(4096);
let mut last_duration = 0;
let mut next_pts = 0;
append_varint32(u32::try_from(ctx.min_interval_90k).unwrap(), &mut frame_data);
loop {
let pkt = match input.read_frame() {
Ok(p) => p,
Expand All @@ -194,21 +269,43 @@ fn process_recording(ctx: &Context<'_>, streams: &Vec<&Stream>, recording: &Reco
if !d.decode_video(&pkt, &mut f).unwrap() {
continue;
}
let d = pkt.duration();
append_varint32(zigzag32(d.checked_sub(last_duration).unwrap()), &mut durations);
last_duration = d;
let pts = pkt.pts().unwrap();
match pts.cmp(&next_pts) {
std::cmp::Ordering::Less => continue,
std::cmp::Ordering::Equal => {},
std::cmp::Ordering::Greater => {
// works for non-negative values.
fn ceil_div(a: i64, b: i64) -> i64 { (a + b - 1) / b }
let i = i64::from(ctx.min_interval_90k);
let before = next_pts;
next_pts = ceil_div(pts, i) * i;
assert!(next_pts >= pts, "next_pts {}->{} pts {} interval {}",
before, next_pts, pts, i);
},
}

// Perform object detection on the frame.
s.scale(&f, &mut scaled);
let mut interpreter = ctx.interpreter_rx.recv().unwrap();
moonfire_motion::copy(&scaled, &mut interpreter.inputs()[0]);
interpreter.invoke().unwrap();
append_frame(&interpreter, &mut frame_data);
ctx.interpreter_tx.try_send(interpreter).unwrap();
}
let compressed = zstd::stream::encode_all(&frame_data[..], 22)?;

let conn = ctx.conn.lock();
let mut stmt = conn.prepare_cached(r#"
insert into recording_object_detection (camera_uuid, stream_name, recording_id, frame_data)
values (?, ?, ?, ?)
insert into recording_object_detection (camera_uuid, stream_name, recording_id, frame_data,
durations)
values (?, ?, ?, ?, ?)
"#)?;
let stream = streams[usize::try_from(recording.stream_i).unwrap()];
let u = stream.camera_uuid.as_bytes();
stmt.execute(params![&u[..], &stream.stream_name, &recording.id, &b""[..]])?;
stmt.execute(params![&u[..], &stream.stream_name, &recording.id, &compressed, &durations])?;
Ok(())
}

Expand Down Expand Up @@ -256,6 +353,13 @@ fn main() -> Result<(), Error> {
interpreter_tx.try_send(i).unwrap();
}

let min_interval_90k = match opt.fps {
None => 1,
Some(f) if f > 0. => (90000. / f) as i32,
Some(_) => panic!("interval fps; must be non-negative"),
};
assert!(min_interval_90k > 0);

let ctx = Context {
client: moonfire_nvr_client::Client::new(opt.nvr, opt.cookie),
conn,
Expand All @@ -265,6 +369,8 @@ fn main() -> Result<(), Error> {
height,
start: opt.start,
end: opt.end,
cameras: opt.cameras,
min_interval_90k,
};

let _ffmpeg = moonfire_ffmpeg::Ffmpeg::new();
Expand Down
48 changes: 38 additions & 10 deletions nvr-motion/src/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ create table object_detection_model (
name text not null,

-- The actual model and label mappings, in a tbd protocol buffer message
-- format.
-- format. Currently one model is hardcoded so this column is left null.
-- The format should include both the actual model and a mapping
-- from the model's label ids to object_detection_label.uuid.
data blob
);

Expand Down Expand Up @@ -105,17 +107,43 @@ create table recording_object_detection (
stream_name not null check (stream_name in ('main', 'sub')),
recording_id integer not null,

-- repeated:
-- * frame delta unsigned varint
-- * label unsigned varint
-- * xmin, xmax, ymin, ymax as fixed 8-bit numbers
-- (any value from knowing xmin <= xmax, ymin <= ymax?
-- probably not a whole byte anyway.)
-- although 256/300 or 256/320 is not super clean. awkward.
-- * score/probability/whatever-it's-called as fixed 8-bit number
-- linear scale?
-- zstd-compressed:
--
-- * non-zero pts interval in 90k units, as an unsigned varint. For all
-- multiples of this number in [0, recording duration) the frame displayed
-- at that pts (relative to start of the recording) will be processed;
-- duplicates are suppressed.
--
-- Consider a 2-second recording at 4 fps with no jitter:
-- frame [0, 1, 2, 3, 4, 5, 6, 7]
-- pts [0, 22500, 45000, 67500, 90000, 112500, 135000, 157500].
--
-- A pts interval of 30000 means process at 3 fps (90000 / 30000):
-- target pts [0, 30000, 60000, 90000, 120000, 150000]
-- frame [0, 1, 2, 4, 5, 6]
-- frame pts [0, 22500, 45000, 90000, 112500, 135000]
--
-- A pts interval of 18000 means process at 5 fps (90000 / 18000):
-- target pts [0, 18000, 36000, 54000, 72000, 90000, 108000, 126000, 144000, 162000]
-- frame [0, *0, 1, 2, 3, 4, *4, 5, 6, 7]
-- frame pts [0, *0, 22500, 45000, 67500, 90000, *90000, 112500, 135000, 157500]
-- * indicates duplicate frames (suppressed).
-- In this case, every frame is processed. However, in real examples with
-- jitter, frames may be skipped even if the desired frame rate exceeds
-- the input data's frame rate.
--
-- A pts interval of 1 guarantees every frame is processed.
--
-- * one per processed frame (even if zero objects were detected):
-- * label count unsigned varint; for each:
-- * label unsigned varint, referencing object_detection_label.id.
-- * xmin, width, ymin, height as fixed 8-bit numbers
-- * score as fixed 8-bit number
frame_data blob not null,

-- repeated delta of duration 90k, one per frame.
durations blob not null,

-- Operations are almost always done on a bounded set of recordings, so
-- and perhaps on all models. Use composite_id as the prefix of the primary
-- key to make these efficient.
Expand Down

0 comments on commit 9ff4646

Please sign in to comment.