Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
/sdk/appconfig/ @antkmsft @seankane-msft @jhendrixMSFT

# PRLabel: %Cosmos
/sdk/data/azcosmos/ @ealsur @ausfeldt
/sdk/data/azcosmos/ @ealsur @kirankumarkolli

# PRLabel: %Tables
/sdk/data/aztables/ @seankane-msft @jhendrixMSFT
Expand Down
2 changes: 2 additions & 0 deletions sdk/data/azcosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
## 0.2.1 (Unreleased)

### Features Added
* Added single partition query support.

### Breaking Changes
* This module now requires Go 1.18

### Bugs Fixed

Expand Down
8 changes: 2 additions & 6 deletions sdk/data/azcosmos/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,10 @@ stages:

strategy:
matrix:
Windows_Go116:
Windows_Go118:
pool.name: azsdk-pool-mms-win-2019-general
image.name: MMS2019
go.version: '1.16.7'
Windows_Go117:
pool.name: azsdk-pool-mms-win-2019-general
image.name: MMS2019
go.version: '1.17'
go.version: '1.18'
pool:
name: $(pool.name)
vmImage: $(image.name)
Expand Down
59 changes: 35 additions & 24 deletions sdk/data/azcosmos/cosmos_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"io/ioutil"
"net/http"
"net/url"
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
Expand Down Expand Up @@ -224,8 +225,8 @@ func TestSendDelete(t *testing.T) {
t.Fatal(err)
}

if verifier.method != http.MethodDelete {
t.Errorf("Expected %v, but got %v", http.MethodDelete, verifier.method)
if verifier.requests[0].method != http.MethodDelete {
t.Errorf("Expected %v, but got %v", http.MethodDelete, verifier.requests[0].method)
}
}

Expand All @@ -247,8 +248,8 @@ func TestSendGet(t *testing.T) {
t.Fatal(err)
}

if verifier.method != http.MethodGet {
t.Errorf("Expected %v, but got %v", http.MethodGet, verifier.method)
if verifier.requests[0].method != http.MethodGet {
t.Errorf("Expected %v, but got %v", http.MethodGet, verifier.requests[0].method)
}
}

Expand Down Expand Up @@ -276,12 +277,12 @@ func TestSendPut(t *testing.T) {
t.Fatal(err)
}

if verifier.method != http.MethodPut {
t.Errorf("Expected %v, but got %v", http.MethodPut, verifier.method)
if verifier.requests[0].method != http.MethodPut {
t.Errorf("Expected %v, but got %v", http.MethodPut, verifier.requests[0].method)
}

if verifier.body != string(marshalled) {
t.Errorf("Expected %v, but got %v", string(marshalled), verifier.body)
if verifier.requests[0].body != string(marshalled) {
t.Errorf("Expected %v, but got %v", string(marshalled), verifier.requests[0].body)
}
}

Expand Down Expand Up @@ -309,12 +310,12 @@ func TestSendPost(t *testing.T) {
t.Fatal(err)
}

if verifier.method != http.MethodPost {
t.Errorf("Expected %v, but got %v", http.MethodPost, verifier.method)
if verifier.requests[0].method != http.MethodPost {
t.Errorf("Expected %v, but got %v", http.MethodPost, verifier.requests[0].method)
}

if verifier.body != string(marshalled) {
t.Errorf("Expected %v, but got %v", string(marshalled), verifier.body)
if verifier.requests[0].body != string(marshalled) {
t.Errorf("Expected %v, but got %v", string(marshalled), verifier.requests[0].body)
}
}

Expand All @@ -336,37 +337,47 @@ func TestSendQuery(t *testing.T) {
t.Fatal(err)
}

if verifier.method != http.MethodPost {
t.Errorf("Expected %v, but got %v", http.MethodPost, verifier.method)
if verifier.requests[0].method != http.MethodPost {
t.Errorf("Expected %v, but got %v", http.MethodPost, verifier.requests[0].method)
}

if verifier.isQuery != true {
t.Errorf("Expected %v, but got %v", true, verifier.isQuery)
if verifier.requests[0].isQuery != true {
t.Errorf("Expected %v, but got %v", true, verifier.requests[0].isQuery)
}

if verifier.contentType != cosmosHeaderValuesQuery {
t.Errorf("Expected %v, but got %v", cosmosHeaderValuesQuery, verifier.contentType)
if verifier.requests[0].contentType != cosmosHeaderValuesQuery {
t.Errorf("Expected %v, but got %v", cosmosHeaderValuesQuery, verifier.requests[0].contentType)
}

if verifier.body != "{\"query\":\"SELECT * FROM c\"}" {
t.Errorf("Expected %v, but got %v", "{\"query\":\"SELECT * FROM c\"}", verifier.body)
if verifier.requests[0].body != "{\"query\":\"SELECT * FROM c\"}" {
t.Errorf("Expected %v, but got %v", "{\"query\":\"SELECT * FROM c\"}", verifier.requests[0].body)
}
}

type pipelineVerifier struct {
requests []pipelineVerifierRequest
}

type pipelineVerifierRequest struct {
method string
body string
contentType string
isQuery bool
url *url.URL
headers http.Header
}

func (p *pipelineVerifier) Do(req *policy.Request) (*http.Response, error) {
p.method = req.Raw().Method
pr := pipelineVerifierRequest{}
pr.method = req.Raw().Method
pr.url = req.Raw().URL
if req.Body() != nil {
readBody, _ := ioutil.ReadAll(req.Body())
p.body = string(readBody)
pr.body = string(readBody)
}
p.contentType = req.Raw().Header.Get(headerContentType)
p.isQuery = req.Raw().Method == http.MethodPost && req.Raw().Header.Get(cosmosHeaderQuery) == "True"
pr.contentType = req.Raw().Header.Get(headerContentType)
pr.headers = req.Raw().Header
pr.isQuery = req.Raw().Method == http.MethodPost && req.Raw().Header.Get(cosmosHeaderQuery) == "True"
p.requests = append(p.requests, pr)
return req.Next()
}
61 changes: 59 additions & 2 deletions sdk/data/azcosmos/cosmos_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"context"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/internal/uuid"
)

// ContainerClient lets you perform read, update, change throughput, and delete container operations.
Expand Down Expand Up @@ -199,7 +201,7 @@ func (c *ContainerClient) CreateItem(
isWriteOperation: true,
headerOptionsOverride: &h}

path, err := generatePathForNameBased(resourceTypeDocument, c.link, true)
path, err := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, true)
if err != nil {
return ItemResponse{}, err
}
Expand Down Expand Up @@ -248,7 +250,7 @@ func (c *ContainerClient) UpsertItem(
isWriteOperation: true,
headerOptionsOverride: &h}

path, err := generatePathForNameBased(resourceTypeDocument, c.link, true)
path, err := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, true)
if err != nil {
return ItemResponse{}, err
}
Expand Down Expand Up @@ -399,6 +401,61 @@ func (c *ContainerClient) DeleteItem(
return newItemResponse(azResponse)
}

// NewQueryItemsPager executes a single partition query in a Cosmos container.
// ctx - The context for the request.
// query - The SQL query to execute.
// partitionKey - The partition key to scope the query on.
// o - Options for the operation.
func (c *ContainerClient) NewQueryItemsPager(query string, partitionKey PartitionKey, o *QueryOptions) *runtime.Pager[QueryItemsResponse] {
correlatedActivityId, _ := uuid.New()
h := headerOptionsOverride{
partitionKey: &partitionKey,
correlatedActivityId: &correlatedActivityId,
}

queryOptions := &QueryOptions{}
if o != nil {
originalOptions := *o
queryOptions = &originalOptions
}

operationContext := pipelineRequestOptions{
resourceType: resourceTypeDocument,
resourceAddress: c.link,
headerOptionsOverride: &h,
}

path, _ := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, true)

return runtime.NewPager(runtime.PageProcessor[QueryItemsResponse]{
More: func(page QueryItemsResponse) bool {
return page.ContinuationToken != ""
},
Fetcher: func(ctx context.Context, page *QueryItemsResponse) (QueryItemsResponse, error) {
if page != nil {
if page.ContinuationToken != "" {
// Use the previous page continuation if available
queryOptions.ContinuationToken = page.ContinuationToken
}
}

azResponse, err := c.database.client.sendQueryRequest(
path,
ctx,
query,
operationContext,
queryOptions,
nil)

if err != nil {
return QueryItemsResponse{}, err
}

return newQueryResponse(azResponse)
},
})
}

func (c *ContainerClient) getRID(ctx context.Context) (string, error) {
containerResponse, err := c.Read(ctx, nil)
if err != nil {
Expand Down
Loading