forked from ArashJavan/veloparser
-
Notifications
You must be signed in to change notification settings - Fork 0
/
lidar.py
242 lines (189 loc) · 8.35 KB
/
lidar.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import os
import datetime
import argparse
import dpkt
import math
import numpy as np
from gps import GprmcMessage, utc_to_weekseconds
def read_uint32(data, idx):
return data[idx] + data[idx+1]*256 + data[idx+2]*256*256 + data[idx+3]*256*256*256
class Lidar():
def __init__(self):
self.point_cloud = []
def process_data_frame(self, data, frame_idx):
raise NotImplementedError("Subclasses should implement this!")
def process_position_frame(self, data, frame_idx):
raise NotImplementedError("Subclasses should implement this!")
class VelodyneVLP16(Lidar):
# factor distance centimeter value to meter
FACTOR_CM2M = 0.01
# factor distance value to cm, each velodyne distance unit is 2 mm
FACTOR_MM2CM = 0.2
def __init__(self, dual_mode=False):
super(VelodyneVLP16, self).__init__()
self.dual_mode = dual_mode
self.timing_offsets = self.calc_timing_offsets()
self.omega = np.array([-15, 1, -13, 3, -11, 5, -9, 7, -7, 9, -5, 11, -3, 13, -1, 15])
# Vertical correction (mm) - VLP-16 User Manual Table 9-1 (numbers are specific to VLP16)
self.vcorr_mm = np.array([11.2, -0.7, 9.7, -2.2, 8.1, -3.7, 6.6, -5.1, 5.1, -6.6, 3.7, -8.1, 2.2, -9.7, 0.7, -11.2])
self.count_lasers = 16
def calc_timing_offsets(self):
timing_offsets = [[0.0 for x in range(12)] for y in range(32)] # Init matrix
# constants
full_firing_cycle = 55.296 # μs
single_firing = 2.304 # μs
# compute timing offsets
for x in range(12):
for y in range(32):
if self.dual_mode:
dataBlockIndex = (x - (x % 2)) + int((y / 16))
else:
dataBlockIndex = (x * 2) + int((y / 16))
dataPointIndex = y % 16
timing_offsets[y][x] = \
(full_firing_cycle * dataBlockIndex) + (single_firing * dataPointIndex)
return np.array(timing_offsets).T
def process_data_frame(self, data, frame_idx):
"""
:param data: A velodyne packet consisting of 12 (n) blocks and 24 (m) sequences and 16 firing pre sequence
:param frame_idx:
:return: X,Y,Z-coordinate, azimuth, intensitiy, timestamp of each firing, sequence ordered, shape of each = [384x1]
"""
data = np.frombuffer(data, dtype=np.uint8).astype(np.uint32)
# frame length = 1206
timestamp = read_uint32(data, 1200)
factory = data[1204:]
# veldyne has 12 blocks each 100 bytes data
# data-legth = 1206 bytes
blocks = data[0:1200].reshape(12, 100)
distances = []
intensities = []
azimuth_per_block = []
# iteratie through each block
for i, blk in enumerate(blocks):
dists, intens, angles = self.read_firing_data(blk)
distances.append(dists)
intensities.append(intens)
azimuth_per_block.append(angles)
azimuth_per_block = np.array(azimuth_per_block)
## Note: all these arrray have th same size, number of firing in one packet
azimuth = self.calc_precise_azimuth_2(azimuth_per_block).reshape(12, 32)
distances = np.array(distances)
intensities = np.array(intensities)
# now calculate the cartesian coordinate of each point
X, Y, Z = self.calc_cart_coord(distances, azimuth)
# calculating timestamp [microsec] of each firing
timestamps = timestamp + self.timing_offsets
X = X.flatten()
Y = Y.flatten()
Z = Z.flatten()
intensities = intensities.flatten()
azimuth = azimuth.flatten()
timestamps = timestamps.flatten()
distances = distances.flatten() * self.FACTOR_MM2CM * self.FACTOR_CM2M
# remeber the last azimuth for roll over checking
self.cur_azimuth = azimuth
return X, Y, Z, intensities, azimuth, timestamps, distances
def process_position_frame(self, data, frame_idx):
timestamp = data[198:202]
timestamp = read_uint32(timestamp, 0)
ppps_status = data[202]
gprmc = data[206:]
gprmc = gprmc.split()[0] # filter out gprmc message, remaining are zeros
gprmc = gprmc.decode('ascii').split(',') # convert bytes array to string
#print(gprmc)
gps_msg = GprmcMessage()
time = gprmc[1]
date = gprmc[9]
gps_msg.datetime = datetime.datetime.strptime(date + time, '%d%m%y%H%M%S')
gps_msg.status = gprmc[2]
gps_msg.lat = float(gprmc[3])
gps_msg.lat_ori = gprmc[4]
gps_msg.long = float(gprmc[5])
gps_msg.long_ori = gprmc[6]
gps_msg.velocity = float(gprmc[7])
gps_msg.course = float(gprmc[8])
gps_msg.mag = float(gprmc[10])
gps_msg.mag_sign = gprmc[11]
gps_msg.singularity = gprmc[12]
gps_msg.weeks, _, gps_msg.seconds, _ = utc_to_weekseconds(gps_msg.datetime, leapseconds=0)
return gps_msg
def calc_precise_azimuth(self, azimuth):
"""
Linear interpolation of azimuth values
:param azimuth:
:return:
"""
org_azi = azimuth.copy()
precision_azimuth = []
# iterate through each block
for n in range(12): # n=0..11
azimuth = org_azi.copy()
try:
# First, adjust for an Azimuth rollover from 359.99° to 0°
if azimuth[n + 1] < azimuth[n]:
azimuth[n + 1] += 360.
# Determine the azimuth Gap between data blocks
azimuth_gap = azimuth[n + 1] - azimuth[n]
except:
if azimuth[n] < azimuth[n-1]:
azimuth[n] += 360
azimuth_gap = azimuth[n] - azimuth[n-1]
factor = azimuth_gap / 32.
k = np.arange(32)
precise_azimuth = azimuth[n] + factor * k
precision_azimuth.append(precise_azimuth)
precision_azimuth = np.array(precision_azimuth)
return precision_azimuth
def calc_precise_azimuth_2(self, azimuth):
org_azi = azimuth.copy()
precision_azimuth = []
# iterate through each block
for n in range(12): # n=0..11
azimuth = org_azi.copy()
# Determine the azimuth Gap between data blocks
if n < 11:
# First, adjust for an Azimuth rollover from 359.99° to 0°
if azimuth[n + 1] < azimuth[n]:
azimuth[n + 1] += 360.
azimuth_gap = azimuth[n + 1] - azimuth[n]
else:
# Last block. Simply use last iteration `azimuth_gap`.
pass
# iterate through each firing
for k in range(32):
# Determine if you’re in the first or second firing sequence of the data block
if k < 16:
# Interpolate
precise_azimuth = azimuth[n] + (azimuth_gap * 2.304 * k) / (2 * 55.296)
else:
# Interpolate, but 55.296 us has passed.
precise_azimuth = azimuth[n] + (azimuth_gap * (55.296 + 2.304*(k - 16))) / (2 * 55.296)
if precise_azimuth >= 360.:
precise_azimuth -= 360.
#print(precise_azimuth)
precision_azimuth.append(precise_azimuth)
precision_azimuth = np.array(precision_azimuth)
return precision_azimuth
def read_firing_data(self, data):
block_id = data[0] + data[1]*256
# 0xeeff is upper block
assert block_id == 0xeeff
azimuth = (data[2] + data[3] * 256) / 100
firings = data[4:].reshape(32, 3)
distances = firings[:, 0] + firings[:, 1] * 256
intensities = firings[:, 2]
return distances, intensities, azimuth
def calc_cart_coord(self, distances, azimuth):
# convert distances to meters
distances = distances * self.FACTOR_MM2CM * self.FACTOR_CM2M
# convert deg to rad
longitudes = np.tile(self.omega * np.pi / 180., 2)
latitudes = azimuth * np.pi / 180.
hypotenuses = distances * np.cos(longitudes)
X = hypotenuses * np.sin(latitudes)
Y = hypotenuses * np.cos(latitudes)
Z = distances * np.sin(longitudes)
# Apply vertical correction for laser positions
Z += 1e-3 * np.tile(self.vcorr_mm, 2)
return X, Y, Z