Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.hypertrace.core.documentstore.Document;
import org.hypertrace.core.documentstore.JSONDocument;
import org.hypertrace.core.documentstore.Key;
import org.hypertrace.core.documentstore.SingleValueKey;
import org.hypertrace.core.documentstore.expression.impl.ConstantExpression;
import org.hypertrace.core.documentstore.expression.impl.IdentifierExpression;
import org.hypertrace.core.documentstore.expression.impl.LogicalExpression;
Expand Down Expand Up @@ -457,14 +456,26 @@ public void bulkUpdateEntityArrayAttribute(
responseObserver.onError(new ServiceException("Tenant id is missing in the request."));
return;
}

if (StringUtils.isBlank(request.getEntityType())) {
LOG.warn("Entity type is missing in bulk update entity array request");
responseObserver.onError(
Status.INVALID_ARGUMENT
.withDescription("Entity type is missing in the request.")
.asException());
return;
}

try {
Set<Key> keys =
request.getEntityIdsList().stream()
.map(entityId -> new SingleValueKey(tenantId, entityId))
.map(
entityId ->
this.entityNormalizer.getEntityDocKey(
tenantId, request.getEntityType(), entityId))
.collect(Collectors.toCollection(LinkedHashSet::new));

String attributeId = request.getAttribute().getColumnName();

String subDocPath =
entityAttributeMapping
.getDocStorePathByAttributeId(requestContext, attributeId)
Expand Down Expand Up @@ -590,7 +601,8 @@ private void doBulkUpdate(
continue;
}
entitiesUpdateMap.put(
new SingleValueKey(requestContext.getTenantId().orElseThrow(), entityId),
this.entityNormalizer.getEntityDocKey(
requestContext.getTenantId().orElseThrow(), entityType, entityId),
transformedUpdateOperations);
boolean shouldSendNotification =
this.entityAttributeChangeEvaluator.shouldSendNotification(
Expand Down Expand Up @@ -692,7 +704,7 @@ public void deleteEntities(
return;
}

if (existingEntities.size() == 0) {
if (existingEntities.isEmpty()) {
LOG.debug("{}. No entities found to delete", request);
responseObserver.onNext(DeleteEntitiesResponse.newBuilder().build());
responseObserver.onCompleted();
Expand Down Expand Up @@ -820,19 +832,17 @@ private BulkUpdateAllMatchingFilterResponse doBulkUpdate(
queryConverter.convert(entityQueryRequest, requestContext);
final List<Entity> existingEntities = entityFetcher.query(updateFilterQuery);

final List<SingleValueKey> keys = getKeysToUpdate(entityType, existingEntities);
final List<UpdatedEntity> updatedEntityResponses = buildUpdatedEntityResponse(keys);
final List<UpdatedEntity> updatedEntityResponses =
buildUpdatedEntityResponse(existingEntities);
responseBuilder.addSummaries(
UpdateSummary.newBuilder().addAllUpdatedEntities(updatedEntityResponses));

if (keys.isEmpty()) {
if (updatedEntityResponses.isEmpty()) {
// Nothing to update
LOG.debug("No entity found with filter {} for updating", update.getFilter());
continue;
}

final List<AttributeUpdateOperation> updateOperations = update.getOperationsList();

final List<SubDocumentUpdate> updates = convertUpdates(requestContext, updateOperations);

final boolean shouldSendNotification =
Expand All @@ -856,9 +866,9 @@ private BulkUpdateAllMatchingFilterResponse doBulkUpdate(
return responseBuilder.build();
}

private List<UpdatedEntity> buildUpdatedEntityResponse(final List<SingleValueKey> keys) {
return keys.stream()
.map(SingleValueKey::getValue)
private List<UpdatedEntity> buildUpdatedEntityResponse(final List<Entity> entities) {
return entities.stream()
.map(Entity::getEntityId)
.map(id -> UpdatedEntity.newBuilder().setId(id))
.map(UpdatedEntity.Builder::build)
.collect(toUnmodifiableList());
Expand All @@ -871,8 +881,10 @@ private Converter<AttributeUpdateOperation, SubDocumentUpdate> getUpdateConverte
new TypeLiteral<Converter<AttributeUpdateOperation, SubDocumentUpdate>>() {}));
}

private List<SingleValueKey> getKeysToUpdate(
final String entityType, final List<Entity> existingEntities) {
private List<Key> getKeysToUpdate(
final RequestContext requestContext,
final String entityType,
final List<Entity> existingEntities) {
final Optional<String> idAttribute =
entityAttributeMapping.getIdentifierAttributeId(entityType);

Expand All @@ -883,7 +895,10 @@ private List<SingleValueKey> getKeysToUpdate(
}

return existingEntities.stream()
.map(entity -> new SingleValueKey(entity.getTenantId(), entity.getEntityId()))
.map(
entity ->
this.entityNormalizer.getEntityDocKey(
requestContext.getTenantId().orElseThrow(), entityType, entity.getEntityId()))
.collect(toUnmodifiableList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.hypertrace.core.attribute.service.v1.AttributeKind.TYPE_STRING;
import static org.hypertrace.core.attribute.service.v1.AttributeKind.TYPE_STRING_ARRAY;
import static org.hypertrace.core.attribute.service.v1.AttributeKind.TYPE_STRING_MAP;
import static org.hypertrace.core.attribute.service.v1.AttributeKind.TYPE_TIMESTAMP;
import static org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.ADD_TO_LIST_IF_ABSENT;
import static org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.REMOVE_ALL_FROM_LIST;
import static org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.SET;
Expand All @@ -28,8 +29,11 @@
import static org.hypertrace.entity.query.service.v1.ValueType.STRING;
import static org.hypertrace.entity.query.service.v1.ValueType.STRING_ARRAY;
import static org.hypertrace.entity.query.service.v1.ValueType.STRING_MAP;
import static org.hypertrace.entity.query.service.v1.ValueType.TIMESTAMP;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import java.util.Map;
import java.util.Set;
import javax.inject.Inject;
Expand All @@ -49,19 +53,23 @@

@AllArgsConstructor(onConstructor_ = {@Inject})
public class UpdateConverter implements Converter<AttributeUpdateOperation, SubDocumentUpdate> {

private static final Joiner DOT_JOINER = Joiner.on(".");

private static final Map<ValueType, AttributeKind> VALUE_TYPE_TO_ATTRIBUTE_KIND_MAP =
Map.ofEntries(
entry(STRING, TYPE_STRING),
entry(LONG, TYPE_INT64),
entry(INT, TYPE_INT64),
entry(FLOAT, TYPE_DOUBLE),
entry(DOUBLE, TYPE_DOUBLE),
entry(BYTES, TYPE_BYTES),
entry(BOOL, TYPE_BOOL),
entry(STRING_ARRAY, TYPE_STRING_ARRAY),
entry(STRING_MAP, TYPE_STRING_MAP));
private static final Multimap<ValueType, AttributeKind> VALUE_TYPE_TO_ATTRIBUTE_KIND_MAP =
new ImmutableMultimap.Builder<ValueType, AttributeKind>()
.put(entry(STRING, TYPE_STRING))
.put(entry(LONG, TYPE_INT64))
.put(entry(INT, TYPE_INT64))
.put(entry(FLOAT, TYPE_DOUBLE))
.put(entry(DOUBLE, TYPE_DOUBLE))
.put(entry(BYTES, TYPE_BYTES))
.put(entry(BOOL, TYPE_BOOL))
.put(entry(LONG, TYPE_TIMESTAMP))
.put(entry(TIMESTAMP, TYPE_TIMESTAMP))
.put(entry(STRING_ARRAY, TYPE_STRING_ARRAY))
.put(entry(STRING_MAP, TYPE_STRING_MAP))
.build();

private static final Map<AttributeUpdateOperator, UpdateOperator> OPERATOR_MAP =
Map.ofEntries(
Expand Down Expand Up @@ -139,7 +147,7 @@ private void validateDataType(
return;
}

if (!attributeKind.equals(VALUE_TYPE_TO_ATTRIBUTE_KIND_MAP.get(valueType))) {
if (!VALUE_TYPE_TO_ATTRIBUTE_KIND_MAP.get(valueType).contains(attributeKind)) {
throw new ConversionException(
String.format(
"Mismatching value type (%s) for attribute of type %s", valueType, attributeKind));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.hypertrace.core.attribute.service.v1.AttributeKind.TYPE_INT64;
import static org.hypertrace.core.attribute.service.v1.AttributeKind.TYPE_STRING;
import static org.hypertrace.core.attribute.service.v1.AttributeKind.TYPE_STRING_ARRAY;
import static org.hypertrace.core.attribute.service.v1.AttributeKind.TYPE_TIMESTAMP;
import static org.hypertrace.entity.query.service.v1.AttributeUpdateOperation.AttributeUpdateOperator.ATTRIBUTE_UPDATE_OPERATION_UNSPECIFIED;
import static org.hypertrace.entity.query.service.v1.AttributeUpdateOperation.AttributeUpdateOperator.ATTRIBUTE_UPDATE_OPERATOR_ADD_TO_LIST_IF_ABSENT;
import static org.hypertrace.entity.query.service.v1.AttributeUpdateOperation.AttributeUpdateOperator.ATTRIBUTE_UPDATE_OPERATOR_REMOVE_FROM_LIST;
Expand Down Expand Up @@ -74,6 +75,30 @@ void setUp() {
new UpdateConverter(mockEntityAttributeMapping, new ValueHelper(new ValueOneOfAccessor()));
}

@Test
void testConvert_updateTimestamp_withValueTypeLong() throws Exception {
final AttributeUpdateOperation operation =
AttributeUpdateOperation.newBuilder()
.setAttribute(ColumnIdentifier.newBuilder().setColumnName("columnName"))
.setOperator(ATTRIBUTE_UPDATE_OPERATOR_SET)
.setValue(
LiteralConstant.newBuilder()
.setValue(Value.newBuilder().setValueType(LONG).setLong(123)))
.build();
final RequestContext requestContext = new RequestContext();
final SubDocumentUpdate expectedResult =
SubDocumentUpdate.of(
"attributes.subDocPath",
SubDocumentValue.of(new JSONDocument("{\"value\":{\"long\":123}}")));
when(mockEntityAttributeMapping.getDocStorePathByAttributeId(requestContext, "columnName"))
.thenReturn(Optional.of("attributes.subDocPath"));
when(mockEntityAttributeMapping.getAttributeKind(requestContext, "columnName"))
.thenReturn(Optional.of(TYPE_TIMESTAMP));
final SubDocumentUpdate result = updateConverter.convert(operation, requestContext);

assertEquals(expectedResult, result);
}

@Test
void testConvert() throws Exception {
final AttributeUpdateOperation operation =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
import org.hypertrace.entity.type.service.v2.EntityTypeServiceGrpc.EntityTypeServiceStub;
import org.hypertrace.entity.type.service.v2.QueryEntityTypesRequest;
import org.hypertrace.entity.type.service.v2.QueryEntityTypesResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class EntityTypeCachingClient implements EntityTypeClient {

private static final Logger log = LoggerFactory.getLogger(EntityTypeCachingClient.class);
private final EntityTypeServiceStub entityTypeClient;
private final LoadingCache<TenantBasedCacheKey, Single<Map<String, EntityType>>> cache;

Expand Down Expand Up @@ -68,6 +71,7 @@ private Single<Map<String, EntityType>> getOrInvalidate(TenantBasedCacheKey key)
}

private NoSuchElementException buildErrorForMissingType(String name) {
log.error("No entity type available for name '{}'", name);
return new NoSuchElementException(
String.format("No entity type available for name '%s'", name));
}
Expand Down