Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import static java.util.function.Predicate.not;

import io.grpc.Status;
import io.reactivex.rxjava3.core.Single;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
Expand All @@ -14,6 +16,7 @@
import org.hypertrace.entity.type.service.v1.AttributeType;

public class EntityNormalizer {

private final EntityTypeClient entityTypeV2Client;
private final EntityIdGenerator idGenerator;
private final IdentifyingAttributeCache identifyingAttributeCache;
Expand All @@ -31,8 +34,8 @@ public EntityNormalizer(
* Normalizes the entity to a canonical, ready-to-upsert form
*
* @param receivedEntity
* @throws RuntimeException If entity can not be normalized
* @return
* @throws RuntimeException If entity can not be normalized
*/
Entity normalize(String tenantId, Entity receivedEntity) {
if (StringUtils.isEmpty(receivedEntity.getEntityType())) {
Expand Down Expand Up @@ -100,7 +103,11 @@ private boolean isV2Type(String entityType) {
return this.entityTypeV2Client
.get(entityType)
.map(unused -> true)
.onErrorReturnItem(false)
.onErrorResumeNext(
throwable ->
Status.NOT_FOUND.getCode().equals(Status.fromThrowable(throwable).getCode())
? Single.just(false)
: Single.error(throwable))
.blockingGet();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.hypertrace.core.documentstore.Document;
import org.hypertrace.core.documentstore.JSONDocument;
import org.hypertrace.core.documentstore.Key;
import org.hypertrace.core.documentstore.SingleValueKey;
import org.hypertrace.core.documentstore.expression.impl.ConstantExpression;
import org.hypertrace.core.documentstore.expression.impl.IdentifierExpression;
import org.hypertrace.core.documentstore.expression.impl.LogicalExpression;
Expand Down Expand Up @@ -155,7 +154,7 @@ public EntityQueryServiceImpl(
entityChangeEventGenerator,
new EntityAttributeChangeEvaluator(config, entityAttributeMapping),
entityCounterMetricSender,
entityTypeChannel,
EntityTypeClient.builder(entityTypeChannel).build(),
!config.hasPathOrNull(CHUNK_SIZE_CONFIG)
? DEFAULT_CHUNK_SIZE
: config.getInt(CHUNK_SIZE_CONFIG),
Expand All @@ -174,7 +173,7 @@ public EntityQueryServiceImpl(
EntityChangeEventGenerator entityChangeEventGenerator,
EntityAttributeChangeEvaluator entityAttributeChangeEvaluator,
EntityCounterMetricSender entityCounterMetricSender,
Channel entityTypeChannel,
EntityTypeClient entityTypeClient,
int chunkSize,
int maxEntitiesToDelete,
int maxStringLengthForUpdate) {
Expand All @@ -186,7 +185,7 @@ public EntityQueryServiceImpl(
entityAttributeChangeEvaluator,
entityCounterMetricSender,
new EntityFetcher(entitiesCollection, DOCUMENT_PARSER),
entityTypeChannel,
entityTypeClient,
chunkSize,
maxEntitiesToDelete,
maxStringLengthForUpdate);
Expand All @@ -200,7 +199,7 @@ public EntityQueryServiceImpl(
EntityAttributeChangeEvaluator entityAttributeChangeEvaluator,
EntityCounterMetricSender entityCounterMetricSender,
EntityFetcher entityFetcher,
Channel entityTypeChannel,
EntityTypeClient entityTypeClient,
int chunkSize,
int maxEntitiesToDelete,
int maxStringLengthForUpdate) {
Expand All @@ -213,7 +212,6 @@ public EntityQueryServiceImpl(
this.entityFetcher = entityFetcher;
this.entityAttributeChangeEvaluator = entityAttributeChangeEvaluator;
this.entityCounterMetricSender = entityCounterMetricSender;
EntityTypeClient entityTypeClient = EntityTypeClient.builder(entityTypeChannel).build();
IdentifyingAttributeCache identifyingAttributeCache = new IdentifyingAttributeCache(datastore);
this.entityNormalizer =
new EntityNormalizer(entityTypeClient, new EntityIdGenerator(), identifyingAttributeCache);
Expand Down Expand Up @@ -457,14 +455,26 @@ public void bulkUpdateEntityArrayAttribute(
responseObserver.onError(new ServiceException("Tenant id is missing in the request."));
return;
}

if (StringUtils.isBlank(request.getEntityType())) {
LOG.warn("Entity type is missing in bulk update entity array request");
responseObserver.onError(
Status.INVALID_ARGUMENT
.withDescription("Entity type is missing in the request.")
.asException());
return;
}

try {
Set<Key> keys =
request.getEntityIdsList().stream()
.map(entityId -> new SingleValueKey(tenantId, entityId))
.map(
entityId ->
this.entityNormalizer.getEntityDocKey(
tenantId, request.getEntityType(), entityId))
.collect(Collectors.toCollection(LinkedHashSet::new));

String attributeId = request.getAttribute().getColumnName();

String subDocPath =
entityAttributeMapping
.getDocStorePathByAttributeId(requestContext, attributeId)
Expand Down Expand Up @@ -590,7 +600,8 @@ private void doBulkUpdate(
continue;
}
entitiesUpdateMap.put(
new SingleValueKey(requestContext.getTenantId().orElseThrow(), entityId),
this.entityNormalizer.getEntityDocKey(
requestContext.getTenantId().orElseThrow(), entityType, entityId),
transformedUpdateOperations);
boolean shouldSendNotification =
this.entityAttributeChangeEvaluator.shouldSendNotification(
Expand Down Expand Up @@ -692,7 +703,7 @@ public void deleteEntities(
return;
}

if (existingEntities.size() == 0) {
if (existingEntities.isEmpty()) {
LOG.debug("{}. No entities found to delete", request);
responseObserver.onNext(DeleteEntitiesResponse.newBuilder().build());
responseObserver.onCompleted();
Expand Down Expand Up @@ -820,19 +831,17 @@ private BulkUpdateAllMatchingFilterResponse doBulkUpdate(
queryConverter.convert(entityQueryRequest, requestContext);
final List<Entity> existingEntities = entityFetcher.query(updateFilterQuery);

final List<SingleValueKey> keys = getKeysToUpdate(entityType, existingEntities);
final List<UpdatedEntity> updatedEntityResponses = buildUpdatedEntityResponse(keys);
final List<UpdatedEntity> updatedEntityResponses =
buildUpdatedEntityResponse(existingEntities);
responseBuilder.addSummaries(
UpdateSummary.newBuilder().addAllUpdatedEntities(updatedEntityResponses));

if (keys.isEmpty()) {
if (updatedEntityResponses.isEmpty()) {
// Nothing to update
LOG.debug("No entity found with filter {} for updating", update.getFilter());
continue;
}

final List<AttributeUpdateOperation> updateOperations = update.getOperationsList();

final List<SubDocumentUpdate> updates = convertUpdates(requestContext, updateOperations);

final boolean shouldSendNotification =
Expand All @@ -856,9 +865,9 @@ private BulkUpdateAllMatchingFilterResponse doBulkUpdate(
return responseBuilder.build();
}

private List<UpdatedEntity> buildUpdatedEntityResponse(final List<SingleValueKey> keys) {
return keys.stream()
.map(SingleValueKey::getValue)
private List<UpdatedEntity> buildUpdatedEntityResponse(final List<Entity> entities) {
return entities.stream()
.map(Entity::getEntityId)
.map(id -> UpdatedEntity.newBuilder().setId(id))
.map(UpdatedEntity.Builder::build)
.collect(toUnmodifiableList());
Expand All @@ -871,8 +880,10 @@ private Converter<AttributeUpdateOperation, SubDocumentUpdate> getUpdateConverte
new TypeLiteral<Converter<AttributeUpdateOperation, SubDocumentUpdate>>() {}));
}

private List<SingleValueKey> getKeysToUpdate(
final String entityType, final List<Entity> existingEntities) {
private List<Key> getKeysToUpdate(
final RequestContext requestContext,
final String entityType,
final List<Entity> existingEntities) {
final Optional<String> idAttribute =
entityAttributeMapping.getIdentifierAttributeId(entityType);

Expand All @@ -883,7 +894,10 @@ private List<SingleValueKey> getKeysToUpdate(
}

return existingEntities.stream()
.map(entity -> new SingleValueKey(entity.getTenantId(), entity.getEntityId()))
.map(
entity ->
this.entityNormalizer.getEntityDocKey(
requestContext.getTenantId().orElseThrow(), entityType, entity.getEntityId()))
.collect(toUnmodifiableList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.hypertrace.core.attribute.service.v1.AttributeKind.TYPE_STRING;
import static org.hypertrace.core.attribute.service.v1.AttributeKind.TYPE_STRING_ARRAY;
import static org.hypertrace.core.attribute.service.v1.AttributeKind.TYPE_STRING_MAP;
import static org.hypertrace.core.attribute.service.v1.AttributeKind.TYPE_TIMESTAMP;
import static org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.ADD_TO_LIST_IF_ABSENT;
import static org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.REMOVE_ALL_FROM_LIST;
import static org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.SET;
Expand All @@ -28,8 +29,11 @@
import static org.hypertrace.entity.query.service.v1.ValueType.STRING;
import static org.hypertrace.entity.query.service.v1.ValueType.STRING_ARRAY;
import static org.hypertrace.entity.query.service.v1.ValueType.STRING_MAP;
import static org.hypertrace.entity.query.service.v1.ValueType.TIMESTAMP;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import java.util.Map;
import java.util.Set;
import javax.inject.Inject;
Expand All @@ -49,19 +53,23 @@

@AllArgsConstructor(onConstructor_ = {@Inject})
public class UpdateConverter implements Converter<AttributeUpdateOperation, SubDocumentUpdate> {

private static final Joiner DOT_JOINER = Joiner.on(".");

private static final Map<ValueType, AttributeKind> VALUE_TYPE_TO_ATTRIBUTE_KIND_MAP =
Map.ofEntries(
entry(STRING, TYPE_STRING),
entry(LONG, TYPE_INT64),
entry(INT, TYPE_INT64),
entry(FLOAT, TYPE_DOUBLE),
entry(DOUBLE, TYPE_DOUBLE),
entry(BYTES, TYPE_BYTES),
entry(BOOL, TYPE_BOOL),
entry(STRING_ARRAY, TYPE_STRING_ARRAY),
entry(STRING_MAP, TYPE_STRING_MAP));
private static final Multimap<ValueType, AttributeKind> VALUE_TYPE_TO_ATTRIBUTE_KIND_MULTI_MAP =
new ImmutableMultimap.Builder<ValueType, AttributeKind>()
.put(entry(STRING, TYPE_STRING))
.put(entry(LONG, TYPE_INT64))
.put(entry(INT, TYPE_INT64))
.put(entry(FLOAT, TYPE_DOUBLE))
.put(entry(DOUBLE, TYPE_DOUBLE))
.put(entry(BYTES, TYPE_BYTES))
.put(entry(BOOL, TYPE_BOOL))
.put(entry(LONG, TYPE_TIMESTAMP))
.put(entry(TIMESTAMP, TYPE_TIMESTAMP))
.put(entry(STRING_ARRAY, TYPE_STRING_ARRAY))
.put(entry(STRING_MAP, TYPE_STRING_MAP))
.build();

private static final Map<AttributeUpdateOperator, UpdateOperator> OPERATOR_MAP =
Map.ofEntries(
Expand Down Expand Up @@ -139,7 +147,7 @@ private void validateDataType(
return;
}

if (!attributeKind.equals(VALUE_TYPE_TO_ATTRIBUTE_KIND_MAP.get(valueType))) {
if (!VALUE_TYPE_TO_ATTRIBUTE_KIND_MULTI_MAP.get(valueType).contains(attributeKind)) {
throw new ConversionException(
String.format(
"Mismatching value type (%s) for attribute of type %s", valueType, attributeKind));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.when;

import io.grpc.Status;
import io.reactivex.rxjava3.core.Single;
import java.util.List;
import java.util.Map;
Expand All @@ -26,6 +27,7 @@

@ExtendWith(MockitoExtension.class)
class EntityNormalizerTest {

private static final String TENANT_ID = "tenant";
private static final String V1_ENTITY_TYPE = "v1-entity";
private static final String V2_ENTITY_TYPE = "v2-entity";
Expand Down Expand Up @@ -59,15 +61,16 @@ void throwsOnV1EntityTypeMissingIdAttr() {
when(this.mockIdAttrCache.getIdentifyingAttributes(TENANT_ID, V1_ENTITY_TYPE))
.thenReturn(List.of(V1_ID_ATTR));
when(this.mockEntityTypeClient.get(V1_ENTITY_TYPE))
.thenReturn(Single.error(new RuntimeException()));
.thenReturn(Single.error(Status.NOT_FOUND.asRuntimeException()));
Entity inputEntity = Entity.newBuilder().setEntityType(V1_ENTITY_TYPE).build();

Exception exception =
assertThrows(
IllegalArgumentException.class,
() -> this.normalizer.normalize(TENANT_ID, inputEntity));
assertEquals(
"Received and expected identifying attributes differ. Received: [] . Expected: [required-attr]",
"Received and expected identifying attributes differ. Received: [] . Expected: "
+ "[required-attr]",
exception.getMessage());
}

Expand All @@ -80,7 +83,7 @@ void normalizesV1EntityTypeWithExtraIdAttr() {
when(this.mockIdAttrCache.getIdentifyingAttributes(TENANT_ID, V1_ENTITY_TYPE))
.thenReturn(List.of(V1_ID_ATTR));
when(this.mockEntityTypeClient.get(V1_ENTITY_TYPE))
.thenReturn(Single.error(new RuntimeException()));
.thenReturn(Single.error(Status.NOT_FOUND.asRuntimeException()));
Entity inputEntity =
Entity.newBuilder()
.setEntityType(V1_ENTITY_TYPE)
Expand Down Expand Up @@ -109,6 +112,17 @@ void throwsOnV2EntityMissingId() {
assertEquals("Entity ID is empty", exception.getMessage());
}

@Test
void throwsIfEntityTypeClientIsDown() {
when(this.mockEntityTypeClient.get(V2_ENTITY_TYPE))
.thenReturn(Single.error(new RuntimeException()));
Entity inputEntity = Entity.newBuilder().setEntityType(V2_ENTITY_TYPE).build();

Exception exception =
assertThrows(
RuntimeException.class, () -> this.normalizer.normalize(TENANT_ID, inputEntity));
}

@Test
void normalizesV1EntityWithAttrs() {
Map<String, AttributeValue> valueMap = buildValueMap(Map.of(V1_ID_ATTR.getName(), "foo-value"));
Expand All @@ -117,7 +131,7 @@ void normalizesV1EntityWithAttrs() {
when(this.mockIdAttrCache.getIdentifyingAttributes(TENANT_ID, V1_ENTITY_TYPE))
.thenReturn(List.of(V1_ID_ATTR));
when(this.mockEntityTypeClient.get(V1_ENTITY_TYPE))
.thenReturn(Single.error(new RuntimeException()));
.thenReturn(Single.error(Status.NOT_FOUND.asRuntimeException()));
Entity inputEntity =
Entity.newBuilder()
.setEntityType(V1_ENTITY_TYPE)
Expand Down Expand Up @@ -162,7 +176,7 @@ void returnsV2TypeKeyForV2Entity() {
@Test
void returnsSimpleKeyForV1Entity() {
when(this.mockEntityTypeClient.get(V1_ENTITY_TYPE))
.thenReturn(Single.error(new RuntimeException()));
.thenReturn(Single.error(Status.NOT_FOUND.asRuntimeException()));

// Getting a key for a v1 entity when provided with direct id
assertEquals(
Expand Down
Loading