Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public interface EntityDataClient {

/**
* Gets the entity from the cache if available, otherwise upserts it and returns the result. The
* behavior of this may or may be cached depending on the configuration.
* behavior of this may or may not be cached depending on the configuration.
*
* @param entity
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ service EntityDataService {
}
rpc upsertEntities (Entities) returns (Empty) {
}
rpc getAndUpsertEntities (Entities) returns (stream Entity) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
rpc getAndUpsertEntities (Entities) returns (stream Entity) {}
rpc getAndUpsertEntities (Entities) returns (stream Entity) {
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As rest of the methods are following that pattern.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is very minor. Will avoid one more commit and full build cycle for this :)

rpc delete (ByIdRequest) returns (Empty) {
}
rpc getById (ByIdRequest) returns (Entity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
Expand All @@ -20,7 +19,6 @@
import org.hypertrace.entity.data.service.v1.EntityRelationships;
import org.hypertrace.entity.data.service.v1.Query;
import org.hypertrace.entity.service.client.config.EntityServiceClientCacheConfig;
import org.hypertrace.entity.service.client.config.EntityServiceClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -90,6 +88,11 @@ public Entity upsert(Entity entity) {
return client.upsert(entity);
}

@Override
public Iterator<Entity> getAndBulkUpsert(String tenantId, Collection<Entity> entities) {
return client.getAndBulkUpsert(tenantId, entities);
}

@Override
public Entity getByTypeAndIdentifyingAttributes(String tenantId,
ByTypeAndIdentifyingAttributes byIdentifyingAttributes) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.hypertrace.entity.data.service.client;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
Expand All @@ -18,6 +19,8 @@ public interface EdsClient {

Entity upsert(Entity entity);

Iterator<Entity> getAndBulkUpsert(String tenantId, Collection<Entity> entities);

Entity getByTypeAndIdentifyingAttributes(String tenantId,
ByTypeAndIdentifyingAttributes byIdentifyingAttributes);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import io.grpc.Channel;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -58,6 +59,11 @@ public Entity upsert(Entity entity) {
return result.equals(Entity.getDefaultInstance()) ? null : result;
}

@Override
public Iterator<Entity> getAndBulkUpsert(String tenantId, Collection<Entity> entities) {
return execute(tenantId, () -> blockingStub.getAndUpsertEntities(Entities.newBuilder().addAllEntity(entities).build()));
}

public void bulkUpsert(String tenantId, java.util.Collection<Entity> entities) {
execute(tenantId,
() -> blockingStub.upsertEntities(Entities.newBuilder().addAllEntity(entities).build()));
Expand Down
2 changes: 1 addition & 1 deletion entity-service-impl/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ plugins {
dependencies {
api(project(":entity-service-api"))
api("org.hypertrace.core.serviceframework:service-framework-spi:0.1.18")
implementation("org.hypertrace.core.documentstore:document-store:0.4.5")
implementation("org.hypertrace.core.documentstore:document-store:0.4.6")
implementation("org.hypertrace.core.grpcutils:grpc-context-utils:0.3.1")
implementation(project(":entity-type-service-rx-client"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,41 @@ public void upsertEntities(Entities request, StreamObserver<Empty> responseObser
}
}

@Override
public void getAndUpsertEntities(Entities request, StreamObserver<Entity> responseObserver) {
String tenantId = RequestContext.CURRENT.get().getTenantId().orElse(null);
if (tenantId == null) {
responseObserver.onError(new ServiceException("Tenant id is missing in the request."));
return;
}

try {
Map<String, Entity> entityMap =
request.getEntityList().stream()
.map(entity -> this.upsertNormalizer.normalize(tenantId, entity))
.collect(Collectors.toUnmodifiableMap(Entity::getEntityId, Function.identity()));

Map<Key, Document> documentMap = new HashMap<>();
for (Map.Entry<String, Entity> entry : entityMap.entrySet()) {
Document doc = convertEntityToDocument(entry.getValue());
SingleValueKey key = new SingleValueKey(tenantId, entry.getKey());
documentMap.put(key, doc);
}

Streams.stream(entitiesCollection.bulkUpsertAndReturnOlderDocuments(documentMap))
.flatMap(document -> PARSER.<Entity>parseOrLog(document, Entity.newBuilder()).stream())
.map(Entity::toBuilder)
.map(builder -> builder.setTenantId(tenantId))
.map(Entity.Builder::build)
.forEach(responseObserver::onNext);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will send one document at a time, right? Should we send them in batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streaming one at a time will allow the client to process it and discard instead of keeping it in memory. i think that's the pattern we should always follow.


responseObserver.onCompleted();
} catch (IOException e) {
LOG.error("Failed to bulk upsert entities", e);
responseObserver.onError(e);
}
}

/**
* Get an Entity by the EntityId and EntityType
*
Expand Down
4 changes: 2 additions & 2 deletions entity-service/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ dependencies {
implementation("org.hypertrace.core.grpcutils:grpc-server-utils:0.3.1")
implementation("org.hypertrace.core.grpcutils:grpc-client-utils:0.3.1")
implementation("org.hypertrace.core.serviceframework:platform-service-framework:0.1.18")
implementation("org.hypertrace.core.documentstore:document-store:0.4.5")
implementation("org.hypertrace.core.documentstore:document-store:0.4.6")

runtimeOnly("io.grpc:grpc-netty:1.33.1")
constraints {
Expand Down Expand Up @@ -103,4 +103,4 @@ hypertraceDocker {
port.set(50061)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,12 @@
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.hypertrace.core.serviceframework.IntegrationTestServerUtil;
import org.hypertrace.entity.constants.v1.ApiAttribute;
import org.hypertrace.entity.constants.v1.BackendAttribute;
import org.hypertrace.entity.constants.v1.CommonAttribute;
import org.hypertrace.entity.data.service.client.EntityDataServiceClient;
Expand Down Expand Up @@ -643,6 +639,7 @@ public void whenNNewEntitiesAreUpserted_thenExpectNNewEntities() {
entityDataServiceClient.bulkUpsert(TENANT_ID, externalIdToEntity.values());

// all N entities should have been created
Map<String, Entity> entityMap = new HashMap<>();
for (String id : externalIdToEntity.keySet()) {
List<Entity> readEntity =
entityDataServiceClient.getEntitiesWithGivenAttribute(TENANT_ID,
Expand All @@ -652,6 +649,15 @@ public void whenNNewEntitiesAreUpserted_thenExpectNNewEntities() {
// exactly one entity exists
assertEquals(1, readEntity.size());
assertNotNull(readEntity.get(0).getEntityId());
entityMap.put(readEntity.get(0).getEntityId(), readEntity.get(0));
}

// Try getAndBulkUpsert, verify that the returned entities were in previous state.
Iterator<Entity> iterator = entityDataServiceClient.getAndBulkUpsert(TENANT_ID, externalIdToEntity.values());
while (iterator.hasNext()) {
Entity entity = iterator.next();
assertNotNull(entityMap.get(entity.getEntityId()));
assertEquals(entityMap.get(entity.getEntityId()), entity);
}
}

Expand Down