From 6266f8ffbe944fc168ee6223a18fcf28e05a49a1 Mon Sep 17 00:00:00 2001 From: Sravan Date: Thu, 22 Jul 2021 16:09:17 +0530 Subject: [PATCH] Increment the error record count for messages with unknownfields --- src/main/java/com/gojek/beast/converter/RowMapper.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/gojek/beast/converter/RowMapper.java b/src/main/java/com/gojek/beast/converter/RowMapper.java index caa8976..2f64296 100644 --- a/src/main/java/com/gojek/beast/converter/RowMapper.java +++ b/src/main/java/com/gojek/beast/converter/RowMapper.java @@ -41,11 +41,14 @@ private Map getMappings(DynamicMessage message, ColumnMapping co if (message == null || columnMapping == null || columnMapping.isEmpty()) { return new HashMap<>(); } - if (failOnUnknownFields && message.getUnknownFields().asMap().size() > 0) { - statsClient.increment("kafka.protobuf.unknownfields.errors"); + if (message.getUnknownFields().asMap().size() > 0) { + statsClient.count("kafka.error.records.count,type=unknownfields," + statsClient.getBqTags(), 1); String serializedUnknownFields = message.getUnknownFields().asMap().keySet().toString(); String serializedMessage = UnknownProtoFields.toString(message.toByteArray()); - throw new UnknownProtoFieldFoundException(serializedUnknownFields, serializedMessage); + log.warn(String.format("[%s] unknown fields found in proto [%s], either update mapped protobuf or disable FAIL_ON_UNKNOWN_FIELDS", + serializedUnknownFields, serializedMessage)); + if (failOnUnknownFields) + throw new UnknownProtoFieldFoundException(serializedUnknownFields, serializedMessage); } Descriptors.Descriptor descriptorForType = message.getDescriptorForType();