Skip to content

Commit

Permalink
Fix create topics sync vs async (SOHU-Co#519)
Browse files Browse the repository at this point in the history
* Add createTopics tests

* Fixes SOHU-Co#518
  • Loading branch information
hyperlink authored Nov 18, 2016
1 parent b301908 commit d0c1ffb
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 130 deletions.
67 changes: 36 additions & 31 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -357,48 +357,53 @@ Client.prototype.createTopics = function (topics, isAsync, cb) {
cb = isAsync;
isAsync = true;
}

try {
validateKafkaTopics(topics);
} catch (e) {
if (isAsync) return cb(e);
throw e;
}
var self = this;
// first, load metadata to create topics
this.loadMetadataForTopics(topics, function (err, resp) {
if (err) return cb(err);
if (isAsync) return cb(null, 'All requests sent');
var topicMetadata = resp[1].metadata;
// ommit existed topics
var existed = Object.keys(topicMetadata);
var topicsNotExists = topics.filter(function (topic) {
return !~existed.indexOf(topic);

cb = _.once(cb);

const getTopicsFromKafka = (topics, callback) => {
this.loadMetadataForTopics(topics, function (error, resp) {
if (error) {
return callback(error);
}
callback(null, Object.keys(resp[1].metadata));
});
};

function attemptCreateTopics (topics, cb) {
var operation = retry.operation({ minTimeout: 200, maxTimeout: 2000 });
operation.attempt(function (currentAttempt) {
debug('create topics currentAttempt', currentAttempt);
self.loadMetadataForTopics(topics, function (err, resp) {
if (resp) {
var topicMetadata = resp[1].metadata;
var created = Object.keys(topicMetadata).length === topics.length;
if (!created) err = new Error('Topic creation pending');
}
if (operation.retry(err)) {
return;
}

cb(err, 'All created');
});
});
}
const operation = retry.operation({ minTimeout: 200, maxTimeout: 2000 });

if (!topicsNotExists.length) return cb(null, 'All created');
operation.attempt(currentAttempt => {
debug('create topics currentAttempt', currentAttempt);
getTopicsFromKafka(topics, function (error, kafkaTopics) {
if (error) {
if (operation.retry(error)) {
return;
}
}

debug('kafka reported topics', kafkaTopics);
const left = _.difference(topics, kafkaTopics);
if (left.length === 0) {
debug(`Topics created ${kafkaTopics}`);
return cb(null);
}

debug('create topic by sending metadata request');
attemptCreateTopics(topicsNotExists, cb);
debug(`Topics left ${left.join(', ')}`);
if (!operation.retry(new Error(`Topics not created ${left}`))) {
cb(operation.mainError());
}
});
});

if (!isAsync) {
cb(null);
}
};

/**
Expand Down
2 changes: 1 addition & 1 deletion run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
source start-docker.sh
export KAFKA_TEST_HOST=$DOCKER_VM_IP
echo "KAFKA_TEST_HOST: $KAFKA_TEST_HOST"
./node_modules/.bin/istanbul cover _mocha -- -t 15000 test/**/test.*js test/test.*js
./node_modules/.bin/istanbul cover _mocha -- -t 20000 test/**/test.*js test/test.*js
58 changes: 58 additions & 0 deletions test/test.client.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
'use strict';

var host = process.env['KAFKA_TEST_HOST'] || '';
var kafka = require('..');
var Client = kafka.Client;
Expand All @@ -9,6 +11,8 @@ var InvalidConfigError = require('../lib/errors/InvalidConfigError');
var proxyquire = require('proxyquire').noCallThru();
var sinon = require('sinon');
var retry = require('retry');
const _ = require('lodash');
const async = require('async');

describe('Client', function () {
var client = null;
Expand Down Expand Up @@ -368,6 +372,60 @@ describe('Client', function () {
});
});

describe('#createTopics', function () {
function verifyTopics (topics, callback) {
async.each(topics, function (topic, callback) {
client.zk.topicExists(topic, function (error, exists, topic) {
if (error) {
return callback(error);
}
exists.should.be.true;
callback();
});
}, callback);
}

it('should create given kafka topics', function (done) {
const topics = _.times(3, uuid.v4);
client.createTopics(topics, true, function (error) {
if (error) {
return done(error);
}
verifyTopics(topics, done);
});
});

it('should yield synchronously', function (done) {
let called = false;
const topics = _.times(3, uuid.v4);
client.createTopics(topics, false, function (error) {
if (error) {
return done(error);
}
if (!called) {
return done();
}
done('Called asynchronously');
});
called = true;
});

it('should yield asynchronously', function (done) {
let called = false;
const topics = _.times(3, uuid.v4);
client.createTopics(topics, true, function (error) {
if (error) {
return done(error);
}
if (called) {
return done();
}
done('Called synchronously');
});
called = true;
});
});

describe('#reconnectBroker', function () {
var emptyFn = function () {};

Expand Down
17 changes: 6 additions & 11 deletions test/test.consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -204,18 +204,13 @@ describe('Consumer', function () {
EXISTS_TOPIC_2,
EXISTS_GZIP,
EXISTS_SNAPPY
], false, function (err, created) {
], true, function (err) {
if (err) return done(err);

function useNewTopics () {
producer.send([
{ topic: EXISTS_TOPIC_2, messages: 'hello kafka' },
{ topic: EXISTS_GZIP, messages: 'hello gzip', attributes: 1 },
{ topic: EXISTS_SNAPPY, messages: SNAPPY_MESSAGE, attributes: 2 }
], done);
}
// Ensure leader selection happened
setTimeout(useNewTopics, 1000);
producer.send([
{ topic: EXISTS_TOPIC_2, messages: 'hello kafka' },
{ topic: EXISTS_GZIP, messages: 'hello gzip', attributes: 1 },
{ topic: EXISTS_SNAPPY, messages: SNAPPY_MESSAGE, attributes: 2 }
], done);
});
});
});
Expand Down
44 changes: 2 additions & 42 deletions test/test.highlevelProducer.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ var client, producer, noAckProducer, producerKeyed;

var host = process.env['KAFKA_TEST_HOST'] || '';

// Helper method
function randomId () {
return Math.floor(Math.random() * 10000);
}

[
{
name: 'PLAINTEXT HighLevelProducer'
Expand Down Expand Up @@ -45,9 +40,9 @@ function randomId () {

producer.on('ready', function () {
producerKeyed.on('ready', function () {
producer.createTopics([EXISTS_TOPIC_3], false, function (err, created) {
producer.createTopics([EXISTS_TOPIC_3], true, function (err) {
if (err) return done(err);
setTimeout(done, 500);
done();
});
});
});
Expand Down Expand Up @@ -151,41 +146,6 @@ function randomId () {
});
});

describe('#createTopics', function () {
var client, producer;

before(function (done) {
client = new Client(host);
producer = new HighLevelProducer(client);
producer.on('ready', done);
});

after(function (done) {
producer.close(done);
});

it('should return All requests sent when async is true', function (done) {
producer.createTopics(['_exist_topic_' + randomId() + '_test'], true, function (err, data) {
data.should.equal('All requests sent');
done(err);
});
});

it('async should be true if not present', function (done) {
producer.createTopics(['_exist_topic_' + randomId() + '_test'], function (err, data) {
data.should.equal('All requests sent');
done(err);
});
});

it('should return All created when async is false', function (done) {
producer.createTopics(['_exist_topic_' + randomId() + '_test'], false, function (err, data) {
data.should.equal('All created');
done(err);
});
});
});

describe('#close', function () {
var client, producer;

Expand Down
2 changes: 1 addition & 1 deletion test/test.offset.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe('Offset', function () {
client = new Client(host);
producer = new Producer(client);
producer.on('ready', function () {
producer.createTopics(['_exist_topic_3_test'], false, function (err, created) {
producer.createTopics(['_exist_topic_3_test'], true, function (err) {
done(err);
});
});
Expand Down
44 changes: 2 additions & 42 deletions test/test.producer.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ var client, producer, noAckProducer, producerKeyed;

var host = process.env['KAFKA_TEST_HOST'] || '';

// Helper method
function randomId () {
return Math.floor(Math.random() * 10000);
}

[
{
name: 'PLAINTEXT Producer'
Expand Down Expand Up @@ -45,9 +40,9 @@ function randomId () {

producer.on('ready', function () {
producerKeyed.on('ready', function () {
producer.createTopics([EXISTS_TOPIC_3], false, function (err, created) {
producer.createTopics([EXISTS_TOPIC_3], true, function (err) {
if (err) return done(err);
setTimeout(done, 500);
done();
});
});
});
Expand Down Expand Up @@ -198,41 +193,6 @@ function randomId () {
});
});

describe('#createTopics', function () {
var client, producer;

before(function (done) {
client = new Client(host);
producer = new Producer(client);
producer.on('ready', done);
});

after(function (done) {
producer.close(done);
});

it('should return All requests sent when async is true', function (done) {
producer.createTopics(['_exist_topic_' + randomId() + '_test'], true, function (err, data) {
data.should.equal('All requests sent');
done(err);
});
});

it('async should be true if not present', function (done) {
producer.createTopics(['_exist_topic_' + randomId() + '_test'], function (err, data) {
data.should.equal('All requests sent');
done(err);
});
});

it('should return All created when async is false', function (done) {
producer.createTopics(['_exist_topic_' + randomId() + '_test'], false, function (err, data) {
data.should.equal('All created');
done(err);
});
});
});

describe('#close', function () {
var client, producer;

Expand Down
4 changes: 2 additions & 2 deletions test/test.producerBatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ describe('No Ack Producer', function () {
producer = new Producer(client);
batchProducer = new Producer(batchClient);
producer.on('ready', function () {
producer.createTopics([EXISTS_TOPIC_4], false, function (err, created) {
producer.createTopics([EXISTS_TOPIC_4], true, function (err) {
if (err) return callback(err);
setTimeout(callback, 500);
callback();
});
broker = Object.keys(client.brokers)[0];
});
Expand Down

0 comments on commit d0c1ffb

Please sign in to comment.