Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Merge pull request #79 from odpf/unknown_fields_add_metrics
Browse files Browse the repository at this point in the history
Increment the error record count for messages with unknownfields
  • Loading branch information
fzrvic authored Jul 22, 2021
2 parents 954e3d1 + 6266f8f commit 002ef57
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions src/main/java/com/gojek/beast/converter/RowMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@ private Map<String, Object> getMappings(DynamicMessage message, ColumnMapping co
if (message == null || columnMapping == null || columnMapping.isEmpty()) {
return new HashMap<>();
}
if (failOnUnknownFields && message.getUnknownFields().asMap().size() > 0) {
statsClient.increment("kafka.protobuf.unknownfields.errors");
if (message.getUnknownFields().asMap().size() > 0) {
statsClient.count("kafka.error.records.count,type=unknownfields," + statsClient.getBqTags(), 1);
String serializedUnknownFields = message.getUnknownFields().asMap().keySet().toString();
String serializedMessage = UnknownProtoFields.toString(message.toByteArray());
throw new UnknownProtoFieldFoundException(serializedUnknownFields, serializedMessage);
log.warn(String.format("[%s] unknown fields found in proto [%s], either update mapped protobuf or disable FAIL_ON_UNKNOWN_FIELDS",
serializedUnknownFields, serializedMessage));
if (failOnUnknownFields)
throw new UnknownProtoFieldFoundException(serializedUnknownFields, serializedMessage);
}
Descriptors.Descriptor descriptorForType = message.getDescriptorForType();

Expand Down

0 comments on commit 002ef57

Please sign in to comment.