Skip to content
This repository was archived by the owner on Feb 12, 2022. It is now read-only.

Commit 6266f8f

Browse files
Increment the error record count for messages with unknownfields
1 parent bd3e0a1 commit 6266f8f

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

src/main/java/com/gojek/beast/converter/RowMapper.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,14 @@ private Map<String, Object> getMappings(DynamicMessage message, ColumnMapping co
4141
if (message == null || columnMapping == null || columnMapping.isEmpty()) {
4242
return new HashMap<>();
4343
}
44-
if (failOnUnknownFields && message.getUnknownFields().asMap().size() > 0) {
45-
statsClient.increment("kafka.protobuf.unknownfields.errors");
44+
if (message.getUnknownFields().asMap().size() > 0) {
45+
statsClient.count("kafka.error.records.count,type=unknownfields," + statsClient.getBqTags(), 1);
4646
String serializedUnknownFields = message.getUnknownFields().asMap().keySet().toString();
4747
String serializedMessage = UnknownProtoFields.toString(message.toByteArray());
48-
throw new UnknownProtoFieldFoundException(serializedUnknownFields, serializedMessage);
48+
log.warn(String.format("[%s] unknown fields found in proto [%s], either update mapped protobuf or disable FAIL_ON_UNKNOWN_FIELDS",
49+
serializedUnknownFields, serializedMessage));
50+
if (failOnUnknownFields)
51+
throw new UnknownProtoFieldFoundException(serializedUnknownFields, serializedMessage);
4952
}
5053
Descriptors.Descriptor descriptorForType = message.getDescriptorForType();
5154

0 commit comments

Comments
 (0)