Skip to content

Commit 8432b89

Browse files
authored
fix | fix the updates for timestamp attribute and key for update operations (#294)
fix | fix the updates for timestamp attribute and key for update operations
1 parent b32b68c commit 8432b89

File tree

8 files changed

+148
-69
lines changed

8 files changed

+148
-69
lines changed

entity-service-impl/src/main/java/org/hypertrace/entity/data/service/EntityNormalizer.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import static java.util.function.Predicate.not;
44

5+
import io.grpc.Status;
6+
import io.reactivex.rxjava3.core.Single;
57
import java.util.Optional;
68
import java.util.Set;
79
import java.util.stream.Collectors;
@@ -14,6 +16,7 @@
1416
import org.hypertrace.entity.type.service.v1.AttributeType;
1517

1618
public class EntityNormalizer {
19+
1720
private final EntityTypeClient entityTypeV2Client;
1821
private final EntityIdGenerator idGenerator;
1922
private final IdentifyingAttributeCache identifyingAttributeCache;
@@ -31,8 +34,8 @@ public EntityNormalizer(
3134
* Normalizes the entity to a canonical, ready-to-upsert form
3235
*
3336
* @param receivedEntity
34-
* @throws RuntimeException If entity can not be normalized
3537
* @return
38+
* @throws RuntimeException If entity can not be normalized
3639
*/
3740
Entity normalize(String tenantId, Entity receivedEntity) {
3841
if (StringUtils.isEmpty(receivedEntity.getEntityType())) {
@@ -100,7 +103,11 @@ private boolean isV2Type(String entityType) {
100103
return this.entityTypeV2Client
101104
.get(entityType)
102105
.map(unused -> true)
103-
.onErrorReturnItem(false)
106+
.onErrorResumeNext(
107+
throwable ->
108+
Status.NOT_FOUND.getCode().equals(Status.fromThrowable(throwable).getCode())
109+
? Single.just(false)
110+
: Single.error(throwable))
104111
.blockingGet();
105112
}
106113

entity-service-impl/src/main/java/org/hypertrace/entity/query/service/EntityQueryServiceImpl.java

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.hypertrace.core.documentstore.Document;
4444
import org.hypertrace.core.documentstore.JSONDocument;
4545
import org.hypertrace.core.documentstore.Key;
46-
import org.hypertrace.core.documentstore.SingleValueKey;
4746
import org.hypertrace.core.documentstore.expression.impl.ConstantExpression;
4847
import org.hypertrace.core.documentstore.expression.impl.IdentifierExpression;
4948
import org.hypertrace.core.documentstore.expression.impl.LogicalExpression;
@@ -155,7 +154,7 @@ public EntityQueryServiceImpl(
155154
entityChangeEventGenerator,
156155
new EntityAttributeChangeEvaluator(config, entityAttributeMapping),
157156
entityCounterMetricSender,
158-
entityTypeChannel,
157+
EntityTypeClient.builder(entityTypeChannel).build(),
159158
!config.hasPathOrNull(CHUNK_SIZE_CONFIG)
160159
? DEFAULT_CHUNK_SIZE
161160
: config.getInt(CHUNK_SIZE_CONFIG),
@@ -174,7 +173,7 @@ public EntityQueryServiceImpl(
174173
EntityChangeEventGenerator entityChangeEventGenerator,
175174
EntityAttributeChangeEvaluator entityAttributeChangeEvaluator,
176175
EntityCounterMetricSender entityCounterMetricSender,
177-
Channel entityTypeChannel,
176+
EntityTypeClient entityTypeClient,
178177
int chunkSize,
179178
int maxEntitiesToDelete,
180179
int maxStringLengthForUpdate) {
@@ -186,7 +185,7 @@ public EntityQueryServiceImpl(
186185
entityAttributeChangeEvaluator,
187186
entityCounterMetricSender,
188187
new EntityFetcher(entitiesCollection, DOCUMENT_PARSER),
189-
entityTypeChannel,
188+
entityTypeClient,
190189
chunkSize,
191190
maxEntitiesToDelete,
192191
maxStringLengthForUpdate);
@@ -200,7 +199,7 @@ public EntityQueryServiceImpl(
200199
EntityAttributeChangeEvaluator entityAttributeChangeEvaluator,
201200
EntityCounterMetricSender entityCounterMetricSender,
202201
EntityFetcher entityFetcher,
203-
Channel entityTypeChannel,
202+
EntityTypeClient entityTypeClient,
204203
int chunkSize,
205204
int maxEntitiesToDelete,
206205
int maxStringLengthForUpdate) {
@@ -213,7 +212,6 @@ public EntityQueryServiceImpl(
213212
this.entityFetcher = entityFetcher;
214213
this.entityAttributeChangeEvaluator = entityAttributeChangeEvaluator;
215214
this.entityCounterMetricSender = entityCounterMetricSender;
216-
EntityTypeClient entityTypeClient = EntityTypeClient.builder(entityTypeChannel).build();
217215
IdentifyingAttributeCache identifyingAttributeCache = new IdentifyingAttributeCache(datastore);
218216
this.entityNormalizer =
219217
new EntityNormalizer(entityTypeClient, new EntityIdGenerator(), identifyingAttributeCache);
@@ -457,14 +455,26 @@ public void bulkUpdateEntityArrayAttribute(
457455
responseObserver.onError(new ServiceException("Tenant id is missing in the request."));
458456
return;
459457
}
458+
459+
if (StringUtils.isBlank(request.getEntityType())) {
460+
LOG.warn("Entity type is missing in bulk update entity array request");
461+
responseObserver.onError(
462+
Status.INVALID_ARGUMENT
463+
.withDescription("Entity type is missing in the request.")
464+
.asException());
465+
return;
466+
}
467+
460468
try {
461469
Set<Key> keys =
462470
request.getEntityIdsList().stream()
463-
.map(entityId -> new SingleValueKey(tenantId, entityId))
471+
.map(
472+
entityId ->
473+
this.entityNormalizer.getEntityDocKey(
474+
tenantId, request.getEntityType(), entityId))
464475
.collect(Collectors.toCollection(LinkedHashSet::new));
465476

466477
String attributeId = request.getAttribute().getColumnName();
467-
468478
String subDocPath =
469479
entityAttributeMapping
470480
.getDocStorePathByAttributeId(requestContext, attributeId)
@@ -590,7 +600,8 @@ private void doBulkUpdate(
590600
continue;
591601
}
592602
entitiesUpdateMap.put(
593-
new SingleValueKey(requestContext.getTenantId().orElseThrow(), entityId),
603+
this.entityNormalizer.getEntityDocKey(
604+
requestContext.getTenantId().orElseThrow(), entityType, entityId),
594605
transformedUpdateOperations);
595606
boolean shouldSendNotification =
596607
this.entityAttributeChangeEvaluator.shouldSendNotification(
@@ -692,7 +703,7 @@ public void deleteEntities(
692703
return;
693704
}
694705

695-
if (existingEntities.size() == 0) {
706+
if (existingEntities.isEmpty()) {
696707
LOG.debug("{}. No entities found to delete", request);
697708
responseObserver.onNext(DeleteEntitiesResponse.newBuilder().build());
698709
responseObserver.onCompleted();
@@ -820,19 +831,17 @@ private BulkUpdateAllMatchingFilterResponse doBulkUpdate(
820831
queryConverter.convert(entityQueryRequest, requestContext);
821832
final List<Entity> existingEntities = entityFetcher.query(updateFilterQuery);
822833

823-
final List<SingleValueKey> keys = getKeysToUpdate(entityType, existingEntities);
824-
final List<UpdatedEntity> updatedEntityResponses = buildUpdatedEntityResponse(keys);
834+
final List<UpdatedEntity> updatedEntityResponses =
835+
buildUpdatedEntityResponse(existingEntities);
825836
responseBuilder.addSummaries(
826837
UpdateSummary.newBuilder().addAllUpdatedEntities(updatedEntityResponses));
827-
828-
if (keys.isEmpty()) {
838+
if (updatedEntityResponses.isEmpty()) {
829839
// Nothing to update
830840
LOG.debug("No entity found with filter {} for updating", update.getFilter());
831841
continue;
832842
}
833843

834844
final List<AttributeUpdateOperation> updateOperations = update.getOperationsList();
835-
836845
final List<SubDocumentUpdate> updates = convertUpdates(requestContext, updateOperations);
837846

838847
final boolean shouldSendNotification =
@@ -856,9 +865,9 @@ private BulkUpdateAllMatchingFilterResponse doBulkUpdate(
856865
return responseBuilder.build();
857866
}
858867

859-
private List<UpdatedEntity> buildUpdatedEntityResponse(final List<SingleValueKey> keys) {
860-
return keys.stream()
861-
.map(SingleValueKey::getValue)
868+
private List<UpdatedEntity> buildUpdatedEntityResponse(final List<Entity> entities) {
869+
return entities.stream()
870+
.map(Entity::getEntityId)
862871
.map(id -> UpdatedEntity.newBuilder().setId(id))
863872
.map(UpdatedEntity.Builder::build)
864873
.collect(toUnmodifiableList());
@@ -871,8 +880,10 @@ private Converter<AttributeUpdateOperation, SubDocumentUpdate> getUpdateConverte
871880
new TypeLiteral<Converter<AttributeUpdateOperation, SubDocumentUpdate>>() {}));
872881
}
873882

874-
private List<SingleValueKey> getKeysToUpdate(
875-
final String entityType, final List<Entity> existingEntities) {
883+
private List<Key> getKeysToUpdate(
884+
final RequestContext requestContext,
885+
final String entityType,
886+
final List<Entity> existingEntities) {
876887
final Optional<String> idAttribute =
877888
entityAttributeMapping.getIdentifierAttributeId(entityType);
878889

@@ -883,7 +894,10 @@ private List<SingleValueKey> getKeysToUpdate(
883894
}
884895

885896
return existingEntities.stream()
886-
.map(entity -> new SingleValueKey(entity.getTenantId(), entity.getEntityId()))
897+
.map(
898+
entity ->
899+
this.entityNormalizer.getEntityDocKey(
900+
requestContext.getTenantId().orElseThrow(), entityType, entity.getEntityId()))
887901
.collect(toUnmodifiableList());
888902
}
889903

entity-service-impl/src/main/java/org/hypertrace/entity/query/service/converter/UpdateConverter.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import static org.hypertrace.core.attribute.service.v1.AttributeKind.TYPE_STRING;
99
import static org.hypertrace.core.attribute.service.v1.AttributeKind.TYPE_STRING_ARRAY;
1010
import static org.hypertrace.core.attribute.service.v1.AttributeKind.TYPE_STRING_MAP;
11+
import static org.hypertrace.core.attribute.service.v1.AttributeKind.TYPE_TIMESTAMP;
1112
import static org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.ADD_TO_LIST_IF_ABSENT;
1213
import static org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.REMOVE_ALL_FROM_LIST;
1314
import static org.hypertrace.core.documentstore.model.subdoc.UpdateOperator.SET;
@@ -28,8 +29,11 @@
2829
import static org.hypertrace.entity.query.service.v1.ValueType.STRING;
2930
import static org.hypertrace.entity.query.service.v1.ValueType.STRING_ARRAY;
3031
import static org.hypertrace.entity.query.service.v1.ValueType.STRING_MAP;
32+
import static org.hypertrace.entity.query.service.v1.ValueType.TIMESTAMP;
3133

3234
import com.google.common.base.Joiner;
35+
import com.google.common.collect.ImmutableMultimap;
36+
import com.google.common.collect.Multimap;
3337
import java.util.Map;
3438
import java.util.Set;
3539
import javax.inject.Inject;
@@ -49,19 +53,23 @@
4953

5054
@AllArgsConstructor(onConstructor_ = {@Inject})
5155
public class UpdateConverter implements Converter<AttributeUpdateOperation, SubDocumentUpdate> {
56+
5257
private static final Joiner DOT_JOINER = Joiner.on(".");
5358

54-
private static final Map<ValueType, AttributeKind> VALUE_TYPE_TO_ATTRIBUTE_KIND_MAP =
55-
Map.ofEntries(
56-
entry(STRING, TYPE_STRING),
57-
entry(LONG, TYPE_INT64),
58-
entry(INT, TYPE_INT64),
59-
entry(FLOAT, TYPE_DOUBLE),
60-
entry(DOUBLE, TYPE_DOUBLE),
61-
entry(BYTES, TYPE_BYTES),
62-
entry(BOOL, TYPE_BOOL),
63-
entry(STRING_ARRAY, TYPE_STRING_ARRAY),
64-
entry(STRING_MAP, TYPE_STRING_MAP));
59+
private static final Multimap<ValueType, AttributeKind> VALUE_TYPE_TO_ATTRIBUTE_KIND_MULTI_MAP =
60+
new ImmutableMultimap.Builder<ValueType, AttributeKind>()
61+
.put(entry(STRING, TYPE_STRING))
62+
.put(entry(LONG, TYPE_INT64))
63+
.put(entry(INT, TYPE_INT64))
64+
.put(entry(FLOAT, TYPE_DOUBLE))
65+
.put(entry(DOUBLE, TYPE_DOUBLE))
66+
.put(entry(BYTES, TYPE_BYTES))
67+
.put(entry(BOOL, TYPE_BOOL))
68+
.put(entry(LONG, TYPE_TIMESTAMP))
69+
.put(entry(TIMESTAMP, TYPE_TIMESTAMP))
70+
.put(entry(STRING_ARRAY, TYPE_STRING_ARRAY))
71+
.put(entry(STRING_MAP, TYPE_STRING_MAP))
72+
.build();
6573

6674
private static final Map<AttributeUpdateOperator, UpdateOperator> OPERATOR_MAP =
6775
Map.ofEntries(
@@ -139,7 +147,7 @@ private void validateDataType(
139147
return;
140148
}
141149

142-
if (!attributeKind.equals(VALUE_TYPE_TO_ATTRIBUTE_KIND_MAP.get(valueType))) {
150+
if (!VALUE_TYPE_TO_ATTRIBUTE_KIND_MULTI_MAP.get(valueType).contains(attributeKind)) {
143151
throw new ConversionException(
144152
String.format(
145153
"Mismatching value type (%s) for attribute of type %s", valueType, attributeKind));

entity-service-impl/src/test/java/org/hypertrace/entity/data/service/EntityNormalizerTest.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static org.junit.jupiter.api.Assertions.assertThrows;
55
import static org.mockito.Mockito.when;
66

7+
import io.grpc.Status;
78
import io.reactivex.rxjava3.core.Single;
89
import java.util.List;
910
import java.util.Map;
@@ -26,6 +27,7 @@
2627

2728
@ExtendWith(MockitoExtension.class)
2829
class EntityNormalizerTest {
30+
2931
private static final String TENANT_ID = "tenant";
3032
private static final String V1_ENTITY_TYPE = "v1-entity";
3133
private static final String V2_ENTITY_TYPE = "v2-entity";
@@ -59,15 +61,16 @@ void throwsOnV1EntityTypeMissingIdAttr() {
5961
when(this.mockIdAttrCache.getIdentifyingAttributes(TENANT_ID, V1_ENTITY_TYPE))
6062
.thenReturn(List.of(V1_ID_ATTR));
6163
when(this.mockEntityTypeClient.get(V1_ENTITY_TYPE))
62-
.thenReturn(Single.error(new RuntimeException()));
64+
.thenReturn(Single.error(Status.NOT_FOUND.asRuntimeException()));
6365
Entity inputEntity = Entity.newBuilder().setEntityType(V1_ENTITY_TYPE).build();
6466

6567
Exception exception =
6668
assertThrows(
6769
IllegalArgumentException.class,
6870
() -> this.normalizer.normalize(TENANT_ID, inputEntity));
6971
assertEquals(
70-
"Received and expected identifying attributes differ. Received: [] . Expected: [required-attr]",
72+
"Received and expected identifying attributes differ. Received: [] . Expected: "
73+
+ "[required-attr]",
7174
exception.getMessage());
7275
}
7376

@@ -80,7 +83,7 @@ void normalizesV1EntityTypeWithExtraIdAttr() {
8083
when(this.mockIdAttrCache.getIdentifyingAttributes(TENANT_ID, V1_ENTITY_TYPE))
8184
.thenReturn(List.of(V1_ID_ATTR));
8285
when(this.mockEntityTypeClient.get(V1_ENTITY_TYPE))
83-
.thenReturn(Single.error(new RuntimeException()));
86+
.thenReturn(Single.error(Status.NOT_FOUND.asRuntimeException()));
8487
Entity inputEntity =
8588
Entity.newBuilder()
8689
.setEntityType(V1_ENTITY_TYPE)
@@ -109,6 +112,17 @@ void throwsOnV2EntityMissingId() {
109112
assertEquals("Entity ID is empty", exception.getMessage());
110113
}
111114

115+
@Test
116+
void throwsIfEntityTypeClientIsDown() {
117+
when(this.mockEntityTypeClient.get(V2_ENTITY_TYPE))
118+
.thenReturn(Single.error(new RuntimeException()));
119+
Entity inputEntity = Entity.newBuilder().setEntityType(V2_ENTITY_TYPE).build();
120+
121+
Exception exception =
122+
assertThrows(
123+
RuntimeException.class, () -> this.normalizer.normalize(TENANT_ID, inputEntity));
124+
}
125+
112126
@Test
113127
void normalizesV1EntityWithAttrs() {
114128
Map<String, AttributeValue> valueMap = buildValueMap(Map.of(V1_ID_ATTR.getName(), "foo-value"));
@@ -117,7 +131,7 @@ void normalizesV1EntityWithAttrs() {
117131
when(this.mockIdAttrCache.getIdentifyingAttributes(TENANT_ID, V1_ENTITY_TYPE))
118132
.thenReturn(List.of(V1_ID_ATTR));
119133
when(this.mockEntityTypeClient.get(V1_ENTITY_TYPE))
120-
.thenReturn(Single.error(new RuntimeException()));
134+
.thenReturn(Single.error(Status.NOT_FOUND.asRuntimeException()));
121135
Entity inputEntity =
122136
Entity.newBuilder()
123137
.setEntityType(V1_ENTITY_TYPE)
@@ -162,7 +176,7 @@ void returnsV2TypeKeyForV2Entity() {
162176
@Test
163177
void returnsSimpleKeyForV1Entity() {
164178
when(this.mockEntityTypeClient.get(V1_ENTITY_TYPE))
165-
.thenReturn(Single.error(new RuntimeException()));
179+
.thenReturn(Single.error(Status.NOT_FOUND.asRuntimeException()));
166180

167181
// Getting a key for a v1 entity when provided with direct id
168182
assertEquals(

0 commit comments

Comments
 (0)