From a3d179634a4241be7427a201ad9f2a5b91ccc664 Mon Sep 17 00:00:00 2001 From: mauliksoneji Date: Fri, 24 Apr 2020 15:05:58 +0530 Subject: [PATCH] [Maulik] optimize heap and cpu utilization --- .../converter/ConsumerRecordConverter.java | 3 +-- .../gojek/beast/converter/FieldFactory.java | 22 ++++++++++--------- .../com/gojek/beast/converter/RowMapper.java | 4 ++-- .../beast/converter/fields/ByteField.java | 10 ++++----- .../converter/fields/DefaultProtoField.java | 6 ++--- .../beast/converter/fields/EnumField.java | 11 +++++----- .../beast/converter/fields/NestedField.java | 9 ++++---- .../beast/converter/fields/ProtoField.java | 6 +++-- .../beast/converter/fields/StructField.java | 11 +++++----- .../converter/fields/TimestampField.java | 11 +++++----- .../java/com/gojek/beast/models/Record.java | 11 +++++++--- .../java/com/gojek/beast/models/Records.java | 4 +--- .../beast/converter/FieldFactoryTest.java | 4 ++-- 13 files changed, 55 insertions(+), 57 deletions(-) diff --git a/src/main/java/com/gojek/beast/converter/ConsumerRecordConverter.java b/src/main/java/com/gojek/beast/converter/ConsumerRecordConverter.java index 8e76fd1..26dabb8 100644 --- a/src/main/java/com/gojek/beast/converter/ConsumerRecordConverter.java +++ b/src/main/java/com/gojek/beast/converter/ConsumerRecordConverter.java @@ -23,8 +23,7 @@ public class ConsumerRecordConverter implements Converter { public List convert(final Iterable> messages) throws InvalidProtocolBufferException { ArrayList records = new ArrayList<>(); for (ConsumerRecord message : messages) { - byte[] value = message.value(); - Map columns = rowMapper.map(parser.parse(value)); + Map columns = rowMapper.map(parser.parse(message.value())); OffsetInfo offsetInfo = new OffsetInfo(message.topic(), message.partition(), message.offset(), message.timestamp()); addMetadata(columns, offsetInfo); records.add(new Record(offsetInfo, columns)); diff --git a/src/main/java/com/gojek/beast/converter/FieldFactory.java b/src/main/java/com/gojek/beast/converter/FieldFactory.java index 57cde61..2770b27 100644 --- a/src/main/java/com/gojek/beast/converter/FieldFactory.java +++ b/src/main/java/com/gojek/beast/converter/FieldFactory.java @@ -14,20 +14,22 @@ import java.util.Optional; public class FieldFactory { + private static final List PROTO_FIELDS = Arrays.asList( + new TimestampField(), + new EnumField(), + new ByteField(), + new StructField(), + new NestedField() + ); + private static final DefaultProtoField DEFAULT_PROTO_FIELD = new DefaultProtoField(); public static ProtoField getField(Descriptors.FieldDescriptor descriptor, Object fieldValue) { - List protoFields = Arrays.asList( - new TimestampField(descriptor, fieldValue), - new EnumField(descriptor, fieldValue), - new ByteField(descriptor, fieldValue), - new StructField(descriptor, fieldValue), - new NestedField(descriptor, fieldValue) - ); - Optional first = protoFields + + Optional first = PROTO_FIELDS .stream() - .filter(ProtoField::matches) + .filter(field -> field.matches(descriptor, fieldValue)) .findFirst(); - return first.orElseGet(() -> new DefaultProtoField(descriptor, fieldValue)); + return first.orElse(DEFAULT_PROTO_FIELD); } } diff --git a/src/main/java/com/gojek/beast/converter/RowMapper.java b/src/main/java/com/gojek/beast/converter/RowMapper.java index 57ad80c..6b12d53 100644 --- a/src/main/java/com/gojek/beast/converter/RowMapper.java +++ b/src/main/java/com/gojek/beast/converter/RowMapper.java @@ -44,10 +44,10 @@ private Map getMappings(DynamicMessage message, ColumnMapping co } Integer protoIndex = Integer.valueOf(column); Descriptors.FieldDescriptor fieldDesc = descriptorForType.findFieldByNumber(protoIndex); - if (fieldDesc != null && !message.getField(fieldDesc).toString().isEmpty()) { + if (fieldDesc != null && message.getField(fieldDesc) != null) { Object field = message.getField(fieldDesc); ProtoField protoField = FieldFactory.getField(fieldDesc, field); - Object fieldValue = protoField.getValue(); + Object fieldValue = protoField.getValue(fieldDesc, field); if (fieldValue instanceof List) { addRepeatedFields(row, (String) key, value, (List) fieldValue); diff --git a/src/main/java/com/gojek/beast/converter/fields/ByteField.java b/src/main/java/com/gojek/beast/converter/fields/ByteField.java index 95949b4..ab0551d 100644 --- a/src/main/java/com/gojek/beast/converter/fields/ByteField.java +++ b/src/main/java/com/gojek/beast/converter/fields/ByteField.java @@ -9,18 +9,16 @@ @AllArgsConstructor public class ByteField implements ProtoField { - - private final Descriptors.FieldDescriptor descriptor; - private final Object fieldValue; + private static final String BYTES = "BYTES"; @Override - public Object getValue() { + public Object getValue(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) { ByteString byteString = (ByteString) fieldValue; return new String(Base64.getEncoder().encode(byteString.toStringUtf8().getBytes())); } @Override - public boolean matches() { - return descriptor.getType().name().equals("BYTES"); + public boolean matches(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) { + return fieldDescriptor.getType().name().equals(BYTES); } } diff --git a/src/main/java/com/gojek/beast/converter/fields/DefaultProtoField.java b/src/main/java/com/gojek/beast/converter/fields/DefaultProtoField.java index d6061fa..b1f27ce 100644 --- a/src/main/java/com/gojek/beast/converter/fields/DefaultProtoField.java +++ b/src/main/java/com/gojek/beast/converter/fields/DefaultProtoField.java @@ -5,17 +5,15 @@ @AllArgsConstructor public class DefaultProtoField implements ProtoField { - private final Descriptors.FieldDescriptor descriptor; - private final Object fieldValue; // handles primitives, repeated field @Override - public Object getValue() { + public Object getValue(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) { return fieldValue; } @Override - public boolean matches() { + public boolean matches(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) { return false; } } diff --git a/src/main/java/com/gojek/beast/converter/fields/EnumField.java b/src/main/java/com/gojek/beast/converter/fields/EnumField.java index 581ff93..e78dada 100644 --- a/src/main/java/com/gojek/beast/converter/fields/EnumField.java +++ b/src/main/java/com/gojek/beast/converter/fields/EnumField.java @@ -8,12 +8,11 @@ @AllArgsConstructor public class EnumField implements ProtoField { - private final Descriptors.FieldDescriptor descriptor; - private final Object fieldValue; + private static final String ENUM = "ENUM"; @Override - public Object getValue() { - if (descriptor.isRepeated()) { + public Object getValue(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) { + if (fieldDescriptor.isRepeated()) { List enumValues = ((List) (fieldValue)); List enumStrValues = new ArrayList<>(); for (Descriptors.EnumValueDescriptor enumVal : enumValues) { @@ -25,7 +24,7 @@ public Object getValue() { } @Override - public boolean matches() { - return descriptor.getJavaType().name().equals("ENUM"); + public boolean matches(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) { + return fieldDescriptor.getJavaType().name().equals(ENUM); } } diff --git a/src/main/java/com/gojek/beast/converter/fields/NestedField.java b/src/main/java/com/gojek/beast/converter/fields/NestedField.java index 4c52900..d5928d7 100644 --- a/src/main/java/com/gojek/beast/converter/fields/NestedField.java +++ b/src/main/java/com/gojek/beast/converter/fields/NestedField.java @@ -8,17 +8,16 @@ @AllArgsConstructor public class NestedField implements ProtoField { - private final Descriptors.FieldDescriptor descriptor; - private final Object fieldValue; + private static final String MESSAGE = "MESSAGE"; @Override - public DynamicMessage getValue() { + public DynamicMessage getValue(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) { return (DynamicMessage) fieldValue; } @Override - public boolean matches() { - return descriptor.getJavaType().name().equals("MESSAGE") + public boolean matches(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) { + return fieldDescriptor.getJavaType().name().equals(MESSAGE) && !(fieldValue instanceof List); } } diff --git a/src/main/java/com/gojek/beast/converter/fields/ProtoField.java b/src/main/java/com/gojek/beast/converter/fields/ProtoField.java index a1390d3..7494ba7 100644 --- a/src/main/java/com/gojek/beast/converter/fields/ProtoField.java +++ b/src/main/java/com/gojek/beast/converter/fields/ProtoField.java @@ -1,8 +1,10 @@ package com.gojek.beast.converter.fields; +import com.google.protobuf.Descriptors; + public interface ProtoField { - Object getValue(); + Object getValue(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue); - boolean matches(); + boolean matches(Descriptors.FieldDescriptor descriptor, Object fieldValue); } diff --git a/src/main/java/com/gojek/beast/converter/fields/StructField.java b/src/main/java/com/gojek/beast/converter/fields/StructField.java index 9fd538d..c3f8eea 100644 --- a/src/main/java/com/gojek/beast/converter/fields/StructField.java +++ b/src/main/java/com/gojek/beast/converter/fields/StructField.java @@ -8,11 +8,10 @@ @AllArgsConstructor public class StructField implements ProtoField { - private final Descriptors.FieldDescriptor descriptor; - private final Object fieldValue; + private static final String MESSAGE = "MESSAGE"; @Override - public Object getValue() { + public Object getValue(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) { try { return JsonFormat.printer() .omittingInsignificantWhitespace() @@ -23,8 +22,8 @@ public Object getValue() { } @Override - public boolean matches() { - return descriptor.getJavaType().name().equals("MESSAGE") - && descriptor.getMessageType().getFullName().equals(com.google.protobuf.Struct.getDescriptor().getFullName()); + public boolean matches(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) { + return fieldDescriptor.getJavaType().name().equals(MESSAGE) + && fieldDescriptor.getMessageType().getFullName().equals(com.google.protobuf.Struct.getDescriptor().getFullName()); } } diff --git a/src/main/java/com/gojek/beast/converter/fields/TimestampField.java b/src/main/java/com/gojek/beast/converter/fields/TimestampField.java index e73c032..c4ae3d6 100644 --- a/src/main/java/com/gojek/beast/converter/fields/TimestampField.java +++ b/src/main/java/com/gojek/beast/converter/fields/TimestampField.java @@ -11,11 +11,10 @@ @AllArgsConstructor public class TimestampField implements ProtoField { - private final Descriptors.FieldDescriptor descriptor; - private final Object fieldValue; + private static final String MESSAGE = "MESSAGE"; @Override - public Object getValue() { + public Object getValue(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) { DynamicMessage dynamicField = (DynamicMessage) fieldValue; List descriptors = dynamicField.getDescriptorForType().getFields(); List timeFields = new ArrayList<>(); @@ -25,8 +24,8 @@ public Object getValue() { } @Override - public boolean matches() { - return descriptor.getJavaType().name().equals("MESSAGE") - && descriptor.getMessageType().getFullName().equals(com.google.protobuf.Timestamp.getDescriptor().getFullName()); + public boolean matches(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) { + return fieldDescriptor.getJavaType().name().equals(MESSAGE) + && fieldDescriptor.getMessageType().getFullName().equals(com.google.protobuf.Timestamp.getDescriptor().getFullName()); } } diff --git a/src/main/java/com/gojek/beast/models/Record.java b/src/main/java/com/gojek/beast/models/Record.java index 5da87ae..622ba80 100644 --- a/src/main/java/com/gojek/beast/models/Record.java +++ b/src/main/java/com/gojek/beast/models/Record.java @@ -1,24 +1,29 @@ package com.gojek.beast.models; -import lombok.AllArgsConstructor; import lombok.Getter; import lombok.Setter; import java.nio.charset.StandardCharsets; import java.util.Map; -@AllArgsConstructor @Getter public class Record { @Setter private OffsetInfo offsetInfo; private Map columns; + private long size; + + public Record(OffsetInfo offsetInfo, Map columns) { + this.offsetInfo = offsetInfo; + this.columns = columns; + this.size = columns.toString().getBytes(StandardCharsets.UTF_8).length; + } public String getId() { return String.format("%s_%d_%d", offsetInfo.getTopic(), offsetInfo.getPartition(), offsetInfo.getOffset()); } public long getSize() { - return columns.toString().getBytes(StandardCharsets.UTF_8).length; + return size; } } diff --git a/src/main/java/com/gojek/beast/models/Records.java b/src/main/java/com/gojek/beast/models/Records.java index b9c877f..ab10d26 100644 --- a/src/main/java/com/gojek/beast/models/Records.java +++ b/src/main/java/com/gojek/beast/models/Records.java @@ -45,9 +45,7 @@ public Map getPartitionsCommitOffset() { } public long getSize() { - return records.stream().mapToLong(record -> { - return record.getSize(); - }).sum(); + return records.stream().mapToLong(Record::getSize).sum(); } @Override diff --git a/src/test/java/com/gojek/beast/converter/FieldFactoryTest.java b/src/test/java/com/gojek/beast/converter/FieldFactoryTest.java index 20581cc..554687f 100644 --- a/src/test/java/com/gojek/beast/converter/FieldFactoryTest.java +++ b/src/test/java/com/gojek/beast/converter/FieldFactoryTest.java @@ -71,7 +71,7 @@ public void shouldReturnByteField() { assertEquals(ByteField.class.getName(), protoField.getClass().getName()); String encodedToken = new String(Base64.getEncoder().encode("token".getBytes())); - assertEquals(encodedToken, protoField.getValue()); + assertEquals(encodedToken, protoField.getValue(byteDesc, message.getField(byteDesc))); } @Test @@ -103,6 +103,6 @@ public void shouldReturnRepeatedFieldForGivenData() { ProtoField protoField = FieldFactory.getField(repeatedFieldDesc, message.getField(repeatedFieldDesc)); assertEquals(DefaultProtoField.class.getName(), protoField.getClass().getName()); - assertEquals(protoField.getValue(), message.getAliasesList().stream().map(String::toString).collect(Collectors.toList())); + assertEquals(protoField.getValue(repeatedFieldDesc, message.getField(repeatedFieldDesc)), message.getAliasesList().stream().map(String::toString).collect(Collectors.toList())); } }