Skip to content

Commit d9fc6a9

Browse files
committed
Adds processor
1 parent 762df5c commit d9fc6a9

File tree

3 files changed

+164
-0
lines changed

3 files changed

+164
-0
lines changed

README.md

+3
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,6 @@ ullala is a demo that showcase using kafka as a data pipeline and log aggregatio
1212
5. Symlink opencv into capturer, for mac you can run `scripts/symlink-opencv-bew.sh`
1313
6. Install capturer dependencies `pip install -r capturer/requirements.txt`
1414
6. Run Capturer: `KAFKA_HOSTS=KAFKA:9092 capturer/capturer.py`
15+
7. Install processor dependencies `cd processor && npm insall`
16+
8. Run processor: `cd processor && KAFKA_HOSTS=kafka:9092 node index.js`
17+

processor/index.js

+135
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
'use strict';
2+
3+
const fs = require('fs');
4+
const kafka = require('no-kafka');
5+
const jimp = require('jimp');
6+
const avro = require('avsc');
7+
const kafka_hosts = process.env.KAFKA_HOSTS;
8+
9+
if (!kafka_hosts) {
10+
console.error('No $KAFKA_HOSTS environment variable set');
11+
process.exit(1);
12+
}
13+
14+
const avroType = avro.parse({
15+
name: 'Image',
16+
type: 'record',
17+
fields: [
18+
{name: 'image', type: 'bytes'},
19+
{name: 'capture_timestamp', type: 'int'}
20+
]
21+
});
22+
23+
const workerId = Math.floor(Math.random() * 10000) + 1;
24+
25+
const consumer = new kafka.GroupConsumer({
26+
connectionString: kafka_hosts,
27+
groupId: 'IMAGE_PROCESSOR_I',
28+
clientId: `IMAGE_PROCESSOR_${workerId}`,
29+
logger: {
30+
logFunction: info
31+
}
32+
});
33+
34+
const producer = new kafka.Producer({
35+
connectionString: kafka_hosts
36+
});
37+
38+
function init() {
39+
producer.init();
40+
consumer.init([{
41+
strategy: 'DEFAULT_STRATEGY',
42+
subscriptions: ['CAMERA_FEED'],
43+
handler: handleMessages
44+
}]);
45+
}
46+
47+
function processImage(packedImage, callback) {
48+
jimp.read(packedImage.image, (error, image) => {
49+
if (error) {
50+
error(`ERROR: ${error}`);
51+
return;
52+
}
53+
image.scale(0.25)
54+
.blur(3)
55+
.sepia()
56+
.getBuffer(jimp.MIME_JPEG, (error, image) => {
57+
if (error) {
58+
callback(error);
59+
} else {
60+
packedImage.image = image;
61+
callback(null, packedImage);
62+
}
63+
});
64+
});
65+
}
66+
67+
function handleMessages(messageSet, topic, partition) {
68+
for (const message of messageSet) {
69+
processImage(avroType.fromBuffer(message.message.value), (error, processedImage) => {
70+
if (error) {
71+
error(`ERROR: ${error}`);
72+
return;
73+
}
74+
outputImage(processedImage, partition, message.offset);
75+
consumer.commitOffset({topic: topic, partition: partition, offset: message.offset});
76+
});
77+
}
78+
}
79+
80+
function getConsumerLag(captureTimestamp) {
81+
const captureDate = new Date(0);
82+
captureDate.setUTCSeconds(captureTimestamp);
83+
return (new Date() - captureDate) / 1000;
84+
}
85+
86+
function outputImage(packedImage, partition, offset) {
87+
const consumerLag = getConsumerLag(packedImage.capture_timestamp)
88+
producer.send({
89+
topic: 'PROCESSED_FEED',
90+
message: {
91+
value: avroType.toBuffer(packedImage)
92+
},
93+
partition: 0
94+
});
95+
logConsumerLag(`Image ${partition}_${offset} written with consumer lag: ${consumerLag}`, consumerLag);
96+
}
97+
98+
function info() {
99+
const argumentList = Array.prototype.slice.call(arguments);
100+
console.log.apply(console, argumentList);
101+
log('INFO', argumentList.join(' '));
102+
}
103+
104+
function error(string) {
105+
const argumentList = Array.prototype.slice.call(arguments);
106+
console.error.apply(console, argumentList);
107+
log('ERROR', argumentList.join(' '));
108+
}
109+
110+
function logConsumerLag(string, consumerLag) {
111+
console.log(string);
112+
log('INFO', string, consumerLag);
113+
}
114+
115+
function log(level, string, consumerLag) {
116+
const logPayload = {
117+
'@timestamp': new Date().toISOString(),
118+
'level': level,
119+
'message': string,
120+
'application': 'IMAGE_PROCESSOR',
121+
'worker': workerId
122+
};
123+
if (consumerLag) {
124+
logPayload.consumerLag = consumerLag;
125+
}
126+
producer.send({
127+
topic: 'APPLICATION_LOGS',
128+
message: {
129+
value: JSON.stringify(logPayload)
130+
},
131+
partition: 0
132+
});
133+
}
134+
135+
init();

processor/package.json

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
{
2+
"name": "processor",
3+
"version": "0.0.1",
4+
"description": "Processes images from kafka and outputs them back into kafka",
5+
"main": "index.js",
6+
"scripts": {
7+
"start": "node index.js"
8+
},
9+
"author": {
10+
"name": "Alberto Avila",
11+
"email": "[email protected]"
12+
},
13+
"license": "MIT",
14+
"private": false,
15+
"repository": {
16+
"type": "git",
17+
"url": "[email protected]:albertein/pictofun.git"
18+
},
19+
"devDependencies": {
20+
},
21+
"dependencies": {
22+
"no-kafka": "^2.5.6",
23+
"jimp": "^0.2.24",
24+
"avsc": "^4.1.6"
25+
}
26+
}

0 commit comments

Comments
 (0)