diff --git a/plugin/reader/kafkareader/src/main/java/com/wgzhao/addax/plugin/reader/kafkareader/KafkaReader.java b/plugin/reader/kafkareader/src/main/java/com/wgzhao/addax/plugin/reader/kafkareader/KafkaReader.java index 473f924fb..cc1551848 100644 --- a/plugin/reader/kafkareader/src/main/java/com/wgzhao/addax/plugin/reader/kafkareader/KafkaReader.java +++ b/plugin/reader/kafkareader/src/main/java/com/wgzhao/addax/plugin/reader/kafkareader/KafkaReader.java @@ -11,6 +11,8 @@ import com.wgzhao.addax.common.plugin.RecordSender; import com.wgzhao.addax.common.spi.Reader; import com.wgzhao.addax.common.util.Configuration; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -81,7 +83,7 @@ public void init() this.columns = configuration.getList(KafkaKey.COLUMN, String.class); this.missKeyValue = configuration.getString(KafkaKey.MISSING_KEY_VALUE, null); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokeLists); - properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID + "-" + RandomStringUtils.randomAlphanumeric(5)); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID); @@ -96,47 +98,16 @@ public void init() @Override public void destroy() { - + kafkaConsumer.close(); } @Override public void startRead(RecordSender recordSender) { - try { - while (true) { - ConsumerRecords items = kafkaConsumer.poll(Duration.ofSeconds(100)); - for (ConsumerRecord item : items) { - Record record = recordSender.createRecord(); - logger.debug("topic = {}, partition = {}, offset = {}, kafkaConsumer = {}, country = {}%n", - item.topic(), item.partition(), item.offset(), - item.key(), item.value()); - final JSONObject jsonObject = JSONObject.parseObject(item.value().toString()); - if (columns.size() == 1 && "*".equals(columns.get(0))) { - //assume all json value type is string - for (String key : jsonObject.keySet()) { - record.addColumn(new StringColumn(jsonObject.getString(key))); - } - } - else { - for (String col : columns) { - if (!jsonObject.containsKey(col)) { - if (this.missKeyValue == null) { - throw AddaxException.asAddaxException(CONFIG_ERROR, - "The column " + col + " not exists"); - } - record.addColumn(new StringColumn(this.missKeyValue)); - } - else { - record.addColumn(guessColumnType(jsonObject.get(col))); - } - } - } - recordSender.sendToWriter(record); - } - } - } - finally { - kafkaConsumer.close(); + while (true) { + ConsumerRecords items = kafkaConsumer.poll(Duration.ofSeconds(2)); + sendData(items, recordSender); + recordSender.flush(); } } @@ -153,5 +124,37 @@ private Column guessColumnType(Object obj) } return new StringColumn(obj.toString()); } + + private void sendData(ConsumerRecords items, RecordSender recordSender) + { + for (ConsumerRecord item : items) { + Record record = recordSender.createRecord(); + logger.debug("topic = {}, partition = {}, offset = {}, kafkaConsumer = {}, country = {}%n", + item.topic(), item.partition(), item.offset(), + item.key(), item.value()); + final JSONObject jsonObject = JSONObject.parseObject(item.value().toString()); + if (columns.size() == 1 && "*".equals(columns.get(0))) { + //assume all json value type is string + for (String key : jsonObject.keySet()) { + record.addColumn(new StringColumn(jsonObject.getString(key))); + } + } + else { + for (String col : columns) { + if (!jsonObject.containsKey(col)) { + if (this.missKeyValue == null) { + throw AddaxException.asAddaxException(CONFIG_ERROR, + "The column " + col + " not exists"); + } + record.addColumn(new StringColumn(this.missKeyValue)); + } + else { + record.addColumn(guessColumnType(jsonObject.get(col))); + } + } + } + recordSender.sendToWriter(record); + } + } } }