From a92942095f6eb6e21f0cda20f42347717d5b9d90 Mon Sep 17 00:00:00 2001 From: vicky fazlurrahman Date: Fri, 23 Jul 2021 17:15:49 +0700 Subject: [PATCH 1/6] fix: fix unknown fields error count metric, change exception message --- .../com/gojek/beast/converter/RowMapper.java | 39 ++++++++++++++----- .../UnknownProtoFieldFoundException.java | 5 +-- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/gojek/beast/converter/RowMapper.java b/src/main/java/com/gojek/beast/converter/RowMapper.java index 2f64296..75ca072 100644 --- a/src/main/java/com/gojek/beast/converter/RowMapper.java +++ b/src/main/java/com/gojek/beast/converter/RowMapper.java @@ -6,7 +6,6 @@ import com.gojek.beast.converter.fields.ProtoField; import com.gojek.beast.exception.UnknownProtoFieldFoundException; import com.gojek.beast.models.ConfigurationException; -import com.gojek.beast.protomapping.UnknownProtoFields; import com.gojek.beast.stats.Stats; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; @@ -15,8 +14,10 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; @Slf4j @AllArgsConstructor @@ -34,22 +35,40 @@ public Map map(DynamicMessage message) { if (mapping == null) { throw new ConfigurationException("BQ_PROTO_COLUMN_MAPPING is not configured"); } + + List messageWithUnknownFields = getMessageWithUnknownFields(getDynamicMessageFields(message)); + if (messageWithUnknownFields.size() > 0) { + statsClient.count("kafka.error.records.count,type=unknownfields," + statsClient.getBqTags(), 1); + if (failOnUnknownFields) { + throw new UnknownProtoFieldFoundException(message.toString()); + } + } + return getMappings(message, mapping); } + private List getDynamicMessageFields(DynamicMessage message) { + List fields = new LinkedList<>(); + fields.add(message); + + List children = message.getAllFields().values().stream().filter(v -> v instanceof DynamicMessage).map(o -> (DynamicMessage) o).collect(Collectors.toList()); + children.forEach(m -> { + List messageFields = getDynamicMessageFields(m); + fields.addAll(messageFields); + }); + + return fields; + } + + private List getMessageWithUnknownFields(List messages) { + return messages.stream().filter(message -> message.getUnknownFields().asMap().size() > 0).collect(Collectors.toList()); + } + + private Map getMappings(DynamicMessage message, ColumnMapping columnMapping) { if (message == null || columnMapping == null || columnMapping.isEmpty()) { return new HashMap<>(); } - if (message.getUnknownFields().asMap().size() > 0) { - statsClient.count("kafka.error.records.count,type=unknownfields," + statsClient.getBqTags(), 1); - String serializedUnknownFields = message.getUnknownFields().asMap().keySet().toString(); - String serializedMessage = UnknownProtoFields.toString(message.toByteArray()); - log.warn(String.format("[%s] unknown fields found in proto [%s], either update mapped protobuf or disable FAIL_ON_UNKNOWN_FIELDS", - serializedUnknownFields, serializedMessage)); - if (failOnUnknownFields) - throw new UnknownProtoFieldFoundException(serializedUnknownFields, serializedMessage); - } Descriptors.Descriptor descriptorForType = message.getDescriptorForType(); Map row = new HashMap<>(columnMapping.size()); diff --git a/src/main/java/com/gojek/beast/exception/UnknownProtoFieldFoundException.java b/src/main/java/com/gojek/beast/exception/UnknownProtoFieldFoundException.java index c7f7c5e..01021ab 100644 --- a/src/main/java/com/gojek/beast/exception/UnknownProtoFieldFoundException.java +++ b/src/main/java/com/gojek/beast/exception/UnknownProtoFieldFoundException.java @@ -1,8 +1,7 @@ package com.gojek.beast.exception; public class UnknownProtoFieldFoundException extends RuntimeException { - public UnknownProtoFieldFoundException(String serializedUnknownFields, String serializedMessage) { - super(String.format("[%s] unknown fields found in proto [%s], either update mapped protobuf or disable FAIL_ON_UNKNOWN_FIELDS", - serializedUnknownFields, serializedMessage)); + public UnknownProtoFieldFoundException(String serialisedProtoMessage) { + super(String.format("some unknown fields found, please check the published message, update mapped protobuf or disable FAIL_ON_UNKNOWN_FIELDS, full proto message : [%s]", serialisedProtoMessage)); } } From 6bd9f5a68b0fde5f7c76db10d602ea290b9d6bc5 Mon Sep 17 00:00:00 2001 From: vicky fazlurrahman Date: Wed, 28 Jul 2021 15:24:08 +0700 Subject: [PATCH 2/6] fix: fix unknown fields checking, remove duplicated nested field list --- .../com/gojek/beast/converter/RowMapper.java | 24 ++------------ .../java/com/gojek/beast/util/ProtoUtil.java | 33 +++++++++++++++++++ 2 files changed, 35 insertions(+), 22 deletions(-) create mode 100644 src/main/java/com/gojek/beast/util/ProtoUtil.java diff --git a/src/main/java/com/gojek/beast/converter/RowMapper.java b/src/main/java/com/gojek/beast/converter/RowMapper.java index 75ca072..5ec3e7c 100644 --- a/src/main/java/com/gojek/beast/converter/RowMapper.java +++ b/src/main/java/com/gojek/beast/converter/RowMapper.java @@ -7,6 +7,7 @@ import com.gojek.beast.exception.UnknownProtoFieldFoundException; import com.gojek.beast.models.ConfigurationException; import com.gojek.beast.stats.Stats; +import com.gojek.beast.util.ProtoUtil; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import lombok.AllArgsConstructor; @@ -14,10 +15,8 @@ import java.util.ArrayList; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; @Slf4j @AllArgsConstructor @@ -36,8 +35,7 @@ public Map map(DynamicMessage message) { throw new ConfigurationException("BQ_PROTO_COLUMN_MAPPING is not configured"); } - List messageWithUnknownFields = getMessageWithUnknownFields(getDynamicMessageFields(message)); - if (messageWithUnknownFields.size() > 0) { + if (ProtoUtil.isUnknownFieldExist(message)) { statsClient.count("kafka.error.records.count,type=unknownfields," + statsClient.getBqTags(), 1); if (failOnUnknownFields) { throw new UnknownProtoFieldFoundException(message.toString()); @@ -47,24 +45,6 @@ public Map map(DynamicMessage message) { return getMappings(message, mapping); } - private List getDynamicMessageFields(DynamicMessage message) { - List fields = new LinkedList<>(); - fields.add(message); - - List children = message.getAllFields().values().stream().filter(v -> v instanceof DynamicMessage).map(o -> (DynamicMessage) o).collect(Collectors.toList()); - children.forEach(m -> { - List messageFields = getDynamicMessageFields(m); - fields.addAll(messageFields); - }); - - return fields; - } - - private List getMessageWithUnknownFields(List messages) { - return messages.stream().filter(message -> message.getUnknownFields().asMap().size() > 0).collect(Collectors.toList()); - } - - private Map getMappings(DynamicMessage message, ColumnMapping columnMapping) { if (message == null || columnMapping == null || columnMapping.isEmpty()) { return new HashMap<>(); diff --git a/src/main/java/com/gojek/beast/util/ProtoUtil.java b/src/main/java/com/gojek/beast/util/ProtoUtil.java new file mode 100644 index 0000000..60eeb17 --- /dev/null +++ b/src/main/java/com/gojek/beast/util/ProtoUtil.java @@ -0,0 +1,33 @@ +package com.gojek.beast.util; + +import com.google.protobuf.DynamicMessage; + +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Collectors; + +public class ProtoUtil { + public static boolean isUnknownFieldExist(DynamicMessage dynamicMessage) { + if (dynamicMessage == null) { + return false; + } + List dynamicMessageFields = new LinkedList<>(); + collectNestedFields(dynamicMessage, dynamicMessageFields); + List messageWithUnknownFields = getMessageWithUnknownFields(dynamicMessageFields); + return messageWithUnknownFields.size() > 0; + } + + private static void collectNestedFields(DynamicMessage root, List accumulator) { + List nestedChild = root.getAllFields().values().stream() + .filter(v -> v instanceof DynamicMessage) + .map(o -> (DynamicMessage) o) + .collect(Collectors.toList()); + + nestedChild.forEach(m -> collectNestedFields(m, accumulator)); + accumulator.add(root); + } + + private static List getMessageWithUnknownFields(List messages) { + return messages.stream().filter(message -> message.getUnknownFields().asMap().size() > 0).collect(Collectors.toList()); + } +} From eae47fef3b8bfffe6ce45bafb13f2413a53ca007 Mon Sep 17 00:00:00 2001 From: vicky fazlurrahman Date: Wed, 28 Jul 2021 16:14:44 +0700 Subject: [PATCH 3/6] fix: fix clash class name with testing class --- src/main/java/com/gojek/beast/converter/RowMapper.java | 4 ++-- .../beast/util/{ProtoUtil.java => DynamicMessageUtil.java} | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) rename src/main/java/com/gojek/beast/util/{ProtoUtil.java => DynamicMessageUtil.java} (97%) diff --git a/src/main/java/com/gojek/beast/converter/RowMapper.java b/src/main/java/com/gojek/beast/converter/RowMapper.java index 5ec3e7c..abe4093 100644 --- a/src/main/java/com/gojek/beast/converter/RowMapper.java +++ b/src/main/java/com/gojek/beast/converter/RowMapper.java @@ -7,7 +7,7 @@ import com.gojek.beast.exception.UnknownProtoFieldFoundException; import com.gojek.beast.models.ConfigurationException; import com.gojek.beast.stats.Stats; -import com.gojek.beast.util.ProtoUtil; +import com.gojek.beast.util.DynamicMessageUtil; import com.google.protobuf.Descriptors; import com.google.protobuf.DynamicMessage; import lombok.AllArgsConstructor; @@ -35,7 +35,7 @@ public Map map(DynamicMessage message) { throw new ConfigurationException("BQ_PROTO_COLUMN_MAPPING is not configured"); } - if (ProtoUtil.isUnknownFieldExist(message)) { + if (DynamicMessageUtil.isUnknownFieldExist(message)) { statsClient.count("kafka.error.records.count,type=unknownfields," + statsClient.getBqTags(), 1); if (failOnUnknownFields) { throw new UnknownProtoFieldFoundException(message.toString()); diff --git a/src/main/java/com/gojek/beast/util/ProtoUtil.java b/src/main/java/com/gojek/beast/util/DynamicMessageUtil.java similarity index 97% rename from src/main/java/com/gojek/beast/util/ProtoUtil.java rename to src/main/java/com/gojek/beast/util/DynamicMessageUtil.java index 60eeb17..79b75b3 100644 --- a/src/main/java/com/gojek/beast/util/ProtoUtil.java +++ b/src/main/java/com/gojek/beast/util/DynamicMessageUtil.java @@ -6,7 +6,7 @@ import java.util.List; import java.util.stream.Collectors; -public class ProtoUtil { +public class DynamicMessageUtil { public static boolean isUnknownFieldExist(DynamicMessage dynamicMessage) { if (dynamicMessage == null) { return false; From 748b2245373fd394c1a697ed0efd2b45f441170d Mon Sep 17 00:00:00 2001 From: vicky fazlurrahman Date: Thu, 29 Jul 2021 12:37:47 +0700 Subject: [PATCH 4/6] feat: add log warning when unknown fields found --- .../com/gojek/beast/converter/RowMapper.java | 4 +++- .../UnknownProtoFieldFoundException.java | 2 +- .../gojek/beast/util/DynamicMessageUtil.java | 18 +++++++++--------- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/gojek/beast/converter/RowMapper.java b/src/main/java/com/gojek/beast/converter/RowMapper.java index abe4093..45347d2 100644 --- a/src/main/java/com/gojek/beast/converter/RowMapper.java +++ b/src/main/java/com/gojek/beast/converter/RowMapper.java @@ -37,8 +37,10 @@ public Map map(DynamicMessage message) { if (DynamicMessageUtil.isUnknownFieldExist(message)) { statsClient.count("kafka.error.records.count,type=unknownfields," + statsClient.getBqTags(), 1); + String serialisedProtoMessage = message.toString(); + log.warn(String.format("unknown fields found in proto : %s", serialisedProtoMessage)); if (failOnUnknownFields) { - throw new UnknownProtoFieldFoundException(message.toString()); + throw new UnknownProtoFieldFoundException(serialisedProtoMessage); } } diff --git a/src/main/java/com/gojek/beast/exception/UnknownProtoFieldFoundException.java b/src/main/java/com/gojek/beast/exception/UnknownProtoFieldFoundException.java index 01021ab..23bfe2e 100644 --- a/src/main/java/com/gojek/beast/exception/UnknownProtoFieldFoundException.java +++ b/src/main/java/com/gojek/beast/exception/UnknownProtoFieldFoundException.java @@ -2,6 +2,6 @@ public class UnknownProtoFieldFoundException extends RuntimeException { public UnknownProtoFieldFoundException(String serialisedProtoMessage) { - super(String.format("some unknown fields found, please check the published message, update mapped protobuf or disable FAIL_ON_UNKNOWN_FIELDS, full proto message : [%s]", serialisedProtoMessage)); + super(String.format("some unknown fields found, please check the published message, update mapped protobuf or disable FAIL_ON_UNKNOWN_FIELDS, full proto message : %s", serialisedProtoMessage)); } } diff --git a/src/main/java/com/gojek/beast/util/DynamicMessageUtil.java b/src/main/java/com/gojek/beast/util/DynamicMessageUtil.java index 79b75b3..849285e 100644 --- a/src/main/java/com/gojek/beast/util/DynamicMessageUtil.java +++ b/src/main/java/com/gojek/beast/util/DynamicMessageUtil.java @@ -7,24 +7,24 @@ import java.util.stream.Collectors; public class DynamicMessageUtil { - public static boolean isUnknownFieldExist(DynamicMessage dynamicMessage) { - if (dynamicMessage == null) { + public static boolean isUnknownFieldExist(DynamicMessage root) { + if (root == null) { return false; } List dynamicMessageFields = new LinkedList<>(); - collectNestedFields(dynamicMessage, dynamicMessageFields); + collectNestedFields(root, dynamicMessageFields); List messageWithUnknownFields = getMessageWithUnknownFields(dynamicMessageFields); return messageWithUnknownFields.size() > 0; } - private static void collectNestedFields(DynamicMessage root, List accumulator) { - List nestedChild = root.getAllFields().values().stream() - .filter(v -> v instanceof DynamicMessage) - .map(o -> (DynamicMessage) o) + private static void collectNestedFields(DynamicMessage node, List accumulator) { + List nestedChildNodes = node.getAllFields().values().stream() + .filter(field -> field instanceof DynamicMessage) + .map(field -> (DynamicMessage) field) .collect(Collectors.toList()); - nestedChild.forEach(m -> collectNestedFields(m, accumulator)); - accumulator.add(root); + nestedChildNodes.forEach(n -> collectNestedFields(n, accumulator)); + accumulator.add(node); } private static List getMessageWithUnknownFields(List messages) { From 302daa7a24b93f6e8de0189aed7831ea173340c9 Mon Sep 17 00:00:00 2001 From: vicky fazlurrahman Date: Fri, 30 Jul 2021 10:16:07 +0700 Subject: [PATCH 5/6] refactor: change implementation unknown fields checking --- .../gojek/beast/util/DynamicMessageUtil.java | 30 +++++++++++++------ 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/gojek/beast/util/DynamicMessageUtil.java b/src/main/java/com/gojek/beast/util/DynamicMessageUtil.java index 849285e..2a9aa98 100644 --- a/src/main/java/com/gojek/beast/util/DynamicMessageUtil.java +++ b/src/main/java/com/gojek/beast/util/DynamicMessageUtil.java @@ -2,8 +2,10 @@ import com.google.protobuf.DynamicMessage; +import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.stream.Collectors; public class DynamicMessageUtil { @@ -11,20 +13,30 @@ public static boolean isUnknownFieldExist(DynamicMessage root) { if (root == null) { return false; } - List dynamicMessageFields = new LinkedList<>(); - collectNestedFields(root, dynamicMessageFields); + List dynamicMessageFields = collectNestedFields(root); List messageWithUnknownFields = getMessageWithUnknownFields(dynamicMessageFields); return messageWithUnknownFields.size() > 0; } - private static void collectNestedFields(DynamicMessage node, List accumulator) { - List nestedChildNodes = node.getAllFields().values().stream() - .filter(field -> field instanceof DynamicMessage) - .map(field -> (DynamicMessage) field) - .collect(Collectors.toList()); + private static List collectNestedFields(DynamicMessage node) { + List output = new LinkedList<>(); + Queue stack = Collections.asLifoQueue(new LinkedList<>()); + stack.add(node); + while (true) { + DynamicMessage current = stack.poll(); + if (current == null) { + break; + } + List nestedChildNodes = current.getAllFields().values().stream() + .filter(field -> field instanceof DynamicMessage) + .map(field -> (DynamicMessage) field) + .collect(Collectors.toList()); + stack.addAll(nestedChildNodes); - nestedChildNodes.forEach(n -> collectNestedFields(n, accumulator)); - accumulator.add(node); + output.add(current); + } + + return output; } private static List getMessageWithUnknownFields(List messages) { From b947b6f6989c9b3efd6590346897478fe4afc9bc Mon Sep 17 00:00:00 2001 From: vicky fazlurrahman Date: Fri, 30 Jul 2021 13:38:59 +0700 Subject: [PATCH 6/6] refactor: rename function, remove redundant code --- src/main/java/com/gojek/beast/converter/RowMapper.java | 2 +- src/main/java/com/gojek/beast/util/DynamicMessageUtil.java | 5 +---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/gojek/beast/converter/RowMapper.java b/src/main/java/com/gojek/beast/converter/RowMapper.java index 45347d2..24a4859 100644 --- a/src/main/java/com/gojek/beast/converter/RowMapper.java +++ b/src/main/java/com/gojek/beast/converter/RowMapper.java @@ -35,7 +35,7 @@ public Map map(DynamicMessage message) { throw new ConfigurationException("BQ_PROTO_COLUMN_MAPPING is not configured"); } - if (DynamicMessageUtil.isUnknownFieldExist(message)) { + if (DynamicMessageUtil.hasUnknownField(message)) { statsClient.count("kafka.error.records.count,type=unknownfields," + statsClient.getBqTags(), 1); String serialisedProtoMessage = message.toString(); log.warn(String.format("unknown fields found in proto : %s", serialisedProtoMessage)); diff --git a/src/main/java/com/gojek/beast/util/DynamicMessageUtil.java b/src/main/java/com/gojek/beast/util/DynamicMessageUtil.java index 2a9aa98..31f4e96 100644 --- a/src/main/java/com/gojek/beast/util/DynamicMessageUtil.java +++ b/src/main/java/com/gojek/beast/util/DynamicMessageUtil.java @@ -9,10 +9,7 @@ import java.util.stream.Collectors; public class DynamicMessageUtil { - public static boolean isUnknownFieldExist(DynamicMessage root) { - if (root == null) { - return false; - } + public static boolean hasUnknownField(DynamicMessage root) { List dynamicMessageFields = collectNestedFields(root); List messageWithUnknownFields = getMessageWithUnknownFields(dynamicMessageFields); return messageWithUnknownFields.size() > 0;