Skip to content

Commit

Permalink
[bugfix][plugin][kafkareader] Resolve the issue of consumed data not …
Browse files Browse the repository at this point in the history
…being output by invoking the flush method

try to fix #1053
  • Loading branch information
wgzhao committed Sep 24, 2024
1 parent d057a05 commit 0750c8b
Showing 1 changed file with 40 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.wgzhao.addax.common.plugin.RecordSender;
import com.wgzhao.addax.common.spi.Reader;
import com.wgzhao.addax.common.util.Configuration;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -81,7 +83,7 @@ public void init()
this.columns = configuration.getList(KafkaKey.COLUMN, String.class);
this.missKeyValue = configuration.getString(KafkaKey.MISSING_KEY_VALUE, null);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokeLists);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID + "-" + RandomStringUtils.randomAlphanumeric(5));
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, CLIENT_ID);
Expand All @@ -96,47 +98,16 @@ public void init()
@Override
public void destroy()
{

kafkaConsumer.close();
}

@Override
public void startRead(RecordSender recordSender)
{
try {
while (true) {
ConsumerRecords<String, Object> items = kafkaConsumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, Object> item : items) {
Record record = recordSender.createRecord();
logger.debug("topic = {}, partition = {}, offset = {}, kafkaConsumer = {}, country = {}%n",
item.topic(), item.partition(), item.offset(),
item.key(), item.value());
final JSONObject jsonObject = JSONObject.parseObject(item.value().toString());
if (columns.size() == 1 && "*".equals(columns.get(0))) {
//assume all json value type is string
for (String key : jsonObject.keySet()) {
record.addColumn(new StringColumn(jsonObject.getString(key)));
}
}
else {
for (String col : columns) {
if (!jsonObject.containsKey(col)) {
if (this.missKeyValue == null) {
throw AddaxException.asAddaxException(CONFIG_ERROR,
"The column " + col + " not exists");
}
record.addColumn(new StringColumn(this.missKeyValue));
}
else {
record.addColumn(guessColumnType(jsonObject.get(col)));
}
}
}
recordSender.sendToWriter(record);
}
}
}
finally {
kafkaConsumer.close();
while (true) {
ConsumerRecords<String, Object> items = kafkaConsumer.poll(Duration.ofSeconds(2));
sendData(items, recordSender);
recordSender.flush();
}
}

Expand All @@ -153,5 +124,37 @@ private Column guessColumnType(Object obj)
}
return new StringColumn(obj.toString());
}

private void sendData(ConsumerRecords<String, Object> items, RecordSender recordSender)
{
for (ConsumerRecord<String, Object> item : items) {
Record record = recordSender.createRecord();
logger.debug("topic = {}, partition = {}, offset = {}, kafkaConsumer = {}, country = {}%n",
item.topic(), item.partition(), item.offset(),
item.key(), item.value());
final JSONObject jsonObject = JSONObject.parseObject(item.value().toString());
if (columns.size() == 1 && "*".equals(columns.get(0))) {
//assume all json value type is string
for (String key : jsonObject.keySet()) {
record.addColumn(new StringColumn(jsonObject.getString(key)));
}
}
else {
for (String col : columns) {
if (!jsonObject.containsKey(col)) {
if (this.missKeyValue == null) {
throw AddaxException.asAddaxException(CONFIG_ERROR,
"The column " + col + " not exists");
}
record.addColumn(new StringColumn(this.missKeyValue));
}
else {
record.addColumn(guessColumnType(jsonObject.get(col)));
}
}
}
recordSender.sendToWriter(record);
}
}
}
}

0 comments on commit 0750c8b

Please sign in to comment.