Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/data/azcosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Features Added

* Adds `PriorityLevel` and `ThroughputBucket` options at the client and per-request level for item, query, change-feed, batch, and read-many operations. See [PR 26750](https://github.com/Azure/azure-sdk-for-go/pull/26750)
* Added client-level partition key range cache and container properties cache, reducing redundant metadata round-trips for ReadMany and query operations. See [PR 26723](https://github.com/Azure/azure-sdk-for-go/pull/26723)
* Added operation diagnostics on responses and `DiagnosticsFromError` for retrieving diagnostics from failed operations. See [PR 26548](https://github.com/Azure/azure-sdk-for-go/pull/26548)

Expand Down
8 changes: 8 additions & 0 deletions sdk/data/azcosmos/cosmos_change_feed_request_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ type ChangeFeedOptions struct {

// CompositeContinuation is used to continue reading the change feed from a specific point.
Continuation *string

// PriorityLevel overrides the client-level default priority for this operation.
// Valid values are PriorityLevelHigh and PriorityLevelLow.
PriorityLevel *PriorityLevel
// ThroughputBucket overrides the client-level default throughput bucket for this operation.
// For more information, see https://aka.ms/CosmosDB/ThroughputBuckets
// The valid range is 1 to 5 (inclusive).
ThroughputBucket *int32
}

func (options *ChangeFeedOptions) toHeaders(partitionKeyRanges []partitionKeyRange) *map[string]string {
Expand Down
4 changes: 4 additions & 0 deletions sdk/data/azcosmos/cosmos_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ func newClient(authPolicy policy.Policy, gem *globalEndpointManager, options *Cl
PerCall: []policy.Policy{
&headerPolicies{
enableContentResponseOnWrite: options.EnableContentResponseOnWrite,
priorityLevel: options.PriorityLevel,
throughputBucket: options.ThroughputBucket,
},
&globalEndpointManagerPolicy{gem: gem},
},
Expand Down Expand Up @@ -692,5 +694,7 @@ func getAllowedHeaders() []string {
cosmosHeaderIsPartitionKeyDeletePending,
cosmosHeaderQueryExecutionInfo,
headerXmsItemCount,
cosmosHeaderPriorityLevel,
cosmosHeaderThroughputBucket,
}
}
10 changes: 10 additions & 0 deletions sdk/data/azcosmos/cosmos_client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,14 @@ type ClientOptions struct {
EnableContentResponseOnWrite bool
// PreferredRegions is a list of regions to be used when initializing the client in case the default region fails.
PreferredRegions []string
// PriorityLevel defines the default priority level for all requests made by this client.
// This feature is currently in preview. For more information, see https://aka.ms/CosmosDB/PriorityBasedExecution
// Valid values are PriorityLevelHigh and PriorityLevelLow.
// Can be overridden per-request via the operation options.
PriorityLevel *PriorityLevel
Comment thread
simorenoh marked this conversation as resolved.
// ThroughputBucket defines the default throughput bucket for all requests made by this client.
// This feature is currently in preview. For more information, see https://aka.ms/CosmosDB/ThroughputBuckets
// The valid range is 1 to 5 (inclusive).
// Can be overridden per-request via the operation options.
ThroughputBucket *int32
}
36 changes: 32 additions & 4 deletions sdk/data/azcosmos/cosmos_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ func (c *ContainerClient) CreateItem(
o = &ItemOptions{}
} else {
h.enableContentResponseOnWrite = &o.EnableContentResponseOnWrite
h.priorityLevel = o.PriorityLevel
h.throughputBucket = o.ThroughputBucket
}

operationContext := pipelineRequestOptions{
Expand Down Expand Up @@ -335,6 +337,8 @@ func (c *ContainerClient) UpsertItem(
o = &ItemOptions{}
} else {
h.enableContentResponseOnWrite = &o.EnableContentResponseOnWrite
h.priorityLevel = o.PriorityLevel
h.throughputBucket = o.ThroughputBucket
}

operationContext := pipelineRequestOptions{
Expand Down Expand Up @@ -390,6 +394,8 @@ func (c *ContainerClient) ReplaceItem(
o = &ItemOptions{}
} else {
h.enableContentResponseOnWrite = &o.EnableContentResponseOnWrite
h.priorityLevel = o.PriorityLevel
h.throughputBucket = o.ThroughputBucket
}

operationContext := pipelineRequestOptions{
Expand Down Expand Up @@ -442,6 +448,8 @@ func (c *ContainerClient) ReadItem(
if o == nil {
o = &ItemOptions{}
}
h.priorityLevel = o.PriorityLevel
h.throughputBucket = o.ThroughputBucket

operationContext := pipelineRequestOptions{
resourceType: resourceTypeDocument,
Expand Down Expand Up @@ -493,9 +501,15 @@ func (c *ContainerClient) ReadManyItems(
readManyOptions = &originalOptions
}

h := headerOptionsOverride{
priorityLevel: readManyOptions.PriorityLevel,
throughputBucket: readManyOptions.ThroughputBucket,
}

operationContext := pipelineRequestOptions{
resourceType: resourceTypeDocument,
resourceAddress: c.link,
resourceType: resourceTypeDocument,
resourceAddress: c.link,
headerOptionsOverride: &h,
}

ctx, endTrace := ensureOperationTrace(ctx, fmt.Sprintf("read_many_items %s", c.id))
Expand Down Expand Up @@ -556,6 +570,8 @@ func (c *ContainerClient) DeleteItem(
o = &ItemOptions{}
} else {
h.enableContentResponseOnWrite = &o.EnableContentResponseOnWrite
h.priorityLevel = o.PriorityLevel
h.throughputBucket = o.ThroughputBucket
}

operationContext := pipelineRequestOptions{
Expand Down Expand Up @@ -614,6 +630,8 @@ func (c *ContainerClient) NewQueryItemsPager(query string, partitionKey Partitio
if o != nil {
originalOptions := *o
queryOptions = &originalOptions
h.priorityLevel = o.PriorityLevel
h.throughputBucket = o.ThroughputBucket
}

operationContext := pipelineRequestOptions{
Expand Down Expand Up @@ -695,6 +713,8 @@ func (c *ContainerClient) PatchItem(
o = &ItemOptions{}
} else {
h.enableContentResponseOnWrite = &o.EnableContentResponseOnWrite
h.priorityLevel = o.PriorityLevel
h.throughputBucket = o.ThroughputBucket
}

operationContext := pipelineRequestOptions{
Expand Down Expand Up @@ -751,6 +771,8 @@ func (c *ContainerClient) ExecuteTransactionalBatch(ctx context.Context, b Trans
o = &TransactionalBatchOptions{}
} else {
h.enableContentResponseOnWrite = &o.EnableContentResponseOnWrite
h.priorityLevel = o.PriorityLevel
h.throughputBucket = o.ThroughputBucket
}

// If contentResponseOnWrite is not enabled at the client level the
Expand Down Expand Up @@ -857,9 +879,15 @@ func (c *ContainerClient) getChangeFeedForEPKRange(
}
}

h := headerOptionsOverride{
priorityLevel: options.PriorityLevel,
throughputBucket: options.ThroughputBucket,
}

operationContext := pipelineRequestOptions{
resourceType: resourceTypeDocument,
resourceAddress: c.link,
resourceType: resourceTypeDocument,
resourceAddress: c.link,
headerOptionsOverride: &h,
}

path, err := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, true)
Expand Down
Loading