Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import com.azure.spring.data.cosmos.repository.TestRepositoryConfig;
import com.azure.spring.data.cosmos.repository.repository.ReactiveCourseRepository;
import com.azure.spring.data.cosmos.repository.support.CosmosEntityInformation;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
Expand Down Expand Up @@ -319,6 +321,16 @@ public void testFindByNameOrDepartmentAllIgnoreCase() {
StepVerifier.create(findResult).expectNext(COURSE_1).verifyComplete();
}

@Test
public void testFindByNameJsonNode() {
final Flux<JsonNode> findResult = repository.annotatedFindByName(COURSE_NAME_1);
StepVerifier.create(findResult).consumeNextWith(result -> {
Assert.assertEquals(result.findValue("courseId").asText(), COURSE_1.getCourseId());
Assert.assertEquals(result.findValue("name").asText(), COURSE_1.getName());
Assert.assertEquals(result.findValue("department").asText(), COURSE_1.getDepartment());
}).verifyComplete();
}

@Test
public void testAnnotatedQueries() {
Flux<Course> courseFlux = repository.getCoursesWithNameDepartment(COURSE_NAME_1, DEPARTMENT_NAME_3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.azure.spring.data.cosmos.domain.Course;
import com.azure.spring.data.cosmos.repository.Query;
import com.azure.spring.data.cosmos.repository.ReactiveCosmosRepository;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.springframework.data.repository.query.Param;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -50,6 +51,9 @@ Flux<Course> findByNameAndDepartmentOrNameAndDepartment(String name,
*/
Flux<Course> findByNameOrDepartmentAllIgnoreCase(String name, String department);

@Query(value = "select c as jsonNode from c where c.name = @name")
Flux<JsonNode> annotatedFindByName(@Param("name") String city);

/**
* Find a single Course list by name
* @param name name
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-spring-data-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#### Breaking Changes

#### Bugs Fixed
* Bug fixed in `ReactiveCosmosTemplate` where returning a Flux<JsonNode> was causing an error - See [PR 33730](https://github.com/Azure/azure-sdk-for-java/pull/33730)

#### Other Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public <T> T findById(String containerName, Object id, Class<T> domainType) {
options.setResponseContinuationTokenLimitInKb(this.responseContinuationTokenLimitInKb);
return this.getCosmosAsyncClient()
.getDatabase(this.getDatabaseName())
.getContainer(containerName)
.getContainer(finalContainerName)
.queryItems(sqlQuerySpec, options, JsonNode.class)
.byPage()
.publishOn(Schedulers.parallel())
Expand Down Expand Up @@ -762,7 +762,7 @@ public <T> Iterable<T> delete(@NonNull CosmosQuery query, @NonNull Class<T> doma
Assert.hasText(containerName, "container should not be null, empty or only whitespaces");
String finalContainerName = getContainerNameOverride(containerName);

final List<JsonNode> results = findItemsAsFlux(query, containerName, domainType).collectList().block();
final List<JsonNode> results = findItemsAsFlux(query, finalContainerName, domainType).collectList().block();
assert results != null;
return results.stream()
.map(item -> deleteItem(item, finalContainerName, domainType))
Expand Down Expand Up @@ -1074,7 +1074,7 @@ private <T> Iterable<T> findItems(@NonNull CosmosQuery query,
@NonNull String containerName,
@NonNull Class<T> domainType) {
String finalContainerName = getContainerNameOverride(containerName);
return findItemsAsFlux(query, containerName, domainType)
return findItemsAsFlux(query, finalContainerName, domainType)
.map(jsonNode -> emitOnLoadEventAndConvertToDomainObject(domainType, finalContainerName, jsonNode))
.toIterable();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ public <T> Flux<T> findAll(PartitionKey partitionKey, Class<T> domainType) {
cosmosItemFeedResponse.getCosmosDiagnostics(), cosmosItemFeedResponse);
return Flux.fromIterable(cosmosItemFeedResponse.getResults());
})
.map(cosmosItemProperties -> emitOnLoadEventAndConvertToDomainObject(domainType, cosmosItemProperties))
.map(cosmosItemProperties -> emitOnLoadEventAndConvertToDomainObject(domainType, containerName, cosmosItemProperties))
.onErrorResume(throwable ->
CosmosExceptionUtils.exceptionHandler("Failed to find items", throwable,
this.responseDiagnosticsProcessor));
Expand Down Expand Up @@ -325,7 +325,7 @@ public <T> Mono<T> findById(Object id, Class<T> domainType) {
public <T> Mono<T> findById(String containerName, Object id, Class<T> domainType) {
Assert.hasText(containerName, "containerName should not be null, empty or only whitespaces");
Assert.notNull(domainType, "domainType should not be null");
containerName = getContainerNameOverride(containerName);
final String finalContainerName = getContainerNameOverride(containerName);
final String query = "select * from root where root.id = @ROOT_ID";
final SqlParameter param = new SqlParameter("@ROOT_ID", CosmosUtils.getStringIDValue(id));
final SqlQuerySpec sqlQuerySpec = new SqlQuerySpec(query, param);
Expand All @@ -336,7 +336,7 @@ public <T> Mono<T> findById(String containerName, Object id, Class<T> domainType
options.setResponseContinuationTokenLimitInKb(this.responseContinuationTokenLimitInKb);

return this.getCosmosAsyncClient().getDatabase(this.getDatabaseName())
.getContainer(containerName)
.getContainer(finalContainerName)
.queryItems(sqlQuerySpec, options, JsonNode.class)
.byPage()
.publishOn(Schedulers.parallel())
Expand All @@ -347,7 +347,7 @@ public <T> Mono<T> findById(String containerName, Object id, Class<T> domainType
return Mono.justOrEmpty(cosmosItemFeedResponse
.getResults()
.stream()
.map(cosmosItem -> emitOnLoadEventAndConvertToDomainObject(domainType, cosmosItem))
.map(cosmosItem -> emitOnLoadEventAndConvertToDomainObject(domainType, finalContainerName, cosmosItem))
.findFirst());
})
.onErrorResume(throwable ->
Expand Down Expand Up @@ -378,7 +378,7 @@ public <T> Mono<T> findById(Object id, Class<T> domainType, PartitionKey partiti
CosmosUtils.fillAndProcessResponseDiagnostics(this.responseDiagnosticsProcessor,
cosmosItemResponse.getDiagnostics(), null);
return Mono.justOrEmpty(emitOnLoadEventAndConvertToDomainObject(domainType,
cosmosItemResponse.getItem()));
containerName, cosmosItemResponse.getItem()));
})
.onErrorResume(throwable ->
CosmosExceptionUtils.findAPIExceptionHandler("Failed to find item", throwable,
Expand Down Expand Up @@ -664,7 +664,7 @@ public <T> Flux<T> delete(CosmosQuery query, Class<T> domainType, String contain
@Override
public <T> Flux<T> find(CosmosQuery query, Class<T> domainType, String containerName) {
return findItems(query, containerName, domainType)
.map(cosmosItemProperties -> emitOnLoadEventAndConvertToDomainObject(domainType, cosmosItemProperties));
.map(cosmosItemProperties -> emitOnLoadEventAndConvertToDomainObject(domainType, containerName, cosmosItemProperties));
}

/**
Expand Down Expand Up @@ -744,7 +744,7 @@ public <T> Flux<T> runQuery(SqlQuerySpec querySpec, Class<?> domainType, Class<T
public <T> Flux<T> runQuery(SqlQuerySpec querySpec, Sort sort, Class<?> domainType, Class<T> returnType) {
SqlQuerySpec sortedQuerySpec = NativeQueryGenerator.getInstance().generateSortedQuery(querySpec, sort);
return runQuery(sortedQuerySpec, domainType)
.map(cosmosItemProperties -> emitOnLoadEventAndConvertToDomainObject(returnType, cosmosItemProperties));
.map(cosmosItemProperties -> emitOnLoadEventAndConvertToDomainObject(returnType, getContainerName(domainType), cosmosItemProperties));
}

private Flux<JsonNode> runQuery(SqlQuerySpec querySpec, Class<?> domainType) {
Expand Down Expand Up @@ -901,9 +901,9 @@ private <T> Mono<T> deleteItem(@NonNull JsonNode jsonNode,
this.responseDiagnosticsProcessor));
}

private <T> T emitOnLoadEventAndConvertToDomainObject(@NonNull Class<T> domainType, JsonNode responseJsonNode) {
CosmosEntityInformation<?, ?> entityInformation = CosmosEntityInformation.getInstance(domainType);
maybeEmitEvent(new AfterLoadEvent<>(responseJsonNode, domainType, entityInformation.getContainerName()));
private <T> T emitOnLoadEventAndConvertToDomainObject(@NonNull Class<T> domainType, String containerName, JsonNode responseJsonNode) {
containerName = getContainerNameOverride(containerName);
maybeEmitEvent(new AfterLoadEvent<>(responseJsonNode, domainType, containerName));
return toDomainObject(domainType, responseJsonNode);
}

Expand Down