Skip to content

Commit 6d92f51

Browse files
authored
Fixing issues with returning a Flux<JsonNode> in reactive spring. (Azure#33730)
* Fixing issues with returning a Flux<JsonNode> in reactive spring. * Updating ChangeLog * Fixing issues with containerName/finalContainerName usage. * Fixing issues with containerName/finalContainerName usage.
1 parent 7a8c286 commit 6d92f51

File tree

5 files changed

+30
-13
lines changed

5 files changed

+30
-13
lines changed

sdk/cosmos/azure-spring-data-cosmos-test/src/test/java/com/azure/spring/data/cosmos/repository/integration/ReactiveCourseRepositoryIT.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
import com.azure.spring.data.cosmos.repository.TestRepositoryConfig;
1414
import com.azure.spring.data.cosmos.repository.repository.ReactiveCourseRepository;
1515
import com.azure.spring.data.cosmos.repository.support.CosmosEntityInformation;
16+
import com.fasterxml.jackson.databind.JsonNode;
1617
import com.fasterxml.jackson.databind.node.ObjectNode;
1718
import org.assertj.core.api.Assertions;
19+
import org.junit.Assert;
1820
import org.junit.Before;
1921
import org.junit.ClassRule;
2022
import org.junit.Test;
@@ -319,6 +321,16 @@ public void testFindByNameOrDepartmentAllIgnoreCase() {
319321
StepVerifier.create(findResult).expectNext(COURSE_1).verifyComplete();
320322
}
321323

324+
@Test
325+
public void testFindByNameJsonNode() {
326+
final Flux<JsonNode> findResult = repository.annotatedFindByName(COURSE_NAME_1);
327+
StepVerifier.create(findResult).consumeNextWith(result -> {
328+
Assert.assertEquals(result.findValue("courseId").asText(), COURSE_1.getCourseId());
329+
Assert.assertEquals(result.findValue("name").asText(), COURSE_1.getName());
330+
Assert.assertEquals(result.findValue("department").asText(), COURSE_1.getDepartment());
331+
}).verifyComplete();
332+
}
333+
322334
@Test
323335
public void testAnnotatedQueries() {
324336
Flux<Course> courseFlux = repository.getCoursesWithNameDepartment(COURSE_NAME_1, DEPARTMENT_NAME_3);

sdk/cosmos/azure-spring-data-cosmos-test/src/test/java/com/azure/spring/data/cosmos/repository/repository/ReactiveCourseRepository.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.azure.spring.data.cosmos.domain.Course;
66
import com.azure.spring.data.cosmos.repository.Query;
77
import com.azure.spring.data.cosmos.repository.ReactiveCosmosRepository;
8+
import com.fasterxml.jackson.databind.JsonNode;
89
import com.fasterxml.jackson.databind.node.ObjectNode;
910
import org.springframework.data.repository.query.Param;
1011
import reactor.core.publisher.Flux;
@@ -50,6 +51,9 @@ Flux<Course> findByNameAndDepartmentOrNameAndDepartment(String name,
5051
*/
5152
Flux<Course> findByNameOrDepartmentAllIgnoreCase(String name, String department);
5253

54+
@Query(value = "select c as jsonNode from c where c.name = @name")
55+
Flux<JsonNode> annotatedFindByName(@Param("name") String city);
56+
5357
/**
5458
* Find a single Course list by name
5559
* @param name name

sdk/cosmos/azure-spring-data-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Bug fixed in `ReactiveCosmosTemplate` where returning a Flux<JsonNode> was causing an error - See [PR 33730](https://github.com/Azure/azure-sdk-for-java/pull/33730)
1011

1112
#### Other Changes
1213

sdk/cosmos/azure-spring-data-cosmos/src/main/java/com/azure/spring/data/cosmos/core/CosmosTemplate.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ public <T> T findById(String containerName, Object id, Class<T> domainType) {
362362
options.setResponseContinuationTokenLimitInKb(this.responseContinuationTokenLimitInKb);
363363
return this.getCosmosAsyncClient()
364364
.getDatabase(this.getDatabaseName())
365-
.getContainer(containerName)
365+
.getContainer(finalContainerName)
366366
.queryItems(sqlQuerySpec, options, JsonNode.class)
367367
.byPage()
368368
.publishOn(Schedulers.parallel())
@@ -762,7 +762,7 @@ public <T> Iterable<T> delete(@NonNull CosmosQuery query, @NonNull Class<T> doma
762762
Assert.hasText(containerName, "container should not be null, empty or only whitespaces");
763763
String finalContainerName = getContainerNameOverride(containerName);
764764

765-
final List<JsonNode> results = findItemsAsFlux(query, containerName, domainType).collectList().block();
765+
final List<JsonNode> results = findItemsAsFlux(query, finalContainerName, domainType).collectList().block();
766766
assert results != null;
767767
return results.stream()
768768
.map(item -> deleteItem(item, finalContainerName, domainType))
@@ -1074,7 +1074,7 @@ private <T> Iterable<T> findItems(@NonNull CosmosQuery query,
10741074
@NonNull String containerName,
10751075
@NonNull Class<T> domainType) {
10761076
String finalContainerName = getContainerNameOverride(containerName);
1077-
return findItemsAsFlux(query, containerName, domainType)
1077+
return findItemsAsFlux(query, finalContainerName, domainType)
10781078
.map(jsonNode -> emitOnLoadEventAndConvertToDomainObject(domainType, finalContainerName, jsonNode))
10791079
.toIterable();
10801080
}

sdk/cosmos/azure-spring-data-cosmos/src/main/java/com/azure/spring/data/cosmos/core/ReactiveCosmosTemplate.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ public <T> Flux<T> findAll(PartitionKey partitionKey, Class<T> domainType) {
294294
cosmosItemFeedResponse.getCosmosDiagnostics(), cosmosItemFeedResponse);
295295
return Flux.fromIterable(cosmosItemFeedResponse.getResults());
296296
})
297-
.map(cosmosItemProperties -> emitOnLoadEventAndConvertToDomainObject(domainType, cosmosItemProperties))
297+
.map(cosmosItemProperties -> emitOnLoadEventAndConvertToDomainObject(domainType, containerName, cosmosItemProperties))
298298
.onErrorResume(throwable ->
299299
CosmosExceptionUtils.exceptionHandler("Failed to find items", throwable,
300300
this.responseDiagnosticsProcessor));
@@ -325,7 +325,7 @@ public <T> Mono<T> findById(Object id, Class<T> domainType) {
325325
public <T> Mono<T> findById(String containerName, Object id, Class<T> domainType) {
326326
Assert.hasText(containerName, "containerName should not be null, empty or only whitespaces");
327327
Assert.notNull(domainType, "domainType should not be null");
328-
containerName = getContainerNameOverride(containerName);
328+
final String finalContainerName = getContainerNameOverride(containerName);
329329
final String query = "select * from root where root.id = @ROOT_ID";
330330
final SqlParameter param = new SqlParameter("@ROOT_ID", CosmosUtils.getStringIDValue(id));
331331
final SqlQuerySpec sqlQuerySpec = new SqlQuerySpec(query, param);
@@ -336,7 +336,7 @@ public <T> Mono<T> findById(String containerName, Object id, Class<T> domainType
336336
options.setResponseContinuationTokenLimitInKb(this.responseContinuationTokenLimitInKb);
337337

338338
return this.getCosmosAsyncClient().getDatabase(this.getDatabaseName())
339-
.getContainer(containerName)
339+
.getContainer(finalContainerName)
340340
.queryItems(sqlQuerySpec, options, JsonNode.class)
341341
.byPage()
342342
.publishOn(Schedulers.parallel())
@@ -347,7 +347,7 @@ public <T> Mono<T> findById(String containerName, Object id, Class<T> domainType
347347
return Mono.justOrEmpty(cosmosItemFeedResponse
348348
.getResults()
349349
.stream()
350-
.map(cosmosItem -> emitOnLoadEventAndConvertToDomainObject(domainType, cosmosItem))
350+
.map(cosmosItem -> emitOnLoadEventAndConvertToDomainObject(domainType, finalContainerName, cosmosItem))
351351
.findFirst());
352352
})
353353
.onErrorResume(throwable ->
@@ -378,7 +378,7 @@ public <T> Mono<T> findById(Object id, Class<T> domainType, PartitionKey partiti
378378
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
379379
cosmosItemResponse.getDiagnostics(), null);
380380
return Mono.justOrEmpty(emitOnLoadEventAndConvertToDomainObject(domainType,
381-
cosmosItemResponse.getItem()));
381+
containerName, cosmosItemResponse.getItem()));
382382
})
383383
.onErrorResume(throwable ->
384384
CosmosExceptionUtils.findAPIExceptionHandler("Failed to find item", throwable,
@@ -664,7 +664,7 @@ public <T> Flux<T> delete(CosmosQuery query, Class<T> domainType, String contain
664664
@Override
665665
public <T> Flux<T> find(CosmosQuery query, Class<T> domainType, String containerName) {
666666
return findItems(query, containerName, domainType)
667-
.map(cosmosItemProperties -> emitOnLoadEventAndConvertToDomainObject(domainType, cosmosItemProperties));
667+
.map(cosmosItemProperties -> emitOnLoadEventAndConvertToDomainObject(domainType, containerName, cosmosItemProperties));
668668
}
669669

670670
/**
@@ -744,7 +744,7 @@ public <T> Flux<T> runQuery(SqlQuerySpec querySpec, Class<?> domainType, Class<T
744744
public <T> Flux<T> runQuery(SqlQuerySpec querySpec, Sort sort, Class<?> domainType, Class<T> returnType) {
745745
SqlQuerySpec sortedQuerySpec = NativeQueryGenerator.getInstance().generateSortedQuery(querySpec, sort);
746746
return runQuery(sortedQuerySpec, domainType)
747-
.map(cosmosItemProperties -> emitOnLoadEventAndConvertToDomainObject(returnType, cosmosItemProperties));
747+
.map(cosmosItemProperties -> emitOnLoadEventAndConvertToDomainObject(returnType, getContainerName(domainType), cosmosItemProperties));
748748
}
749749

750750
private Flux<JsonNode> runQuery(SqlQuerySpec querySpec, Class<?> domainType) {
@@ -901,9 +901,9 @@ private <T> Mono<T> deleteItem(@NonNull JsonNode jsonNode,
901901
this.responseDiagnosticsProcessor));
902902
}
903903

904-
private <T> T emitOnLoadEventAndConvertToDomainObject(@NonNull Class<T> domainType, JsonNode responseJsonNode) {
905-
CosmosEntityInformation<?, ?> entityInformation = CosmosEntityInformation.getInstance(domainType);
906-
maybeEmitEvent(new AfterLoadEvent<>(responseJsonNode, domainType, entityInformation.getContainerName()));
904+
private <T> T emitOnLoadEventAndConvertToDomainObject(@NonNull Class<T> domainType, String containerName, JsonNode responseJsonNode) {
905+
containerName = getContainerNameOverride(containerName);
906+
maybeEmitEvent(new AfterLoadEvent<>(responseJsonNode, domainType, containerName));
907907
return toDomainObject(domainType, responseJsonNode);
908908
}
909909

0 commit comments

Comments
 (0)