-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathxirec.py
140 lines (100 loc) · 5.05 KB
/
xirec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import argparse
import json
import time
from ctypes import sizeof
from functools import partial
from itertools import starmap
from os import makedirs
from os.path import join
import imageio
from ximea import xiapi
from recorder import *
from utilities import *
def parse_camera_arg(s):
parts = s.split(':')
if len(parts) > 2:
raise ValueError()
parts[0] = int(parts[0])
return parts
def camera_from_args(arg) -> xiapi.Camera:
c = open_camera_by_sn(arg[0])
if len(arg) > 1:
apply_camtool_parameters(c, arg[1])
return c
def save_camera_parameters(parameters, data_dir: str):
with open(join(data_dir, 'camera_parameters.json'), 'w') as file:
json.dump(parameters, file, indent=1)
def save_recording(buf: RecordingBuffers, data_dir: str, file_format='tiff'):
frames_dir = join(data_dir, 'frames')
makedirs(frames_dir, exist_ok=True)
with open(join(data_dir, 'frames_metadata.json'), 'w') as file:
json.dump([frame_metadata_as_dict(frame) for frame in buf.meta_buffer], file, indent=1)
# make a string format with the right amount of leading 0's
path_format = f'{{:0{len(str(len(buf.meta_buffer)))}}}'
for n, (img, video_buf) in enumerate(zip(buf.meta_buffer, buf.video_buffer)):
img.bp = ctypes.addressof(video_buf)
img_np = img.get_image_data_numpy()
imageio.imwrite(join(frames_dir, path_format.format(n) + f'.{file_format}'), img_np)
argparser = argparse.ArgumentParser()
argparser.add_argument('camera-sn:parameter-file', nargs='+', type=parse_camera_arg, help='The parameter file as saved by XiCamtool is optional')
argparser.add_argument('-fc', '--frame_count', default=0, type=int, help='number of frames to record')
argparser.add_argument('-d', '--duration', default=0, type=float, help='number of seconds to record')
argparser.add_argument('-f', '--format', default='tiff', choices=['tiff', 'bmp', 'jpg', 'png'])
argparser.add_argument('-w', '--wait', default='no', help='Wait with recording until the trigger input of specified camera goes high')
argparser.add_argument('--wait-gpi', default=1, choices=list(range(1,13)), help='In case of waiting, selects which GPI port to use as the trigger. Defaults to 1')
args = argparser.parse_args()
if (args.duration == 0 and args.frame_count == 0) or (args.duration != 0 and args.frame_count != 0):
print('Either define a number of frames or duration to record, but not both or none')
exit(1)
camera_args = getattr(args, 'camera-sn:parameter-file')
no_cameras = len(camera_args)
saving_format = args.format
print('opening cameras')
cameras = [camera_from_args(arg) for arg in camera_args]
cameras_sn_str = [cam.get_device_sn().decode() for cam in cameras]
if args.duration == 0:
no_frames = [args.frame_count] * no_cameras
else:
no_frames = [int(cam.get_framerate() * args.duration) + 1 for cam in cameras]
for sn, framerate, fcount in zip(cameras_sn_str, map(lambda cam: cam.get_framerate(), cameras), no_frames):
print(f'[{sn}] framerate: {framerate} | no frames: {fcount}')
memory_requirement_per_frame = list(map(probe_memory_requirements, cameras))
total_storage_requirement = sum(1024 + frame_size*frame_count for frame_size,frame_count in zip(memory_requirement_per_frame, no_frames))
print('checking disk space')
if free_disk_space('.') < total_storage_requirement:
print('ERROR: not enough disk space for storing video')
exit(1)
camera_buffers = list(starmap(allocate_recording_buffers, zip(memory_requirement_per_frame, no_frames)))
print(f'allocated {sum(sizeof(b.video_buffer) for b in camera_buffers) / 1024**3:.2f} gigabyte for video')
print('storing all camera parameters')
cameras_parameter_dump = [get_all_camera_parameters(cam) for cam in cameras]
if args.wait != 'no':
try:
cam_to_wait_for_idx = cameras_sn_str.index(args.wait)
except ValueError:
print(f'Can not wait for camera with serial number {args.wait} because it is not opened for recording')
cam_to_wait_for = cameras[cam_to_wait_for_idx]
selected_gpi = f'XI_GPI_PORT{args.wait_gpi}'
print(f'Waiting on GPI {args.wait_gpi} of camera {args.wait}')
while True:
cam_to_wait_for.set_gpi_selector(selected_gpi)
if cam_to_wait_for.get_gpi_level():
break
time.sleep(0.1)
print('recording')
recording_datetime = datetime.now()
record_cameras(cameras, camera_buffers, no_frames)
print('saving')
recording_dirs = [join(f'{recording_datetime.isoformat().replace(":", "_")}', f'{sn}') for sn in cameras_sn_str]
list(map(makedirs, recording_dirs))
list(starmap(partial(save_recording, file_format=saving_format), zip(camera_buffers, recording_dirs)))
list(starmap(save_camera_parameters, zip(cameras_parameter_dump, recording_dirs)))
print('analyzing skipped frames')
skipped_frames = list(map(detect_skipped_frames, camera_buffers))
if sum(skipped_frames) > 0:
for count, camera_sn in zip(skipped_frames, cameras_sn_str):
print(f'\t[{camera_sn}]: skipped frames: {count}')
else:
print('\tno skipped frames')
list(map(xiapi.Camera.close_device, cameras))
print('done')