Skip to content

Commit 651eafc

Browse files
#60 Adding a new API which returns the previous entities before bulk upsert (#61)
* #60 Adding a new API which returns the previous entities before bulk upsert. This lets the clients to easily check if an entity has changed as part of an upsert, without making an extra round trip to the service.
1 parent d0a1f9c commit 651eafc

File tree

9 files changed

+66
-12
lines changed

9 files changed

+66
-12
lines changed

entity-data-service-rx-client/src/main/java/org/hypertrace/entity/data/service/rxclient/EntityDataClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public interface EntityDataClient {
1818

1919
/**
2020
* Gets the entity from the cache if available, otherwise upserts it and returns the result. The
21-
* behavior of this may or may be cached depending on the configuration.
21+
* behavior of this may or may not be cached depending on the configuration.
2222
*
2323
* @param entity
2424
* @return

entity-service-api/src/main/proto/org/hypertrace/entity/data/service/v1/entity_data_service.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ service EntityDataService {
1212
}
1313
rpc upsertEntities (Entities) returns (Empty) {
1414
}
15+
rpc getAndUpsertEntities (Entities) returns (stream Entity) {}
1516
rpc delete (ByIdRequest) returns (Empty) {
1617
}
1718
rpc getById (ByIdRequest) returns (Entity) {

entity-service-client/src/main/java/org/hypertrace/entity/data/service/client/EdsCacheClient.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
import com.google.common.cache.CacheBuilder;
44
import com.google.common.cache.CacheLoader;
55
import com.google.common.cache.LoadingCache;
6-
import io.grpc.Channel;
7-
import io.grpc.ManagedChannelBuilder;
6+
import java.util.Collection;
87
import java.util.Iterator;
98
import java.util.List;
109
import java.util.Set;
@@ -20,7 +19,6 @@
2019
import org.hypertrace.entity.data.service.v1.EntityRelationships;
2120
import org.hypertrace.entity.data.service.v1.Query;
2221
import org.hypertrace.entity.service.client.config.EntityServiceClientCacheConfig;
23-
import org.hypertrace.entity.service.client.config.EntityServiceClientConfig;
2422
import org.slf4j.Logger;
2523
import org.slf4j.LoggerFactory;
2624

@@ -90,6 +88,11 @@ public Entity upsert(Entity entity) {
9088
return client.upsert(entity);
9189
}
9290

91+
@Override
92+
public Iterator<Entity> getAndBulkUpsert(String tenantId, Collection<Entity> entities) {
93+
return client.getAndBulkUpsert(tenantId, entities);
94+
}
95+
9396
@Override
9497
public Entity getByTypeAndIdentifyingAttributes(String tenantId,
9598
ByTypeAndIdentifyingAttributes byIdentifyingAttributes) {

entity-service-client/src/main/java/org/hypertrace/entity/data/service/client/EdsClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.hypertrace.entity.data.service.client;
22

3+
import java.util.Collection;
34
import java.util.Iterator;
45
import java.util.List;
56
import java.util.Set;
@@ -18,6 +19,8 @@ public interface EdsClient {
1819

1920
Entity upsert(Entity entity);
2021

22+
Iterator<Entity> getAndBulkUpsert(String tenantId, Collection<Entity> entities);
23+
2124
Entity getByTypeAndIdentifyingAttributes(String tenantId,
2225
ByTypeAndIdentifyingAttributes byIdentifyingAttributes);
2326

entity-service-client/src/main/java/org/hypertrace/entity/data/service/client/EntityDataServiceClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.google.common.annotations.VisibleForTesting;
66
import com.google.common.collect.Lists;
77
import io.grpc.Channel;
8+
import java.util.Collection;
89
import java.util.Iterator;
910
import java.util.List;
1011
import java.util.Set;
@@ -58,6 +59,11 @@ public Entity upsert(Entity entity) {
5859
return result.equals(Entity.getDefaultInstance()) ? null : result;
5960
}
6061

62+
@Override
63+
public Iterator<Entity> getAndBulkUpsert(String tenantId, Collection<Entity> entities) {
64+
return execute(tenantId, () -> blockingStub.getAndUpsertEntities(Entities.newBuilder().addAllEntity(entities).build()));
65+
}
66+
6167
public void bulkUpsert(String tenantId, java.util.Collection<Entity> entities) {
6268
execute(tenantId,
6369
() -> blockingStub.upsertEntities(Entities.newBuilder().addAllEntity(entities).build()));

entity-service-impl/build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ plugins {
77
dependencies {
88
api(project(":entity-service-api"))
99
api("org.hypertrace.core.serviceframework:service-framework-spi:0.1.18")
10-
implementation("org.hypertrace.core.documentstore:document-store:0.4.5")
10+
implementation("org.hypertrace.core.documentstore:document-store:0.4.6")
1111
implementation("org.hypertrace.core.grpcutils:grpc-context-utils:0.3.1")
1212
implementation(project(":entity-type-service-rx-client"))
1313

entity-service-impl/src/main/java/org/hypertrace/entity/data/service/EntityDataServiceImpl.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,41 @@ public void upsertEntities(Entities request, StreamObserver<Empty> responseObser
127127
}
128128
}
129129

130+
@Override
131+
public void getAndUpsertEntities(Entities request, StreamObserver<Entity> responseObserver) {
132+
String tenantId = RequestContext.CURRENT.get().getTenantId().orElse(null);
133+
if (tenantId == null) {
134+
responseObserver.onError(new ServiceException("Tenant id is missing in the request."));
135+
return;
136+
}
137+
138+
try {
139+
Map<String, Entity> entityMap =
140+
request.getEntityList().stream()
141+
.map(entity -> this.upsertNormalizer.normalize(tenantId, entity))
142+
.collect(Collectors.toUnmodifiableMap(Entity::getEntityId, Function.identity()));
143+
144+
Map<Key, Document> documentMap = new HashMap<>();
145+
for (Map.Entry<String, Entity> entry : entityMap.entrySet()) {
146+
Document doc = convertEntityToDocument(entry.getValue());
147+
SingleValueKey key = new SingleValueKey(tenantId, entry.getKey());
148+
documentMap.put(key, doc);
149+
}
150+
151+
Streams.stream(entitiesCollection.bulkUpsertAndReturnOlderDocuments(documentMap))
152+
.flatMap(document -> PARSER.<Entity>parseOrLog(document, Entity.newBuilder()).stream())
153+
.map(Entity::toBuilder)
154+
.map(builder -> builder.setTenantId(tenantId))
155+
.map(Entity.Builder::build)
156+
.forEach(responseObserver::onNext);
157+
158+
responseObserver.onCompleted();
159+
} catch (IOException e) {
160+
LOG.error("Failed to bulk upsert entities", e);
161+
responseObserver.onError(e);
162+
}
163+
}
164+
130165
/**
131166
* Get an Entity by the EntityId and EntityType
132167
*

entity-service/build.gradle.kts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ dependencies {
5959
implementation("org.hypertrace.core.grpcutils:grpc-server-utils:0.3.1")
6060
implementation("org.hypertrace.core.grpcutils:grpc-client-utils:0.3.1")
6161
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.18")
62-
implementation("org.hypertrace.core.documentstore:document-store:0.4.5")
62+
implementation("org.hypertrace.core.documentstore:document-store:0.4.6")
6363

6464
runtimeOnly("io.grpc:grpc-netty:1.33.1")
6565
constraints {
@@ -103,4 +103,4 @@ hypertraceDocker {
103103
port.set(50061)
104104
}
105105
}
106-
}
106+
}

entity-service/src/integrationTest/java/org/hypertrace/entity/service/service/EntityDataServiceTest.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,12 @@
1414
import io.grpc.Status.Code;
1515
import io.grpc.StatusRuntimeException;
1616
import java.time.Instant;
17-
import java.time.OffsetDateTime;
18-
import java.time.ZoneId;
19-
import java.time.ZonedDateTime;
20-
import java.time.format.DateTimeFormatter;
2117
import java.util.HashMap;
18+
import java.util.Iterator;
2219
import java.util.List;
2320
import java.util.Map;
2421
import java.util.UUID;
2522
import org.hypertrace.core.serviceframework.IntegrationTestServerUtil;
26-
import org.hypertrace.entity.constants.v1.ApiAttribute;
2723
import org.hypertrace.entity.constants.v1.BackendAttribute;
2824
import org.hypertrace.entity.constants.v1.CommonAttribute;
2925
import org.hypertrace.entity.data.service.client.EntityDataServiceClient;
@@ -643,6 +639,7 @@ public void whenNNewEntitiesAreUpserted_thenExpectNNewEntities() {
643639
entityDataServiceClient.bulkUpsert(TENANT_ID, externalIdToEntity.values());
644640

645641
// all N entities should have been created
642+
Map<String, Entity> entityMap = new HashMap<>();
646643
for (String id : externalIdToEntity.keySet()) {
647644
List<Entity> readEntity =
648645
entityDataServiceClient.getEntitiesWithGivenAttribute(TENANT_ID,
@@ -652,6 +649,15 @@ public void whenNNewEntitiesAreUpserted_thenExpectNNewEntities() {
652649
// exactly one entity exists
653650
assertEquals(1, readEntity.size());
654651
assertNotNull(readEntity.get(0).getEntityId());
652+
entityMap.put(readEntity.get(0).getEntityId(), readEntity.get(0));
653+
}
654+
655+
// Try getAndBulkUpsert, verify that the returned entities were in previous state.
656+
Iterator<Entity> iterator = entityDataServiceClient.getAndBulkUpsert(TENANT_ID, externalIdToEntity.values());
657+
while (iterator.hasNext()) {
658+
Entity entity = iterator.next();
659+
assertNotNull(entityMap.get(entity.getEntityId()));
660+
assertEquals(entityMap.get(entity.getEntityId()), entity);
655661
}
656662
}
657663

0 commit comments

Comments
 (0)