Skip to content

Commit 9067657

Browse files
authored
[HUDI-2487] Fix JsonKafkaSource cannot filter empty messages from kafka (apache#3715)
1 parent 36be287 commit 9067657

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,11 @@ protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> lastCheckpoint
6969

7070
private JavaRDD<String> toRDD(OffsetRange[] offsetRanges) {
7171
return KafkaUtils.createRDD(sparkContext, offsetGen.getKafkaParams(), offsetRanges,
72-
LocationStrategies.PreferConsistent()).map(x -> (String) x.value());
72+
LocationStrategies.PreferConsistent()).filter(x -> {
73+
String msgValue = (String) x.value();
74+
//Filter null messages from Kafka to prevent Exceptions
75+
return msgValue != null;
76+
}).map(x -> (String) x.value());
7377
}
7478

7579
@Override

hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java

+22
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,28 @@ public void testJsonKafkaSource() {
151151
assertEquals(Option.empty(), fetch4AsRows.getBatch());
152152
}
153153

154+
// test whether empty messages can be filtered
155+
@Test
156+
public void testJsonKafkaSourceFilterNullMsg() {
157+
// topic setup.
158+
testUtils.createTopic(TEST_TOPIC_NAME, 2);
159+
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
160+
TypedProperties props = createPropsForJsonSource(null, "earliest");
161+
162+
Source jsonSource = new JsonKafkaSource(props, jsc, sparkSession, schemaProvider, metrics);
163+
SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
164+
165+
// 1. Extract without any checkpoint => get all the data, respecting sourceLimit
166+
assertEquals(Option.empty(), kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch());
167+
// Send 1000 non-null messages to Kafka
168+
testUtils.sendMessages(TEST_TOPIC_NAME, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
169+
// Send 100 null messages to Kafka
170+
testUtils.sendMessages(TEST_TOPIC_NAME,new String[100]);
171+
InputBatch<JavaRDD<GenericRecord>> fetch1 = kafkaSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE);
172+
// Verify that messages with null values are filtered
173+
assertEquals(1000, fetch1.getBatch().get().count());
174+
}
175+
154176
// test case with kafka offset reset strategy
155177
@Test
156178
public void testJsonKafkaSourceResetStrategy() {

0 commit comments

Comments
 (0)