-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinsert_gps_data.py
executable file
·91 lines (72 loc) · 2.3 KB
/
insert_gps_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#!/usr/bin/env python
import io
import sys
import re
import struct
import pprint
import sys
import time
import logging
import pymysql.cursors
from pymysql import MySQLError
import avro.schema
import avro.io
from kafka import KafkaConsumer
from datetime import datetime
from struct import *
# specify timestamp format
fmt = "%Y-%m-%d %H:%M:%S"
# read in the password
with open('/home/yang/isoblue-db/passwd') as f:
_db_passwd = f.readline().strip('\n')
# connect to the database
conn = pymysql.connect(host='localhost', \
user='isoblue', \
password=_db_passwd,
db='isoblueData')
cursor = conn.cursor()
# setup kafka consumer
## setting group_id to `None` to force it to consume from the latest, i.e.,
## 'no memory' of what has been consumed
consumer = KafkaConsumer('remote', group_id=None)
# avro schema path
schema_path = '/home/yang/isoblue-db/schema/gps.avsc'
# load avro schema
schema = avro.schema.parse(open(schema_path).read())
try:
for message in consumer:
splited_keys = message.key.split(':')
if splited_keys[0] != 'gps':
continue
isoblue_id = splited_keys[1]
# Setup avro decoder
bytes_reader = io.BytesIO(message.value)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
gps_msg = reader.read(decoder)
# print gps_msg
if gps_msg['object_name'] == 'TPV':
gps_data = gps_msg['object']
print gps_data
if gps_data['time'] is None:
continue
else:
t = datetime.utcfromtimestamp(gps_data['time'])
# Setup mysql query
sql = 'INSERT INTO `gps` \
(`isoblue_id`, `ts`, `lat`, `lon`, `alt`, `speed`) \
VALUES (%s, %s, %s, %s, %s, %s)'
# Excute the insert
try:
cursor.execute(sql, \
(isoblue_id, \
t.strftime(fmt), \
gps_data['lat'], \
gps_data['lon'], \
gps_data['lat'], \
gps_data['speed']))
conn.commit()
except MySQLError as e:
print 'INSERT FAILED'
except KeyboardInterrupt:
conn.close()