Skip to content

Commit 34dc39b

Browse files
Optimise BulkUpdateAllMatchingFilter (#261)
1 parent 40023b9 commit 34dc39b

File tree

2 files changed

+51
-70
lines changed

2 files changed

+51
-70
lines changed

entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java

Lines changed: 40 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,13 @@
77
import static java.util.stream.Collectors.joining;
88
import static java.util.stream.Collectors.toUnmodifiableList;
99
import static org.hypertrace.core.documentstore.expression.operators.RelationalOperator.IN;
10-
import static org.hypertrace.core.documentstore.model.options.ReturnDocumentType.NONE;
10+
import static org.hypertrace.core.documentstore.model.options.UpdateOptions.DEFAULT_UPDATE_OPTIONS;
1111
import static org.hypertrace.entity.data.service.v1.AttributeValue.VALUE_LIST_FIELD_NUMBER;
1212
import static org.hypertrace.entity.data.service.v1.AttributeValueList.VALUES_FIELD_NUMBER;
1313
import static org.hypertrace.entity.service.constants.EntityCollectionConstants.RAW_ENTITIES_COLLECTION;
1414

1515
import com.google.common.collect.Lists;
16+
import com.google.common.collect.Streams;
1617
import com.google.inject.Guice;
1718
import com.google.inject.Injector;
1819
import com.google.inject.TypeLiteral;
@@ -784,10 +785,21 @@ private BulkUpdateAllMatchingFilterResponse doBulkUpdate(
784785
final BulkUpdateAllMatchingFilterResponse.Builder responseBuilder =
785786
BulkUpdateAllMatchingFilterResponse.newBuilder();
786787
final String entityType = request.getEntityType();
787-
final String tenantId = requestContext.getTenantId().orElseThrow();
788+
final Converter<EntityQueryRequest, org.hypertrace.core.documentstore.query.Query>
789+
queryConverter = getQueryConverter();
788790

789791
for (final Update update : request.getUpdatesList()) {
790-
final List<SingleValueKey> keys = getKeysToUpdate(requestContext, entityType, update);
792+
EntityQueryRequest entityQueryRequest =
793+
EntityQueryRequest.newBuilder()
794+
.setEntityType(entityType)
795+
.setFilter(update.getFilter())
796+
.build();
797+
final org.hypertrace.core.documentstore.query.Query updateFilterQuery =
798+
queryConverter.convert(entityQueryRequest, requestContext);
799+
final List<Entity> existingEntities =
800+
entityFetcher.query(updateFilterQuery).collect(Collectors.toUnmodifiableList());
801+
802+
final List<SingleValueKey> keys = getKeysToUpdate(entityType, existingEntities);
791803
final List<UpdatedEntity> updatedEntityResponses = buildUpdatedEntities(keys);
792804
responseBuilder.addSummaries(
793805
UpdateSummary.newBuilder().addAllUpdatedEntities(updatedEntityResponses));
@@ -798,43 +810,44 @@ private BulkUpdateAllMatchingFilterResponse doBulkUpdate(
798810
continue;
799811
}
800812

801-
final List<String> entityIds =
802-
keys.stream().map(SingleValueKey::getValue).collect(toUnmodifiableList());
803813
final List<AttributeUpdateOperation> updateOperations = update.getOperationsList();
804814

805-
final FilterTypeExpression filter = getFilterForKeys(keys);
806-
807-
final org.hypertrace.core.documentstore.query.Query updateQuery =
808-
org.hypertrace.core.documentstore.query.Query.builder().setFilter(filter).build();
809815
final List<SubDocumentUpdate> updates = convertUpdates(requestContext, updateOperations);
810816

811817
final boolean shouldSendNotification =
812818
entityAttributeChangeEvaluator.shouldSendNotificationForAttributeUpdates(
813819
requestContext, entityType, updateOperations);
814-
final Optional<List<Entity>> existingEntities;
820+
final List<Entity> updatedEntities =
821+
bulkUpdateAndGetEntities(updateFilterQuery, updates, DEFAULT_UPDATE_OPTIONS);
815822

816823
if (shouldSendNotification) {
817-
existingEntities = Optional.of(entityFetcher.getEntitiesByEntityIds(tenantId, entityIds));
818-
} else {
819-
existingEntities = Optional.empty();
820-
}
821-
822-
entitiesCollection.bulkUpdate(
823-
updateQuery, updates, UpdateOptions.builder().returnDocumentType(NONE).build());
824-
825-
if (existingEntities.isPresent()) {
826-
final List<Entity> updatedEntities =
827-
entityFetcher.getEntitiesByEntityIds(tenantId, entityIds);
828824
this.entityCounterMetricSender.sendEntitiesMetrics(
829-
requestContext, request.getEntityType(), existingEntities.get(), updatedEntities);
825+
requestContext, request.getEntityType(), existingEntities, updatedEntities);
830826
entityChangeEventGenerator.sendChangeNotification(
831-
requestContext, existingEntities.get(), updatedEntities);
827+
requestContext, existingEntities, updatedEntities);
832828
}
833829
}
834830

835831
return responseBuilder.build();
836832
}
837833

834+
private List<Entity> bulkUpdateAndGetEntities(
835+
org.hypertrace.core.documentstore.query.Query updateFilterQuery,
836+
List<SubDocumentUpdate> updates,
837+
UpdateOptions updateOptions)
838+
throws IOException {
839+
return Streams.stream(entitiesCollection.bulkUpdate(updateFilterQuery, updates, updateOptions))
840+
.map(this::entityFromDocument)
841+
.flatMap(Optional::stream)
842+
.map(Entity::toBuilder)
843+
.map(Entity.Builder::build)
844+
.collect(toUnmodifiableList());
845+
}
846+
847+
private Optional<Entity> entityFromDocument(Document document) {
848+
return DOCUMENT_PARSER.parseOrLog(document, Entity.newBuilder());
849+
}
850+
838851
private FilterTypeExpression getFilterForKeys(final List<SingleValueKey> keys) {
839852
return KeyExpression.of(keys.stream().map(key -> (Key) key).collect(toUnmodifiableList()));
840853
}
@@ -855,8 +868,7 @@ private Converter<AttributeUpdateOperation, SubDocumentUpdate> getUpdateConverte
855868
}
856869

857870
private List<SingleValueKey> getKeysToUpdate(
858-
final RequestContext requestContext, final String entityType, final Update update)
859-
throws ConversionException, IOException {
871+
final String entityType, final List<Entity> existingEntities) {
860872
final Optional<String> idAttribute =
861873
entityAttributeMapping.getIdentifierAttributeId(entityType);
862874

@@ -866,38 +878,9 @@ private List<SingleValueKey> getKeysToUpdate(
866878
.asRuntimeException();
867879
}
868880

869-
final Expression idSelection =
870-
Expression.newBuilder()
871-
.setColumnIdentifier(
872-
ColumnIdentifier.newBuilder().setColumnName(idAttribute.orElseThrow()))
873-
.build();
874-
final EntityQueryRequest entityQueryRequest =
875-
EntityQueryRequest.newBuilder()
876-
.setEntityType(entityType)
877-
.setFilter(update.getFilter())
878-
.addSelection(idSelection)
879-
.build();
880-
final Converter<EntityQueryRequest, org.hypertrace.core.documentstore.query.Query>
881-
queryConverter = getQueryConverter();
882-
883-
final org.hypertrace.core.documentstore.query.Query query =
884-
queryConverter.convert(entityQueryRequest, requestContext);
885-
886-
final CloseableIterator<Document> idsIterator = entitiesCollection.aggregate(query);
887-
final DocumentConverter rowConverter = injector.getInstance(DocumentConverter.class);
888-
final ResultSetMetadata resultSetMetadata =
889-
this.buildMetadataForSelections(List.of(idSelection));
890-
final String tenantId = requestContext.getTenantId().orElseThrow();
891-
final List<SingleValueKey> ids = new ArrayList<>();
892-
893-
while (idsIterator.hasNext()) {
894-
final Row row = rowConverter.convertToRow(idsIterator.next(), resultSetMetadata);
895-
final String id = row.getColumn(0).getString();
896-
ids.add(new SingleValueKey(tenantId, id));
897-
}
898-
899-
idsIterator.close();
900-
return unmodifiableList(ids);
881+
return existingEntities.stream()
882+
.map(entity -> new SingleValueKey(entity.getTenantId(), entity.getEntityId()))
883+
.collect(toUnmodifiableList());
901884
}
902885

903886
private List<SubDocumentUpdate> convertUpdates(

entity-service-impl/src/test/java/org/hypertrace/entity/query/service/EntityQueryServiceImplTest.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import static org.hypertrace.core.attribute.service.v1.AttributeKind.TYPE_STRING;
44
import static org.hypertrace.core.documentstore.expression.impl.LogicalExpression.and;
5-
import static org.hypertrace.core.documentstore.model.options.ReturnDocumentType.NONE;
65
import static org.hypertrace.entity.TestUtils.convertToCloseableIterator;
76
import static org.hypertrace.entity.query.service.v1.AttributeUpdateOperation.AttributeUpdateOperator.ATTRIBUTE_UPDATE_OPERATOR_SET;
87
import static org.hypertrace.entity.service.constants.EntityConstants.ENTITY_ID;
@@ -22,6 +21,7 @@
2221
import io.grpc.Channel;
2322
import io.grpc.Context;
2423
import io.grpc.stub.StreamObserver;
24+
import java.util.Collections;
2525
import java.util.LinkedHashSet;
2626
import java.util.List;
2727
import java.util.Map;
@@ -40,7 +40,6 @@
4040
import org.hypertrace.core.documentstore.SingleValueKey;
4141
import org.hypertrace.core.documentstore.expression.impl.ConstantExpression;
4242
import org.hypertrace.core.documentstore.expression.impl.IdentifierExpression;
43-
import org.hypertrace.core.documentstore.expression.impl.KeyExpression;
4443
import org.hypertrace.core.documentstore.expression.impl.RelationalExpression;
4544
import org.hypertrace.core.documentstore.expression.operators.RelationalOperator;
4645
import org.hypertrace.core.documentstore.model.options.UpdateOptions;
@@ -643,14 +642,18 @@ void testBulkUpdateAllMatchingFilter_success() throws Exception {
643642
final StreamObserver<BulkUpdateAllMatchingFilterResponse> mockResponseObserver =
644643
mock(StreamObserver.class);
645644

646-
final List<Document> documents =
647-
List.of(new JSONDocument("{ \"entityId\": \"" + entityId + "\" }"));
645+
final List<Entity> existingEntities =
646+
Collections.singletonList(
647+
Entity.newBuilder()
648+
.setTenantId("tenant1")
649+
.setEntityId(entityId)
650+
.setEntityType("API")
651+
.build());
648652
when(mockMappingForAttributes().getIdentifierAttributeId(EntityType.API.name()))
649653
.thenReturn(Optional.of(ENTITY_ID));
650654

651655
final org.hypertrace.core.documentstore.query.Query query =
652656
org.hypertrace.core.documentstore.query.Query.builder()
653-
.addSelection(IdentifierExpression.of(ENTITY_ID), ENTITY_ID)
654657
.setFilter(
655658
and(
656659
RelationalExpression.of(
@@ -666,8 +669,7 @@ void testBulkUpdateAllMatchingFilter_success() throws Exception {
666669
RelationalOperator.EQ,
667670
ConstantExpression.of(EntityType.API.name()))))
668671
.build();
669-
when(mockEntitiesCollection.aggregate(query))
670-
.thenReturn(convertToCloseableIterator(documents.iterator()));
672+
when(entityFetcher.query(query)).thenReturn(existingEntities.stream());
671673

672674
Context.current()
673675
.withValue(RequestContext.CURRENT, mockRequestContextWithTenantId())
@@ -692,20 +694,16 @@ void testBulkUpdateAllMatchingFilter_success() throws Exception {
692694
final ArgumentCaptor<List<SubDocumentUpdate>> valueCaptor =
693695
ArgumentCaptor.forClass(List.class);
694696

695-
verify(mockEntitiesCollection, times(1)).aggregate(query);
696697
verify(mockEntitiesCollection, times(1))
697698
.bulkUpdate(
698-
eq(
699-
org.hypertrace.core.documentstore.query.Query.builder()
700-
.setFilter(KeyExpression.of(new SingleValueKey(TENANT_ID, entityId)))
701-
.build()),
699+
eq(query),
702700
eq(
703701
List.of(
704702
SubDocumentUpdate.of(
705703
"attributes.entity_id",
706704
SubDocumentValue.of(
707705
new JSONDocument("{\"value\":{\"string\":\"NEW_STATUS\"}}"))))),
708-
eq(UpdateOptions.builder().returnDocumentType(NONE).build()));
706+
eq(UpdateOptions.DEFAULT_UPDATE_OPTIONS));
709707
}
710708
}
711709

0 commit comments

Comments
 (0)