diff --git a/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataClient.java b/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataClient.java index 29255e8f..5effef22 100644 --- a/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataClient.java +++ b/entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataClient.java @@ -18,7 +18,7 @@ public interface EntityDataClient { /** * Gets the entity from the cache if available, otherwise upserts it and returns the result. The - * behavior of this may or may be cached depending on the configuration. + * behavior of this may or may not be cached depending on the configuration. * * @param entity * @return diff --git a/entity-service-api/src/main/proto/org/hypertrace/entity/data/service/v1/entity_data_service.proto b/entity-service-api/src/main/proto/org/hypertrace/entity/data/service/v1/entity_data_service.proto index 4410a2b2..8bdd5cfc 100644 --- a/entity-service-api/src/main/proto/org/hypertrace/entity/data/service/v1/entity_data_service.proto +++ b/entity-service-api/src/main/proto/org/hypertrace/entity/data/service/v1/entity_data_service.proto @@ -12,6 +12,7 @@ service EntityDataService { } rpc upsertEntities (Entities) returns (Empty) { } + rpc getAndUpsertEntities (Entities) returns (stream Entity) {} rpc delete (ByIdRequest) returns (Empty) { } rpc getById (ByIdRequest) returns (Entity) { diff --git a/entity-service-client/src/main/java/org/hypertrace/entity/data/service/client/EdsCacheClient.java b/entity-service-client/src/main/java/org/hypertrace/entity/data/service/client/EdsCacheClient.java index 3699bd41..7ecbec47 100644 --- a/entity-service-client/src/main/java/org/hypertrace/entity/data/service/client/EdsCacheClient.java +++ b/entity-service-client/src/main/java/org/hypertrace/entity/data/service/client/EdsCacheClient.java @@ -3,8 +3,7 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import io.grpc.Channel; -import io.grpc.ManagedChannelBuilder; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -20,7 +19,6 @@ import org.hypertrace.entity.data.service.v1.EntityRelationships; import org.hypertrace.entity.data.service.v1.Query; import org.hypertrace.entity.service.client.config.EntityServiceClientCacheConfig; -import org.hypertrace.entity.service.client.config.EntityServiceClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,6 +88,11 @@ public Entity upsert(Entity entity) { return client.upsert(entity); } + @Override + public Iterator getAndBulkUpsert(String tenantId, Collection entities) { + return client.getAndBulkUpsert(tenantId, entities); + } + @Override public Entity getByTypeAndIdentifyingAttributes(String tenantId, ByTypeAndIdentifyingAttributes byIdentifyingAttributes) { diff --git a/entity-service-client/src/main/java/org/hypertrace/entity/data/service/client/EdsClient.java b/entity-service-client/src/main/java/org/hypertrace/entity/data/service/client/EdsClient.java index d72c348b..6586cc88 100644 --- a/entity-service-client/src/main/java/org/hypertrace/entity/data/service/client/EdsClient.java +++ b/entity-service-client/src/main/java/org/hypertrace/entity/data/service/client/EdsClient.java @@ -1,5 +1,6 @@ package org.hypertrace.entity.data.service.client; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -18,6 +19,8 @@ public interface EdsClient { Entity upsert(Entity entity); + Iterator getAndBulkUpsert(String tenantId, Collection entities); + Entity getByTypeAndIdentifyingAttributes(String tenantId, ByTypeAndIdentifyingAttributes byIdentifyingAttributes); diff --git a/entity-service-client/src/main/java/org/hypertrace/entity/data/service/client/EntityDataServiceClient.java b/entity-service-client/src/main/java/org/hypertrace/entity/data/service/client/EntityDataServiceClient.java index ec98f579..5ef5133a 100644 --- a/entity-service-client/src/main/java/org/hypertrace/entity/data/service/client/EntityDataServiceClient.java +++ b/entity-service-client/src/main/java/org/hypertrace/entity/data/service/client/EntityDataServiceClient.java @@ -5,6 +5,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import io.grpc.Channel; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Set; @@ -58,6 +59,11 @@ public Entity upsert(Entity entity) { return result.equals(Entity.getDefaultInstance()) ? null : result; } + @Override + public Iterator getAndBulkUpsert(String tenantId, Collection entities) { + return execute(tenantId, () -> blockingStub.getAndUpsertEntities(Entities.newBuilder().addAllEntity(entities).build())); + } + public void bulkUpsert(String tenantId, java.util.Collection entities) { execute(tenantId, () -> blockingStub.upsertEntities(Entities.newBuilder().addAllEntity(entities).build())); diff --git a/entity-service-impl/build.gradle.kts b/entity-service-impl/build.gradle.kts index a864fb4c..229336bc 100644 --- a/entity-service-impl/build.gradle.kts +++ b/entity-service-impl/build.gradle.kts @@ -7,7 +7,7 @@ plugins { dependencies { api(project(":entity-service-api")) api("org.hypertrace.core.serviceframework:service-framework-spi:0.1.18") - implementation("org.hypertrace.core.documentstore:document-store:0.4.5") + implementation("org.hypertrace.core.documentstore:document-store:0.4.6") implementation("org.hypertrace.core.grpcutils:grpc-context-utils:0.3.1") implementation(project(":entity-type-service-rx-client")) diff --git a/entity-service-impl/src/main/java/org/hypertrace/entity/data/service/EntityDataServiceImpl.java b/entity-service-impl/src/main/java/org/hypertrace/entity/data/service/EntityDataServiceImpl.java index fca1451b..8c7310c1 100644 --- a/entity-service-impl/src/main/java/org/hypertrace/entity/data/service/EntityDataServiceImpl.java +++ b/entity-service-impl/src/main/java/org/hypertrace/entity/data/service/EntityDataServiceImpl.java @@ -127,6 +127,41 @@ public void upsertEntities(Entities request, StreamObserver responseObser } } + @Override + public void getAndUpsertEntities(Entities request, StreamObserver responseObserver) { + String tenantId = RequestContext.CURRENT.get().getTenantId().orElse(null); + if (tenantId == null) { + responseObserver.onError(new ServiceException("Tenant id is missing in the request.")); + return; + } + + try { + Map entityMap = + request.getEntityList().stream() + .map(entity -> this.upsertNormalizer.normalize(tenantId, entity)) + .collect(Collectors.toUnmodifiableMap(Entity::getEntityId, Function.identity())); + + Map documentMap = new HashMap<>(); + for (Map.Entry entry : entityMap.entrySet()) { + Document doc = convertEntityToDocument(entry.getValue()); + SingleValueKey key = new SingleValueKey(tenantId, entry.getKey()); + documentMap.put(key, doc); + } + + Streams.stream(entitiesCollection.bulkUpsertAndReturnOlderDocuments(documentMap)) + .flatMap(document -> PARSER.parseOrLog(document, Entity.newBuilder()).stream()) + .map(Entity::toBuilder) + .map(builder -> builder.setTenantId(tenantId)) + .map(Entity.Builder::build) + .forEach(responseObserver::onNext); + + responseObserver.onCompleted(); + } catch (IOException e) { + LOG.error("Failed to bulk upsert entities", e); + responseObserver.onError(e); + } + } + /** * Get an Entity by the EntityId and EntityType * diff --git a/entity-service/build.gradle.kts b/entity-service/build.gradle.kts index f8baf5d4..0912981e 100644 --- a/entity-service/build.gradle.kts +++ b/entity-service/build.gradle.kts @@ -59,7 +59,7 @@ dependencies { implementation("org.hypertrace.core.grpcutils:grpc-server-utils:0.3.1") implementation("org.hypertrace.core.grpcutils:grpc-client-utils:0.3.1") implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.18") - implementation("org.hypertrace.core.documentstore:document-store:0.4.5") + implementation("org.hypertrace.core.documentstore:document-store:0.4.6") runtimeOnly("io.grpc:grpc-netty:1.33.1") constraints { @@ -103,4 +103,4 @@ hypertraceDocker { port.set(50061) } } -} \ No newline at end of file +} diff --git a/entity-service/src/integrationTest/java/org/hypertrace/entity/service/service/EntityDataServiceTest.java b/entity-service/src/integrationTest/java/org/hypertrace/entity/service/service/EntityDataServiceTest.java index 17d5f70a..77846794 100644 --- a/entity-service/src/integrationTest/java/org/hypertrace/entity/service/service/EntityDataServiceTest.java +++ b/entity-service/src/integrationTest/java/org/hypertrace/entity/service/service/EntityDataServiceTest.java @@ -14,16 +14,12 @@ import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.time.Instant; -import java.time.OffsetDateTime; -import java.time.ZoneId; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; import org.hypertrace.core.serviceframework.IntegrationTestServerUtil; -import org.hypertrace.entity.constants.v1.ApiAttribute; import org.hypertrace.entity.constants.v1.BackendAttribute; import org.hypertrace.entity.constants.v1.CommonAttribute; import org.hypertrace.entity.data.service.client.EntityDataServiceClient; @@ -643,6 +639,7 @@ public void whenNNewEntitiesAreUpserted_thenExpectNNewEntities() { entityDataServiceClient.bulkUpsert(TENANT_ID, externalIdToEntity.values()); // all N entities should have been created + Map entityMap = new HashMap<>(); for (String id : externalIdToEntity.keySet()) { List readEntity = entityDataServiceClient.getEntitiesWithGivenAttribute(TENANT_ID, @@ -652,6 +649,15 @@ public void whenNNewEntitiesAreUpserted_thenExpectNNewEntities() { // exactly one entity exists assertEquals(1, readEntity.size()); assertNotNull(readEntity.get(0).getEntityId()); + entityMap.put(readEntity.get(0).getEntityId(), readEntity.get(0)); + } + + // Try getAndBulkUpsert, verify that the returned entities were in previous state. + Iterator iterator = entityDataServiceClient.getAndBulkUpsert(TENANT_ID, externalIdToEntity.values()); + while (iterator.hasNext()) { + Entity entity = iterator.next(); + assertNotNull(entityMap.get(entity.getEntityId())); + assertEquals(entityMap.get(entity.getEntityId()), entity); } }