|
| 1 | +from __future__ import print_function |
| 2 | + |
| 3 | +import io |
| 4 | +import sys |
| 5 | +import logging |
| 6 | +import calendar |
| 7 | +from os import getenv |
| 8 | +from time import sleep |
| 9 | +from datetime import datetime |
| 10 | + |
| 11 | +import avro.io |
| 12 | +import avro.schema |
| 13 | +from SimpleCV import Camera |
| 14 | +from kafka import KafkaProducer |
| 15 | + |
| 16 | +def get_timestamp(): |
| 17 | + timestamp = datetime.now() |
| 18 | + return '{0}:{1}:{2}'.format(timestamp.hour, timestamp.minute, timestamp.second) |
| 19 | + |
| 20 | +def run_capturer(kafka_hosts, fps=24): |
| 21 | + producer = KafkaProducer(bootstrap_servers=kafka_hosts) |
| 22 | + cam = Camera() |
| 23 | + while True: |
| 24 | + img = cam.getImage() |
| 25 | + img.drawText(get_timestamp(), fontsize=160) |
| 26 | + img.save('tmp.jpg') |
| 27 | + with open('tmp.jpg', mode='rb') as file: |
| 28 | + content = file.read() |
| 29 | + producer.send('CAMERA_FEED', pack_image(content)) |
| 30 | + print('Got an image') |
| 31 | + sleep(0.4) |
| 32 | + |
| 33 | +def pack_image(image): |
| 34 | + schema_string = """ |
| 35 | + { |
| 36 | + "namespace": "ullala", |
| 37 | + "type": "record", |
| 38 | + "name": "Image", |
| 39 | + "fields": [ |
| 40 | + {"name": "image", "type": "bytes"}, |
| 41 | + {"name": "capture_timestamp", "type": "int"} |
| 42 | + ] |
| 43 | + } |
| 44 | + """ |
| 45 | + schema = avro.schema.parse(schema_string) |
| 46 | + capture_timestamp = calendar.timegm(datetime.utcnow().utctimetuple()) |
| 47 | + |
| 48 | + writer = avro.io.DatumWriter(schema) |
| 49 | + |
| 50 | + bytes_writer = io.BytesIO() |
| 51 | + encoder = avro.io.BinaryEncoder(bytes_writer) |
| 52 | + |
| 53 | + writer.write({"image": image, "capture_timestamp": capture_timestamp}, encoder) |
| 54 | + |
| 55 | + return bytes_writer.getvalue() |
| 56 | + |
| 57 | + |
| 58 | +if __name__ == '__main__': |
| 59 | + logging.getLogger().setLevel(logging.DEBUG) |
| 60 | + kafka_hosts = getenv('KAFKA_HOSTS') |
| 61 | + if not kafka_hosts: |
| 62 | + print('You need to set $KAFKA_HOSTS environment variable', file=sys.stderr) |
| 63 | + exit(1) |
| 64 | + run_capturer(kafka_hosts, fps=24) |
0 commit comments