-
Notifications
You must be signed in to change notification settings - Fork 656
/
Copy pathloaders.py
285 lines (241 loc) · 12.6 KB
/
loaders.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
# nuScenes dev-kit.
# Code written by Oscar Beijbom, 2019.
import json
from typing import Dict, Tuple
import numpy as np
import tqdm
from pyquaternion import Quaternion
from nuscenes import NuScenes
from nuscenes.eval.common.data_classes import EvalBoxes
from nuscenes.eval.detection.data_classes import DetectionBox
from nuscenes.eval.detection.utils import category_to_detection_name
from nuscenes.eval.tracking.data_classes import TrackingBox
from nuscenes.utils.data_classes import Box
from nuscenes.utils.geometry_utils import points_in_box
from nuscenes.utils.splits import create_splits_scenes
def load_prediction(result_path: str, max_boxes_per_sample: int, box_cls, verbose: bool = False) \
-> Tuple[EvalBoxes, Dict]:
"""
Loads object predictions from file.
:param result_path: Path to the .json result file provided by the user.
:param max_boxes_per_sample: Maximim number of boxes allowed per sample.
:param box_cls: Type of box to load, e.g. DetectionBox or TrackingBox.
:param verbose: Whether to print messages to stdout.
:return: The deserialized results and meta data.
"""
# Load from file and check that the format is correct.
with open(result_path) as f:
data = json.load(f)
assert 'results' in data, 'Error: No field `results` in result file. Please note that the result format changed.' \
'See https://www.nuscenes.org/object-detection for more information.'
# Deserialize results and get meta data.
all_results = EvalBoxes.deserialize(data['results'], box_cls)
meta = data['meta']
if verbose:
print("Loaded results from {}. Found detections for {} samples."
.format(result_path, len(all_results.sample_tokens)))
# Check that each sample has no more than x predicted boxes.
for sample_token in all_results.sample_tokens:
assert len(all_results.boxes[sample_token]) <= max_boxes_per_sample, \
"Error: Only <= %d boxes per sample allowed!" % max_boxes_per_sample
return all_results, meta
def load_gt(nusc: NuScenes, eval_split: str, box_cls, verbose: bool = False) -> EvalBoxes:
"""
Loads ground truth boxes from DB.
:param nusc: A NuScenes instance.
:param eval_split: The evaluation split for which we load GT boxes.
:param box_cls: Type of box to load, e.g. DetectionBox or TrackingBox.
:param verbose: Whether to print messages to stdout.
:return: The GT boxes.
"""
# Init.
if box_cls == DetectionBox:
attribute_map = {a['token']: a['name'] for a in nusc.attribute}
if verbose:
print('Loading annotations for {} split from nuScenes version: {}'.format(eval_split, nusc.version))
# Read out all sample_tokens in DB.
sample_tokens_all = [s['token'] for s in nusc.sample]
assert len(sample_tokens_all) > 0, "Error: Database has no samples!"
# Only keep samples from this split.
splits = create_splits_scenes()
# Check compatibility of split with nusc_version.
version = nusc.version
if eval_split in {'train', 'val', 'train_detect', 'train_track'}:
assert version.endswith('trainval'), \
'Error: Requested split {} which is not compatible with NuScenes version {}'.format(eval_split, version)
elif eval_split in {'mini_train', 'mini_val'}:
assert version.endswith('mini'), \
'Error: Requested split {} which is not compatible with NuScenes version {}'.format(eval_split, version)
elif eval_split == 'test':
assert version.endswith('test'), \
'Error: Requested split {} which is not compatible with NuScenes version {}'.format(eval_split, version)
else:
raise ValueError('Error: Requested split {} which this function cannot map to the correct NuScenes version.'
.format(eval_split))
if eval_split == 'test':
# Check that you aren't trying to cheat :).
assert len(nusc.sample_annotation) > 0, \
'Error: You are trying to evaluate on the test set but you do not have the annotations!'
sample_tokens = []
for sample_token in sample_tokens_all:
scene_token = nusc.get('sample', sample_token)['scene_token']
scene_record = nusc.get('scene', scene_token)
if scene_record['name'] in splits[eval_split]:
sample_tokens.append(sample_token)
all_annotations = EvalBoxes()
# Load annotations and filter predictions and annotations.
tracking_id_set = set()
for sample_token in tqdm.tqdm(sample_tokens, leave=verbose):
sample = nusc.get('sample', sample_token)
sample_annotation_tokens = sample['anns']
sample_boxes = []
for sample_annotation_token in sample_annotation_tokens:
sample_annotation = nusc.get('sample_annotation', sample_annotation_token)
if box_cls == DetectionBox:
# Get label name in detection task and filter unused labels.
detection_name = category_to_detection_name(sample_annotation['category_name'])
if detection_name is None:
continue
# Get attribute_name.
attr_tokens = sample_annotation['attribute_tokens']
attr_count = len(attr_tokens)
if attr_count == 0:
attribute_name = ''
elif attr_count == 1:
attribute_name = attribute_map[attr_tokens[0]]
else:
raise Exception('Error: GT annotations must not have more than one attribute!')
sample_boxes.append(
box_cls(
sample_token=sample_token,
translation=sample_annotation['translation'],
size=sample_annotation['size'],
rotation=sample_annotation['rotation'],
velocity=nusc.box_velocity(sample_annotation['token'])[:2],
num_pts=sample_annotation['num_lidar_pts'] + sample_annotation['num_radar_pts'],
detection_name=detection_name,
detection_score=-1.0, # GT samples do not have a score.
attribute_name=attribute_name
)
)
elif box_cls == TrackingBox:
# Use nuScenes token as tracking id.
tracking_id = sample_annotation['instance_token']
tracking_id_set.add(tracking_id)
# Get label name in detection task and filter unused labels.
# Import locally to avoid errors when motmetrics package is not installed.
from nuscenes.eval.tracking.utils import category_to_tracking_name
tracking_name = category_to_tracking_name(sample_annotation['category_name'])
if tracking_name is None:
continue
sample_boxes.append(
box_cls(
sample_token=sample_token,
translation=sample_annotation['translation'],
size=sample_annotation['size'],
rotation=sample_annotation['rotation'],
velocity=nusc.box_velocity(sample_annotation['token'])[:2],
num_pts=sample_annotation['num_lidar_pts'] + sample_annotation['num_radar_pts'],
tracking_id=tracking_id,
tracking_name=tracking_name,
tracking_score=-1.0 # GT samples do not have a score.
)
)
else:
raise NotImplementedError('Error: Invalid box_cls %s!' % box_cls)
all_annotations.add_boxes(sample_token, sample_boxes)
if verbose:
print("Loaded ground truth annotations for {} samples.".format(len(all_annotations.sample_tokens)))
return all_annotations
def add_center_dist(nusc: NuScenes,
eval_boxes: EvalBoxes):
"""
Adds the cylindrical (xy) center distance from ego vehicle to each box.
:param nusc: The NuScenes instance.
:param eval_boxes: A set of boxes, either GT or predictions.
:return: eval_boxes augmented with center distances.
"""
for sample_token in eval_boxes.sample_tokens:
sample_rec = nusc.get('sample', sample_token)
sd_record = nusc.get('sample_data', sample_rec['data']['LIDAR_TOP'])
pose_record = nusc.get('ego_pose', sd_record['ego_pose_token'])
for box in eval_boxes[sample_token]:
# Both boxes and ego pose are given in global coord system, so distance can be calculated directly.
# Note that the z component of the ego pose is 0.
ego_translation = (box.translation[0] - pose_record['translation'][0],
box.translation[1] - pose_record['translation'][1],
box.translation[2] - pose_record['translation'][2])
if isinstance(box, DetectionBox) or isinstance(box, TrackingBox):
box.ego_translation = ego_translation
else:
raise NotImplementedError
return eval_boxes
def filter_eval_boxes(nusc: NuScenes,
eval_boxes: EvalBoxes,
max_dist: Dict[str, float],
verbose: bool = False) -> EvalBoxes:
"""
Applies filtering to boxes. Distance, bike-racks and points per box.
:param nusc: An instance of the NuScenes class.
:param eval_boxes: An instance of the EvalBoxes class.
:param max_dist: Maps the detection name to the eval distance threshold for that class.
:param verbose: Whether to print to stdout.
"""
# Retrieve box type for detectipn/tracking boxes.
class_field = _get_box_class_field(eval_boxes)
# Accumulators for number of filtered boxes.
total, dist_filter, point_filter, bike_rack_filter = 0, 0, 0, 0
for ind, sample_token in enumerate(eval_boxes.sample_tokens):
# Filter on distance first.
total += len(eval_boxes[sample_token])
eval_boxes.boxes[sample_token] = [box for box in eval_boxes[sample_token] if
box.ego_dist < max_dist[box.__getattribute__(class_field)]]
dist_filter += len(eval_boxes[sample_token])
# Then remove boxes with zero points in them. Eval boxes have -1 points by default.
eval_boxes.boxes[sample_token] = [box for box in eval_boxes[sample_token] if not box.num_pts == 0]
point_filter += len(eval_boxes[sample_token])
# Perform bike-rack filtering.
sample_anns = nusc.get('sample', sample_token)['anns']
bikerack_recs = [nusc.get('sample_annotation', ann) for ann in sample_anns if
nusc.get('sample_annotation', ann)['category_name'] == 'static_object.bicycle_rack']
bikerack_boxes = [Box(rec['translation'], rec['size'], Quaternion(rec['rotation'])) for rec in bikerack_recs]
filtered_boxes = []
for box in eval_boxes[sample_token]:
if box.__getattribute__(class_field) in ['bicycle', 'motorcycle']:
in_a_bikerack = False
for bikerack_box in bikerack_boxes:
if np.sum(points_in_box(bikerack_box, np.expand_dims(np.array(box.translation), axis=1))) > 0:
in_a_bikerack = True
if not in_a_bikerack:
filtered_boxes.append(box)
else:
filtered_boxes.append(box)
eval_boxes.boxes[sample_token] = filtered_boxes
bike_rack_filter += len(eval_boxes.boxes[sample_token])
if verbose:
print("=> Original number of boxes: %d" % total)
print("=> After distance based filtering: %d" % dist_filter)
print("=> After LIDAR and RADAR points based filtering: %d" % point_filter)
print("=> After bike rack filtering: %d" % bike_rack_filter)
return eval_boxes
def _get_box_class_field(eval_boxes: EvalBoxes) -> str:
"""
Retrieve the name of the class field in the boxes.
This parses through all boxes until it finds a valid box.
If there are no valid boxes, this function throws an exception.
:param eval_boxes: The EvalBoxes used for evaluation.
:return: The name of the class field in the boxes, e.g. detection_name or tracking_name.
"""
assert len(eval_boxes.boxes) > 0
box = None
for val in eval_boxes.boxes.values():
if len(val) > 0:
box = val[0]
break
if isinstance(box, DetectionBox):
class_field = 'detection_name'
elif isinstance(box, TrackingBox):
class_field = 'tracking_name'
else:
raise Exception('Error: Invalid box type: %s' % box)
return class_field