Skip to content

Commit 7ba8596

Browse files
Close the collection iterators (#270)
1 parent da22ae1 commit 7ba8596

File tree

7 files changed

+286
-204
lines changed

7 files changed

+286
-204
lines changed

entity-service-impl/src/main/java/org/hypertrace/entity/data/service/EntityDataServiceImpl.java

Lines changed: 128 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,18 @@
1111
import com.google.protobuf.Message;
1212
import com.google.protobuf.ServiceException;
1313
import io.grpc.Channel;
14+
import io.grpc.Status;
1415
import io.grpc.stub.StreamObserver;
1516
import java.io.IOException;
1617
import java.util.ArrayList;
1718
import java.util.Collections;
1819
import java.util.HashMap;
19-
import java.util.Iterator;
2020
import java.util.List;
2121
import java.util.Map;
2222
import java.util.Optional;
2323
import java.util.function.Function;
2424
import java.util.stream.Collectors;
25+
import org.hypertrace.core.documentstore.CloseableIterator;
2526
import org.hypertrace.core.documentstore.Collection;
2627
import org.hypertrace.core.documentstore.Datastore;
2728
import org.hypertrace.core.documentstore.Document;
@@ -188,14 +189,18 @@ public void getAndUpsertEntities(Entities request, StreamObserver<Entity> respon
188189
documentMap.put(key, doc);
189190
}
190191

191-
List<Entity> existingEntities =
192-
Streams.stream(entitiesCollection.bulkUpsertAndReturnOlderDocuments(documentMap))
193-
.flatMap(
194-
document -> PARSER.<Entity>parseOrLog(document, Entity.newBuilder()).stream())
195-
.map(Entity::toBuilder)
196-
.map(builder -> builder.setTenantId(tenantId))
197-
.map(Entity.Builder::build)
198-
.collect(Collectors.toList());
192+
List<Entity> existingEntities;
193+
try (final CloseableIterator<Document> iterator =
194+
entitiesCollection.bulkUpsertAndReturnOlderDocuments(documentMap)) {
195+
existingEntities =
196+
Streams.stream(iterator)
197+
.flatMap(
198+
document -> PARSER.<Entity>parseOrLog(document, Entity.newBuilder()).stream())
199+
.map(Entity::toBuilder)
200+
.map(builder -> builder.setTenantId(tenantId))
201+
.map(Builder::build)
202+
.collect(Collectors.toList());
203+
}
199204

200205
existingEntities.forEach(responseObserver::onNext);
201206
responseObserver.onCompleted();
@@ -299,21 +304,30 @@ public void delete(ByIdRequest request, StreamObserver<Empty> responseObserver)
299304
Key key =
300305
this.entityNormalizer.getEntityDocKey(
301306
tenantId, request.getEntityType(), request.getEntityId());
302-
Optional<Entity> existingEntity =
303-
this.entityFetcher.getEntitiesByEntityIds(tenantId, List.of(key.toString())).stream()
304-
.findFirst();
305307

306-
if (entitiesCollection.delete(key)) {
307-
responseObserver.onNext(Empty.newBuilder().build());
308-
responseObserver.onCompleted();
309-
existingEntity.ifPresent(
310-
entity -> {
311-
this.entityCounterMetricSender.sendEntitiesDeleteMetrics(
312-
requestContext, request.getEntityType(), List.of(entity));
313-
entityChangeEventGenerator.sendDeleteNotification(requestContext, List.of(entity));
314-
});
315-
} else {
316-
responseObserver.onError(new RuntimeException("Could not delete the entity."));
308+
try {
309+
Optional<Entity> existingEntity =
310+
this.entityFetcher.getEntitiesByEntityIds(tenantId, List.of(key.toString())).stream()
311+
.findFirst();
312+
313+
if (entitiesCollection.delete(key)) {
314+
responseObserver.onNext(Empty.newBuilder().build());
315+
responseObserver.onCompleted();
316+
existingEntity.ifPresent(
317+
entity -> {
318+
this.entityCounterMetricSender.sendEntitiesDeleteMetrics(
319+
requestContext, request.getEntityType(), List.of(entity));
320+
entityChangeEventGenerator.sendDeleteNotification(requestContext, List.of(entity));
321+
});
322+
} else {
323+
responseObserver.onError(new RuntimeException("Could not delete the entity"));
324+
}
325+
} catch (final Exception e) {
326+
LOG.error("Unknown error occurred", e);
327+
responseObserver.onError(
328+
Status.INTERNAL
329+
.withDescription("Unknown error occurred")
330+
.asRuntimeException(requestContext.buildTrailers()));
317331
}
318332
}
319333

@@ -326,15 +340,24 @@ public void delete(ByIdRequest request, StreamObserver<Empty> responseObserver)
326340
@Override
327341
public void query(Query request, StreamObserver<Entity> responseObserver) {
328342
logQuery(request);
329-
Optional<String> tenantId = RequestContext.CURRENT.get().getTenantId();
343+
final RequestContext requestContext = RequestContext.CURRENT.get();
344+
Optional<String> tenantId = requestContext.getTenantId();
330345
if (tenantId.isEmpty()) {
331346
responseObserver.onError(new ServiceException("Tenant id is missing in the request."));
332347
return;
333348
}
334349

335-
this.entityFetcher
336-
.query(DocStoreConverter.transform(tenantId.get(), request, Collections.emptyList()))
337-
.forEach(responseObserver::onNext);
350+
try {
351+
this.entityFetcher
352+
.query(DocStoreConverter.transform(tenantId.get(), request, Collections.emptyList()))
353+
.forEach(responseObserver::onNext);
354+
} catch (final Exception e) {
355+
LOG.error("Unknown error occurred", e);
356+
responseObserver.onError(
357+
Status.INTERNAL
358+
.withDescription("Unknown error occurred")
359+
.asRuntimeException(requestContext.buildTrailers()));
360+
}
338361

339362
responseObserver.onCompleted();
340363
}
@@ -545,6 +568,7 @@ public void getEnrichedEntityByTypeAndIdentifyingProps(
545568
String entityId =
546569
this.entityIdGenerator.generateEntityId(
547570
tenantId, request.getEntityType(), request.getIdentifyingAttributesMap());
571+
548572
searchByIdAndStreamSingleResponse(
549573
tenantId,
550574
entityId,
@@ -565,32 +589,32 @@ public void mergeAndUpsertEntity(
565589
return;
566590
}
567591

568-
Entity receivedEntity = this.entityNormalizer.normalize(tenantId, request.getEntity());
569-
Optional<Entity> existingEntity =
570-
getExistingEntity(tenantId, receivedEntity.getEntityType(), receivedEntity.getEntityId());
592+
try {
593+
Entity receivedEntity = this.entityNormalizer.normalize(tenantId, request.getEntity());
594+
Optional<Entity> existingEntity =
595+
getExistingEntity(tenantId, receivedEntity.getEntityType(), receivedEntity.getEntityId());
571596

572-
boolean rejectUpsertForConditionMismatch =
573-
existingEntity
574-
.map(
575-
entity ->
576-
!this.upsertConditionMatcher.matches(entity, request.getUpsertCondition()))
577-
.orElse(false);
578-
579-
if (rejectUpsertForConditionMismatch) {
580-
// There's an existing entity and the update doesn't meet the condition, return existing
581-
responseObserver.onNext(
582-
MergeAndUpsertEntityResponse.newBuilder().setEntity(existingEntity.get()).build());
583-
responseObserver.onCompleted();
584-
} else {
585-
// There's either a new entity or a valid update to upsert
586-
Entity entityToUpsert =
597+
boolean rejectUpsertForConditionMismatch =
587598
existingEntity
588-
.map(Entity::toBuilder)
589-
.map(Entity.Builder::clearCreatedTime)
590-
.map(builder -> builder.mergeFrom(receivedEntity))
591-
.map(Builder::build)
592-
.orElse(receivedEntity);
593-
try {
599+
.map(
600+
entity ->
601+
!this.upsertConditionMatcher.matches(entity, request.getUpsertCondition()))
602+
.orElse(false);
603+
604+
if (rejectUpsertForConditionMismatch) {
605+
// There's an existing entity and the update doesn't meet the condition, return existing
606+
responseObserver.onNext(
607+
MergeAndUpsertEntityResponse.newBuilder().setEntity(existingEntity.get()).build());
608+
responseObserver.onCompleted();
609+
} else {
610+
// There's either a new entity or a valid update to upsert
611+
Entity entityToUpsert =
612+
existingEntity
613+
.map(Entity::toBuilder)
614+
.map(Entity.Builder::clearCreatedTime)
615+
.map(builder -> builder.mergeFrom(receivedEntity))
616+
.map(Builder::build)
617+
.orElse(receivedEntity);
594618
Entity upsertedEntity = this.upsertEntity(tenantId, entityToUpsert);
595619
responseObserver.onNext(
596620
MergeAndUpsertEntityResponse.newBuilder().setEntity(upsertedEntity).build());
@@ -604,9 +628,11 @@ public void mergeAndUpsertEntity(
604628
List.of(upsertedEntity));
605629
entityChangeEventGenerator.sendChangeNotification(
606630
requestContext, existingEntities, List.of(upsertedEntity));
607-
} catch (IOException e) {
608-
responseObserver.onError(e);
609631
}
632+
} catch (final Exception e) {
633+
LOG.error("Unknown error occurred", e);
634+
responseObserver.onError(
635+
Status.INTERNAL.withDescription("Unknown error occurred").asRuntimeException());
610636
}
611637
}
612638

@@ -716,23 +742,32 @@ private <T extends Message> void searchByIdAndStreamSingleResponse(
716742
String docId = this.entityNormalizer.getEntityDocKey(tenantId, entityType, entityId).toString();
717743
query.setFilter(new Filter(Filter.Op.EQ, EntityServiceConstants.ID, docId));
718744

719-
Iterator<Document> result = collection.search(query);
720745
List<T> entities = new ArrayList<>();
721-
while (result.hasNext()) {
722-
PARSER
723-
.<T>parseOrLog(result.next(), builder.clone())
724-
.map(
725-
entity -> {
726-
// Populate the tenant id field with the tenant id that's received for backward
727-
// compatibility.
728-
Descriptors.FieldDescriptor fieldDescriptor =
729-
entity.getDescriptorForType().findFieldByName("tenant_id");
730-
if (fieldDescriptor != null) {
731-
return (T) entity.toBuilder().setField(fieldDescriptor, tenantId).build();
732-
}
733-
return entity;
734-
})
735-
.ifPresent(entities::add);
746+
try (final CloseableIterator<Document> result = collection.search(query)) {
747+
while (result.hasNext()) {
748+
PARSER
749+
.<T>parseOrLog(result.next(), builder.clone())
750+
.map(
751+
entity -> {
752+
// Populate the tenant id field with the tenant id that's received for backward
753+
// compatibility.
754+
Descriptors.FieldDescriptor fieldDescriptor =
755+
entity.getDescriptorForType().findFieldByName("tenant_id");
756+
if (fieldDescriptor != null) {
757+
return (T) entity.toBuilder().setField(fieldDescriptor, tenantId).build();
758+
}
759+
return entity;
760+
})
761+
.ifPresent(entities::add);
762+
}
763+
} catch (final IOException e) {
764+
final String message =
765+
String.format(
766+
"Unable to search for tenant: %s, entityType: %s, entityId: %s",
767+
tenantId, entityType, entityId);
768+
LOG.warn(message, e);
769+
responseObserver.onError(Status.INTERNAL.withDescription(message).asRuntimeException());
770+
return;
736771
}
737772

738773
if (LOG.isDebugEnabled()) {
@@ -754,7 +789,8 @@ private <T extends Message> void searchByIdAndStreamSingleResponse(
754789
}
755790
}
756791

757-
private List<Entity> getExistingEntities(String tenantId, java.util.Collection<Entity> entities) {
792+
private List<Entity> getExistingEntities(String tenantId, java.util.Collection<Entity> entities)
793+
throws IOException {
758794
List<String> docIds = entities.stream().map(this::getDocId).collect(Collectors.toList());
759795
return this.entityFetcher.getEntitiesByDocIds(tenantId, docIds);
760796
}
@@ -765,7 +801,8 @@ private String getDocId(Entity entity) {
765801
.toString();
766802
}
767803

768-
private Optional<Entity> getExistingEntity(String tenantId, String entityType, String entityId) {
804+
private Optional<Entity> getExistingEntity(String tenantId, String entityType, String entityId)
805+
throws IOException {
769806
String docId = this.entityNormalizer.getEntityDocKey(tenantId, entityType, entityId).toString();
770807
return this.entityFetcher.getEntitiesByDocIds(tenantId, List.of(docId)).stream().findFirst();
771808
}
@@ -778,23 +815,29 @@ private void searchByQueryAndStreamRelationships(
778815
org.hypertrace.core.documentstore.Query query,
779816
StreamObserver<EntityRelationship> responseObserver,
780817
String tenantId) {
781-
List<EntityRelationship> relationships =
782-
Streams.stream(relationshipsCollection.search(query))
783-
.flatMap(
784-
document ->
785-
PARSER
786-
.<EntityRelationship>parseOrLog(document, EntityRelationship.newBuilder())
787-
.stream())
788-
.map(EntityRelationship::toBuilder)
789-
.map(builder -> builder.setTenantId(tenantId))
790-
.map(EntityRelationship.Builder::build)
791-
.peek(responseObserver::onNext)
792-
.collect(Collectors.toList());
818+
try (final CloseableIterator<Document> documentIterator =
819+
relationshipsCollection.search(query)) {
820+
List<EntityRelationship> relationships =
821+
Streams.stream(documentIterator)
822+
.flatMap(
823+
document ->
824+
PARSER
825+
.<EntityRelationship>parseOrLog(document, EntityRelationship.newBuilder())
826+
.stream())
827+
.map(EntityRelationship::toBuilder)
828+
.map(builder -> builder.setTenantId(tenantId))
829+
.map(EntityRelationship.Builder::build)
830+
.peek(responseObserver::onNext)
831+
.collect(Collectors.toList());
793832

794-
if (LOG.isDebugEnabled()) {
795833
LOG.debug("Docstore query has returned the result: {}", relationships);
834+
responseObserver.onCompleted();
835+
} catch (final IOException e) {
836+
final String message =
837+
String.format("Unknown error occurred for query: %s on tenant: %s", query, tenantId);
838+
LOG.warn(message, e);
839+
responseObserver.onError(Status.INTERNAL.withDescription(message).asRuntimeException());
796840
}
797-
responseObserver.onCompleted();
798841
}
799842

800843
private void logQuery(Object query) {

entity-service-impl/src/main/java/org/hypertrace/entity/data/service/IdentifyingAttributeCache.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@
66
import com.google.common.cache.CacheLoader;
77
import com.google.common.cache.LoadingCache;
88
import com.google.common.collect.Streams;
9+
import io.grpc.Status;
10+
import java.io.IOException;
911
import java.util.Collections;
1012
import java.util.List;
1113
import java.util.Map;
1214
import java.util.concurrent.TimeUnit;
1315
import java.util.stream.Collectors;
1416
import java.util.stream.Stream;
17+
import org.hypertrace.core.documentstore.CloseableIterator;
1518
import org.hypertrace.core.documentstore.Collection;
1619
import org.hypertrace.core.documentstore.Datastore;
1720
import org.hypertrace.core.documentstore.Document;
@@ -48,11 +51,17 @@ private Map<String, List<AttributeType>> loadEntityTypes(String tenantId) {
4851
new Filter(
4952
Op.IN, EntityServiceConstants.TENANT_ID, TenantUtils.getTenantHierarchy(tenantId)));
5053

51-
return Streams.stream(entityTypesCollection.search(query))
52-
.flatMap(this::buildEntityType)
53-
.collect(
54-
Collectors.toUnmodifiableMap(
55-
EntityType::getName, this::getIdentifyingAttributesFromType));
54+
try (final CloseableIterator<Document> iterator = entityTypesCollection.search(query)) {
55+
return Streams.stream(iterator)
56+
.flatMap(this::buildEntityType)
57+
.collect(
58+
Collectors.toUnmodifiableMap(
59+
EntityType::getName, this::getIdentifyingAttributesFromType));
60+
} catch (final IOException e) {
61+
throw Status.INTERNAL
62+
.withDescription("Unable to fetch entity types for tenant: " + tenantId)
63+
.asRuntimeException();
64+
}
5665
}
5766

5867
private Stream<EntityType> buildEntityType(Document doc) {

0 commit comments

Comments
 (0)