Skip to content

Commit

Permalink
Fix wrong offset being assigned to compressed messages closes SOHU-Co…
Browse files Browse the repository at this point in the history
  • Loading branch information
hyperlink authored Apr 6, 2019
1 parent 0462227 commit 51641a0
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 21 deletions.
27 changes: 17 additions & 10 deletions docker/createTopic.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,26 @@
const execa = require('execa');
const assert = require('assert');

function createTopic (topicName, partitions, replicas) {
function createTopic (topicName, partitions, replicas, config = '') {
assert(topicName);
assert(partitions && partitions > 0);
assert(replicas && replicas > 0);
const topic = `${topicName}:${partitions}:${replicas}`;
const createResult = execa('docker-compose', [
'exec',
'-T',
'kafka',
'bash',
'-c',
`KAFKA_CREATE_TOPICS=${topic} KAFKA_PORT=9092 /usr/bin/create-topics.sh`
]);

const args = ['exec', '-T', 'kafka', 'bash', '-c'];

if (process.env.KAFKA_VERSION === '0.9') {
const topic = `${topicName}:${partitions}:${replicas}`;
args.push(`KAFKA_CREATE_TOPICS=${topic} KAFKA_PORT=9092 /usr/bin/create-topics.sh`);
} else {
if (config) {
config = ` --config ${config}`;
}
args.push(
`/opt/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --topic ${topicName} --partitions ${partitions} --replication-factor ${replicas} ${config}`
);
}

const createResult = execa('docker-compose', args);
// createResult.stdout.pipe(process.stdout);
return createResult;
}
Expand Down
23 changes: 12 additions & 11 deletions lib/protocol/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,13 @@ function _decodeFetchResponse (resp, cb, maxTickMessages, version) {
}
},
highWaterOffset,
topics,
0 // base offset
topics
);
});
}
}

function decodeMessageSet (topic, partition, messageSet, enqueue, emit, highWaterOffset, topics, base) {
function decodeMessageSet (topic, partition, messageSet, enqueue, emit, highWaterOffset, topics) {
const messageSetSize = messageSet.length;
// TODO: this is broken logic. It overwrites previous partitions HWO.
// Need to refactor this on next major API bump
Expand Down Expand Up @@ -363,19 +362,21 @@ function decodeMessageSet (topic, partition, messageSet, enqueue, emit, highWate
}

if (vars.attributes === 0 && vars.messageSize > messageSetSize) {
const offset = vars.offset + (base || 0);
return enqueue((next) => {
emit(new MessageSizeTooLarge({
topic: topic,
offset: offset,
partition: partition
}));
const offset = vars.offset;
return enqueue(next => {
emit(
new MessageSizeTooLarge({
topic: topic,
offset: offset,
partition: partition
})
);
next(null);
});
}

if (!partial && vars.offset !== null) {
const offset = vars.offset + (base || 0);
const offset = vars.offset;
const value = vars.value;
const key = vars.key;
var codec = getCodec(vars.attributes);
Expand Down
45 changes: 45 additions & 0 deletions test/test.consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ var TopicsNotExistError = require(libPath + 'errors').TopicsNotExistError;
var FakeClient = require('./mocks/mockClient');
var InvalidConfigError = require('../lib/errors/InvalidConfigError');

const createTopic = require('../docker/createTopic');
const sendMessage = require('./helpers/sendMessage');
const _ = require('lodash');

var client, producer, offset;

var TOPIC_POSTFIX = '_test_' + Date.now();
Expand Down Expand Up @@ -130,6 +134,47 @@ describe('Consumer', function () {
});
});

describe('Compression', function () {
let topic, messages;

before(function () {
if (process.env.KAFKA_VERSION === '0.9') {
this.skip();
}

topic = uuid.v4();
messages = _.times(10, uuid.v4);
return createTopic(topic, 1, 1, 'compression.type=gzip').then(function () {
return new Promise(function (resolve, reject) {
sendMessage(messages, topic, function (error) {
if (error) {
return reject(error);
}
resolve();
});
});
});
});

it('should not throw offsetOutOfRange error', function (done) {
const client = new Client({ kafkaHost: '127.0.0.1:9092' });
const consumer = new Consumer(client, [
{
topic,
partition: 0
}
]);
consumer.on('offsetOutOfRange', done);
consumer.on('message', function (message) {
if (_.pull(messages, message.value).length === 0) {
setTimeout(function () {
consumer.close(done);
}, 50);
}
});
});
});

describe('validate groupId', function () {
function validateThrowsInvalidConfigError (groupId) {
should.throws(function () {
Expand Down

0 comments on commit 51641a0

Please sign in to comment.