diff --git a/sdk/cosmos/azure-cosmos/pom.xml b/sdk/cosmos/azure-cosmos/pom.xml index 65a40da6dbe7..ae8dee16f2b3 100644 --- a/sdk/cosmos/azure-cosmos/pom.xml +++ b/sdk/cosmos/azure-cosmos/pom.xml @@ -127,7 +127,18 @@ Licensed under the MIT License. - + + com.azure + azure-core-tracing-opentelemetry + test + 1.0.0-beta.5 + + + io.netty + netty-tcnative-boringssl-static + + + com.fasterxml.jackson.module jackson-module-afterburner diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncClient.java index bbe989db8e80..473c45d6d25d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncClient.java @@ -4,16 +4,19 @@ import com.azure.core.annotation.ServiceClient; import com.azure.core.credential.AzureKeyCredential; +import com.azure.core.util.Context; +import com.azure.core.util.tracing.Tracer; import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.ConnectionPolicy; import com.azure.cosmos.implementation.CosmosAuthorizationTokenResolver; import com.azure.cosmos.implementation.Database; import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.TracerProvider; import com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdMetrics; -import com.azure.cosmos.models.CosmosDatabaseResponse; import com.azure.cosmos.models.CosmosDatabaseProperties; import com.azure.cosmos.models.CosmosDatabaseRequestOptions; +import com.azure.cosmos.models.CosmosDatabaseResponse; import com.azure.cosmos.models.CosmosPermissionProperties; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.ModelBridgeInternal; @@ -27,7 +30,9 @@ import java.io.Closeable; import java.util.List; +import java.util.ServiceLoader; +import static com.azure.core.util.FluxUtil.withContext; import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount; /** @@ -51,6 +56,7 @@ public final class CosmosAsyncClient implements Closeable { private final AzureKeyCredential credential; private final boolean sessionCapturingOverride; private final boolean enableTransportClientSharing; + private final TracerProvider tracerProvider; private final boolean contentResponseOnWriteEnabled; CosmosAsyncClient(CosmosClientBuilder builder) { @@ -65,6 +71,7 @@ public final class CosmosAsyncClient implements Closeable { this.sessionCapturingOverride = builder.isSessionCapturingOverrideEnabled(); this.enableTransportClientSharing = builder.isConnectionSharingAcrossClientsEnabled(); this.contentResponseOnWriteEnabled = builder.isContentResponseOnWriteEnabled(); + this.tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class)); this.asyncDocumentClient = new AsyncDocumentClient.Builder() .withServiceEndpoint(this.serviceEndpoint) .withMasterKeyOrResourceToken(this.keyOrResourceToken) @@ -194,8 +201,14 @@ boolean isContentResponseOnWriteEnabled() { * @return a {@link Mono} containing the cosmos database response with the created or existing database or * an error. */ - Mono createDatabaseIfNotExists(CosmosDatabaseProperties databaseProperties) { - return createDatabaseIfNotExistsInternal(getDatabase(databaseProperties.getId())); + public Mono createDatabaseIfNotExists(CosmosDatabaseProperties databaseProperties) { + if(!getTracerProvider().isEnabled()) { + CosmosAsyncDatabase database = getDatabase(databaseProperties.getId()); + return createDatabaseIfNotExistsInternal(database.read(), database, null, null); + } + + return withContext(context -> createDatabaseIfNotExistsInternal(getDatabase(databaseProperties.getId()), + null, context)); } /** @@ -209,21 +222,12 @@ Mono createDatabaseIfNotExists(CosmosDatabaseProperties * an error. */ public Mono createDatabaseIfNotExists(String id) { - return createDatabaseIfNotExistsInternal(getDatabase(id)); - } + if(!getTracerProvider().isEnabled()) { + CosmosAsyncDatabase database = getDatabase(id); + return createDatabaseIfNotExistsInternal(database.read(), database, null,null); + } - private Mono createDatabaseIfNotExistsInternal(CosmosAsyncDatabase database) { - return database.read().onErrorResume(exception -> { - final Throwable unwrappedException = Exceptions.unwrap(exception); - if (unwrappedException instanceof CosmosException) { - final CosmosException cosmosException = (CosmosException) unwrappedException; - if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { - return createDatabase(new CosmosDatabaseProperties(database.getId()), - new CosmosDatabaseRequestOptions()); - } - } - return Mono.error(unwrappedException); - }); + return withContext(context -> createDatabaseIfNotExistsInternal(getDatabase(id), null, context)); } /** @@ -240,19 +244,13 @@ private Mono createDatabaseIfNotExistsInternal(CosmosAsy * @return the mono. */ public Mono createDatabaseIfNotExists(String id, ThroughputProperties throughputProperties) { - return this.getDatabase(id).read().onErrorResume(exception -> { - final Throwable unwrappedException = Exceptions.unwrap(exception); - if (unwrappedException instanceof CosmosException) { - final CosmosException cosmosException = (CosmosException) unwrappedException; - if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { - CosmosDatabaseRequestOptions options = new CosmosDatabaseRequestOptions(); - ModelBridgeInternal.setThroughputProperties(options, throughputProperties); - return createDatabase(new CosmosDatabaseProperties(id), - options); - } - } - return Mono.error(unwrappedException); - }); + if(!getTracerProvider().isEnabled()) { + CosmosAsyncDatabase database = getDatabase(id); + return createDatabaseIfNotExistsInternal(database.read(), database, throughputProperties, null); + } + + return withContext(context -> createDatabaseIfNotExistsInternal(getDatabase(id), + throughputProperties, context)); } /** @@ -274,9 +272,12 @@ public Mono createDatabase(CosmosDatabaseProperties data } Database wrappedDatabase = new Database(); wrappedDatabase.setId(databaseProperties.getId()); - return asyncDocumentClient.createDatabase(wrappedDatabase, ModelBridgeInternal.toRequestOptions(options)) - .map(databaseResourceResponse -> ModelBridgeInternal.createCosmosDatabaseResponse(databaseResourceResponse)) - .single(); + if(!getTracerProvider().isEnabled()) { + return createDatabaseInternal(wrappedDatabase, options); + } + + final CosmosDatabaseRequestOptions requestOptions = options; + return withContext(context -> createDatabaseInternal(wrappedDatabase, requestOptions, context)); } /** @@ -331,9 +332,13 @@ public Mono createDatabase(CosmosDatabaseProperties data ModelBridgeInternal.setThroughputProperties(options, throughputProperties); Database wrappedDatabase = new Database(); wrappedDatabase.setId(databaseProperties.getId()); - return asyncDocumentClient.createDatabase(wrappedDatabase, ModelBridgeInternal.toRequestOptions(options)) - .map(databaseResourceResponse -> ModelBridgeInternal.createCosmosDatabaseResponse(databaseResourceResponse)) - .single(); + if (!getTracerProvider().isEnabled()) { + return createDatabaseInternal(wrappedDatabase, options); + } + + + final CosmosDatabaseRequestOptions requestOptions = options; + return withContext(context -> createDatabaseInternal(wrappedDatabase, requestOptions, context)); } /** @@ -397,13 +402,15 @@ public Mono createDatabase(String id, ThroughputProperti */ CosmosPagedFlux readAllDatabases(CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "readAllDatabases"; + pagedFluxOptions.setTracerInformation(this.tracerProvider, spanName, this.serviceEndpoint, null); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return getDocClientWrapper().readDatabases(options) .map(response -> BridgeInternal.createFeedResponse( ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()), response.getResponseHeaders())); - }); + }, this.tracerProvider.isEnabled()); } /** @@ -432,7 +439,7 @@ public CosmosPagedFlux readAllDatabases() { * @return a {@link CosmosPagedFlux} containing one or several feed response pages of read databases or an error. */ public CosmosPagedFlux queryDatabases(String query, CosmosQueryRequestOptions options) { - return queryDatabases(new SqlQuerySpec(query), options); + return queryDatabasesInternal(new SqlQuerySpec(query), options); } /** @@ -447,13 +454,7 @@ public CosmosPagedFlux queryDatabases(String query, Co * @return a {@link CosmosPagedFlux} containing one or several feed response pages of read databases or an error. */ public CosmosPagedFlux queryDatabases(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) { - return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { - setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); - return getDocClientWrapper().queryDatabases(querySpec, options) - .map(response -> BridgeInternal.createFeedResponse( - ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()), - response.getResponseHeaders())); - }); + return queryDatabasesInternal(querySpec, options); } /** @@ -473,4 +474,77 @@ public CosmosAsyncDatabase getDatabase(String id) { public void close() { asyncDocumentClient.close(); } + + TracerProvider getTracerProvider(){ + return this.tracerProvider; + } + + private CosmosPagedFlux queryDatabasesInternal(SqlQuerySpec querySpec, CosmosQueryRequestOptions options){ + return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "queryDatabases"; + pagedFluxOptions.setTracerInformation(this.tracerProvider, spanName, this.serviceEndpoint, null); + setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); + return getDocClientWrapper().queryDatabases(querySpec, options) + .map(response -> BridgeInternal.createFeedResponse( + ModelBridgeInternal.getCosmosDatabasePropertiesFromV2Results(response.getResults()), + response.getResponseHeaders())); + }, this.tracerProvider.isEnabled()); + } + + + private Mono createDatabaseIfNotExistsInternal(CosmosAsyncDatabase database, + ThroughputProperties throughputProperties, Context context) { + String spanName = "createDatabaseIfNotExists." + database.getId(); + Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL); + Mono responseMono = createDatabaseIfNotExistsInternal(database.readInternal(new CosmosDatabaseRequestOptions(), nestedContext), database, throughputProperties, nestedContext); + return tracerProvider.traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + database.getId(), + this.serviceEndpoint); + } + + private Mono createDatabaseIfNotExistsInternal(Mono responseMono, CosmosAsyncDatabase database, ThroughputProperties throughputProperties, Context context) { + return responseMono.onErrorResume(exception -> { + final Throwable unwrappedException = Exceptions.unwrap(exception); + if (unwrappedException instanceof CosmosException) { + final CosmosException cosmosException = (CosmosException) unwrappedException; + if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { + CosmosDatabaseRequestOptions requestOptions = new CosmosDatabaseRequestOptions(); + if(throughputProperties != null) { + ModelBridgeInternal.setThroughputProperties(requestOptions, throughputProperties); + } + + if (context != null) { + Database wrappedDatabase = new Database(); + wrappedDatabase.setId(database.getId()); + return createDatabaseInternal(wrappedDatabase, + requestOptions, context); + } + + return createDatabase(new CosmosDatabaseProperties(database.getId()), + requestOptions); + } + } + return Mono.error(unwrappedException); + }); + } + + + private Mono createDatabaseInternal(Database database, CosmosDatabaseRequestOptions options, + Context context) { + String spanName = "createDatabase." + database.getId(); + Mono responseMono = createDatabaseInternal(database, options); + return tracerProvider.traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + database.getId(), + this.serviceEndpoint); + } + + private Mono createDatabaseInternal(Database database, CosmosDatabaseRequestOptions options) { + return asyncDocumentClient.createDatabase(database, ModelBridgeInternal.toRequestOptions(options)) + .map(databaseResourceResponse -> ModelBridgeInternal.createCosmosDatabaseResponse(databaseResourceResponse)) + .single(); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncConflict.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncConflict.java index 9e7a336e782f..96f1edaaa7ed 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncConflict.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncConflict.java @@ -2,6 +2,7 @@ // Licensed under the MIT License. package com.azure.cosmos; +import com.azure.core.util.Context; import com.azure.cosmos.implementation.Paths; import com.azure.cosmos.implementation.RequestOptions; import com.azure.cosmos.models.CosmosConflictResponse; @@ -9,6 +10,8 @@ import com.azure.cosmos.models.ModelBridgeInternal; import reactor.core.publisher.Mono; +import static com.azure.core.util.FluxUtil.withContext; + /** * Read and delete conflicts */ @@ -64,9 +67,11 @@ public Mono read(CosmosConflictRequestOptions options) { options = new CosmosConflictRequestOptions(); } RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options); - return this.container.getDatabase().getDocClientWrapper().readConflict(getLink(), requestOptions) - .map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single(); + if (!this.container.getDatabase().getClient().getTracerProvider().isEnabled()) { + return readInternal(requestOptions); + } + return withContext(context -> readInternal(requestOptions, context)); } /** @@ -85,8 +90,11 @@ public Mono delete(CosmosConflictRequestOptions options) options = new CosmosConflictRequestOptions(); } RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options); - return this.container.getDatabase().getDocClientWrapper().deleteConflict(getLink(), requestOptions) - .map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single(); + if (!this.container.getDatabase().getClient().getTracerProvider().isEnabled()) { + return deleteInternal(requestOptions); + } + + return withContext(context -> deleteInternal(requestOptions, context)); } String getURIPathSegment() { @@ -106,4 +114,33 @@ String getLink() { builder.append(getId()); return builder.toString(); } + + private Mono readInternal(RequestOptions options, Context context) { + String spanName = "readConflict." + getId(); + Mono responseMono = this.readInternal(options); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + this.container.getDatabase().getId(), + this.container.getDatabase().getClient().getServiceEndpoint()); + + } + + private Mono readInternal(RequestOptions options) { + return this.container.getDatabase().getDocClientWrapper().readConflict(getLink(), options) + .map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single(); + } + + private Mono deleteInternal(RequestOptions options, Context context) { + String spanName = "deleteConflict." + getId(); + Mono responseMono = deleteInternal(options); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + this.container.getDatabase().getId(), + this.container.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono deleteInternal(RequestOptions options) { + return this.container.getDatabase().getDocClientWrapper().deleteConflict(getLink(), options) + .map(response -> ModelBridgeInternal.createCosmosConflictResponse(response)).single(); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java index 43812d58a822..81055b0dc932 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java @@ -2,6 +2,7 @@ // Licensed under the MIT License. package com.azure.cosmos; +import com.azure.core.util.Context; import com.azure.cosmos.implementation.Constants; import com.azure.cosmos.implementation.CosmosItemProperties; import com.azure.cosmos.implementation.Document; @@ -9,14 +10,15 @@ import com.azure.cosmos.implementation.Offer; import com.azure.cosmos.implementation.Paths; import com.azure.cosmos.implementation.RequestOptions; +import com.azure.cosmos.implementation.TracerProvider; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.query.QueryInfo; -import com.azure.cosmos.models.CosmosContainerResponse; -import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosConflictProperties; import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosContainerRequestOptions; +import com.azure.cosmos.models.CosmosContainerResponse; import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.ModelBridgeInternal; @@ -32,6 +34,7 @@ import java.util.List; import java.util.stream.Collectors; +import static com.azure.core.util.FluxUtil.withContext; import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount; /** @@ -43,12 +46,40 @@ public class CosmosAsyncContainer { private final CosmosAsyncDatabase database; private final String id; private final String link; + private final String replaceContainerSpanName; + private final String deleteContainerSpanName; + private final String replaceThroughputSpanName; + private final String readThroughputSpanName; + private final String readContainerSpanName; + private final String readItemSpanName; + private final String upsertItemSpanName; + private final String deleteItemSpanName; + private final String replaceItemSpanName; + private final String createItemSpanName; + private final String readAllItemsSpanName; + private final String queryItemsSpanName; + private final String readAllConflictsSpanName; + private final String queryConflictsSpanName; private CosmosAsyncScripts scripts; CosmosAsyncContainer(String id, CosmosAsyncDatabase database) { this.id = id; this.database = database; this.link = getParentLink() + "/" + getURIPathSegment() + "/" + getId(); + this.replaceContainerSpanName = "replaceContainer." + this.id; + this.deleteContainerSpanName = "deleteContainer." + this.id; + this.replaceThroughputSpanName = "replaceThroughput." + this.id; + this.readThroughputSpanName = "readThroughput." + this.id; + this.readContainerSpanName = "readContainer." + this.id; + this.readItemSpanName = "readItem." + this.id; + this.upsertItemSpanName = "upsertItem." + this.id; + this.deleteItemSpanName = "deleteItem." + this.id; + this.replaceItemSpanName = "replaceItem." + this.id; + this.createItemSpanName = "createItem." + this.id; + this.readAllItemsSpanName = "readAllItems." + this.id; + this.queryItemsSpanName = "queryItems." + this.id; + this.readAllConflictsSpanName = "readAllConflicts." + this.id; + this.queryConflictsSpanName = "queryConflicts." + this.id; } /** @@ -89,8 +120,13 @@ public Mono read(CosmosContainerRequestOptions options) if (options == null) { options = new CosmosContainerRequestOptions(); } - return database.getDocClientWrapper().readCollection(getLink(), ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosContainerResponse(response)).single(); + + final CosmosContainerRequestOptions requestOptions = options; + if(!database.getClient().getTracerProvider().isEnabled()){ + return readInternal(options); + } + + return withContext(context -> read(requestOptions, context)); } /** @@ -108,8 +144,13 @@ public Mono delete(CosmosContainerRequestOptions option if (options == null) { options = new CosmosContainerRequestOptions(); } - return database.getDocClientWrapper().deleteCollection(getLink(), ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosContainerResponse(response)).single(); + + if (!database.getClient().getTracerProvider().isEnabled()) { + return deleteInternal(options); + } + + final CosmosContainerRequestOptions requestOptions = options; + return withContext(context -> deleteInternal(requestOptions, context)); } /** @@ -161,9 +202,13 @@ public Mono replace( if (options == null) { options = new CosmosContainerRequestOptions(); } - return database.getDocClientWrapper() - .replaceCollection(ModelBridgeInternal.getV2Collection(containerProperties), ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosContainerResponse(response)).single(); + + if(!database.getClient().getTracerProvider().isEnabled()) { + return replaceInternal(containerProperties, options); + } + + final CosmosContainerRequestOptions requestOptions = options; + return withContext(context -> replaceInternal(containerProperties, requestOptions, context)); } /* CosmosAsyncItem operations */ @@ -208,7 +253,6 @@ public Mono> createItem( return createItem(item, options); } - /** * Creates a Cosmos item. * @@ -221,16 +265,35 @@ public Mono> createItem(T item, CosmosItemRequestOptio if (options == null) { options = new CosmosItemRequestOptions(); } + + if (!database.getClient().getTracerProvider().isEnabled()) { + return createItemInternal(item, options); + } + + final CosmosItemRequestOptions requestOptions = options; + return withContext(context -> createItemInternal(item, requestOptions, context)); + } + + private Mono> createItemInternal(T item, CosmosItemRequestOptions options, Context context) { + Mono> responseMono = createItemInternal(item, options); + return database.getClient().getTracerProvider().traceEnabledCosmosItemResponsePublisher(responseMono, + context, + this.createItemSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono> createItemInternal(T item, CosmosItemRequestOptions options) { @SuppressWarnings("unchecked") Class itemType = (Class) item.getClass(); RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options); return database.getDocClientWrapper() - .createDocument(getLink(), - item, - requestOptions, - true) - .map(response -> ModelBridgeInternal.createCosmosAsyncItemResponse(response, itemType)) - .single(); + .createDocument(getLink(), + item, + requestOptions, + true) + .map(response -> ModelBridgeInternal.createCosmosAsyncItemResponse(response, itemType)) + .single(); } /** @@ -264,14 +327,13 @@ public Mono> upsertItem(T item, CosmosItemRequestOptio if (options == null) { options = new CosmosItemRequestOptions(); } - @SuppressWarnings("unchecked") - Class itemType = (Class) item.getClass(); - return this.getDatabase().getDocClientWrapper() - .upsertDocument(this.getLink(), item, - ModelBridgeInternal.toRequestOptions(options), - true) - .map(response -> ModelBridgeInternal.createCosmosAsyncItemResponse(response, itemType)) - .single(); + + if(!database.getClient().getTracerProvider().isEnabled()) { + return upsertItemInternal(item, options); + } + + final CosmosItemRequestOptions requestOptions = options; + return withContext(context -> upsertItemInternal(item, requestOptions, context)); } /** @@ -305,10 +367,12 @@ CosmosPagedFlux readAllItems(Class classType) { */ CosmosPagedFlux readAllItems(CosmosQueryRequestOptions options, Class classType) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + pagedFluxOptions.setTracerInformation(this.getDatabase().getClient().getTracerProvider(), this.readAllItemsSpanName, + this.getDatabase().getClient().getServiceEndpoint(), database.getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return getDatabase().getDocClientWrapper().readDocuments(getLink(), options).map( response -> prepareFeedResponse(response, classType)); - }); + }, this.getDatabase().getClient().getTracerProvider().isEnabled()); } /** @@ -325,7 +389,7 @@ CosmosPagedFlux readAllItems(CosmosQueryRequestOptions options, Class * error. */ public CosmosPagedFlux queryItems(String query, Class classType) { - return queryItems(new SqlQuerySpec(query), classType); + return queryItemsInternal(new SqlQuerySpec(query), new CosmosQueryRequestOptions(), classType); } /** @@ -343,7 +407,7 @@ public CosmosPagedFlux queryItems(String query, Class classType) { * error. */ public CosmosPagedFlux queryItems(String query, CosmosQueryRequestOptions options, Class classType) { - return queryItems(new SqlQuerySpec(query), options, classType); + return queryItemsInternal(new SqlQuerySpec(query), options, classType); } /** @@ -360,7 +424,7 @@ public CosmosPagedFlux queryItems(String query, CosmosQueryRequestOptions * error. */ public CosmosPagedFlux queryItems(SqlQuerySpec querySpec, Class classType) { - return queryItems(querySpec, new CosmosQueryRequestOptions(), classType); + return queryItemsInternal(querySpec, new CosmosQueryRequestOptions(), classType); } /** @@ -382,14 +446,17 @@ public CosmosPagedFlux queryItems(SqlQuerySpec querySpec, CosmosQueryRequ } private CosmosPagedFlux queryItemsInternal( - SqlQuerySpec sqlQuerySpec, CosmosQueryRequestOptions cosmosQueryRequestOptions, Class classType) { + SqlQuerySpec sqlQuerySpec, CosmosQueryRequestOptions cosmosQueryRequestOptions, Class classType) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = this.queryItemsSpanName; + pagedFluxOptions.setTracerInformation(this.getDatabase().getClient().getTracerProvider(), spanName, + this.getDatabase().getClient().getServiceEndpoint(), database.getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, cosmosQueryRequestOptions); return getDatabase().getDocClientWrapper() .queryDocuments(CosmosAsyncContainer.this.getLink(), sqlQuerySpec, cosmosQueryRequestOptions) .map(response -> prepareFeedResponse(response, classType)); - }); + }, this.getDatabase().getClient().getTracerProvider().isEnabled()); } private FeedResponse prepareFeedResponse(FeedResponse response, Class classType) { @@ -452,12 +519,14 @@ public Mono> readItem( if (options == null) { options = new CosmosItemRequestOptions(); } + ModelBridgeInternal.setPartitionKey(options, partitionKey); RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options); - return this.getDatabase().getDocClientWrapper() - .readDocument(getItemLink(itemId), requestOptions) - .map(response -> ModelBridgeInternal.createCosmosAsyncItemResponse(response, itemType)) - .single(); + if(!database.getClient().getTracerProvider().isEnabled()) { + return readItemInternal(itemId, requestOptions, itemType); + } + + return withContext(context -> readItemInternal(itemId, requestOptions, itemType, context)); } /** @@ -499,11 +568,12 @@ public Mono> replaceItem( ModelBridgeInternal.setPartitionKey(options, partitionKey); @SuppressWarnings("unchecked") Class itemType = (Class) item.getClass(); - return this.getDatabase() - .getDocClientWrapper() - .replaceDocument(getItemLink(itemId), doc, ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosAsyncItemResponse(response, itemType)) - .single(); + if(!database.getClient().getTracerProvider().isEnabled()){ + return replaceItemInternal(itemType, itemId, doc, options); + } + + final CosmosItemRequestOptions requestOptions = options; + return withContext(context -> replaceItemInternal(itemType, itemId, doc, requestOptions,context)); } /** @@ -539,11 +609,11 @@ public Mono> deleteItem( } ModelBridgeInternal.setPartitionKey(options, partitionKey); RequestOptions requestOptions = ModelBridgeInternal.toRequestOptions(options); - return this.getDatabase() - .getDocClientWrapper() - .deleteDocument(getItemLink(itemId), requestOptions) - .map(response -> ModelBridgeInternal.createCosmosAsyncItemResponseWithObjectType(response)) - .single(); + if(!database.getClient().getTracerProvider().isEnabled()) { + return deleteItemInternal(itemId, requestOptions); + } + + return withContext(context -> deleteItemInternal(itemId, requestOptions, context)); } private String getItemLink(String itemId) { @@ -579,12 +649,14 @@ public CosmosAsyncScripts getScripts() { */ public CosmosPagedFlux readAllConflicts(CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + pagedFluxOptions.setTracerInformation(this.getDatabase().getClient().getTracerProvider(), this.readAllConflictsSpanName, + this.getDatabase().getClient().getServiceEndpoint(), database.getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return database.getDocClientWrapper().readConflicts(getLink(), options) .map(response -> BridgeInternal.createFeedResponse( ModelBridgeInternal.getCosmosConflictPropertiesFromV2Results(response.getResults()), response.getResponseHeaders())); - }); + }, this.getDatabase().getClient().getTracerProvider().isEnabled()); } /** @@ -608,12 +680,14 @@ public CosmosPagedFlux queryConflicts(String query) { */ public CosmosPagedFlux queryConflicts(String query, CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + pagedFluxOptions.setTracerInformation(this.getDatabase().getClient().getTracerProvider(), this.queryConflictsSpanName, + this.getDatabase().getClient().getServiceEndpoint(), database.getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return database.getDocClientWrapper().queryConflicts(getLink(), query, options) .map(response -> BridgeInternal.createFeedResponse( ModelBridgeInternal.getCosmosConflictPropertiesFromV2Results(response.getResults()), response.getResponseHeaders())); - }); + }, this.getDatabase().getClient().getTracerProvider().isEnabled()); } /** @@ -627,35 +701,17 @@ public CosmosAsyncConflict getConflict(String id) { } /** - * Replace the throughput provisioned for the current container. + * Replace the throughput . * - * @param throughputProperties the throughput properties. - * @return the mono containing throughput response. + * @param throughputProperties the throughput properties + * @return the mono containing throughput response */ public Mono replaceThroughput(ThroughputProperties throughputProperties) { - return this.read() - .flatMap(response -> this.database.getDocClientWrapper() - .queryOffers(database.getOfferQuerySpecFromResourceId(response.getProperties() - .getResourceId()) - , new CosmosQueryRequestOptions()) - .single() - .flatMap(offerFeedResponse -> { - if (offerFeedResponse.getResults().isEmpty()) { - return Mono.error(BridgeInternal - .createCosmosException( - HttpConstants.StatusCodes.BADREQUEST, - "No offers found for the " + - "resource " + this.getId())); - } - - Offer existingOffer = offerFeedResponse.getResults().get(0); - Offer updatedOffer = - ModelBridgeInternal.updateOfferFromProperties(existingOffer, - throughputProperties); - return this.database.getDocClientWrapper() - .replaceOffer(updatedOffer) - .single(); - }).map(ModelBridgeInternal::createThroughputRespose)); + if (!this.database.getClient().getTracerProvider().isEnabled()) { + return replaceThroughputInternal(this.read(), throughputProperties); + } + + return withContext(context -> replaceThroughputInternal(throughputProperties, context)); } /** @@ -664,29 +720,12 @@ public Mono replaceThroughput(ThroughputProperties throughpu * @return the mono containing throughput response. */ public Mono readThroughput() { - return this.read() - .flatMap(response -> this.database.getDocClientWrapper() - .queryOffers(database.getOfferQuerySpecFromResourceId(response.getProperties() - .getResourceId()) - , new CosmosQueryRequestOptions()) - .single() - .flatMap(offerFeedResponse -> { - if (offerFeedResponse.getResults().isEmpty()) { - return Mono.error(BridgeInternal - .createCosmosException( - HttpConstants.StatusCodes.BADREQUEST, - "No offers found for the resource " - + this.getId())); - } - return this.database.getDocClientWrapper() - .readOffer(offerFeedResponse.getResults() - .get(0) - .getSelfLink()) - .single(); - }) - .map(ModelBridgeInternal::createThroughputRespose)); - } + if (!this.database.getClient().getTracerProvider().isEnabled()) { + return readThroughputInternal(this.read()); + } + return withContext(context -> readThroughputInternal(context)); + } /** * Gets the parent {@link CosmosAsyncDatabase} for the current container. @@ -708,4 +747,214 @@ String getParentLink() { String getLink() { return this.link; } + + private Mono> deleteItemInternal( + String itemId, + RequestOptions requestOptions, + Context context) { + Mono> responseMono = deleteItemInternal(itemId, requestOptions); + return database.getClient().getTracerProvider().traceEnabledCosmosItemResponsePublisher(responseMono, + context, + this.deleteItemSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono> deleteItemInternal( + String itemId, + RequestOptions requestOptions) { + return this.getDatabase() + .getDocClientWrapper() + .deleteDocument(getItemLink(itemId), requestOptions) + .map(response -> ModelBridgeInternal.createCosmosAsyncItemResponseWithObjectType(response)) + .single(); + } + + private Mono> replaceItemInternal( + Class itemType, + String itemId, + Document doc, + CosmosItemRequestOptions options, + Context context) { + Mono> responseMono = replaceItemInternal(itemType, itemId, doc, options); + return database.getClient().getTracerProvider().traceEnabledCosmosItemResponsePublisher(responseMono, + context, this.replaceItemSpanName, database.getId(), database.getClient().getServiceEndpoint()); + } + + private Mono> replaceItemInternal( + Class itemType, + String itemId, + Document doc, + CosmosItemRequestOptions options) { + return this.getDatabase() + .getDocClientWrapper() + .replaceDocument(getItemLink(itemId), doc, ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosAsyncItemResponse(response, itemType)) + .single(); + } + + private Mono> upsertItemInternal(T item, CosmosItemRequestOptions options, Context context) { + Mono> responseMono = upsertItemInternal(item, options); + return database.getClient().getTracerProvider().traceEnabledCosmosItemResponsePublisher(responseMono, + context, + this.upsertItemSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono> upsertItemInternal(T item, CosmosItemRequestOptions options) { + @SuppressWarnings("unchecked") + Class itemType = (Class) item.getClass(); + return this.getDatabase().getDocClientWrapper() + .upsertDocument(this.getLink(), item, + ModelBridgeInternal.toRequestOptions(options), + true) + .map(response -> ModelBridgeInternal.createCosmosAsyncItemResponse(response, itemType)) + .single(); + } + + private Mono> readItemInternal( + String itemId, + RequestOptions requestOptions, Class itemType, + Context context) { + Mono> responseMono = readItemInternal(itemId, requestOptions, itemType); + return database.getClient().getTracerProvider().traceEnabledCosmosItemResponsePublisher(responseMono, + context, + this.readItemSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono> readItemInternal( + String itemId, + RequestOptions requestOptions, Class itemType) { + return this.getDatabase().getDocClientWrapper() + .readDocument(getItemLink(itemId), requestOptions) + .map(response -> ModelBridgeInternal.createCosmosAsyncItemResponse(response, itemType)) + .single(); + } + + Mono read(CosmosContainerRequestOptions options, Context context) { + Mono responseMono = readInternal(options); + return database.getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + this.readContainerSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono readInternal(CosmosContainerRequestOptions options) { + return database.getDocClientWrapper().readCollection(getLink(), + ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosContainerResponse(response)).single(); + } + + private Mono deleteInternal(CosmosContainerRequestOptions options, Context context) { + Mono responseMono = deleteInternal(options); + return database.getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + this.deleteContainerSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono deleteInternal(CosmosContainerRequestOptions options) { + return database.getDocClientWrapper().deleteCollection(getLink(), + ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosContainerResponse(response)).single(); + } + + private Mono replaceInternal(CosmosContainerProperties containerProperties, + CosmosContainerRequestOptions options, + Context context) { + Mono responseMono = replaceInternal(containerProperties, options); + return database.getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + this.replaceContainerSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono replaceInternal(CosmosContainerProperties containerProperties, + CosmosContainerRequestOptions options) { + return database.getDocClientWrapper() + .replaceCollection(ModelBridgeInternal.getV2Collection(containerProperties), + ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosContainerResponse(response)).single(); + } + + private Mono readThroughputInternal(Context context) { + Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL); + Mono responseMono = readThroughputInternal(this.read(new CosmosContainerRequestOptions(), + nestedContext)); + return this.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + this.readThroughputSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono readThroughputInternal(Mono responseMono) { + return responseMono + .flatMap(response -> this.database.getDocClientWrapper() + .queryOffers(database.getOfferQuerySpecFromResourceId(response.getProperties() + .getResourceId()) + , new CosmosQueryRequestOptions()) + .single() + .flatMap(offerFeedResponse -> { + if (offerFeedResponse.getResults().isEmpty()) { + return Mono.error(BridgeInternal + .createCosmosException( + HttpConstants.StatusCodes.BADREQUEST, + "No offers found for the resource " + + this.getId())); + } + return this.database.getDocClientWrapper() + .readOffer(offerFeedResponse.getResults() + .get(0) + .getSelfLink()) + .single(); + }) + .map(ModelBridgeInternal::createThroughputRespose)); + } + + private Mono replaceThroughputInternal(ThroughputProperties throughputProperties, + Context context) { + Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL); + Mono responseMono = + replaceThroughputInternal(this.read(new CosmosContainerRequestOptions(), nestedContext), + throughputProperties); + return this.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + this.replaceThroughputSpanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono replaceThroughputInternal(Mono responseMono, + ThroughputProperties throughputProperties) { + return responseMono + .flatMap(response -> this.database.getDocClientWrapper() + .queryOffers(database.getOfferQuerySpecFromResourceId(response.getProperties() + .getResourceId()) + , new CosmosQueryRequestOptions()) + .single() + .flatMap(offerFeedResponse -> { + if (offerFeedResponse.getResults().isEmpty()) { + return Mono.error(BridgeInternal + .createCosmosException( + HttpConstants.StatusCodes.BADREQUEST, + "No offers found for the " + + "resource " + this.getId())); + } + + Offer existingOffer = offerFeedResponse.getResults().get(0); + Offer updatedOffer = + ModelBridgeInternal.updateOfferFromProperties(existingOffer, + throughputProperties); + return this.database.getDocClientWrapper() + .replaceOffer(updatedOffer) + .single(); + }).map(ModelBridgeInternal::createThroughputRespose)); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncDatabase.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncDatabase.java index 7d495d715b45..1da2affd0916 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncDatabase.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncDatabase.java @@ -2,22 +2,24 @@ // Licensed under the MIT License. package com.azure.cosmos; +import com.azure.core.util.Context; import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.HttpConstants; import com.azure.cosmos.implementation.Offer; import com.azure.cosmos.implementation.Paths; -import com.azure.cosmos.models.CosmosContainerResponse; -import com.azure.cosmos.models.CosmosDatabaseResponse; -import com.azure.cosmos.models.CosmosUserResponse; +import com.azure.cosmos.implementation.TracerProvider; +import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.models.CosmosContainerProperties; import com.azure.cosmos.models.CosmosContainerRequestOptions; +import com.azure.cosmos.models.CosmosContainerResponse; import com.azure.cosmos.models.CosmosDatabaseRequestOptions; -import com.azure.cosmos.models.CosmosUserProperties; +import com.azure.cosmos.models.CosmosDatabaseResponse; import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosUserProperties; +import com.azure.cosmos.models.CosmosUserResponse; import com.azure.cosmos.models.ModelBridgeInternal; import com.azure.cosmos.models.SqlParameter; import com.azure.cosmos.models.SqlQuerySpec; -import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.models.ThroughputProperties; import com.azure.cosmos.models.ThroughputResponse; import com.azure.cosmos.util.CosmosPagedFlux; @@ -28,6 +30,7 @@ import java.util.Collections; import java.util.List; +import static com.azure.core.util.FluxUtil.withContext; import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount; /** @@ -82,8 +85,12 @@ public Mono read(CosmosDatabaseRequestOptions options) { if (options == null) { options = new CosmosDatabaseRequestOptions(); } - return getDocClientWrapper().readDatabase(getLink(), ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosDatabaseResponse(response)).single(); + + if (!client.getTracerProvider().isEnabled()) { + return readInternal(options); + } + final CosmosDatabaseRequestOptions requestOptions = options; + return withContext(context -> readInternal(requestOptions, context)); } /** @@ -113,8 +120,12 @@ public Mono delete(CosmosDatabaseRequestOptions options) if (options == null) { options = new CosmosDatabaseRequestOptions(); } - return getDocClientWrapper().deleteDatabase(getLink(), ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosDatabaseResponse(response)).single(); + if (!client.getTracerProvider().isEnabled()) { + return deleteInternal(options); + } + + final CosmosDatabaseRequestOptions requestOptions = options; + return withContext(context -> deleteInternal(requestOptions, context)); } /* CosmosAsyncContainer operations */ @@ -197,10 +208,13 @@ public Mono createContainer( if (options == null) { options = new CosmosContainerRequestOptions(); } - return getDocClientWrapper() - .createCollection(this.getLink(), ModelBridgeInternal.getV2Collection(containerProperties), - ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosContainerResponse(response)).single(); + + if (!client.getTracerProvider().isEnabled()) { + return createContainerInternal(containerProperties, options); + } + + final CosmosContainerRequestOptions requestOptions = options; + return withContext(context -> createContainerInternal(containerProperties, requestOptions, context)); } /** @@ -278,7 +292,12 @@ public Mono createContainer(String id, String partition public Mono createContainerIfNotExists( CosmosContainerProperties containerProperties) { CosmosAsyncContainer container = getContainer(containerProperties.getId()); - return createContainerIfNotExistsInternal(containerProperties, container, null); + if (!client.getTracerProvider().isEnabled()) { + return createContainerIfNotExistsInternal(container.read(), containerProperties, null, null); + } + + return withContext(context -> createContainerIfNotExistsInternal(containerProperties, container, null, + context)); } /** @@ -303,7 +322,12 @@ Mono createContainerIfNotExists( CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); ModelBridgeInternal.setThroughputProperties(options, ThroughputProperties.createManualThroughput(throughput)); CosmosAsyncContainer container = getContainer(containerProperties.getId()); - return createContainerIfNotExistsInternal(containerProperties, container, options); + if (!client.getTracerProvider().isEnabled()) { + return createContainerIfNotExistsInternal(container.read(), containerProperties, options, null); + } + + return withContext(context -> createContainerIfNotExistsInternal(containerProperties, container, options, + context)); } /** @@ -328,7 +352,12 @@ public Mono createContainerIfNotExists( CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); ModelBridgeInternal.setThroughputProperties(options, throughputProperties); CosmosAsyncContainer container = getContainer(containerProperties.getId()); - return createContainerIfNotExistsInternal(containerProperties, container, options); + if (!client.getTracerProvider().isEnabled()) { + return createContainerIfNotExistsInternal(container.read(), containerProperties, options, null); + } + + return withContext(context -> createContainerIfNotExistsInternal(containerProperties, container, options, + context)); } /** @@ -345,16 +374,21 @@ public Mono createContainerIfNotExists( */ public Mono createContainerIfNotExists(String id, String partitionKeyPath) { CosmosAsyncContainer container = getContainer(id); - return createContainerIfNotExistsInternal(new CosmosContainerProperties(id, partitionKeyPath), - container, - null); + if (!client.getTracerProvider().isEnabled()) { + return createContainerIfNotExistsInternal(container.read(), new CosmosContainerProperties(id, + partitionKeyPath), null, null); + } + + return withContext(context -> createContainerIfNotExistsInternal(new CosmosContainerProperties(id, + partitionKeyPath), container, null, + context)); } /** * Creates a Cosmos container if it does not exist on the service. *

- * The throughput setting will only be used if the specified container - * does not exist and a new container will be created. + * The throughput properties will only be used if the specified container + * does not exist and therefor a new container will be created. * * After subscription the operation will be performed. The {@link Mono} upon * successful completion will contain a cosmos container response with the @@ -362,25 +396,29 @@ public Mono createContainerIfNotExists(String id, Strin * * @param id the cosmos container id. * @param partitionKeyPath the partition key path. - * @param throughput the throughput for the container. + * @param throughputProperties the throughput properties for the container. * @return a {@link Mono} containing the cosmos container response with the * created container or an error. */ - Mono createContainerIfNotExists( + public Mono createContainerIfNotExists( String id, String partitionKeyPath, - int throughput) { + ThroughputProperties throughputProperties) { CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); - ModelBridgeInternal.setThroughputProperties(options, ThroughputProperties.createManualThroughput(throughput)); + ModelBridgeInternal.setThroughputProperties(options, throughputProperties); CosmosAsyncContainer container = getContainer(id); - return createContainerIfNotExistsInternal(new CosmosContainerProperties(id, partitionKeyPath), container, - options); + if (!client.getTracerProvider().isEnabled()) { + return createContainerIfNotExistsInternal(container.read(), new CosmosContainerProperties(id, partitionKeyPath), options, null); + } + + return withContext(context -> createContainerIfNotExistsInternal(new CosmosContainerProperties(id, + partitionKeyPath), container, options, context)); } /** * Creates a Cosmos container if it does not exist on the service. *

- * The throughput properties will only be used if the specified container - * does not exist and therefor a new container will be created. + * The throughput setting will only be used if the specified container + * does not exist and a new container will be created. * * After subscription the operation will be performed. The {@link Mono} upon * successful completion will contain a cosmos container response with the @@ -388,33 +426,24 @@ Mono createContainerIfNotExists( * * @param id the cosmos container id. * @param partitionKeyPath the partition key path. - * @param throughputProperties the throughput properties for the container. + * @param throughput the throughput for the container. * @return a {@link Mono} containing the cosmos container response with the * created container or an error. */ - public Mono createContainerIfNotExists( + Mono createContainerIfNotExists( String id, String partitionKeyPath, - ThroughputProperties throughputProperties) { + int throughput) { CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); - ModelBridgeInternal.setThroughputProperties(options, throughputProperties); + ModelBridgeInternal.setThroughputProperties(options, ThroughputProperties.createManualThroughput(throughput)); CosmosAsyncContainer container = getContainer(id); - return createContainerIfNotExistsInternal(new CosmosContainerProperties(id, partitionKeyPath), container, - options); - } + if (!client.getTracerProvider().isEnabled()) { + return createContainerIfNotExistsInternal(container.read(), new CosmosContainerProperties(id, + partitionKeyPath), + options, null); + } - private Mono createContainerIfNotExistsInternal( - CosmosContainerProperties containerProperties, CosmosAsyncContainer container, - CosmosContainerRequestOptions options) { - return container.read(options).onErrorResume(exception -> { - final Throwable unwrappedException = Exceptions.unwrap(exception); - if (unwrappedException instanceof CosmosException) { - final CosmosException cosmosException = (CosmosException) unwrappedException; - if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { - return createContainer(containerProperties, options); - } - } - return Mono.error(unwrappedException); - }); + return withContext(context -> createContainerIfNotExistsInternal(new CosmosContainerProperties(id, + partitionKeyPath), container, options, context)); } /** @@ -430,12 +459,15 @@ private Mono createContainerIfNotExistsInternal( */ public CosmosPagedFlux readAllContainers(CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "readAllContainers." + this.getId(); + pagedFluxOptions.setTracerInformation(this.getClient().getTracerProvider(), spanName, + this.getClient().getServiceEndpoint(), getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return getDocClientWrapper().readCollections(getLink(), options) .map(response -> BridgeInternal.createFeedResponse( ModelBridgeInternal.getCosmosContainerPropertiesFromV2Results(response.getResults()), response.getResponseHeaders())); - }); + }, this.getClient().getTracerProvider().isEnabled()); } /** @@ -464,7 +496,7 @@ public CosmosPagedFlux readAllContainers() { * obtained containers or an error. */ public CosmosPagedFlux queryContainers(String query) { - return queryContainers(new SqlQuerySpec(query)); + return queryContainersInternal(new SqlQuerySpec(query), new CosmosQueryRequestOptions()); } /** @@ -480,7 +512,7 @@ public CosmosPagedFlux queryContainers(String query) * obtained containers or an error. */ public CosmosPagedFlux queryContainers(String query, CosmosQueryRequestOptions options) { - return queryContainers(new SqlQuerySpec(query), options); + return queryContainersInternal(new SqlQuerySpec(query), options); } /** @@ -495,7 +527,7 @@ public CosmosPagedFlux queryContainers(String query, * obtained containers or an error. */ public CosmosPagedFlux queryContainers(SqlQuerySpec querySpec) { - return queryContainers(querySpec, new CosmosQueryRequestOptions()); + return queryContainersInternal(querySpec, new CosmosQueryRequestOptions()); } /** @@ -510,14 +542,9 @@ public CosmosPagedFlux queryContainers(SqlQuerySpec q * @return a {@link CosmosPagedFlux} containing one or several feed response pages of the * obtained containers or an error. */ - public CosmosPagedFlux queryContainers(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) { - return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { - setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); - return getDocClientWrapper().queryCollections(getLink(), querySpec, options) - .map(response -> BridgeInternal.createFeedResponse( - ModelBridgeInternal.getCosmosContainerPropertiesFromV2Results(response.getResults()), - response.getResponseHeaders())); - }); + public CosmosPagedFlux queryContainers(SqlQuerySpec querySpec + , CosmosQueryRequestOptions options) { + return queryContainersInternal(querySpec, options); } /** @@ -530,8 +557,6 @@ public CosmosAsyncContainer getContainer(String id) { return new CosmosAsyncContainer(id, this); } - /** User operations **/ - /** * Creates a user After subscription the operation will be performed. The * {@link Mono} upon successful completion will contain a single resource @@ -543,11 +568,12 @@ public CosmosAsyncContainer getContainer(String id) { * created cosmos user or an error. */ public Mono createUser(CosmosUserProperties userProperties) { - return getDocClientWrapper().createUser(this.getLink(), ModelBridgeInternal.getV2User(userProperties), null) - .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + if (!client.getTracerProvider().isEnabled()) { + return createUserInternal(userProperties); + } + return withContext(context -> createUserInternal(userProperties, context)); } - /** * Upsert a user. Upsert will create a new user if it doesn't exist, or replace * the existing one if it does. After subscription the operation will be @@ -560,8 +586,11 @@ public Mono createUser(CosmosUserProperties userProperties) * upserted user or an error. */ public Mono upsertUser(CosmosUserProperties userProperties) { - return getDocClientWrapper().upsertUser(this.getLink(), ModelBridgeInternal.getV2User(userProperties), null) - .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + if (!client.getTracerProvider().isEnabled()) { + return upsertUserInternal(userProperties); + } + + return withContext(context -> upsertUserInternal(userProperties, context)); } /** @@ -591,12 +620,15 @@ public CosmosPagedFlux readAllUsers() { */ CosmosPagedFlux readAllUsers(CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "readAllUsers." + this.getId(); + pagedFluxOptions.setTracerInformation(this.getClient().getTracerProvider(), spanName, + this.getClient().getServiceEndpoint(), getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return getDocClientWrapper().readUsers(getLink(), options) .map(response -> BridgeInternal.createFeedResponse( ModelBridgeInternal.getCosmosUserPropertiesFromV2Results(response.getResults()), response .getResponseHeaders())); - }); + }, this.getClient().getTracerProvider().isEnabled()); } /** @@ -627,7 +659,7 @@ public CosmosPagedFlux queryUsers(String query) { * obtained users or an error. */ public CosmosPagedFlux queryUsers(String query, CosmosQueryRequestOptions options) { - return queryUsers(new SqlQuerySpec(query), options); + return queryUsersInternal(new SqlQuerySpec(query), options); } /** @@ -642,7 +674,7 @@ public CosmosPagedFlux queryUsers(String query, CosmosQuer * obtained users or an error. */ public CosmosPagedFlux queryUsers(SqlQuerySpec querySpec) { - return queryUsers(querySpec, new CosmosQueryRequestOptions()); + return queryUsersInternal(querySpec, new CosmosQueryRequestOptions()); } /** @@ -658,13 +690,7 @@ public CosmosPagedFlux queryUsers(SqlQuerySpec querySpec) * obtained users or an error. */ public CosmosPagedFlux queryUsers(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) { - return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { - setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); - return getDocClientWrapper().queryUsers(getLink(), querySpec, options) - .map(response -> BridgeInternal.createFeedResponseWithQueryMetrics( - ModelBridgeInternal.getCosmosUserPropertiesFromV2Results(response.getResults()), response.getResponseHeaders(), - ModelBridgeInternal.queryMetrics(response))); - }); + return queryUsersInternal(querySpec, options); } /** @@ -685,30 +711,11 @@ public CosmosAsyncUser getUser(String id) { * @return the mono. */ public Mono replaceThroughput(ThroughputProperties throughputProperties) { - return this.read() - .flatMap(response -> this.getDocClientWrapper() - .queryOffers(getOfferQuerySpecFromResourceId(response.getProperties().getResourceId()), - new CosmosQueryRequestOptions()) - .single() - .flatMap(offerFeedResponse -> { - if (offerFeedResponse.getResults().isEmpty()) { - return Mono.error(BridgeInternal - .createCosmosException( - HttpConstants.StatusCodes.BADREQUEST, - "No offers found for the " + - "resource " + this.getId())); - } - - Offer existingOffer = offerFeedResponse.getResults().get(0); - Offer updatedOffer = - ModelBridgeInternal.updateOfferFromProperties(existingOffer, - throughputProperties); - - return this.getDocClientWrapper() - .replaceOffer(updatedOffer) - .single(); - }) - .map(ModelBridgeInternal::createThroughputRespose)); + if(!this.client.getTracerProvider().isEnabled()) { + return replaceThroughputInternal(this.read(), throughputProperties); + } + + return withContext(context -> replaceThroughputInternal(throughputProperties, context)); } /** @@ -717,26 +724,11 @@ public Mono replaceThroughput(ThroughputProperties throughpu * @return the mono containing throughput response. */ public Mono readThroughput() { - return this.read() - .flatMap(response -> getDocClientWrapper() - .queryOffers(getOfferQuerySpecFromResourceId(response.getProperties().getResourceId()), - new CosmosQueryRequestOptions()) - .single() - .flatMap(offerFeedResponse -> { - if (offerFeedResponse.getResults().isEmpty()) { - return Mono.error(BridgeInternal - .createCosmosException( - HttpConstants.StatusCodes.BADREQUEST, - "No offers found for the " + - "resource " + this.getId())); - } - return getDocClientWrapper() - .readOffer(offerFeedResponse.getResults() - .get(0) - .getSelfLink()) - .single(); - }) - .map(ModelBridgeInternal::createThroughputRespose)); + if(!this.client.getTracerProvider().isEnabled()) { + return readThroughputInternal(this.read()); + } + + return withContext(context -> readThroughputInternal(context)); } SqlQuerySpec getOfferQuerySpecFromResourceId(String resourceId) { @@ -768,4 +760,218 @@ String getLink() { return this.link; } + private CosmosPagedFlux queryContainersInternal(SqlQuerySpec querySpec + , CosmosQueryRequestOptions options) { + return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "queryContainers." + this.getId(); + pagedFluxOptions.setTracerInformation(this.getClient().getTracerProvider(), spanName, + this.getClient().getServiceEndpoint(), getId()); + setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); + return getDocClientWrapper().queryCollections(getLink(), querySpec, options) + .map(response -> BridgeInternal.createFeedResponse( + ModelBridgeInternal.getCosmosContainerPropertiesFromV2Results(response.getResults()), + response.getResponseHeaders())); + }, this.getClient().getTracerProvider().isEnabled()); + } + + private CosmosPagedFlux queryUsersInternal(SqlQuerySpec querySpec, CosmosQueryRequestOptions options) { + return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "queryUsers." + this.getId(); + pagedFluxOptions.setTracerInformation(this.getClient().getTracerProvider(), spanName, + this.getClient().getServiceEndpoint(), getId()); + setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); + return getDocClientWrapper().queryUsers(getLink(), querySpec, options) + .map(response -> BridgeInternal.createFeedResponseWithQueryMetrics( + ModelBridgeInternal.getCosmosUserPropertiesFromV2Results(response.getResults()), response.getResponseHeaders(), + ModelBridgeInternal.queryMetrics(response))); + }, this.getClient().getTracerProvider().isEnabled()); + } + + private Mono createContainerIfNotExistsInternal( + CosmosContainerProperties containerProperties, + CosmosAsyncContainer container, + CosmosContainerRequestOptions options, + Context context) { + String spanName = "createContainerIfNotExistsInternal." + containerProperties.getId(); + Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL); + if (options == null) { + options = new CosmosContainerRequestOptions(); + } + + Mono responseMono = createContainerIfNotExistsInternal(container.read(options, nestedContext), containerProperties, options, nestedContext); + return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + getId(), + getClient().getServiceEndpoint()); + } + + private Mono createContainerIfNotExistsInternal( + Mono responseMono, + CosmosContainerProperties containerProperties, + CosmosContainerRequestOptions options, + Context context) { + return responseMono.onErrorResume(exception -> { + final Throwable unwrappedException = Exceptions.unwrap(exception); + if (unwrappedException instanceof CosmosException) { + final CosmosException cosmosException = (CosmosException) unwrappedException; + if (cosmosException.getStatusCode() == HttpConstants.StatusCodes.NOTFOUND) { + if(context != null) { + return createContainerInternal(containerProperties, options, context); + } + + return createContainer(containerProperties, options); + } + } + return Mono.error(unwrappedException); + }); + } + + private Mono createContainerInternal( + CosmosContainerProperties containerProperties, + CosmosContainerRequestOptions options, + Context context) { + String spanName = "createContainer." + containerProperties.getId(); + Mono responseMono = createContainerInternal(containerProperties, options); + return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + getId(), + getClient().getServiceEndpoint()); + } + + private Mono createContainerInternal( + CosmosContainerProperties containerProperties, + CosmosContainerRequestOptions options) { + return getDocClientWrapper() + .createCollection(this.getLink(), ModelBridgeInternal.getV2Collection(containerProperties), + ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosContainerResponse(response)).single(); + } + + Mono readInternal(CosmosDatabaseRequestOptions options, Context context) { + String spanName = "readDatabase." + this.getId(); + Mono responseMono = readInternal(options); + return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + getId(), + getClient().getServiceEndpoint()); + } + + private Mono readInternal(CosmosDatabaseRequestOptions options) { + return getDocClientWrapper().readDatabase(getLink(), + ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosDatabaseResponse(response)).single(); + } + + private Mono deleteInternal(CosmosDatabaseRequestOptions options, Context context) { + String spanName = "deleteDatabase." + this.getId(); + Mono responseMono = deleteInternal(options); + return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + getId(), + getClient().getServiceEndpoint()); + } + + private Mono deleteInternal(CosmosDatabaseRequestOptions options) { + return getDocClientWrapper().deleteDatabase(getLink(), + ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosDatabaseResponse(response)).single(); + } + + private Mono createUserInternal(CosmosUserProperties userProperties, Context context) { + String spanName = "createUser." + this.getId(); + Mono responseMono = createUserInternal(userProperties); + return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + getId(), + getClient().getServiceEndpoint()); + } + + private Mono createUserInternal(CosmosUserProperties userProperties) { + return getDocClientWrapper().createUser(this.getLink(), ModelBridgeInternal.getV2User(userProperties), null) + .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + } + + private Mono upsertUserInternal(CosmosUserProperties userProperties, Context context) { + String spanName = "upsertUser." + this.getId(); + Mono responseMono = upsertUserInternal(userProperties); + return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, getId(), getClient().getServiceEndpoint()); + } + + private Mono upsertUserInternal(CosmosUserProperties userProperties) { + return getDocClientWrapper().upsertUser(this.getLink(), ModelBridgeInternal.getV2User(userProperties), null) + .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + } + + private Mono replaceThroughputInternal(ThroughputProperties throughputProperties, Context context){ + String spanName = "replaceThroughput." + this.getId(); + Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL); + Mono responseMono = replaceThroughputInternal(this.readInternal(new CosmosDatabaseRequestOptions(), nestedContext), throughputProperties); + return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + getId(), + getClient().getServiceEndpoint()); + } + + private Mono replaceThroughputInternal(Mono responseMono, ThroughputProperties throughputProperties) { + return responseMono + .flatMap(response -> this.getDocClientWrapper() + .queryOffers(getOfferQuerySpecFromResourceId(response.getProperties().getResourceId()), + new CosmosQueryRequestOptions()) + .single() + .flatMap(offerFeedResponse -> { + if (offerFeedResponse.getResults().isEmpty()) { + return Mono.error(BridgeInternal + .createCosmosException( + HttpConstants.StatusCodes.BADREQUEST, + "No offers found for the " + + "resource " + this.getId())); + } + + Offer existingOffer = offerFeedResponse.getResults().get(0); + Offer updatedOffer = + ModelBridgeInternal.updateOfferFromProperties(existingOffer, + throughputProperties); + + return this.getDocClientWrapper() + .replaceOffer(updatedOffer) + .single(); + }) + .map(ModelBridgeInternal::createThroughputRespose)); + } + + private Mono readThroughputInternal(Context context){ + String spanName = "readThroughput." + this.getId(); + Context nestedContext = context.addData(TracerProvider.COSMOS_CALL_DEPTH, TracerProvider.COSMOS_CALL_DEPTH_VAL); + Mono responseMono = readThroughputInternal(this.readInternal(new CosmosDatabaseRequestOptions(), nestedContext)); + return this.client.getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + getId(), + getClient().getServiceEndpoint()); + } + + private Mono readThroughputInternal(Mono responseMono) { + return responseMono + .flatMap(response -> getDocClientWrapper() + .queryOffers(getOfferQuerySpecFromResourceId(response.getProperties().getResourceId()), + new CosmosQueryRequestOptions()) + .single() + .flatMap(offerFeedResponse -> { + if (offerFeedResponse.getResults().isEmpty()) { + return Mono.error(BridgeInternal + .createCosmosException( + HttpConstants.StatusCodes.BADREQUEST, + "No offers found for the " + + "resource " + this.getId())); + } + return getDocClientWrapper() + .readOffer(offerFeedResponse.getResults() + .get(0) + .getSelfLink()) + .single(); + }) + .map(ModelBridgeInternal::createThroughputRespose)); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncPermission.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncPermission.java index f01a52766820..128937f4f762 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncPermission.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncPermission.java @@ -2,6 +2,7 @@ // Licensed under the MIT License. package com.azure.cosmos; +import com.azure.core.util.Context; import com.azure.cosmos.implementation.Paths; import com.azure.cosmos.models.CosmosPermissionResponse; import com.azure.cosmos.models.CosmosPermissionProperties; @@ -9,6 +10,8 @@ import com.azure.cosmos.models.ModelBridgeInternal; import reactor.core.publisher.Mono; +import static com.azure.core.util.FluxUtil.withContext; + /** * Has methods to operate on a per-User Permission to access a specific resource */ @@ -56,11 +59,13 @@ public Mono read(CosmosPermissionRequestOptions option if (options == null) { options = new CosmosPermissionRequestOptions(); } - return cosmosUser.getDatabase() - .getDocClientWrapper() - .readPermission(getLink(), ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) - .single(); + + if (!cosmosUser.getDatabase().getClient().getTracerProvider().isEnabled()) { + return readInternal(options); + } + + final CosmosPermissionRequestOptions requestOptions = options; + return withContext(context -> readInternal(requestOptions, context)); } /** @@ -79,13 +84,13 @@ public Mono replace(CosmosPermissionProperties permiss if (options == null) { options = new CosmosPermissionRequestOptions(); } - CosmosAsyncDatabase databaseContext = cosmosUser.getDatabase(); - return databaseContext - .getDocClientWrapper() - .replacePermission(ModelBridgeInternal.getPermission(permissionProperties, databaseContext.getId()), - ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) - .single(); + + if (!cosmosUser.getDatabase().getClient().getTracerProvider().isEnabled()) { + return replaceInternal(permissionProperties, options); + } + + final CosmosPermissionRequestOptions requestOptions = options; + return withContext(context -> replaceInternal(permissionProperties, requestOptions, context)); } /** @@ -102,11 +107,13 @@ public Mono delete(CosmosPermissionRequestOptions opti if (options == null) { options = new CosmosPermissionRequestOptions(); } - return cosmosUser.getDatabase() - .getDocClientWrapper() - .deletePermission(getLink(), ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) - .single(); + + if (!cosmosUser.getDatabase().getClient().getTracerProvider().isEnabled()) { + return deleteInternal(options); + } + + final CosmosPermissionRequestOptions requestOptions = options; + return withContext(context -> deleteInternal(requestOptions, context)); } String getURIPathSegment() { @@ -126,4 +133,67 @@ String getLink() { builder.append(getId()); return builder.toString(); } + + private Mono readInternal(CosmosPermissionRequestOptions options, Context context) { + + String spanName = "readPermission." + cosmosUser.getId(); + Mono responseMono = readInternal(options); + return cosmosUser.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + cosmosUser.getDatabase().getId(), + cosmosUser.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono readInternal(CosmosPermissionRequestOptions options) { + + return cosmosUser.getDatabase() + .getDocClientWrapper() + .readPermission(getLink(), ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) + .single(); + } + + private Mono replaceInternal(CosmosPermissionProperties permissionProperties, + CosmosPermissionRequestOptions options, + Context context) { + + String spanName = "replacePermission." + cosmosUser.getId(); + Mono responseMono = replaceInternal(permissionProperties, options); + return cosmosUser.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + cosmosUser.getDatabase().getId(), + cosmosUser.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono replaceInternal(CosmosPermissionProperties permissionProperties, + CosmosPermissionRequestOptions options) { + CosmosAsyncDatabase databaseContext = cosmosUser.getDatabase(); + return cosmosUser.getDatabase() + .getDocClientWrapper() + .replacePermission(ModelBridgeInternal.getPermission(permissionProperties, databaseContext.getId()), + ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) + .single(); + } + + private Mono deleteInternal(CosmosPermissionRequestOptions options, + Context context) { + + String spanName = "deletePermission." + cosmosUser.getId(); + Mono responseMono = deleteInternal(options); + return cosmosUser.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + cosmosUser.getDatabase().getId(), + cosmosUser.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono deleteInternal(CosmosPermissionRequestOptions options) { + + return cosmosUser.getDatabase() + .getDocClientWrapper() + .deletePermission(getLink(), ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) + .single(); + } + } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncScripts.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncScripts.java index 7ea5249b8b70..7d4e22244948 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncScripts.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncScripts.java @@ -2,23 +2,25 @@ // Licensed under the MIT License. package com.azure.cosmos; +import com.azure.core.util.Context; import com.azure.cosmos.implementation.StoredProcedure; import com.azure.cosmos.implementation.Trigger; import com.azure.cosmos.implementation.UserDefinedFunction; -import com.azure.cosmos.models.CosmosStoredProcedureResponse; -import com.azure.cosmos.models.CosmosTriggerResponse; -import com.azure.cosmos.models.CosmosUserDefinedFunctionResponse; +import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.CosmosStoredProcedureProperties; import com.azure.cosmos.models.CosmosStoredProcedureRequestOptions; +import com.azure.cosmos.models.CosmosStoredProcedureResponse; import com.azure.cosmos.models.CosmosTriggerProperties; +import com.azure.cosmos.models.CosmosTriggerResponse; import com.azure.cosmos.models.CosmosUserDefinedFunctionProperties; -import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosUserDefinedFunctionResponse; import com.azure.cosmos.models.ModelBridgeInternal; import com.azure.cosmos.models.SqlQuerySpec; import com.azure.cosmos.util.CosmosPagedFlux; import com.azure.cosmos.util.UtilBridgeInternal; import reactor.core.publisher.Mono; +import static com.azure.core.util.FluxUtil.withContext; import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount; /** @@ -71,10 +73,12 @@ public Mono createStoredProcedure( StoredProcedure sProc = new StoredProcedure(); sProc.setId(properties.getId()); sProc.setBody(properties.getBody()); - return database.getDocClientWrapper() - .createStoredProcedure(container.getLink(), sProc, ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)) - .single(); + if (!container.getDatabase().getClient().getTracerProvider().isEnabled()) { + return createStoredProcedureInternal(sProc, options); + } + + final CosmosStoredProcedureRequestOptions requestOptions = options; + return withContext(context -> createStoredProcedureInternal(sProc, requestOptions, context)); } /** @@ -108,13 +112,18 @@ public CosmosPagedFlux readAllStoredProcedures( */ CosmosPagedFlux readAllStoredProcedures(CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "readAllStoredProcedures." + this.container.getId(); + pagedFluxOptions.setTracerInformation(this.container.getDatabase().getClient().getTracerProvider(), + spanName, + this.container.getDatabase().getClient().getServiceEndpoint(), + this.container.getDatabase().getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return database.getDocClientWrapper() .readStoredProcedures(container.getLink(), options) .map(response -> BridgeInternal.createFeedResponse( ModelBridgeInternal.getCosmosStoredProcedurePropertiesFromV2Results(response.getResults()), response.getResponseHeaders())); - }); + }, this.container.getDatabase().getClient().getTracerProvider().isEnabled()); } /** @@ -132,8 +141,8 @@ CosmosPagedFlux readAllStoredProcedures(CosmosQ */ public CosmosPagedFlux queryStoredProcedures( String query, - CosmosQueryRequestOptions options) { - return queryStoredProcedures(new SqlQuerySpec(query), options); + CosmosQueryRequestOptions options) { + return queryStoredProceduresInternal(new SqlQuerySpec(query), options); } /** @@ -152,14 +161,7 @@ public CosmosPagedFlux queryStoredProcedures( public CosmosPagedFlux queryStoredProcedures( SqlQuerySpec querySpec, CosmosQueryRequestOptions options) { - return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { - setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); - return database.getDocClientWrapper() - .queryStoredProcedures(container.getLink(), querySpec, options) - .map(response -> BridgeInternal.createFeedResponse( - ModelBridgeInternal.getCosmosStoredProcedurePropertiesFromV2Results(response.getResults()), - response.getResponseHeaders())); - }); + return queryStoredProceduresInternal(querySpec, options); } /** @@ -172,7 +174,6 @@ public CosmosAsyncStoredProcedure getStoredProcedure(String id) { return new CosmosAsyncStoredProcedure(id, this.container); } - /* UDF Operations */ /** @@ -191,10 +192,11 @@ public Mono createUserDefinedFunction( UserDefinedFunction udf = new UserDefinedFunction(); udf.setId(properties.getId()); udf.setBody(properties.getBody()); + if (!container.getDatabase().getClient().getTracerProvider().isEnabled()) { + return createUserDefinedFunctionInternal(udf); + } - return database.getDocClientWrapper() - .createUserDefinedFunction(container.getLink(), udf, null) - .map(response -> ModelBridgeInternal.createCosmosUserDefinedFunctionResponse(response)).single(); + return withContext(context -> createUserDefinedFunctionInternal(udf, context)); } /** @@ -226,13 +228,18 @@ public CosmosPagedFlux readAllUserDefinedFu */ CosmosPagedFlux readAllUserDefinedFunctions(CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "readAllUserDefinedFunctions." + this.container.getId(); + pagedFluxOptions.setTracerInformation(this.container.getDatabase().getClient().getTracerProvider(), + spanName, + this.container.getDatabase().getClient().getServiceEndpoint(), + this.container.getDatabase().getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return database.getDocClientWrapper() - .readUserDefinedFunctions(container.getLink(), options) - .map(response -> BridgeInternal.createFeedResponse( - ModelBridgeInternal.getCosmosUserDefinedFunctionPropertiesFromV2Results(response.getResults()), - response.getResponseHeaders())); - }); + .readUserDefinedFunctions(container.getLink(), options) + .map(response -> BridgeInternal.createFeedResponse( + ModelBridgeInternal.getCosmosUserDefinedFunctionPropertiesFromV2Results(response.getResults()), + response.getResponseHeaders())); + }, this.container.getDatabase().getClient().getTracerProvider().isEnabled()); } /** @@ -272,14 +279,7 @@ public CosmosPagedFlux queryUserDefinedFunc public CosmosPagedFlux queryUserDefinedFunctions( SqlQuerySpec querySpec, CosmosQueryRequestOptions options) { - return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { - setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); - return database.getDocClientWrapper() - .queryUserDefinedFunctions(container.getLink(), querySpec, options) - .map(response -> BridgeInternal.createFeedResponse( - ModelBridgeInternal.getCosmosUserDefinedFunctionPropertiesFromV2Results(response.getResults()), - response.getResponseHeaders())); - }); + return queryUserDefinedFunctionsInternal(querySpec, options); } /** @@ -305,12 +305,11 @@ public CosmosAsyncUserDefinedFunction getUserDefinedFunction(String id) { * @return an {@link Mono} containing the single resource response with the created trigger or an error. */ public Mono createTrigger(CosmosTriggerProperties properties) { - Trigger trigger = new Trigger(ModelBridgeInternal.toJsonFromJsonSerializable(ModelBridgeInternal.getResource(properties))); + if (!container.getDatabase().getClient().getTracerProvider().isEnabled()) { + return createTriggerInternal(properties); + } - return database.getDocClientWrapper() - .createTrigger(container.getLink(), trigger, null) - .map(response -> ModelBridgeInternal.createCosmosTriggerResponse(response)) - .single(); + return withContext(context -> createTriggerInternal(properties, context)); } /** @@ -344,13 +343,18 @@ public CosmosPagedFlux readAllTriggers() { */ CosmosPagedFlux readAllTriggers(CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "readAllTriggers." + this.container.getId(); + pagedFluxOptions.setTracerInformation(this.container.getDatabase().getClient().getTracerProvider(), + spanName, + this.container.getDatabase().getClient().getServiceEndpoint(), + this.container.getDatabase().getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return database.getDocClientWrapper() .readTriggers(container.getLink(), options) .map(response -> BridgeInternal.createFeedResponse( ModelBridgeInternal.getCosmosTriggerPropertiesFromV2Results(response.getResults()), response.getResponseHeaders())); - }); + }, this.container.getDatabase().getClient().getTracerProvider().isEnabled()); } /** @@ -366,7 +370,7 @@ CosmosPagedFlux readAllTriggers(CosmosQueryRequestOptio * error. */ public CosmosPagedFlux queryTriggers(String query, CosmosQueryRequestOptions options) { - return queryTriggers(new SqlQuerySpec(query), options); + return queryTriggersInternal(false, new SqlQuerySpec(query), options); } /** @@ -384,14 +388,7 @@ public CosmosPagedFlux queryTriggers(String query, Cosm public CosmosPagedFlux queryTriggers( SqlQuerySpec querySpec, CosmosQueryRequestOptions options) { - return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { - setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); - return database.getDocClientWrapper() - .queryTriggers(container.getLink(), querySpec, options) - .map(response -> BridgeInternal.createFeedResponse( - ModelBridgeInternal.getCosmosTriggerPropertiesFromV2Results(response.getResults()), - response.getResponseHeaders())); - }); + return queryTriggersInternal(true, querySpec, options); } /** @@ -404,4 +401,120 @@ public CosmosAsyncTrigger getTrigger(String id) { return new CosmosAsyncTrigger(id, this.container); } + private CosmosPagedFlux queryStoredProceduresInternal( + SqlQuerySpec querySpec, + CosmosQueryRequestOptions options) { + return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "queryStoredProcedures." + this.container.getId(); + pagedFluxOptions.setTracerInformation(this.container.getDatabase().getClient().getTracerProvider(), + spanName, + this.container.getDatabase().getClient().getServiceEndpoint(), + this.container.getDatabase().getId()); + setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); + return database.getDocClientWrapper() + .queryStoredProcedures(container.getLink(), querySpec, options) + .map(response -> BridgeInternal.createFeedResponse( + ModelBridgeInternal.getCosmosStoredProcedurePropertiesFromV2Results(response.getResults()), + response.getResponseHeaders())); + }, this.container.getDatabase().getClient().getTracerProvider().isEnabled()); + } + + private CosmosPagedFlux queryUserDefinedFunctionsInternal( + SqlQuerySpec querySpec, + CosmosQueryRequestOptions options) { + return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "queryUserDefinedFunctions." + this.container.getId(); + pagedFluxOptions.setTracerInformation(this.container.getDatabase().getClient().getTracerProvider(), + spanName, + this.container.getDatabase().getClient().getServiceEndpoint(), + this.container.getDatabase().getId()); + setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); + return database.getDocClientWrapper() + .queryUserDefinedFunctions(container.getLink(), querySpec, options) + .map(response -> BridgeInternal.createFeedResponse( + ModelBridgeInternal.getCosmosUserDefinedFunctionPropertiesFromV2Results(response.getResults()), + response.getResponseHeaders())); + }, this.container.getDatabase().getClient().getTracerProvider().isEnabled()); + } + + private CosmosPagedFlux queryTriggersInternal( + boolean isParameterised, + SqlQuerySpec querySpec, + CosmosQueryRequestOptions options) { + return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName; + if (isParameterised) { + spanName = "queryTriggers." + this.container.getId() + "." + querySpec.getQueryText(); + } else { + spanName = "queryTriggers." + this.container.getId(); + } + + pagedFluxOptions.setTracerInformation(this.container.getDatabase().getClient().getTracerProvider(), + spanName, + this.container.getDatabase().getClient().getServiceEndpoint(), + this.container.getDatabase().getId()); + setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); + return database.getDocClientWrapper() + .queryTriggers(container.getLink(), querySpec, options) + .map(response -> BridgeInternal.createFeedResponse( + ModelBridgeInternal.getCosmosTriggerPropertiesFromV2Results(response.getResults()), + response.getResponseHeaders())); + }, this.container.getDatabase().getClient().getTracerProvider().isEnabled()); + } + + private Mono createStoredProcedureInternal(StoredProcedure sProc, + CosmosStoredProcedureRequestOptions options, + Context context) { + String spanName = "createStoredProcedure." + container.getId(); + Mono responseMono = createStoredProcedureInternal(sProc, options); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono createStoredProcedureInternal(StoredProcedure sProc, + CosmosStoredProcedureRequestOptions options) { + return database.getDocClientWrapper() + .createStoredProcedure(container.getLink(), sProc, ModelBridgeInternal.toRequestOptions(options)).map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)) + .single(); + } + + private Mono createUserDefinedFunctionInternal( + UserDefinedFunction udf, + Context context) { + String spanName = "createUserDefinedFunction." + container.getId(); + Mono responseMono = createUserDefinedFunctionInternal(udf); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono createUserDefinedFunctionInternal( + UserDefinedFunction udf) { + return database.getDocClientWrapper() + .createUserDefinedFunction(container.getLink(), udf, null).map(response -> ModelBridgeInternal.createCosmosUserDefinedFunctionResponse(response)).single(); + } + + private Mono createTriggerInternal(CosmosTriggerProperties properties, Context context) { + String spanName = "createTrigger." + container.getId(); + Mono responseMono = createTriggerInternal(properties); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono createTriggerInternal(CosmosTriggerProperties properties) { + Trigger trigger = new Trigger(ModelBridgeInternal.toJsonFromJsonSerializable(ModelBridgeInternal.getResource(properties))); + return database.getDocClientWrapper() + .createTrigger(container.getLink(), trigger, null) + .map(response -> ModelBridgeInternal.createCosmosTriggerResponse(response)) + .single(); + } + } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncStoredProcedure.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncStoredProcedure.java index 8e0a51768818..e6d4433de989 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncStoredProcedure.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncStoredProcedure.java @@ -2,6 +2,7 @@ // Licensed under the MIT License. package com.azure.cosmos; +import com.azure.core.util.Context; import com.azure.cosmos.implementation.Paths; import com.azure.cosmos.implementation.StoredProcedure; import com.azure.cosmos.models.CosmosStoredProcedureResponse; @@ -10,6 +11,7 @@ import com.azure.cosmos.models.ModelBridgeInternal; import reactor.core.publisher.Mono; +import static com.azure.core.util.FluxUtil.withContext; import java.util.List; /** @@ -72,12 +74,11 @@ public Mono read() { * @return an {@link Mono} containing the single resource response with the read stored procedure or an error. */ public Mono read(CosmosStoredProcedureRequestOptions options) { - if (options == null) { - options = new CosmosStoredProcedureRequestOptions(); + if (!cosmosContainer.getDatabase().getClient().getTracerProvider().isEnabled()) { + return readInternal(options); } - return cosmosContainer.getDatabase().getDocClientWrapper().readStoredProcedure(getLink(), - ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)).single(); + + return withContext(context -> readInternal(options, context)); } /** @@ -106,14 +107,11 @@ public Mono delete() { * @return an {@link Mono} containing the single resource response for the deleted stored procedure or an error. */ public Mono delete(CosmosStoredProcedureRequestOptions options) { - if (options == null) { - options = new CosmosStoredProcedureRequestOptions(); + if (!cosmosContainer.getDatabase().getClient().getTracerProvider().isEnabled()) { + return deleteInternal(options); } - return cosmosContainer.getDatabase() - .getDocClientWrapper() - .deleteStoredProcedure(getLink(), ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)) - .single(); + + return withContext(context -> deleteInternal(options, context)); } /** @@ -129,15 +127,12 @@ public Mono delete(CosmosStoredProcedureRequestOp * @return an {@link Mono} containing the single resource response with the stored procedure response or an error. */ public Mono execute(List procedureParams, - CosmosStoredProcedureRequestOptions options) { - if (options == null) { - options = new CosmosStoredProcedureRequestOptions(); + CosmosStoredProcedureRequestOptions options) { + if (!cosmosContainer.getDatabase().getClient().getTracerProvider().isEnabled()) { + return executeInternal(procedureParams, options); } - return cosmosContainer.getDatabase() - .getDocClientWrapper() - .executeStoredProcedure(getLink(), ModelBridgeInternal.toRequestOptions(options), procedureParams) - .map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)) - .single(); + + return withContext(context -> executeInternal(procedureParams, options, context)); } /** @@ -168,17 +163,13 @@ public Mono replace(CosmosStoredProcedureProperti * @return an {@link Mono} containing the single resource response with the replaced stored procedure or an error. */ public Mono replace(CosmosStoredProcedureProperties storedProcedureProperties, - CosmosStoredProcedureRequestOptions options) { - if (options == null) { - options = new CosmosStoredProcedureRequestOptions(); + CosmosStoredProcedureRequestOptions options) { + if (!cosmosContainer.getDatabase().getClient().getTracerProvider().isEnabled()) { + return replaceInternal(storedProcedureProperties, options); } - return cosmosContainer.getDatabase() - .getDocClientWrapper() - .replaceStoredProcedure(new StoredProcedure(ModelBridgeInternal.toJsonFromJsonSerializable( - ModelBridgeInternal.getResource(storedProcedureProperties))), - ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)) - .single(); + + return withContext(context -> replaceInternal(storedProcedureProperties, options, + context)); } String getURIPathSegment() { @@ -198,4 +189,91 @@ String getLink() { builder.append(getId()); return builder.toString(); } + + private Mono readInternal(CosmosStoredProcedureRequestOptions options, + Context context) { + String spanName = "readStoredProcedure." + cosmosContainer.getId(); + Mono responseMono = readInternal(options); + return this.cosmosContainer.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + cosmosContainer.getDatabase().getId(), + cosmosContainer.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono readInternal(CosmosStoredProcedureRequestOptions options) { + if (options == null) { + options = new CosmosStoredProcedureRequestOptions(); + } + return cosmosContainer.getDatabase().getDocClientWrapper().readStoredProcedure(getLink(), + ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)).single(); + } + + private Mono deleteInternal(CosmosStoredProcedureRequestOptions options, + Context context) { + String spanName = "deleteStoredProcedure." + cosmosContainer.getId(); + Mono responseMono = deleteInternal(options); + return this.cosmosContainer.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + cosmosContainer.getDatabase().getId(), + cosmosContainer.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono deleteInternal(CosmosStoredProcedureRequestOptions options) { + if (options == null) { + options = new CosmosStoredProcedureRequestOptions(); + } + + return cosmosContainer.getDatabase() + .getDocClientWrapper() + .deleteStoredProcedure(getLink(), ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)) + .single(); + } + + private Mono executeInternal(List procedureParams, + CosmosStoredProcedureRequestOptions options, + Context context) { + String spanName = "executeStoredProcedure." + cosmosContainer.getId(); + Mono responseMono = executeInternal(procedureParams, options); + return this.cosmosContainer.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, spanName, cosmosContainer.getDatabase().getId(), cosmosContainer.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono executeInternal(List procedureParams, + CosmosStoredProcedureRequestOptions options) { + if (options == null) { + options = new CosmosStoredProcedureRequestOptions(); + } + + return cosmosContainer.getDatabase() + .getDocClientWrapper() + .executeStoredProcedure(getLink(), ModelBridgeInternal.toRequestOptions(options), procedureParams) + .map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)) + .single(); + } + + private Mono replaceInternal(CosmosStoredProcedureProperties storedProcedureSettings, + CosmosStoredProcedureRequestOptions options, + Context context) { + String spanName = "replaceStoredProcedure." + cosmosContainer.getId(); + Mono responseMono = replaceInternal(storedProcedureSettings, options); + return this.cosmosContainer.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, spanName, cosmosContainer.getDatabase().getId(), cosmosContainer.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono replaceInternal(CosmosStoredProcedureProperties storedProcedureSettings, + CosmosStoredProcedureRequestOptions options) { + if (options == null) { + options = new CosmosStoredProcedureRequestOptions(); + } + + return cosmosContainer.getDatabase() + .getDocClientWrapper() + .replaceStoredProcedure(new StoredProcedure(ModelBridgeInternal.toJsonFromJsonSerializable( + ModelBridgeInternal.getResource(storedProcedureSettings))), + ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosStoredProcedureResponse(response)) + .single(); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncTrigger.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncTrigger.java index ae06eb230079..666ddad0a646 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncTrigger.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncTrigger.java @@ -2,6 +2,7 @@ // Licensed under the MIT License. package com.azure.cosmos; +import com.azure.core.util.Context; import com.azure.cosmos.implementation.Paths; import com.azure.cosmos.implementation.Trigger; import com.azure.cosmos.models.CosmosTriggerResponse; @@ -9,6 +10,8 @@ import com.azure.cosmos.models.ModelBridgeInternal; import reactor.core.publisher.Mono; +import static com.azure.core.util.FluxUtil.withContext; + /** * The type Cosmos async trigger. This contains methods to operate on a cosmos trigger asynchronously */ @@ -52,13 +55,12 @@ CosmosAsyncTrigger setId(String id) { * @return an {@link Mono} containing the single resource response for the read cosmos trigger or an error. */ public Mono read() { - return container.getDatabase() - .getDocClientWrapper() - .readTrigger(getLink(), null) - .map(response -> ModelBridgeInternal.createCosmosTriggerResponse(response)) - .single(); - } + if (!container.getDatabase().getClient().getTracerProvider().isEnabled()) { + return readInternal(); + } + return withContext(context -> readInternal(context)); + } /** * Replaces a cosmos trigger. @@ -71,12 +73,11 @@ public Mono read() { * @return an {@link Mono} containing the single resource response with the replaced cosmos trigger or an error. */ public Mono replace(CosmosTriggerProperties triggerProperties) { - return container.getDatabase() - .getDocClientWrapper() - .replaceTrigger(new Trigger(ModelBridgeInternal.toJsonFromJsonSerializable( - ModelBridgeInternal.getResource(triggerProperties))), null) - .map(response -> ModelBridgeInternal.createCosmosTriggerResponse(response)) - .single(); + if (!container.getDatabase().getClient().getTracerProvider().isEnabled()) { + return replaceInternal(triggerProperties); + } + + return withContext(context -> replaceInternal(triggerProperties, context)); } /** @@ -89,11 +90,11 @@ public Mono replace(CosmosTriggerProperties triggerProper * @return an {@link Mono} containing the single resource response for the deleted cosmos trigger or an error. */ public Mono delete() { - return container.getDatabase() - .getDocClientWrapper() - .deleteTrigger(getLink(), null) - .map(response -> ModelBridgeInternal.createCosmosTriggerResponse(response)) - .single(); + if (!container.getDatabase().getClient().getTracerProvider().isEnabled()) { + return deleteInternal(); + } + + return withContext(context -> deleteInternal(context)); } String getURIPathSegment() { @@ -114,4 +115,58 @@ String getLink() { return builder.toString(); } + private Mono readInternal(Context context) { + String spanName = "readTrigger." + container.getId(); + Mono responseMono = readInternal(); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + container.getDatabase().getId(), + container.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono readInternal() { + return container.getDatabase() + .getDocClientWrapper() + .readTrigger(getLink(), null) + .map(response -> ModelBridgeInternal.createCosmosTriggerResponse(response)) + .single(); + } + + private Mono replaceInternal(CosmosTriggerProperties triggerSettings, Context context) { + String spanName = "replaceTrigger." + container.getId(); + Mono responseMono = replaceInternal(triggerSettings); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + container.getDatabase().getId(), + container.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono replaceInternal(CosmosTriggerProperties triggerSettings) { + return container.getDatabase() + .getDocClientWrapper() + .replaceTrigger(new Trigger(ModelBridgeInternal.toJsonFromJsonSerializable( + ModelBridgeInternal.getResource(triggerSettings))), null) + .map(response -> ModelBridgeInternal.createCosmosTriggerResponse(response)) + .single(); + } + + private Mono deleteInternal(Context context) { + String spanName = "deleteTrigger." + container.getId(); + Mono responseMono = deleteInternal(); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + container.getDatabase().getId(), + container.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono deleteInternal() { + return container.getDatabase() + .getDocClientWrapper() + .deleteTrigger(getLink(), null) + .map(response -> ModelBridgeInternal.createCosmosTriggerResponse(response)) + .single(); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncUser.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncUser.java index d23e239e2969..f2ea9c442a9a 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncUser.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncUser.java @@ -3,7 +3,9 @@ package com.azure.cosmos; +import com.azure.core.util.Context; import com.azure.cosmos.implementation.Paths; +import com.azure.cosmos.implementation.Permission; import com.azure.cosmos.models.CosmosPermissionResponse; import com.azure.cosmos.models.CosmosUserResponse; import com.azure.cosmos.models.CosmosPermissionProperties; @@ -11,11 +13,11 @@ import com.azure.cosmos.models.CosmosUserProperties; import com.azure.cosmos.models.CosmosQueryRequestOptions; import com.azure.cosmos.models.ModelBridgeInternal; -import com.azure.cosmos.implementation.Permission; import com.azure.cosmos.util.CosmosPagedFlux; import com.azure.cosmos.util.UtilBridgeInternal; import reactor.core.publisher.Mono; +import static com.azure.core.util.FluxUtil.withContext; import static com.azure.cosmos.implementation.Utils.setContinuationTokenAndMaxItemCount; /** @@ -56,9 +58,11 @@ CosmosAsyncUser setId(String id) { * @return a {@link Mono} containing the single resource response with the read user or an error. */ public Mono read() { - return this.database.getDocClientWrapper() - .readUser(getLink(), null) - .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + if (!database.getClient().getTracerProvider().isEnabled()) { + return readInternal(); + } + + return withContext(context -> readInternal(context)); } /** @@ -68,9 +72,11 @@ public Mono read() { * @return a {@link Mono} containing the single resource response with the replaced user or an error. */ public Mono replace(CosmosUserProperties userProperties) { - return this.database.getDocClientWrapper() - .replaceUser(ModelBridgeInternal.getV2User(userProperties), null) - .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + if (!database.getClient().getTracerProvider().isEnabled()) { + return replaceInternal(userProperties); + } + + return withContext(context -> replaceInternal(userProperties, context)); } /** @@ -79,9 +85,11 @@ public Mono replace(CosmosUserProperties userProperties) { * @return a {@link Mono} containing the single resource response with the deleted user or an error. */ public Mono delete() { - return this.database.getDocClientWrapper() - .deleteUser(getLink(), null) - .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + if (!database.getClient().getTracerProvider().isEnabled()) { + return deleteInternal(); + } + + return withContext(context -> deleteInternal(context)); } /** @@ -101,11 +109,14 @@ public Mono createPermission( if (options == null) { options = new CosmosPermissionRequestOptions(); } + Permission permission = ModelBridgeInternal.getPermission(permissionProperties, database.getId()); - return database.getDocClientWrapper() - .createPermission(getLink(), permission, ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) - .single(); + if (!database.getClient().getTracerProvider().isEnabled()) { + return createPermissionInternal(permission, options); + } + + final CosmosPermissionRequestOptions requesOptions = options; + return withContext(context -> createPermissionInternal(permission, requesOptions, context)); } /** @@ -126,10 +137,13 @@ public Mono upsertPermission( if (options == null) { options = new CosmosPermissionRequestOptions(); } - return database.getDocClientWrapper() - .upsertPermission(getLink(), permission, ModelBridgeInternal.toRequestOptions(options)) - .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) - .single(); + + if (!database.getClient().getTracerProvider().isEnabled()) { + return upsertPermissionInternal(permission, options); + } + + final CosmosPermissionRequestOptions requestOptions = options; + return withContext(context -> upsertPermissionInternal(permission, requestOptions, context)); } @@ -160,13 +174,18 @@ public CosmosPagedFlux readAllPermissions() { */ CosmosPagedFlux readAllPermissions(CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "readAllPermissions." + this.getId(); + pagedFluxOptions.setTracerInformation(this.getDatabase().getClient().getTracerProvider(), + spanName, + this.getDatabase().getClient().getServiceEndpoint(), + this.getDatabase().getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return getDatabase().getDocClientWrapper() .readPermissions(getLink(), options) .map(response -> BridgeInternal.createFeedResponse( ModelBridgeInternal.getCosmosPermissionPropertiesFromResults(response.getResults()), response.getResponseHeaders())); - }); + }, this.getDatabase().getClient().getTracerProvider().isEnabled()); } /** @@ -198,13 +217,18 @@ public CosmosPagedFlux queryPermissions(String query */ public CosmosPagedFlux queryPermissions(String query, CosmosQueryRequestOptions options) { return UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + String spanName = "queryPermissions." + this.getId(); + pagedFluxOptions.setTracerInformation(this.getDatabase().getClient().getTracerProvider(), + spanName, + this.getDatabase().getClient().getServiceEndpoint(), + this.getDatabase().getId()); setContinuationTokenAndMaxItemCount(pagedFluxOptions, options); return getDatabase().getDocClientWrapper() .queryPermissions(getLink(), query, options) .map(response -> BridgeInternal.createFeedResponse( ModelBridgeInternal.getCosmosPermissionPropertiesFromResults(response.getResults()), response.getResponseHeaders())); - }); + }, this.getDatabase().getClient().getTracerProvider().isEnabled()); } /** @@ -243,4 +267,91 @@ String getLink() { CosmosAsyncDatabase getDatabase() { return database; } + + private Mono readInternal(Context context) { + String spanName = "readUser." + getId(); + Mono responseMono = readInternal(); + return database.getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono readInternal() { + return this.database.getDocClientWrapper() + .readUser(getLink(), null) + .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + } + + private Mono replaceInternal(CosmosUserProperties userSettings, Context context) { + String spanName = "replaceUser." + getId(); + Mono responseMono = replaceInternal(userSettings); + return database.getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono replaceInternal(CosmosUserProperties userSettings) { + return this.database.getDocClientWrapper() + .replaceUser(ModelBridgeInternal.getV2User(userSettings), null) + .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + } + + private Mono deleteInternal(Context context) { + String spanName = "deleteUser." + getId(); + Mono responseMono = deleteInternal(); + return database.getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono deleteInternal() { + return this.database.getDocClientWrapper() + .deleteUser(getLink(), null) + .map(response -> ModelBridgeInternal.createCosmosUserResponse(response)).single(); + } + + private Mono createPermissionInternal( + Permission permission, + CosmosPermissionRequestOptions options, + Context context) { + String spanName = "createPermission." + getId(); + Mono responseMono = createPermissionInternal(permission, options); + return database.getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono createPermissionInternal( + Permission permission, + CosmosPermissionRequestOptions options) { + return database.getDocClientWrapper() + .createPermission(getLink(), permission, ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) + .single(); + } + + private Mono upsertPermissionInternal( + Permission permission, + CosmosPermissionRequestOptions options, + Context context) { + String spanName = "upsertPermission." + getId(); + Mono responseMono = upsertPermissionInternal(permission, options); + return database.getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, context, + spanName, + database.getId(), + database.getClient().getServiceEndpoint()); + } + + private Mono upsertPermissionInternal( + Permission permission, + CosmosPermissionRequestOptions options) { + return database.getDocClientWrapper() + .upsertPermission(getLink(), permission, ModelBridgeInternal.toRequestOptions(options)) + .map(response -> ModelBridgeInternal.createCosmosPermissionResponse(response)) + .single(); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncUserDefinedFunction.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncUserDefinedFunction.java index 7710818dd0fb..45b6803fdaac 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncUserDefinedFunction.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncUserDefinedFunction.java @@ -2,6 +2,7 @@ // Licensed under the MIT License. package com.azure.cosmos; +import com.azure.core.util.Context; import com.azure.cosmos.implementation.Paths; import com.azure.cosmos.implementation.UserDefinedFunction; import com.azure.cosmos.models.CosmosUserDefinedFunctionResponse; @@ -9,6 +10,8 @@ import com.azure.cosmos.models.ModelBridgeInternal; import reactor.core.publisher.Mono; +import static com.azure.core.util.FluxUtil.withContext; + /** * The type Cosmos async user defined function. */ @@ -54,8 +57,11 @@ CosmosAsyncUserDefinedFunction setId(String id) { * @return an {@link Mono} containing the single resource response for the read user defined function or an error. */ public Mono read() { - return container.getDatabase().getDocClientWrapper().readUserDefinedFunction(getLink(), null) - .map(response -> ModelBridgeInternal.createCosmosUserDefinedFunctionResponse(response)).single(); + if(!container.getDatabase().getClient().getTracerProvider().isEnabled()) { + return readInternal(); + } + + return withContext(context -> readInternal(context)); } /** @@ -71,12 +77,11 @@ public Mono read() { * or an error. */ public Mono replace(CosmosUserDefinedFunctionProperties udfSettings) { - return container.getDatabase() - .getDocClientWrapper() - .replaceUserDefinedFunction(new UserDefinedFunction(ModelBridgeInternal.toJsonFromJsonSerializable( - ModelBridgeInternal.getResource(udfSettings))), null) - .map(response -> ModelBridgeInternal.createCosmosUserDefinedFunctionResponse(response)) - .single(); + if(!container.getDatabase().getClient().getTracerProvider().isEnabled()) { + return replaceInternal(udfSettings); + } + + return withContext(context -> replaceInternal(udfSettings, context)); } /** @@ -91,11 +96,11 @@ public Mono replace(CosmosUserDefinedFunction * an error. */ public Mono delete() { - return container.getDatabase() - .getDocClientWrapper() - .deleteUserDefinedFunction(this.getLink(), null) - .map(response -> ModelBridgeInternal.createCosmosUserDefinedFunctionResponse(response)) - .single(); + if(!container.getDatabase().getClient().getTracerProvider().isEnabled()) { + return deleteInternal(); + } + + return withContext(context -> deleteInternal(context)); } String getURIPathSegment() { @@ -115,4 +120,57 @@ String getLink() { builder.append(getId()); return builder.toString(); } + + private Mono readInternal(Context context) { + String spanName = "readUDF." + container.getId(); + Mono responseMono = readInternal(); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + container.getDatabase().getId(), + container.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono readInternal() { + return container.getDatabase().getDocClientWrapper().readUserDefinedFunction(getLink(), null) + .map(response -> ModelBridgeInternal.createCosmosUserDefinedFunctionResponse(response)).single(); + } + + private Mono replaceInternal(CosmosUserDefinedFunctionProperties udfSettings, + Context context) { + String spanName = "replaceUDF." + container.getId(); + Mono responseMono = replaceInternal(udfSettings); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + container.getDatabase().getId(), + container.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono replaceInternal(CosmosUserDefinedFunctionProperties udfSettings) { + return container.getDatabase() + .getDocClientWrapper() + .replaceUserDefinedFunction(new UserDefinedFunction(ModelBridgeInternal.toJsonFromJsonSerializable( + ModelBridgeInternal.getResource(udfSettings))), null) + .map(response -> ModelBridgeInternal.createCosmosUserDefinedFunctionResponse(response)) + .single(); + } + + private Mono deleteInternal(Context context) { + String spanName = "deleteUDF." + container.getId(); + Mono responseMono = deleteInternal(); + return this.container.getDatabase().getClient().getTracerProvider().traceEnabledCosmosResponsePublisher(responseMono, + context, + spanName, + container.getDatabase().getId(), + container.getDatabase().getClient().getServiceEndpoint()); + } + + private Mono deleteInternal() { + return container.getDatabase() + .getDocClientWrapper() + .deleteUserDefinedFunction(this.getLink(), null) + .map(response -> ModelBridgeInternal.createCosmosUserDefinedFunctionResponse(response)) + .single(); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosPagedFluxOptions.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosPagedFluxOptions.java index c4407b68bfee..28b4054180dc 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosPagedFluxOptions.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/CosmosPagedFluxOptions.java @@ -5,6 +5,8 @@ import com.azure.cosmos.util.CosmosPagedFlux; +import java.util.Map; + /** * Specifies paging options for Cosmos Paged Flux implementation. * @see CosmosPagedFlux @@ -13,6 +15,11 @@ public class CosmosPagedFluxOptions { private String requestContinuation; private Integer maxItemCount; + private TracerProvider tracerProvider; + private String tracerSpanName; + private String databaseId; + private String serviceEndpoint; + public CosmosPagedFluxOptions() {} @@ -57,4 +64,43 @@ public CosmosPagedFluxOptions setMaxItemCount(Integer maxItemCount) { this.maxItemCount = maxItemCount; return this; } + + /** + * Gets the tracer provider + * @return tracerProvider + */ + public TracerProvider getTracerProvider() { + return this.tracerProvider; + } + + /** + * Gets the tracer span name + * @return tracerSpanName + */ + public String getTracerSpanName() { + return tracerSpanName; + } + + /** + * Gets the databaseId + * @return databaseId + */ + public String getDatabaseId() { + return databaseId; + } + + /** + * Gets the service end point + * @return serviceEndpoint + */ + public String getServiceEndpoint() { + return serviceEndpoint; + } + + public void setTracerInformation(TracerProvider tracerProvider, String tracerSpanName, String serviceEndpoint, String databaseId) { + this.databaseId = databaseId; + this.serviceEndpoint = serviceEndpoint; + this.tracerSpanName = tracerSpanName; + this.tracerProvider = tracerProvider; + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java index b14fce7394f8..70df4dfc7a0f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java @@ -265,6 +265,7 @@ public static class Versions { } public static class StatusCodes { + public static final int OK = 200; public static final int NOT_MODIFIED = 304; // Client error public static final int MINIMUM_STATUSCODE_AS_ERROR_GATEWAY = 400; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/TracerProvider.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/TracerProvider.java new file mode 100644 index 000000000000..677e87db4e2f --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/TracerProvider.java @@ -0,0 +1,157 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation; + +import com.azure.core.util.Context; +import com.azure.core.util.tracing.Tracer; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.CosmosResponse; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Signal; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY; + +public class TracerProvider { + private Tracer tracer; + public final static String DB_TYPE_VALUE = "Cosmos"; + public final static String DB_TYPE = "db.type"; + public final static String DB_INSTANCE = "db.instance"; + public final static String DB_URL = "db.url"; + public static final String DB_STATEMENT = "db.statement"; + public static final String ERROR_MSG = "error.msg"; + public static final String ERROR_TYPE = "error.type"; + public static final String ERROR_STACK = "error.stack"; + public static final String COSMOS_CALL_DEPTH = "cosmosCallDepth"; + public static final String COSMOS_CALL_DEPTH_VAL = "nested"; + public static final int ERROR_CODE = 0; + public static final String RESOURCE_PROVIDER_NAME = "Microsoft.DocumentDB"; + + public TracerProvider(Iterable tracers) { + Objects.requireNonNull(tracers, "'tracers' cannot be null."); + if (tracers.iterator().hasNext()) { + tracer = tracers.iterator().next(); + } + } + + public boolean isEnabled() { + return tracer != null; + } + + /** + * For each tracer plugged into the SDK a new tracing span is created. + *

+ * The {@code context} will be checked for containing information about a parent span. If a parent span is found the + * new span will be added as a child, otherwise the span will be created and added to the context and any downstream + * start calls will use the created span as the parent. + * + * @param context Additional metadata that is passed through the call stack. + * @return An updated context object. + */ + public Context startSpan(String methodName, String databaseId, String endpoint, Context context) { + Context local = Objects.requireNonNull(context, "'context' cannot be null."); + local = local.addData(AZ_TRACING_NAMESPACE_KEY, RESOURCE_PROVIDER_NAME); + local = tracer.start(methodName, local); // start the span and return the started span + if (databaseId != null) { + tracer.setAttribute(TracerProvider.DB_INSTANCE, databaseId, local); + } + + tracer.setAttribute(TracerProvider.DB_TYPE, DB_TYPE_VALUE, local); + tracer.setAttribute(TracerProvider.DB_URL, endpoint, local); + tracer.setAttribute(TracerProvider.DB_STATEMENT, methodName, local); + return local; + } + + /** + * Given a context containing the current tracing span the span is marked completed with status info from + * {@link Signal}. For each tracer plugged into the SDK the current tracing span is marked as completed. + * + * @param context Additional metadata that is passed through the call stack. + * @param signal The signal indicates the status and contains the metadata we need to end the tracing span. + */ + public > void endSpan(Context context, Signal signal, int statusCode) { + Objects.requireNonNull(context, "'context' cannot be null."); + Objects.requireNonNull(signal, "'signal' cannot be null."); + + switch (signal.getType()) { + case ON_COMPLETE: + end(statusCode, null, context); + break; + case ON_ERROR: + Throwable throwable = null; + if (signal.hasError()) { + // The last status available is on error, this contains the thrown error. + throwable = signal.getThrowable(); + + if (throwable instanceof CosmosException) { + CosmosException exception = (CosmosException) throwable; + statusCode = exception.getStatusCode(); + } + } + end(statusCode, throwable, context); + break; + default: + // ON_SUBSCRIBE and ON_NEXT don't have the information to end the span so just return. + break; + } + } + + public > Mono traceEnabledCosmosResponsePublisher(Mono resultPublisher, + Context context, + String spanName, + String databaseId, + String endpoint) { + return traceEnabledPublisher(resultPublisher, context, spanName,databaseId, endpoint, + (T response) -> response.getStatusCode()); + } + + public Mono> traceEnabledCosmosItemResponsePublisher(Mono> resultPublisher, + Context context, + String spanName, + String databaseId, + String endpoint) { + return traceEnabledPublisher(resultPublisher, context, spanName,databaseId, endpoint, + CosmosItemResponse::getStatusCode); + } + + public Mono traceEnabledPublisher(Mono resultPublisher, + Context context, + String spanName, + String databaseId, + String endpoint, + Function statusCodeFunc) { + final AtomicReference parentContext = new AtomicReference<>(Context.NONE); + Optional callDepth = context.getData(COSMOS_CALL_DEPTH); + final boolean isNestedCall = callDepth.isPresent(); + return resultPublisher + .doOnSubscribe(ignoredValue -> { + if (!isNestedCall) { + parentContext.set(this.startSpan(spanName, databaseId, endpoint, + context)); + } + }).doOnSuccess(response -> { + if (!isNestedCall) { + this.endSpan(parentContext.get(), Signal.complete(), statusCodeFunc.apply(response)); + } + }).doOnError(throwable -> { + if (!isNestedCall) { + this.endSpan(parentContext.get(), Signal.error(throwable), ERROR_CODE); + } + }); + } + + private void end(int statusCode, Throwable throwable, Context context) { + if (throwable != null) { + tracer.setAttribute(TracerProvider.ERROR_MSG, throwable.getMessage(), context); + tracer.setAttribute(TracerProvider.ERROR_TYPE, throwable.getClass().getName(), context); + } + tracer.end(statusCode, throwable, context); + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFlux.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFlux.java index ca9f20352f3a..77167bc5970d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFlux.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFlux.java @@ -3,13 +3,19 @@ package com.azure.cosmos.util; +import com.azure.core.util.Context; +import com.azure.core.util.FluxUtil; import com.azure.core.util.IterableStream; import com.azure.core.util.paging.ContinuablePagedFlux; import com.azure.cosmos.implementation.CosmosPagedFluxOptions; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.TracerProvider; import com.azure.cosmos.models.FeedResponse; import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; +import reactor.core.publisher.Signal; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; /** @@ -29,32 +35,44 @@ public final class CosmosPagedFlux extends ContinuablePagedFlux> { private final Function>> optionsFluxFunction; + private final boolean isTracerEnabled; - CosmosPagedFlux(Function>> optionsFluxFunction) { + CosmosPagedFlux(Function>> optionsFluxFunction, + boolean isTracerEnable) { this.optionsFluxFunction = optionsFluxFunction; + this.isTracerEnabled = isTracerEnable; } @Override public Flux> byPage() { CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions(); + if (!this.isTracerEnabled) { + return this.optionsFluxFunction.apply(cosmosPagedFluxOptions); + } - return this.optionsFluxFunction.apply(cosmosPagedFluxOptions); + return FluxUtil.fluxContext(context -> byPage(cosmosPagedFluxOptions, context)); } @Override public Flux> byPage(String continuationToken) { CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions(); cosmosPagedFluxOptions.setRequestContinuation(continuationToken); + if (!this.isTracerEnabled) { + return this.optionsFluxFunction.apply(cosmosPagedFluxOptions); + } - return this.optionsFluxFunction.apply(cosmosPagedFluxOptions); + return FluxUtil.fluxContext(context -> byPage(cosmosPagedFluxOptions, context)); } @Override public Flux> byPage(int preferredPageSize) { CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions(); cosmosPagedFluxOptions.setMaxItemCount(preferredPageSize); + if (!this.isTracerEnabled) { + return this.optionsFluxFunction.apply(cosmosPagedFluxOptions); + } - return this.optionsFluxFunction.apply(cosmosPagedFluxOptions); + return FluxUtil.fluxContext(context -> byPage(cosmosPagedFluxOptions, context)); } @Override @@ -62,8 +80,11 @@ public Flux> byPage(String continuationToken, int preferredPageS CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions(); cosmosPagedFluxOptions.setRequestContinuation(continuationToken); cosmosPagedFluxOptions.setMaxItemCount(preferredPageSize); + if (!this.isTracerEnabled) { + return this.optionsFluxFunction.apply(cosmosPagedFluxOptions); + } - return this.optionsFluxFunction.apply(cosmosPagedFluxOptions); + return FluxUtil.fluxContext(context -> byPage(cosmosPagedFluxOptions, context)); } /** @@ -84,4 +105,19 @@ public void subscribe(CoreSubscriber coreSubscriber) { return Flux.fromIterable(elements); }).subscribe(coreSubscriber); } + + private Flux> byPage(CosmosPagedFluxOptions pagedFluxOptions, Context context) { + final AtomicReference parentContext = new AtomicReference<>(Context.NONE); + return this.optionsFluxFunction.apply(pagedFluxOptions).doOnSubscribe(ignoredValue -> { + parentContext.set(pagedFluxOptions.getTracerProvider().startSpan(pagedFluxOptions.getTracerSpanName(), + pagedFluxOptions.getDatabaseId(), pagedFluxOptions.getServiceEndpoint(), + context)); + }).doOnComplete(() -> { + pagedFluxOptions.getTracerProvider().endSpan(parentContext.get(), Signal.complete(), + HttpConstants.StatusCodes.OK); + }).doOnError(throwable -> { + pagedFluxOptions.getTracerProvider().endSpan(parentContext.get(), Signal.error(throwable), + TracerProvider.ERROR_CODE); + }); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/UtilBridgeInternal.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/UtilBridgeInternal.java index 1282cd7a56de..141797b27ed3 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/UtilBridgeInternal.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/UtilBridgeInternal.java @@ -24,10 +24,11 @@ public final class UtilBridgeInternal { private UtilBridgeInternal() {} @Warning(value = INTERNAL_USE_ONLY_WARNING) - public static CosmosPagedFlux createCosmosPagedFlux(Function>> pagedFluxOptionsFluxFunction) { - return new CosmosPagedFlux<>(pagedFluxOptionsFluxFunction); + public static CosmosPagedFlux createCosmosPagedFlux(Function>> pagedFluxOptionsFluxFunction, boolean isTracerEnabled) { + return new CosmosPagedFlux<>(pagedFluxOptionsFluxFunction, isTracerEnabled); } + @Warning(value = INTERNAL_USE_ONLY_WARNING) public static CosmosPagedIterable createCosmosPagedIterable(ContinuablePagedFlux> pagedFlux) { return new CosmosPagedIterable<>(pagedFlux); diff --git a/sdk/cosmos/azure-cosmos/src/main/java/module-info.java b/sdk/cosmos/azure-cosmos/src/main/java/module-info.java index 553cca6bf71f..71c5f58fb735 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/module-info.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/module-info.java @@ -49,4 +49,5 @@ opens com.azure.cosmos.util to com.fasterxml.jackson.databind; uses com.azure.cosmos.implementation.guava25.base.PatternCompiler; + uses com.azure.core.util.tracing.Tracer; } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosTracerTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosTracerTest.java new file mode 100644 index 000000000000..d7ec6300e224 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosTracerTest.java @@ -0,0 +1,245 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos; + +import com.azure.core.util.Context; +import com.azure.core.util.tracing.Tracer; +import com.azure.cosmos.implementation.CosmosItemProperties; +import com.azure.cosmos.implementation.LifeCycleUtils; +import com.azure.cosmos.implementation.TestConfigurations; +import com.azure.cosmos.implementation.TracerProvider; +import com.azure.cosmos.implementation.directconnectivity.ReflectionUtils; +import com.azure.cosmos.models.CosmosItemRequestOptions; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.CosmosStoredProcedureProperties; +import com.azure.cosmos.models.CosmosTriggerProperties; +import com.azure.cosmos.models.CosmosUserDefinedFunctionProperties; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.models.TriggerOperation; +import com.azure.cosmos.models.TriggerType; +import com.azure.cosmos.rx.TestSuiteBase; +import org.mockito.Matchers; +import org.mockito.Mockito; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.ServiceLoader; +import java.util.UUID; + +public class CosmosTracerTest extends TestSuiteBase { + private static final String ITEM_ID = "tracerDoc"; + CosmosAsyncClient client; + CosmosAsyncDatabase cosmosAsyncDatabase; + CosmosAsyncContainer cosmosAsyncContainer; + + @BeforeClass(groups = {"emulator"}, timeOut = SETUP_TIMEOUT) + public void beforeClass() { + client = new CosmosClientBuilder() + .endpoint(TestConfigurations.HOST) + .key(TestConfigurations.MASTER_KEY) + .directMode(DirectConnectionConfig.getDefaultConfig()) + .buildAsyncClient(); + cosmosAsyncDatabase = getSharedCosmosDatabase(client); + cosmosAsyncContainer = getSharedMultiPartitionCosmosContainer(client); + + } + + @Test(groups = {"emulator"}, timeOut = TIMEOUT) + public void cosmosAsyncClient() { + TracerProvider tracer = Mockito.spy(new TracerProvider(ServiceLoader.load(Tracer.class))); + ReflectionUtils.setTracerProvider(client, tracer); + + client.createDatabaseIfNotExists(cosmosAsyncDatabase.getId()).block(); + Mockito.verify(tracer, Mockito.times(1)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + client.readAllDatabases(new CosmosQueryRequestOptions()).byPage().single().block(); + Mockito.verify(tracer, Mockito.times(2)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + String query = "select * from c where c.id = '" + cosmosAsyncDatabase.getId() + "'"; + client.queryDatabases(query, new CosmosQueryRequestOptions()).byPage().single().block(); + Mockito.verify(tracer, Mockito.times(3)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + } + + @Test(groups = {"emulator"}, timeOut = TIMEOUT) + public void cosmosAsyncDatabase() { + TracerProvider tracer = Mockito.spy(new TracerProvider(ServiceLoader.load(Tracer.class))); + ReflectionUtils.setTracerProvider(client, tracer); + + cosmosAsyncDatabase.createContainerIfNotExists(cosmosAsyncContainer.getId(), + "/pk", 5000).block(); + Mockito.verify(tracer, Mockito.times(1)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + try { + cosmosAsyncDatabase.readThroughput().block(); + } catch (CosmosException ex) { + //do nothing + } + + Mockito.verify(tracer, Mockito.times(2)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + cosmosAsyncDatabase.readAllUsers().byPage().single().block(); + Mockito.verify(tracer, Mockito.times(3)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + cosmosAsyncDatabase.readAllContainers().byPage().single().block(); + Mockito.verify(tracer, Mockito.times(4)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + } + + @Test(groups = {"emulator"}, timeOut = TIMEOUT) + public void cosmosAsyncContainer() { + TracerProvider tracer = Mockito.spy(new TracerProvider(ServiceLoader.load(Tracer.class))); + ReflectionUtils.setTracerProvider(client, tracer); + + cosmosAsyncContainer.read().block(); + Mockito.verify(tracer, Mockito.times(1)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + try { + cosmosAsyncContainer.readThroughput().block(); + } catch (CosmosException ex) { + //do nothing + } + Mockito.verify(tracer, Mockito.times(2)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + CosmosItemProperties properties = new CosmosItemProperties(); + properties.setId(ITEM_ID); + cosmosAsyncContainer.createItem(properties).block(); + Mockito.verify(tracer, Mockito.times(3)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + cosmosAsyncContainer.upsertItem(properties, + new CosmosItemRequestOptions()).block(); + Mockito.verify(tracer, Mockito.times(4)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + cosmosAsyncContainer.readItem(ITEM_ID, PartitionKey.NONE, + CosmosItemProperties.class).block(); + Mockito.verify(tracer, Mockito.times(5)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + cosmosAsyncContainer.deleteItem(ITEM_ID, PartitionKey.NONE).block(); + Mockito.verify(tracer, Mockito.times(6)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + cosmosAsyncContainer.readAllItems(new CosmosQueryRequestOptions(), CosmosItemRequestOptions.class).byPage().single().block(); + Mockito.verify(tracer, Mockito.times(7)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + String query = "select * from c where c.id = '" + ITEM_ID + "'"; + cosmosAsyncContainer.queryItems(query, new CosmosQueryRequestOptions(), CosmosItemRequestOptions.class).byPage().single().block(); + Mockito.verify(tracer, Mockito.times(8)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + } + + @Test(groups = {"emulator"}, timeOut = TIMEOUT) + public void cosmosAsyncScripts() { + TracerProvider tracer = Mockito.spy(new TracerProvider(ServiceLoader.load(Tracer.class))); + ReflectionUtils.setTracerProvider(client, tracer); + + cosmosAsyncContainer.getScripts().readAllStoredProcedures(new CosmosQueryRequestOptions()).byPage().single().block(); + Mockito.verify(tracer, Mockito.times(1)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + cosmosAsyncContainer.getScripts().readAllTriggers(new CosmosQueryRequestOptions()).byPage().single().block(); + Mockito.verify(tracer, Mockito.times(2)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + cosmosAsyncContainer.getScripts().readAllUserDefinedFunctions(new CosmosQueryRequestOptions()).byPage().single().block(); + Mockito.verify(tracer, Mockito.times(3)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + CosmosUserDefinedFunctionProperties cosmosUserDefinedFunctionProperties = + getCosmosUserDefinedFunctionProperties(); + CosmosUserDefinedFunctionProperties resultUdf = + cosmosAsyncContainer.getScripts().createUserDefinedFunction(cosmosUserDefinedFunctionProperties).block().getProperties(); + Mockito.verify(tracer, Mockito.times(4)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + cosmosAsyncContainer.getScripts().getUserDefinedFunction(cosmosUserDefinedFunctionProperties.getId()).read().block(); + Mockito.verify(tracer, Mockito.times(5)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + cosmosUserDefinedFunctionProperties.setBody("function() {var x = 15;}"); + cosmosAsyncContainer.getScripts().getUserDefinedFunction(resultUdf.getId()).replace(resultUdf).block(); + Mockito.verify(tracer, Mockito.times(6)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + cosmosAsyncContainer.getScripts().readAllUserDefinedFunctions(new CosmosQueryRequestOptions()).byPage().single().block(); + Mockito.verify(tracer, Mockito.times(7)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + cosmosAsyncContainer.getScripts().getUserDefinedFunction(cosmosUserDefinedFunctionProperties.getId()).delete().block(); + Mockito.verify(tracer, Mockito.times(8)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + CosmosTriggerProperties cosmosTriggerProperties = getCosmosTriggerProperties(); + CosmosTriggerProperties resultTrigger = + cosmosAsyncContainer.getScripts().createTrigger(cosmosTriggerProperties).block().getProperties(); + Mockito.verify(tracer, Mockito.times(9)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + cosmosAsyncContainer.getScripts().getTrigger(cosmosTriggerProperties.getId()).read().block(); + Mockito.verify(tracer, Mockito.times(10)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + cosmosAsyncContainer.getScripts().getTrigger(cosmosTriggerProperties.getId()).replace(resultTrigger).block(); + Mockito.verify(tracer, Mockito.times(11)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + cosmosAsyncContainer.getScripts().readAllTriggers(new CosmosQueryRequestOptions()).byPage().single().block(); + Mockito.verify(tracer, Mockito.times(12)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + cosmosAsyncContainer.getScripts().getTrigger(cosmosTriggerProperties.getId()).delete().block(); + Mockito.verify(tracer, Mockito.times(13)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + CosmosStoredProcedureProperties procedureProperties = getCosmosStoredProcedureProperties(); + CosmosStoredProcedureProperties resultSproc = + cosmosAsyncContainer.getScripts().createStoredProcedure(procedureProperties).block().getProperties(); + Mockito.verify(tracer, Mockito.times(14)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + cosmosAsyncContainer.getScripts().getStoredProcedure(procedureProperties.getId()).read().block(); + Mockito.verify(tracer, Mockito.times(15)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + cosmosAsyncContainer.getScripts().getStoredProcedure(procedureProperties.getId()).replace(resultSproc).block(); + Mockito.verify(tracer, Mockito.times(16)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + + cosmosAsyncContainer.getScripts().readAllStoredProcedures(new CosmosQueryRequestOptions()).byPage().single().block(); + + cosmosAsyncContainer.getScripts().getStoredProcedure(procedureProperties.getId()).delete().block(); + Mockito.verify(tracer, Mockito.times(18)).startSpan(Matchers.anyString(), Matchers.anyString(), + Matchers.anyString(), Matchers.any(Context.class)); + } + + @AfterClass(groups = {"emulator"}, timeOut = SETUP_TIMEOUT) + public void afterClass() { + LifeCycleUtils.closeQuietly(client); + } + + private static CosmosUserDefinedFunctionProperties getCosmosUserDefinedFunctionProperties() { + CosmosUserDefinedFunctionProperties udf = new CosmosUserDefinedFunctionProperties(UUID.randomUUID().toString(), "function() {var x = 10;}"); + return udf; + } + + private static CosmosTriggerProperties getCosmosTriggerProperties() { + CosmosTriggerProperties trigger = new CosmosTriggerProperties(UUID.randomUUID().toString(), "function() {var x = 10;}"); + trigger.setTriggerOperation(TriggerOperation.CREATE); + trigger.setTriggerType(TriggerType.PRE); + return trigger; + } + + private static CosmosStoredProcedureProperties getCosmosStoredProcedureProperties() { + CosmosStoredProcedureProperties storedProcedureDef = new CosmosStoredProcedureProperties(UUID.randomUUID().toString(), "function() {var x = 10;}"); + return storedProcedureDef; + } +} diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java index 9dbdc312ffcd..5a9063584e37 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java @@ -9,6 +9,7 @@ import com.azure.cosmos.implementation.AsyncDocumentClient; import com.azure.cosmos.implementation.GlobalEndpointManager; import com.azure.cosmos.implementation.RxDocumentClientImpl; +import com.azure.cosmos.implementation.TracerProvider; import com.azure.cosmos.implementation.http.HttpClient; import org.apache.commons.lang3.reflect.FieldUtils; @@ -104,4 +105,8 @@ public static GatewayServiceConfigurationReader getServiceConfigurationReader(Rx public static void setBackgroundRefreshLocationTimeIntervalInMS(GlobalEndpointManager globalEndPointManager, int millSec){ set(globalEndPointManager, millSec, "backgroundRefreshLocationTimeIntervalInMS"); } + + public static void setTracerProvider(CosmosAsyncClient cosmosAsyncClient, TracerProvider tracerProvider){ + set(cosmosAsyncClient, tracerProvider, "tracerProvider"); + } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ReadFeedExceptionHandlingTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ReadFeedExceptionHandlingTest.java index b9424abd3d32..a812c50cecd7 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ReadFeedExceptionHandlingTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/ReadFeedExceptionHandlingTest.java @@ -2,12 +2,14 @@ // Licensed under the MIT License. package com.azure.cosmos.rx; +import com.azure.core.util.tracing.Tracer; import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.CosmosAsyncClient; import com.azure.cosmos.CosmosClientBuilder; -import com.azure.cosmos.util.CosmosPagedFlux; +import com.azure.cosmos.implementation.TracerProvider; import com.azure.cosmos.models.CosmosDatabaseProperties; import com.azure.cosmos.models.FeedResponse; +import com.azure.cosmos.util.CosmosPagedFlux; import com.azure.cosmos.util.UtilBridgeInternal; import io.reactivex.subscribers.TestSubscriber; import org.mockito.Mockito; @@ -18,6 +20,7 @@ import reactor.core.publisher.Flux; import java.util.ArrayList; +import java.util.ServiceLoader; import static org.assertj.core.api.Assertions.assertThat; @@ -46,7 +49,10 @@ public void readFeedException() throws Exception { .mergeWith(Flux.fromIterable(frps)); final CosmosAsyncClientWrapper mockedClientWrapper = Mockito.spy(new CosmosAsyncClientWrapper(client)); - Mockito.when(mockedClientWrapper.readAllDatabases()).thenReturn(UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> response)); + Mockito.when(mockedClientWrapper.readAllDatabases()).thenReturn(UtilBridgeInternal.createCosmosPagedFlux(pagedFluxOptions -> { + pagedFluxOptions.setTracerInformation(new TracerProvider(ServiceLoader.load(Tracer.class)), "testSpan", "testEndpoint,", "testDb"); + return response; + }, false)); TestSubscriber> subscriber = new TestSubscriber<>(); mockedClientWrapper.readAllDatabases().byPage().subscribe(subscriber); assertThat(subscriber.valueCount()).isEqualTo(2); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index f6a1a5a1a2bd..3224cc519b6d 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -484,7 +484,7 @@ public Flux> bulkInsert(CosmosAsyncContainer cosmosCon return Flux.merge(Flux.fromIterable(result), concurrencyLevel); } public List bulkInsertBlocking(CosmosAsyncContainer cosmosContainer, - List documentDefinitionList) { + List documentDefinitionList) { return bulkInsert(cosmosContainer, documentDefinitionList, DEFAULT_BULK_INSERT_CONCURRENCY_LEVEL) .publishOn(Schedulers.parallel()) .map(itemResponse -> itemResponse.getItem()) @@ -813,12 +813,12 @@ public void validateItemFailure( } public void validateQuerySuccess(Flux> flowable, - FeedResponseListValidator validator) { + FeedResponseListValidator validator) { validateQuerySuccess(flowable, validator, subscriberValidationTimeout); } public static void validateQuerySuccess(Flux> flowable, - FeedResponseListValidator validator, long timeout) { + FeedResponseListValidator validator, long timeout) { TestSubscriber> testSubscriber = new TestSubscriber<>(); @@ -834,7 +834,7 @@ public void validateQueryFailure(Flux> flowable, FailureVali } public static void validateQueryFailure(Flux> flowable, - FailureValidator validator, long timeout) { + FailureValidator validator, long timeout) { TestSubscriber> testSubscriber = new TestSubscriber<>();