Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

[Maulik] optimize heap and cpu utilization #49

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ public class ConsumerRecordConverter implements Converter {
public List<Record> convert(final Iterable<ConsumerRecord<byte[], byte[]>> messages) throws InvalidProtocolBufferException {
ArrayList<Record> records = new ArrayList<>();
for (ConsumerRecord<byte[], byte[]> message : messages) {
byte[] value = message.value();
Map<String, Object> columns = rowMapper.map(parser.parse(value));
Map<String, Object> columns = rowMapper.map(parser.parse(message.value()));
OffsetInfo offsetInfo = new OffsetInfo(message.topic(), message.partition(), message.offset(), message.timestamp());
addMetadata(columns, offsetInfo);
records.add(new Record(offsetInfo, columns));
Expand Down
22 changes: 12 additions & 10 deletions src/main/java/com/gojek/beast/converter/FieldFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,22 @@
import java.util.Optional;

public class FieldFactory {
private static final List<ProtoField> PROTO_FIELDS = Arrays.asList(
new TimestampField(),
new EnumField(),
new ByteField(),
new StructField(),
new NestedField()
);
private static final DefaultProtoField DEFAULT_PROTO_FIELD = new DefaultProtoField();

public static ProtoField getField(Descriptors.FieldDescriptor descriptor, Object fieldValue) {
List<ProtoField> protoFields = Arrays.asList(
new TimestampField(descriptor, fieldValue),
new EnumField(descriptor, fieldValue),
new ByteField(descriptor, fieldValue),
new StructField(descriptor, fieldValue),
new NestedField(descriptor, fieldValue)
);
Optional<ProtoField> first = protoFields

Optional<ProtoField> first = PROTO_FIELDS
.stream()
.filter(ProtoField::matches)
.filter(field -> field.matches(descriptor, fieldValue))
.findFirst();
return first.orElseGet(() -> new DefaultProtoField(descriptor, fieldValue));
return first.orElse(DEFAULT_PROTO_FIELD);
}

}
4 changes: 2 additions & 2 deletions src/main/java/com/gojek/beast/converter/RowMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ private Map<String, Object> getMappings(DynamicMessage message, ColumnMapping co
}
Integer protoIndex = Integer.valueOf(column);
Descriptors.FieldDescriptor fieldDesc = descriptorForType.findFieldByNumber(protoIndex);
if (fieldDesc != null && !message.getField(fieldDesc).toString().isEmpty()) {
if (fieldDesc != null && message.getField(fieldDesc) != null) {
Object field = message.getField(fieldDesc);
ProtoField protoField = FieldFactory.getField(fieldDesc, field);
Object fieldValue = protoField.getValue();
Object fieldValue = protoField.getValue(fieldDesc, field);

if (fieldValue instanceof List) {
addRepeatedFields(row, (String) key, value, (List<Object>) fieldValue);
Expand Down
10 changes: 4 additions & 6 deletions src/main/java/com/gojek/beast/converter/fields/ByteField.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,16 @@

@AllArgsConstructor
public class ByteField implements ProtoField {

private final Descriptors.FieldDescriptor descriptor;
private final Object fieldValue;
private static final String BYTES = "BYTES";

@Override
public Object getValue() {
public Object getValue(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) {
ByteString byteString = (ByteString) fieldValue;
return new String(Base64.getEncoder().encode(byteString.toStringUtf8().getBytes()));
}

@Override
public boolean matches() {
return descriptor.getType().name().equals("BYTES");
public boolean matches(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) {
return fieldDescriptor.getType().name().equals(BYTES);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,15 @@

@AllArgsConstructor
public class DefaultProtoField implements ProtoField {
private final Descriptors.FieldDescriptor descriptor;
private final Object fieldValue;

// handles primitives, repeated field
@Override
public Object getValue() {
public Object getValue(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) {
return fieldValue;
}

@Override
public boolean matches() {
public boolean matches(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) {
return false;
}
}
11 changes: 5 additions & 6 deletions src/main/java/com/gojek/beast/converter/fields/EnumField.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@

@AllArgsConstructor
public class EnumField implements ProtoField {
private final Descriptors.FieldDescriptor descriptor;
private final Object fieldValue;
private static final String ENUM = "ENUM";

@Override
public Object getValue() {
if (descriptor.isRepeated()) {
public Object getValue(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) {
if (fieldDescriptor.isRepeated()) {
List<Descriptors.EnumValueDescriptor> enumValues = ((List<Descriptors.EnumValueDescriptor>) (fieldValue));
List<String> enumStrValues = new ArrayList<>();
for (Descriptors.EnumValueDescriptor enumVal : enumValues) {
Expand All @@ -25,7 +24,7 @@ public Object getValue() {
}

@Override
public boolean matches() {
return descriptor.getJavaType().name().equals("ENUM");
public boolean matches(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) {
return fieldDescriptor.getJavaType().name().equals(ENUM);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,16 @@

@AllArgsConstructor
public class NestedField implements ProtoField {
private final Descriptors.FieldDescriptor descriptor;
private final Object fieldValue;
private static final String MESSAGE = "MESSAGE";

@Override
public DynamicMessage getValue() {
public DynamicMessage getValue(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) {
return (DynamicMessage) fieldValue;
}

@Override
public boolean matches() {
return descriptor.getJavaType().name().equals("MESSAGE")
public boolean matches(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) {
return fieldDescriptor.getJavaType().name().equals(MESSAGE)
&& !(fieldValue instanceof List);
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.gojek.beast.converter.fields;

import com.google.protobuf.Descriptors;

public interface ProtoField {

Object getValue();
Object getValue(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue);

boolean matches();
boolean matches(Descriptors.FieldDescriptor descriptor, Object fieldValue);
}
11 changes: 5 additions & 6 deletions src/main/java/com/gojek/beast/converter/fields/StructField.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@

@AllArgsConstructor
public class StructField implements ProtoField {
private final Descriptors.FieldDescriptor descriptor;
private final Object fieldValue;
private static final String MESSAGE = "MESSAGE";

@Override
public Object getValue() {
public Object getValue(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) {
try {
return JsonFormat.printer()
.omittingInsignificantWhitespace()
Expand All @@ -23,8 +22,8 @@ public Object getValue() {
}

@Override
public boolean matches() {
return descriptor.getJavaType().name().equals("MESSAGE")
&& descriptor.getMessageType().getFullName().equals(com.google.protobuf.Struct.getDescriptor().getFullName());
public boolean matches(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) {
return fieldDescriptor.getJavaType().name().equals(MESSAGE)
&& fieldDescriptor.getMessageType().getFullName().equals(com.google.protobuf.Struct.getDescriptor().getFullName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@

@AllArgsConstructor
public class TimestampField implements ProtoField {
private final Descriptors.FieldDescriptor descriptor;
private final Object fieldValue;
private static final String MESSAGE = "MESSAGE";

@Override
public Object getValue() {
public Object getValue(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) {
DynamicMessage dynamicField = (DynamicMessage) fieldValue;
List<Descriptors.FieldDescriptor> descriptors = dynamicField.getDescriptorForType().getFields();
List<Object> timeFields = new ArrayList<>();
Expand All @@ -25,8 +24,8 @@ public Object getValue() {
}

@Override
public boolean matches() {
return descriptor.getJavaType().name().equals("MESSAGE")
&& descriptor.getMessageType().getFullName().equals(com.google.protobuf.Timestamp.getDescriptor().getFullName());
public boolean matches(Descriptors.FieldDescriptor fieldDescriptor, Object fieldValue) {
return fieldDescriptor.getJavaType().name().equals(MESSAGE)
&& fieldDescriptor.getMessageType().getFullName().equals(com.google.protobuf.Timestamp.getDescriptor().getFullName());
}
}
11 changes: 8 additions & 3 deletions src/main/java/com/gojek/beast/models/Record.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
package com.gojek.beast.models;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;

import java.nio.charset.StandardCharsets;
import java.util.Map;

@AllArgsConstructor
@Getter
public class Record {
@Setter
private OffsetInfo offsetInfo;
private Map<String, Object> columns;
private long size;

public Record(OffsetInfo offsetInfo, Map<String, Object> columns) {
this.offsetInfo = offsetInfo;
this.columns = columns;
this.size = columns.toString().getBytes(StandardCharsets.UTF_8).length;
}

public String getId() {
return String.format("%s_%d_%d", offsetInfo.getTopic(), offsetInfo.getPartition(), offsetInfo.getOffset());
}

public long getSize() {
return columns.toString().getBytes(StandardCharsets.UTF_8).length;
return size;
}
}
4 changes: 1 addition & 3 deletions src/main/java/com/gojek/beast/models/Records.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ public Map<TopicPartition, OffsetAndMetadata> getPartitionsCommitOffset() {
}

public long getSize() {
return records.stream().mapToLong(record -> {
return record.getSize();
}).sum();
return records.stream().mapToLong(Record::getSize).sum();
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/com/gojek/beast/converter/FieldFactoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void shouldReturnByteField() {

assertEquals(ByteField.class.getName(), protoField.getClass().getName());
String encodedToken = new String(Base64.getEncoder().encode("token".getBytes()));
assertEquals(encodedToken, protoField.getValue());
assertEquals(encodedToken, protoField.getValue(byteDesc, message.getField(byteDesc)));
}

@Test
Expand Down Expand Up @@ -103,6 +103,6 @@ public void shouldReturnRepeatedFieldForGivenData() {
ProtoField protoField = FieldFactory.getField(repeatedFieldDesc, message.getField(repeatedFieldDesc));

assertEquals(DefaultProtoField.class.getName(), protoField.getClass().getName());
assertEquals(protoField.getValue(), message.getAliasesList().stream().map(String::toString).collect(Collectors.toList()));
assertEquals(protoField.getValue(repeatedFieldDesc, message.getField(repeatedFieldDesc)), message.getAliasesList().stream().map(String::toString).collect(Collectors.toList()));
}
}