We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent 998de4f commit ca11207Copy full SHA for ca11207
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -196,7 +196,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
196
PCollection<GenericRecord> kafkaValues =
197
input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create());
198
199
- assert kafkaValues.getCoder().getClass() == AvroCoder.class;
+ assert kafkaValues.getCoder() instanceof AvroCoder;
200
AvroCoder<GenericRecord> coder = (AvroCoder<GenericRecord>) kafkaValues.getCoder();
201
kafkaValues = kafkaValues.setCoder(AvroUtils.schemaCoder(coder.getSchema()));
202
return PCollectionRowTuple.of("output", kafkaValues.apply(Convert.toRows()));
0 commit comments