diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 63b788df603c..cc7d47a5f43d 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -32,7 +32,7 @@ /sdk/appconfig/ @antkmsft @seankane-msft @jhendrixMSFT # PRLabel: %Cosmos -/sdk/data/azcosmos/ @ealsur @ausfeldt +/sdk/data/azcosmos/ @ealsur @kirankumarkolli # PRLabel: %Tables /sdk/data/aztables/ @seankane-msft @jhendrixMSFT diff --git a/sdk/data/azcosmos/CHANGELOG.md b/sdk/data/azcosmos/CHANGELOG.md index 3655f63fffd9..6bd050a4d5e2 100644 --- a/sdk/data/azcosmos/CHANGELOG.md +++ b/sdk/data/azcosmos/CHANGELOG.md @@ -3,8 +3,10 @@ ## 0.2.1 (Unreleased) ### Features Added +* Added single partition query support. ### Breaking Changes +* This module now requires Go 1.18 ### Bugs Fixed diff --git a/sdk/data/azcosmos/ci.yml b/sdk/data/azcosmos/ci.yml index 1e40e4c7b9fe..97a3717db8ee 100644 --- a/sdk/data/azcosmos/ci.yml +++ b/sdk/data/azcosmos/ci.yml @@ -32,14 +32,10 @@ stages: strategy: matrix: - Windows_Go116: + Windows_Go118: pool.name: azsdk-pool-mms-win-2019-general image.name: MMS2019 - go.version: '1.16.7' - Windows_Go117: - pool.name: azsdk-pool-mms-win-2019-general - image.name: MMS2019 - go.version: '1.17' + go.version: '1.18' pool: name: $(pool.name) vmImage: $(image.name) diff --git a/sdk/data/azcosmos/cosmos_client_test.go b/sdk/data/azcosmos/cosmos_client_test.go index f39787b61e61..1eab3dcba3dd 100644 --- a/sdk/data/azcosmos/cosmos_client_test.go +++ b/sdk/data/azcosmos/cosmos_client_test.go @@ -8,6 +8,7 @@ import ( "encoding/json" "io/ioutil" "net/http" + "net/url" "testing" "github.com/Azure/azure-sdk-for-go/sdk/azcore" @@ -224,8 +225,8 @@ func TestSendDelete(t *testing.T) { t.Fatal(err) } - if verifier.method != http.MethodDelete { - t.Errorf("Expected %v, but got %v", http.MethodDelete, verifier.method) + if verifier.requests[0].method != http.MethodDelete { + t.Errorf("Expected %v, but got %v", http.MethodDelete, verifier.requests[0].method) } } @@ -247,8 +248,8 @@ func TestSendGet(t *testing.T) { t.Fatal(err) } - if verifier.method != http.MethodGet { - t.Errorf("Expected %v, but got %v", http.MethodGet, verifier.method) + if verifier.requests[0].method != http.MethodGet { + t.Errorf("Expected %v, but got %v", http.MethodGet, verifier.requests[0].method) } } @@ -276,12 +277,12 @@ func TestSendPut(t *testing.T) { t.Fatal(err) } - if verifier.method != http.MethodPut { - t.Errorf("Expected %v, but got %v", http.MethodPut, verifier.method) + if verifier.requests[0].method != http.MethodPut { + t.Errorf("Expected %v, but got %v", http.MethodPut, verifier.requests[0].method) } - if verifier.body != string(marshalled) { - t.Errorf("Expected %v, but got %v", string(marshalled), verifier.body) + if verifier.requests[0].body != string(marshalled) { + t.Errorf("Expected %v, but got %v", string(marshalled), verifier.requests[0].body) } } @@ -309,12 +310,12 @@ func TestSendPost(t *testing.T) { t.Fatal(err) } - if verifier.method != http.MethodPost { - t.Errorf("Expected %v, but got %v", http.MethodPost, verifier.method) + if verifier.requests[0].method != http.MethodPost { + t.Errorf("Expected %v, but got %v", http.MethodPost, verifier.requests[0].method) } - if verifier.body != string(marshalled) { - t.Errorf("Expected %v, but got %v", string(marshalled), verifier.body) + if verifier.requests[0].body != string(marshalled) { + t.Errorf("Expected %v, but got %v", string(marshalled), verifier.requests[0].body) } } @@ -336,37 +337,47 @@ func TestSendQuery(t *testing.T) { t.Fatal(err) } - if verifier.method != http.MethodPost { - t.Errorf("Expected %v, but got %v", http.MethodPost, verifier.method) + if verifier.requests[0].method != http.MethodPost { + t.Errorf("Expected %v, but got %v", http.MethodPost, verifier.requests[0].method) } - if verifier.isQuery != true { - t.Errorf("Expected %v, but got %v", true, verifier.isQuery) + if verifier.requests[0].isQuery != true { + t.Errorf("Expected %v, but got %v", true, verifier.requests[0].isQuery) } - if verifier.contentType != cosmosHeaderValuesQuery { - t.Errorf("Expected %v, but got %v", cosmosHeaderValuesQuery, verifier.contentType) + if verifier.requests[0].contentType != cosmosHeaderValuesQuery { + t.Errorf("Expected %v, but got %v", cosmosHeaderValuesQuery, verifier.requests[0].contentType) } - if verifier.body != "{\"query\":\"SELECT * FROM c\"}" { - t.Errorf("Expected %v, but got %v", "{\"query\":\"SELECT * FROM c\"}", verifier.body) + if verifier.requests[0].body != "{\"query\":\"SELECT * FROM c\"}" { + t.Errorf("Expected %v, but got %v", "{\"query\":\"SELECT * FROM c\"}", verifier.requests[0].body) } } type pipelineVerifier struct { + requests []pipelineVerifierRequest +} + +type pipelineVerifierRequest struct { method string body string contentType string isQuery bool + url *url.URL + headers http.Header } func (p *pipelineVerifier) Do(req *policy.Request) (*http.Response, error) { - p.method = req.Raw().Method + pr := pipelineVerifierRequest{} + pr.method = req.Raw().Method + pr.url = req.Raw().URL if req.Body() != nil { readBody, _ := ioutil.ReadAll(req.Body()) - p.body = string(readBody) + pr.body = string(readBody) } - p.contentType = req.Raw().Header.Get(headerContentType) - p.isQuery = req.Raw().Method == http.MethodPost && req.Raw().Header.Get(cosmosHeaderQuery) == "True" + pr.contentType = req.Raw().Header.Get(headerContentType) + pr.headers = req.Raw().Header + pr.isQuery = req.Raw().Method == http.MethodPost && req.Raw().Header.Get(cosmosHeaderQuery) == "True" + p.requests = append(p.requests, pr) return req.Next() } diff --git a/sdk/data/azcosmos/cosmos_container.go b/sdk/data/azcosmos/cosmos_container.go index 0fec736b0967..ad70384311ca 100644 --- a/sdk/data/azcosmos/cosmos_container.go +++ b/sdk/data/azcosmos/cosmos_container.go @@ -7,6 +7,8 @@ import ( "context" "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" + "github.com/Azure/azure-sdk-for-go/sdk/internal/uuid" ) // ContainerClient lets you perform read, update, change throughput, and delete container operations. @@ -199,7 +201,7 @@ func (c *ContainerClient) CreateItem( isWriteOperation: true, headerOptionsOverride: &h} - path, err := generatePathForNameBased(resourceTypeDocument, c.link, true) + path, err := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, true) if err != nil { return ItemResponse{}, err } @@ -248,7 +250,7 @@ func (c *ContainerClient) UpsertItem( isWriteOperation: true, headerOptionsOverride: &h} - path, err := generatePathForNameBased(resourceTypeDocument, c.link, true) + path, err := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, true) if err != nil { return ItemResponse{}, err } @@ -399,6 +401,61 @@ func (c *ContainerClient) DeleteItem( return newItemResponse(azResponse) } +// NewQueryItemsPager executes a single partition query in a Cosmos container. +// ctx - The context for the request. +// query - The SQL query to execute. +// partitionKey - The partition key to scope the query on. +// o - Options for the operation. +func (c *ContainerClient) NewQueryItemsPager(query string, partitionKey PartitionKey, o *QueryOptions) *runtime.Pager[QueryItemsResponse] { + correlatedActivityId, _ := uuid.New() + h := headerOptionsOverride{ + partitionKey: &partitionKey, + correlatedActivityId: &correlatedActivityId, + } + + queryOptions := &QueryOptions{} + if o != nil { + originalOptions := *o + queryOptions = &originalOptions + } + + operationContext := pipelineRequestOptions{ + resourceType: resourceTypeDocument, + resourceAddress: c.link, + headerOptionsOverride: &h, + } + + path, _ := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, true) + + return runtime.NewPager(runtime.PageProcessor[QueryItemsResponse]{ + More: func(page QueryItemsResponse) bool { + return page.ContinuationToken != "" + }, + Fetcher: func(ctx context.Context, page *QueryItemsResponse) (QueryItemsResponse, error) { + if page != nil { + if page.ContinuationToken != "" { + // Use the previous page continuation if available + queryOptions.ContinuationToken = page.ContinuationToken + } + } + + azResponse, err := c.database.client.sendQueryRequest( + path, + ctx, + query, + operationContext, + queryOptions, + nil) + + if err != nil { + return QueryItemsResponse{}, err + } + + return newQueryResponse(azResponse) + }, + }) +} + func (c *ContainerClient) getRID(ctx context.Context) (string, error) { containerResponse, err := c.Read(ctx, nil) if err != nil { diff --git a/sdk/data/azcosmos/cosmos_container_test.go b/sdk/data/azcosmos/cosmos_container_test.go new file mode 100644 index 000000000000..7cb83c431c75 --- /dev/null +++ b/sdk/data/azcosmos/cosmos_container_test.go @@ -0,0 +1,522 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package azcosmos + +import ( + "context" + "encoding/json" + "net/http" + "strconv" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" + "github.com/Azure/azure-sdk-for-go/sdk/internal/mock" +) + +func TestContainerRead(t *testing.T) { + nowAsUnix := time.Unix(time.Now().Unix(), 0) + + etag := azcore.ETag("etag") + properties := ContainerProperties{ + ID: "containerId", + ETag: &etag, + SelfLink: "someSelfLink", + ResourceID: "someResourceId", + LastModified: nowAsUnix, + PartitionKeyDefinition: PartitionKeyDefinition{ + Paths: []string{"somePath"}, + Version: 2, + }, + } + + jsonString, err := json.Marshal(properties) + if err != nil { + t.Fatal(err) + } + + srv, close := mock.NewTLSServer() + defer close() + srv.SetResponse( + mock.WithBody(jsonString), + mock.WithHeader(cosmosHeaderEtag, "someEtag"), + mock.WithHeader(cosmosHeaderActivityId, "someActivityId"), + mock.WithHeader(cosmosHeaderRequestCharge, "13.42"), + mock.WithStatusCode(200)) + + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv}) + client := &Client{endpoint: srv.URL(), pipeline: pl} + + database, _ := newDatabase("databaseId", client) + container, _ := newContainer("containerId", database) + + if container.ID() != "containerId" { + t.Errorf("Expected container ID to be %s, but got %s", "containerId", container.ID()) + } + + resp, err := container.Read(context.TODO(), nil) + if err != nil { + t.Fatalf("Failed to read container: %v", err) + } + + if resp.RawResponse == nil { + t.Fatal("parsedResponse.RawResponse is nil") + } + + if resp.ContainerProperties == nil { + t.Fatal("parsedResponse.ContainerProperties is nil") + } + + if properties.ID != resp.ContainerProperties.ID { + t.Errorf("Expected Id to be %s, but got %s", properties.ID, resp.ContainerProperties.ID) + } + + if *properties.ETag != *resp.ContainerProperties.ETag { + t.Errorf("Expected ETag to be %s, but got %s", *properties.ETag, *resp.ContainerProperties.ETag) + } + + if properties.SelfLink != resp.ContainerProperties.SelfLink { + t.Errorf("Expected SelfLink to be %s, but got %s", properties.SelfLink, resp.ContainerProperties.SelfLink) + } + + if properties.ResourceID != resp.ContainerProperties.ResourceID { + t.Errorf("Expected ResourceId to be %s, but got %s", properties.ResourceID, resp.ContainerProperties.ResourceID) + } + + if properties.LastModified != resp.ContainerProperties.LastModified { + t.Errorf("Expected LastModified.Time to be %v, but got %v", properties.LastModified, resp.ContainerProperties.LastModified) + } + + if properties.PartitionKeyDefinition.Paths[0] != resp.ContainerProperties.PartitionKeyDefinition.Paths[0] { + t.Errorf("Expected PartitionKeyDefinition.Paths[0] to be %s, but got %s", properties.PartitionKeyDefinition.Paths[0], resp.ContainerProperties.PartitionKeyDefinition.Paths[0]) + } + + if properties.PartitionKeyDefinition.Version != resp.ContainerProperties.PartitionKeyDefinition.Version { + t.Errorf("Expected PartitionKeyDefinition.Version to be %d, but got %d", properties.PartitionKeyDefinition.Version, resp.ContainerProperties.PartitionKeyDefinition.Version) + } + + if resp.ActivityID != "someActivityId" { + t.Errorf("Expected ActivityId to be %s, but got %s", "someActivityId", resp.ActivityID) + } + + if resp.RequestCharge != 13.42 { + t.Errorf("Expected RequestCharge to be %f, but got %f", 13.42, resp.RequestCharge) + } + + if resp.ETag != "someEtag" { + t.Errorf("Expected ETag to be %s, but got %s", "someEtag", resp.ETag) + } +} + +func TestContainerDeleteItem(t *testing.T) { + srv, close := mock.NewTLSServer() + defer close() + srv.SetResponse( + mock.WithHeader(cosmosHeaderEtag, "someEtag"), + mock.WithHeader(cosmosHeaderActivityId, "someActivityId"), + mock.WithHeader(cosmosHeaderRequestCharge, "13.42"), + mock.WithStatusCode(204)) + + verifier := pipelineVerifier{} + + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv}) + client := &Client{endpoint: srv.URL(), pipeline: pl} + + database, _ := newDatabase("databaseId", client) + container, _ := newContainer("containerId", database) + + resp, err := container.DeleteItem(context.TODO(), NewPartitionKeyString("1"), "doc1", nil) + if err != nil { + t.Fatalf("Failed to delete item: %v", err) + } + + if resp.RawResponse == nil { + t.Fatal("RawResponse is nil") + } + + if resp.ActivityID == "" { + t.Fatal("Activity id was not returned") + } + + if resp.RequestCharge == 0 { + t.Fatal("Request charge was not returned") + } + + if resp.RequestCharge != 13.42 { + t.Errorf("Expected RequestCharge to be %f, but got %f", 13.42, resp.RequestCharge) + } + + if resp.ETag != "someEtag" { + t.Errorf("Expected ETag to be %s, but got %s", "someEtag", resp.ETag) + } + + if verifier.requests[0].method != http.MethodDelete { + t.Errorf("Expected method to be %s, but got %s", http.MethodDelete, verifier.requests[0].method) + } + + if verifier.requests[0].url.RequestURI() != "/dbs/databaseId/colls/containerId/docs/doc1" { + t.Errorf("Expected url to be %s, but got %s", "/dbs/databaseId/colls/containerId/docs/doc1", verifier.requests[0].url.RequestURI()) + } +} + +func TestContainerReadItem(t *testing.T) { + jsonString := []byte(`{"id":"doc1","foo":"bar"}`) + srv, close := mock.NewTLSServer() + defer close() + srv.SetResponse( + mock.WithBody(jsonString), + mock.WithHeader(cosmosHeaderEtag, "someEtag"), + mock.WithHeader(cosmosHeaderActivityId, "someActivityId"), + mock.WithHeader(cosmosHeaderRequestCharge, "13.42"), + mock.WithStatusCode(200)) + + verifier := pipelineVerifier{} + + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv}) + client := &Client{endpoint: srv.URL(), pipeline: pl} + + database, _ := newDatabase("databaseId", client) + container, _ := newContainer("containerId", database) + + resp, err := container.ReadItem(context.TODO(), NewPartitionKeyString("1"), "doc1", nil) + if err != nil { + t.Fatalf("Failed to read item: %v", err) + } + + if string(resp.Value) != string(jsonString) { + t.Errorf("Expected value to be %s, but got %s", string(jsonString), string(resp.Value)) + } + + if resp.RawResponse == nil { + t.Fatal("RawResponse is nil") + } + + if resp.ActivityID == "" { + t.Fatal("Activity id was not returned") + } + + if resp.RequestCharge == 0 { + t.Fatal("Request charge was not returned") + } + + if resp.RequestCharge != 13.42 { + t.Errorf("Expected RequestCharge to be %f, but got %f", 13.42, resp.RequestCharge) + } + + if resp.ETag != "someEtag" { + t.Errorf("Expected ETag to be %s, but got %s", "someEtag", resp.ETag) + } + + if verifier.requests[0].method != http.MethodGet { + t.Errorf("Expected method to be %s, but got %s", http.MethodGet, verifier.requests[0].method) + } + + if verifier.requests[0].url.RequestURI() != "/dbs/databaseId/colls/containerId/docs/doc1" { + t.Errorf("Expected url to be %s, but got %s", "/dbs/databaseId/colls/containerId/docs/doc1", verifier.requests[0].url.RequestURI()) + } +} + +func TestContainerReplaceItem(t *testing.T) { + jsonString := []byte(`{"id":"doc1","foo":"bar"}`) + srv, close := mock.NewTLSServer() + defer close() + srv.SetResponse( + mock.WithBody(jsonString), + mock.WithHeader(cosmosHeaderEtag, "someEtag"), + mock.WithHeader(cosmosHeaderActivityId, "someActivityId"), + mock.WithHeader(cosmosHeaderRequestCharge, "13.42"), + mock.WithStatusCode(200)) + + verifier := pipelineVerifier{} + + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv}) + client := &Client{endpoint: srv.URL(), pipeline: pl} + + database, _ := newDatabase("databaseId", client) + container, _ := newContainer("containerId", database) + + resp, err := container.ReplaceItem(context.TODO(), NewPartitionKeyString("1"), "doc1", jsonString, nil) + if err != nil { + t.Fatalf("Failed to read item: %v", err) + } + + if string(resp.Value) != string(jsonString) { + t.Errorf("Expected value to be %s, but got %s", string(jsonString), string(resp.Value)) + } + + if resp.RawResponse == nil { + t.Fatal("RawResponse is nil") + } + + if resp.ActivityID == "" { + t.Fatal("Activity id was not returned") + } + + if resp.RequestCharge == 0 { + t.Fatal("Request charge was not returned") + } + + if resp.RequestCharge != 13.42 { + t.Errorf("Expected RequestCharge to be %f, but got %f", 13.42, resp.RequestCharge) + } + + if resp.ETag != "someEtag" { + t.Errorf("Expected ETag to be %s, but got %s", "someEtag", resp.ETag) + } + + if verifier.requests[0].method != http.MethodPut { + t.Errorf("Expected method to be %s, but got %s", http.MethodPut, verifier.requests[0].method) + } + + if verifier.requests[0].body != string(jsonString) { + t.Errorf("Expected body to be %s, but got %s", string(jsonString), string(verifier.requests[0].body)) + } + + if verifier.requests[0].url.RequestURI() != "/dbs/databaseId/colls/containerId/docs/doc1" { + t.Errorf("Expected url to be %s, but got %s", "/dbs/databaseId/colls/containerId/docs/doc1", verifier.requests[0].url.RequestURI()) + } +} + +func TestContainerUpsertItem(t *testing.T) { + jsonString := []byte(`{"id":"doc1","foo":"bar"}`) + srv, close := mock.NewTLSServer() + defer close() + srv.SetResponse( + mock.WithBody(jsonString), + mock.WithHeader(cosmosHeaderEtag, "someEtag"), + mock.WithHeader(cosmosHeaderActivityId, "someActivityId"), + mock.WithHeader(cosmosHeaderRequestCharge, "13.42"), + mock.WithStatusCode(200)) + + verifier := pipelineVerifier{} + + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv}) + client := &Client{endpoint: srv.URL(), pipeline: pl} + + database, _ := newDatabase("databaseId", client) + container, _ := newContainer("containerId", database) + + resp, err := container.UpsertItem(context.TODO(), NewPartitionKeyString("1"), jsonString, nil) + if err != nil { + t.Fatalf("Failed to read item: %v", err) + } + + if string(resp.Value) != string(jsonString) { + t.Errorf("Expected value to be %s, but got %s", string(jsonString), string(resp.Value)) + } + + if resp.RawResponse == nil { + t.Fatal("RawResponse is nil") + } + + if resp.ActivityID == "" { + t.Fatal("Activity id was not returned") + } + + if resp.RequestCharge == 0 { + t.Fatal("Request charge was not returned") + } + + if resp.RequestCharge != 13.42 { + t.Errorf("Expected RequestCharge to be %f, but got %f", 13.42, resp.RequestCharge) + } + + if resp.ETag != "someEtag" { + t.Errorf("Expected ETag to be %s, but got %s", "someEtag", resp.ETag) + } + + if verifier.requests[0].method != http.MethodPost { + t.Errorf("Expected method to be %s, but got %s", http.MethodPost, verifier.requests[0].method) + } + + if verifier.requests[0].headers.Get(cosmosHeaderIsUpsert) != "true" { + t.Errorf("Expected header to be %s, but got %s", cosmosHeaderIsUpsert, verifier.requests[0].headers.Get(cosmosHeaderIsUpsert)) + } + + if verifier.requests[0].body != string(jsonString) { + t.Errorf("Expected body to be %s, but got %s", string(jsonString), string(verifier.requests[0].body)) + } + + if verifier.requests[0].url.RequestURI() != "/dbs/databaseId/colls/containerId/docs" { + t.Errorf("Expected url to be %s, but got %s", "/dbs/databaseId/colls/containerId/docs", verifier.requests[0].url.RequestURI()) + } +} + +func TestContainerCreateItem(t *testing.T) { + jsonString := []byte(`{"id":"doc1","foo":"bar"}`) + srv, close := mock.NewTLSServer() + defer close() + srv.SetResponse( + mock.WithBody(jsonString), + mock.WithHeader(cosmosHeaderEtag, "someEtag"), + mock.WithHeader(cosmosHeaderActivityId, "someActivityId"), + mock.WithHeader(cosmosHeaderRequestCharge, "13.42"), + mock.WithStatusCode(200)) + + verifier := pipelineVerifier{} + + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv}) + client := &Client{endpoint: srv.URL(), pipeline: pl} + + database, _ := newDatabase("databaseId", client) + container, _ := newContainer("containerId", database) + + resp, err := container.UpsertItem(context.TODO(), NewPartitionKeyString("1"), jsonString, nil) + if err != nil { + t.Fatalf("Failed to read item: %v", err) + } + + if string(resp.Value) != string(jsonString) { + t.Errorf("Expected value to be %s, but got %s", string(jsonString), string(resp.Value)) + } + + if resp.RawResponse == nil { + t.Fatal("RawResponse is nil") + } + + if resp.ActivityID == "" { + t.Fatal("Activity id was not returned") + } + + if resp.RequestCharge == 0 { + t.Fatal("Request charge was not returned") + } + + if resp.RequestCharge != 13.42 { + t.Errorf("Expected RequestCharge to be %f, but got %f", 13.42, resp.RequestCharge) + } + + if resp.ETag != "someEtag" { + t.Errorf("Expected ETag to be %s, but got %s", "someEtag", resp.ETag) + } + + if verifier.requests[0].method != http.MethodPost { + t.Errorf("Expected method to be %s, but got %s", http.MethodPost, verifier.requests[0].method) + } + + if verifier.requests[0].headers.Get(cosmosHeaderIsUpsert) == "" { + t.Errorf("Expected header to be empty, but got %s", verifier.requests[0].headers.Get(cosmosHeaderIsUpsert)) + } + + if verifier.requests[0].body != string(jsonString) { + t.Errorf("Expected body to be %s, but got %s", string(jsonString), string(verifier.requests[0].body)) + } + + if verifier.requests[0].url.RequestURI() != "/dbs/databaseId/colls/containerId/docs" { + t.Errorf("Expected url to be %s, but got %s", "/dbs/databaseId/colls/containerId/docs", verifier.requests[0].url.RequestURI()) + } +} + +func TestContainerQueryItems(t *testing.T) { + jsonStringpage1 := []byte(`{"Documents":[{"id":"doc1","foo":"bar"},{"id":"doc2","foo":"bar"}]}`) + jsonStringpage2 := []byte(`{"Documents":[{"id":"doc3","foo":"bar"},{"id":"doc4","foo":"bar"},{"id":"doc5","foo":"bar"}]}`) + + srv, close := mock.NewTLSServer() + defer close() + srv.AppendResponse( + mock.WithBody(jsonStringpage1), + mock.WithHeader(cosmosHeaderEtag, "someEtag"), + mock.WithHeader(cosmosHeaderQueryMetrics, "someQueryMetrics"), + mock.WithHeader(cosmosHeaderIndexUtilization, "someIndexUtilization"), + mock.WithHeader(cosmosHeaderActivityId, "someActivityId"), + mock.WithHeader(cosmosHeaderRequestCharge, "13.42"), + mock.WithHeader(cosmosHeaderContinuationToken, "someContinuationToken"), + mock.WithStatusCode(200)) + srv.AppendResponse( + mock.WithBody(jsonStringpage2), + mock.WithHeader(cosmosHeaderQueryMetrics, "someQueryMetrics"), + mock.WithHeader(cosmosHeaderIndexUtilization, "someIndexUtilization"), + mock.WithHeader(cosmosHeaderEtag, "someEtag"), + mock.WithHeader(cosmosHeaderActivityId, "someActivityId"), + mock.WithHeader(cosmosHeaderRequestCharge, "13.42"), + mock.WithStatusCode(200)) + + verifier := pipelineVerifier{} + + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{&verifier}}, &policy.ClientOptions{Transport: srv}) + client := &Client{endpoint: srv.URL(), pipeline: pl} + + database, _ := newDatabase("databaseId", client) + container, _ := newContainer("containerId", database) + + receivedIds := []string{} + queryPager := container.NewQueryItemsPager("select * from c", NewPartitionKeyString("1"), nil) + for queryPager.More() { + queryResponse, err := queryPager.NextPage(context.TODO()) + if err != nil { + t.Fatalf("Failed to query items: %v", err) + } + + for _, item := range queryResponse.Items { + var itemResponseBody map[string]interface{} + err = json.Unmarshal(item, &itemResponseBody) + if err != nil { + t.Fatalf("Failed to unmarshal: %v", err) + } + receivedIds = append(receivedIds, itemResponseBody["id"].(string)) + } + + if queryPager.More() && queryResponse.ContinuationToken != "someContinuationToken" { + t.Errorf("Expected ContinuationToken to be %s, but got %s", "someContinuationToken", queryResponse.ContinuationToken) + } + + if queryResponse.QueryMetrics == nil || *queryResponse.QueryMetrics != "someQueryMetrics" { + t.Errorf("Expected QueryMetrics to be %s, but got %s", "someQueryMetrics", *queryResponse.QueryMetrics) + } + + if queryResponse.IndexMetrics == nil || *queryResponse.IndexMetrics != "someIndexUtilization" { + t.Errorf("Expected IndexMetrics to be %s, but got %s", "someIndexUtilization", *queryResponse.IndexMetrics) + } + + if queryResponse.ActivityID == "" { + t.Fatal("Activity id was not returned") + } + + if queryResponse.RequestCharge == 0 { + t.Fatal("Request charge was not returned") + } + } + + for i := 0; i < 5; i++ { + if receivedIds[i] != "doc"+strconv.Itoa(i+1) { + t.Fatalf("Expected id %d, got %s", i, receivedIds[i]) + } + } + + if len(verifier.requests) != 2 { + t.Fatalf("Expected 2 requests, got %d", len(verifier.requests)) + } + + for index, request := range verifier.requests { + if request.method != http.MethodPost { + t.Errorf("Expected method to be %s, but got %s", http.MethodPost, request.method) + } + + if request.url.RequestURI() != "/dbs/databaseId/colls/containerId/docs" { + t.Errorf("Expected url to be %s, but got %s", "/dbs/databaseId/colls/containerId/docs", request.url.RequestURI()) + } + + if !request.isQuery { + t.Errorf("Expected request to be a query, but it was not") + } + + if request.body != "{\"query\":\"select * from c\"}" { + t.Errorf("Expected %v, but got %v", "{\"query\":\"select * from c\"}", request.body) + } + + if request.contentType != cosmosHeaderValuesQuery { + t.Errorf("Expected %v, but got %v", cosmosHeaderValuesQuery, request.contentType) + } + + if index == 0 && request.headers.Get(cosmosHeaderContinuationToken) != "" { + t.Errorf("Expected ContinuationToken to be %s, but got %s", "", request.headers.Get(cosmosHeaderContinuationToken)) + } + + if index == 1 && request.headers.Get(cosmosHeaderContinuationToken) != "someContinuationToken" { + t.Errorf("Expected ContinuationToken to be %s, but got %s", "someContinuationToken", request.headers.Get(cosmosHeaderContinuationToken)) + } + } +} diff --git a/sdk/data/azcosmos/cosmos_headers.go b/sdk/data/azcosmos/cosmos_headers.go index 87dc74440054..c0c7faebc48c 100644 --- a/sdk/data/azcosmos/cosmos_headers.go +++ b/sdk/data/azcosmos/cosmos_headers.go @@ -4,29 +4,38 @@ package azcosmos const ( - cosmosHeaderRequestCharge string = "x-ms-request-charge" - cosmosHeaderActivityId string = "x-ms-activity-id" - cosmosHeaderEtag string = "etag" - cosmosHeaderPopulateQuotaInfo string = "x-ms-documentdb-populatequotainfo" - cosmosHeaderPreTriggerInclude string = "x-ms-documentdb-pre-trigger-include" - cosmosHeaderPostTriggerInclude string = "x-ms-documentdb-post-trigger-include" - cosmosHeaderIndexingDirective string = "x-ms-indexing-directive" - cosmosHeaderSessionToken string = "x-ms-session-token" - cosmosHeaderConsistencyLevel string = "x-ms-consistency-level" - cosmosHeaderPartitionKey string = "x-ms-documentdb-partitionkey" - cosmosHeaderPrefer string = "Prefer" - cosmosHeaderIsUpsert string = "x-ms-documentdb-is-upsert" - cosmosHeaderOfferThroughput string = "x-ms-offer-throughput" - cosmosHeaderOfferAutoscale string = "x-ms-cosmos-offer-autopilot-settings" - cosmosHeaderQuery string = "x-ms-documentdb-query" - cosmosHeaderOfferReplacePending string = "x-ms-offer-replace-pending" - cosmosHeaderOfferMinimumThroughput string = "x-ms-cosmos-min-throughput" - headerXmsDate string = "x-ms-date" - headerAuthorization string = "Authorization" - headerContentType string = "Content-Type" - headerIfMatch string = "If-Match" - headerIfNoneMatch string = "If-None-Match" - headerXmsVersion string = "x-ms-version" + cosmosHeaderRequestCharge string = "x-ms-request-charge" + cosmosHeaderActivityId string = "x-ms-activity-id" + cosmosHeaderEtag string = "etag" + cosmosHeaderPopulateQuotaInfo string = "x-ms-documentdb-populatequotainfo" + cosmosHeaderPreTriggerInclude string = "x-ms-documentdb-pre-trigger-include" + cosmosHeaderPostTriggerInclude string = "x-ms-documentdb-post-trigger-include" + cosmosHeaderIndexingDirective string = "x-ms-indexing-directive" + cosmosHeaderSessionToken string = "x-ms-session-token" + cosmosHeaderConsistencyLevel string = "x-ms-consistency-level" + cosmosHeaderPartitionKey string = "x-ms-documentdb-partitionkey" + cosmosHeaderPrefer string = "Prefer" + cosmosHeaderIsUpsert string = "x-ms-documentdb-is-upsert" + cosmosHeaderOfferThroughput string = "x-ms-offer-throughput" + cosmosHeaderOfferAutoscale string = "x-ms-cosmos-offer-autopilot-settings" + cosmosHeaderQuery string = "x-ms-documentdb-query" + cosmosHeaderOfferReplacePending string = "x-ms-offer-replace-pending" + cosmosHeaderOfferMinimumThroughput string = "x-ms-cosmos-min-throughput" + cosmosHeaderResponseContinuationTokenLimitInKb string = "x-ms-documentdb-responsecontinuationtokenlimitinkb" + cosmosHeaderEnableScanInQuery string = "x-ms-documentdb-force-query-scan" + cosmosHeaderMaxItemCount string = "x-ms-max-item-count" + cosmosHeaderContinuationToken string = "x-ms-continuation" + cosmosHeaderPopulateIndexMetrics string = "x-ms-cosmos-populateindexmetrics" + cosmosHeaderPopulateQueryMetrics string = "x-ms-documentdb-populatequerymetrics" + cosmosHeaderQueryMetrics string = "x-ms-documentdb-query-metrics" + cosmosHeaderIndexUtilization string = "x-ms-cosmos-index-utilization" + cosmosHeaderCorrelatedActivityId string = "x-ms-cosmos-correlated-activityid" + headerXmsDate string = "x-ms-date" + headerAuthorization string = "Authorization" + headerContentType string = "Content-Type" + headerIfMatch string = "If-Match" + headerIfNoneMatch string = "If-None-Match" + headerXmsVersion string = "x-ms-version" ) const ( diff --git a/sdk/data/azcosmos/cosmos_headers_policy.go b/sdk/data/azcosmos/cosmos_headers_policy.go index 23ca2d4f5773..f3a0eebbf5ef 100644 --- a/sdk/data/azcosmos/cosmos_headers_policy.go +++ b/sdk/data/azcosmos/cosmos_headers_policy.go @@ -7,6 +7,7 @@ import ( "net/http" "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/internal/uuid" ) type headerPolicies struct { @@ -16,6 +17,7 @@ type headerPolicies struct { type headerOptionsOverride struct { enableContentResponseOnWrite *bool partitionKey *PartitionKey + correlatedActivityId *uuid.UUID } func (p *headerPolicies) Do(req *policy.Request) (*http.Response, error) { @@ -35,6 +37,10 @@ func (p *headerPolicies) Do(req *policy.Request) (*http.Response, error) { } req.Raw().Header.Add(cosmosHeaderPartitionKey, string(pkAsString)) } + + if o.headerOptionsOverride.correlatedActivityId != nil { + req.Raw().Header.Add(cosmosHeaderCorrelatedActivityId, (*o.headerOptionsOverride.correlatedActivityId).String()) + } } if o.isWriteOperation && !enableContentResponseOnWrite { diff --git a/sdk/data/azcosmos/cosmos_headers_policy_test.go b/sdk/data/azcosmos/cosmos_headers_policy_test.go index 1fb90e69aa75..ab68973e616b 100644 --- a/sdk/data/azcosmos/cosmos_headers_policy_test.go +++ b/sdk/data/azcosmos/cosmos_headers_policy_test.go @@ -11,6 +11,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" "github.com/Azure/azure-sdk-for-go/sdk/internal/mock" + "github.com/Azure/azure-sdk-for-go/sdk/internal/uuid" ) func TestAddContentHeaderDefaultOnWriteOperation(t *testing.T) { @@ -192,14 +193,47 @@ func TestAddPartitionKeyHeader(t *testing.T) { } } +func TestAddCorrelatedActivityIdHeader(t *testing.T) { + headerPolicy := &headerPolicies{} + srv, close := mock.NewTLSServer() + defer close() + srv.SetResponse(mock.WithStatusCode(http.StatusOK)) + + verifier := headerPoliciesVerify{} + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{PerCall: []policy.Policy{headerPolicy, &verifier}}, &policy.ClientOptions{Transport: srv}) + req, err := azruntime.NewRequest(context.Background(), http.MethodGet, srv.URL()) + + correlatedActivityId, _ := uuid.New() + req.SetOperationValue(pipelineRequestOptions{ + headerOptionsOverride: &headerOptionsOverride{ + correlatedActivityId: &correlatedActivityId, + }, + }) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + _, err = pl.Do(req) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if verifier.isCorrelatedActivityIdSet != correlatedActivityId.String() { + t.Fatalf("expected correlatedActivityId header to be set") + } +} + type headerPoliciesVerify struct { isEnableContentResponseOnWriteHeaderSet bool isPartitionKeyHeaderSet string + isCorrelatedActivityIdSet string } func (p *headerPoliciesVerify) Do(req *policy.Request) (*http.Response, error) { p.isEnableContentResponseOnWriteHeaderSet = req.Raw().Header.Get(cosmosHeaderPrefer) != "" p.isPartitionKeyHeaderSet = req.Raw().Header.Get(cosmosHeaderPartitionKey) + p.isCorrelatedActivityIdSet = req.Raw().Header.Get(cosmosHeaderCorrelatedActivityId) return req.Next() } diff --git a/sdk/data/azcosmos/cosmos_query_request_options.go b/sdk/data/azcosmos/cosmos_query_request_options.go new file mode 100644 index 000000000000..cba848f1438f --- /dev/null +++ b/sdk/data/azcosmos/cosmos_query_request_options.go @@ -0,0 +1,70 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package azcosmos + +import "fmt" + +// QueryOptions includes options for query operations on items. +type QueryOptions struct { + // SessionToken to be used when using Session consistency on the account. + // When working with Session consistency, each new write request to Azure Cosmos DB is assigned a new SessionToken. + // The client instance will use this token internally with each read/query request to ensure that the set consistency level is maintained. + // In some scenarios you need to manage this Session yourself: Consider a web application with multiple nodes, each node will have its own client instance. + // If you wanted these nodes to participate in the same session (to be able read your own writes consistently across web tiers), + // you would have to send the SessionToken from the response of the write action on one node to the client tier, using a cookie or some other mechanism, and have that token flow back to the web tier for subsequent reads. + // If you are using a round-robin load balancer which does not maintain session affinity between requests, such as the Azure Load Balancer,the read could potentially land on a different node to the write request, where the session was created. + SessionToken string + // ConsistencyLevel overrides the account defined consistency level for this operation. + // Consistency can only be relaxed. + ConsistencyLevel *ConsistencyLevel + // PopulateIndexMetrics is used to obtain the index metrics to understand how the query engine used existing indexes and how it could use potential new indexes. + // Please note that this options will incur overhead, so it should be enabled only when debugging slow queries and not in production. + PopulateIndexMetrics bool + // ResponseContinuationTokenLimitInKB is used to limit the length of continuation token in the query response. Valid values are >= 0. + ResponseContinuationTokenLimitInKB int32 + // PageSizeHint determines the maximum number of items to be retrieved in a query result page. + // '-1' Used for dynamic page size. This is a maximum. Query can return 0 items in the page. + PageSizeHint int32 + // EnableScanInQuery Allow scan on the queries which couldn't be served as indexing was opted out on the requested paths. + EnableScanInQuery bool + // ContinuationToken to be used to continue a previous query execution. + // Obtained from QueryItemsResponse.ContinuationToken. + ContinuationToken string +} + +func (options *QueryOptions) toHeaders() *map[string]string { + headers := make(map[string]string) + + if options.ConsistencyLevel != nil { + headers[cosmosHeaderConsistencyLevel] = string(*options.ConsistencyLevel) + } + + if options.SessionToken != "" { + headers[cosmosHeaderSessionToken] = options.SessionToken + } + + if options.ResponseContinuationTokenLimitInKB > 0 { + headers[cosmosHeaderResponseContinuationTokenLimitInKb] = fmt.Sprint(options.ResponseContinuationTokenLimitInKB) + } + + if options.PageSizeHint != 0 { + headers[cosmosHeaderMaxItemCount] = fmt.Sprint(options.PageSizeHint) + } + + if options.EnableScanInQuery { + headers[cosmosHeaderEnableScanInQuery] = "true" + } + + if options.PopulateIndexMetrics { + headers[cosmosHeaderPopulateIndexMetrics] = "true" + } + + if options.ContinuationToken != "" { + headers[cosmosHeaderContinuationToken] = options.ContinuationToken + } + + headers[cosmosHeaderPopulateQueryMetrics] = "true" + + return &headers +} diff --git a/sdk/data/azcosmos/cosmos_query_request_options_test.go b/sdk/data/azcosmos/cosmos_query_request_options_test.go new file mode 100644 index 000000000000..cd8789077a31 --- /dev/null +++ b/sdk/data/azcosmos/cosmos_query_request_options_test.go @@ -0,0 +1,49 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package azcosmos + +import ( + "testing" +) + +func TestQueryRequestOptionsToHeaders(t *testing.T) { + options := &QueryOptions{} + options.ConsistencyLevel = ConsistencyLevelSession.ToPtr() + options.SessionToken = "sessionToken" + options.PageSizeHint = 20 + options.EnableScanInQuery = true + options.ResponseContinuationTokenLimitInKB = 100 + options.PopulateIndexMetrics = true + options.ContinuationToken = "continuationToken" + header := options.toHeaders() + if header == nil { + t.Fatal("toHeaders should return non-nil") + } + + headers := *header + if headers[cosmosHeaderConsistencyLevel] != "Session" { + t.Errorf("ConsistencyLevel should be Session but got %v", headers[cosmosHeaderConsistencyLevel]) + } + if headers[cosmosHeaderSessionToken] != "sessionToken" { + t.Errorf("SessionToken should be sessionToken but got %v", headers[cosmosHeaderSessionToken]) + } + if headers[cosmosHeaderMaxItemCount] != "20" { + t.Errorf("PageSizeHint should be 20 but got %v", headers[cosmosHeaderMaxItemCount]) + } + if headers[cosmosHeaderEnableScanInQuery] != "true" { + t.Errorf("EnableScanInQuery should be true but got %v", headers[cosmosHeaderEnableScanInQuery]) + } + if headers[cosmosHeaderResponseContinuationTokenLimitInKb] != "100" { + t.Errorf("ResponseContinuationTokenLimitInKb should be 100 but got %v", headers[cosmosHeaderResponseContinuationTokenLimitInKb]) + } + if headers[cosmosHeaderPopulateIndexMetrics] != "true" { + t.Errorf("PopulateIndexMetrics should be true but got %v", headers[cosmosHeaderPopulateIndexMetrics]) + } + if headers[cosmosHeaderContinuationToken] != "continuationToken" { + t.Errorf("ContinuationToken should be continuationToken but got %v", headers[cosmosHeaderContinuationToken]) + } + if headers[cosmosHeaderPopulateQueryMetrics] != "true" { + t.Errorf("PopulateQueryMetrics should be true but got %v", headers[cosmosHeaderPopulateQueryMetrics]) + } +} diff --git a/sdk/data/azcosmos/cosmos_query_response.go b/sdk/data/azcosmos/cosmos_query_response.go new file mode 100644 index 000000000000..529fe8c6d130 --- /dev/null +++ b/sdk/data/azcosmos/cosmos_query_response.go @@ -0,0 +1,62 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package azcosmos + +import ( + "encoding/json" + "net/http" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" +) + +// QueryItemsResponse contains response from the query operation. +type QueryItemsResponse struct { + Response + // ContinuationToken contains the value of the x-ms-continuation header in the response. + // It can be used to stop a query and resume it later. + ContinuationToken string + // Contains the query metrics related to the query execution + QueryMetrics *string + // IndexMetrics contains the index utilization metrics if QueryOptions.PopulateIndexMetrics = true + IndexMetrics *string + // List of items. + Items [][]byte +} + +func newQueryResponse(resp *http.Response) (QueryItemsResponse, error) { + response := QueryItemsResponse{ + Response: newResponse(resp), + } + + response.ContinuationToken = resp.Header.Get(cosmosHeaderContinuationToken) + queryMetrics := resp.Header.Get(cosmosHeaderQueryMetrics) + if queryMetrics != "" { + response.QueryMetrics = &queryMetrics + } + queryIndexUtilization := resp.Header.Get(cosmosHeaderIndexUtilization) + if queryIndexUtilization != "" { + response.IndexMetrics = &queryIndexUtilization + } + + result := queryServiceResponse{} + if err := runtime.UnmarshalAsJSON(resp, &result); err != nil { + return QueryItemsResponse{}, err + } + + marshalledValue := make([][]byte, 0) + for _, e := range result.Documents { + m, err := json.Marshal(e) + if err != nil { + return QueryItemsResponse{}, err + } + marshalledValue = append(marshalledValue, m) + } + response.Items = marshalledValue + + return response, nil +} + +type queryServiceResponse struct { + Documents []map[string]interface{} `json:"Documents,omitempty"` +} diff --git a/sdk/data/azcosmos/cosmos_query_response_test.go b/sdk/data/azcosmos/cosmos_query_response_test.go new file mode 100644 index 000000000000..a2424a17ea1a --- /dev/null +++ b/sdk/data/azcosmos/cosmos_query_response_test.go @@ -0,0 +1,96 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package azcosmos + +import ( + "context" + "encoding/json" + "net/http" + "strconv" + "testing" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + azruntime "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" + "github.com/Azure/azure-sdk-for-go/sdk/internal/mock" +) + +func TestQueryResponseParsing(t *testing.T) { + queryResponseRaw := map[string][]map[string]string{ + "Documents": { + {"id": "id1", "name": "name"}, + {"id": "id2", "name": "name"}, + }, + } + + jsonString, err := json.Marshal(queryResponseRaw) + if err != nil { + t.Fatal(err) + } + + srv, close := mock.NewTLSServer() + defer close() + srv.SetResponse( + mock.WithBody(jsonString), + mock.WithHeader(cosmosHeaderEtag, "someEtag"), + mock.WithHeader(cosmosHeaderQueryMetrics, "someQueryMetrics"), + mock.WithHeader(cosmosHeaderIndexUtilization, "indexUtilization"), + mock.WithHeader(cosmosHeaderActivityId, "someActivityId"), + mock.WithHeader(cosmosHeaderRequestCharge, "13.42")) + + req, err := azruntime.NewRequest(context.Background(), http.MethodGet, srv.URL()) + if err != nil { + t.Fatal(err) + } + + pl := azruntime.NewPipeline("azcosmostest", "v1.0.0", azruntime.PipelineOptions{}, &policy.ClientOptions{Transport: srv}) + resp, _ := pl.Do(req) + parsedResponse, err := newQueryResponse(resp) + if err != nil { + t.Fatal(err) + } + + if parsedResponse.RawResponse == nil { + t.Fatal("parsedResponse.RawResponse is nil") + } + + if parsedResponse.ActivityID != "someActivityId" { + t.Errorf("Expected ActivityId to be %s, but got %s", "someActivityId", parsedResponse.ActivityID) + } + + if parsedResponse.RequestCharge != 13.42 { + t.Errorf("Expected RequestCharge to be %f, but got %f", 13.42, parsedResponse.RequestCharge) + } + + if parsedResponse.ETag != "someEtag" { + t.Errorf("Expected ETag to be %s, but got %s", "someEtag", parsedResponse.ETag) + } + + if *parsedResponse.QueryMetrics != "someQueryMetrics" { + t.Errorf("Expected IndexMetrics to be %s, but got %s", "someQueryMetrics", *parsedResponse.IndexMetrics) + } + + if *parsedResponse.IndexMetrics != "indexUtilization" { + t.Errorf("Expected IndexUtilization to be %s, but got %s", "indexUtilization", *parsedResponse.IndexMetrics) + } + + if len(parsedResponse.Items) != 2 { + t.Errorf("Expected 2 documents, but got %d", len(parsedResponse.Items)) + } + + for index, item := range parsedResponse.Items { + var itemResponseBody map[string]interface{} + err = json.Unmarshal(item, &itemResponseBody) + if err != nil { + t.Fatalf("Failed to unmarshal item response: %v", err) + } + + if itemResponseBody["id"] != ("id" + strconv.Itoa(index+1)) { + t.Errorf("Expected id to be %s, but got %s", "id"+strconv.Itoa(index+1), itemResponseBody["id"]) + } + + if itemResponseBody["name"] != "name" { + t.Errorf("Expected name to be %s, but got %s", "name", itemResponseBody["name"]) + } + } +} diff --git a/sdk/data/azcosmos/doc.go b/sdk/data/azcosmos/doc.go index cf1eeab921d2..30ccb34271d2 100644 --- a/sdk/data/azcosmos/doc.go +++ b/sdk/data/azcosmos/doc.go @@ -35,6 +35,7 @@ The following sections provide several code snippets covering some of the most c - Creating a database - Creating a container - Creating, reading, and deleting items + - Querying items Creating a database @@ -105,5 +106,21 @@ Creating, reading, and deleting items itemResponse, err = container.DeleteItem(context, pk, id, nil) handle(err) + +Querying items + + pk := azcosmos.NewPartitionKeyString("myPartitionKeyValue") + queryPager := container.NewQueryItemsPager("select * from docs c", pk, nil) + for queryPager.More() { + queryResponse, err := queryPager.NextPage(context) + if err != nil { + handle(err) + } + + for _, item := range queryResponse.Items { + var itemResponseBody map[string]interface{} + json.Unmarshal(item, &itemResponseBody) + } + } */ package azcosmos diff --git a/sdk/data/azcosmos/emulator_cosmos_query_test.go b/sdk/data/azcosmos/emulator_cosmos_query_test.go new file mode 100644 index 000000000000..321cb5b19fd9 --- /dev/null +++ b/sdk/data/azcosmos/emulator_cosmos_query_test.go @@ -0,0 +1,206 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package azcosmos + +import ( + "context" + "encoding/json" + "strconv" + "testing" +) + +func TestSinglePartitionQueryWithIndexMetrics(t *testing.T) { + emulatorTests := newEmulatorTests(t) + client := emulatorTests.getClient(t) + + database := emulatorTests.createDatabase(t, context.TODO(), client, "queryTests") + defer emulatorTests.deleteDatabase(t, context.TODO(), database) + properties := ContainerProperties{ + ID: "aContainer", + PartitionKeyDefinition: PartitionKeyDefinition{ + Paths: []string{"/pk"}, + }, + } + + _, err := database.CreateContainer(context.TODO(), properties, nil) + if err != nil { + t.Fatalf("Failed to create container: %v", err) + } + + container, _ := database.NewContainer("aContainer") + documentsPerPk := 1 + createSampleItems(t, container, documentsPerPk) + + receivedIds := []string{} + queryPager := container.NewQueryItemsPager("select * from docs c where c.someProp = '2'", NewPartitionKeyString("1"), &QueryOptions{PopulateIndexMetrics: true}) + for queryPager.More() { + queryResponse, err := queryPager.NextPage(context.TODO()) + if err != nil { + t.Fatalf("Failed to query items: %v", err) + } + + for _, item := range queryResponse.Items { + var itemResponseBody map[string]interface{} + err = json.Unmarshal(item, &itemResponseBody) + if err != nil { + t.Fatalf("Failed to unmarshal: %v", err) + } + receivedIds = append(receivedIds, itemResponseBody["id"].(string)) + } + + if queryPager.More() && queryResponse.ContinuationToken == "" { + t.Fatal("Query has more pages but no continuation was provided") + } + + if queryResponse.QueryMetrics == nil { + t.Fatal("Query metrics were not returned") + } + + if queryResponse.IndexMetrics == nil { + t.Fatal("Index metrics were not returned") + } + + if queryResponse.ActivityID == "" { + t.Fatal("Activity id was not returned") + } + + if queryResponse.RequestCharge == 0 { + t.Fatal("Request charge was not returned") + } + + if len(queryResponse.Items) != 1 && len(queryResponse.Items) != 0 { + t.Fatalf("Expected 1 items, got %d", len(queryResponse.Items)) + } + } + + if len(receivedIds) != 1 { + t.Fatalf("Expected 1 documents, got %d", len(receivedIds)) + } + + if receivedIds[0] != "0" { + t.Fatalf("Expected id 0, got %s", receivedIds[0]) + } +} + +func TestSinglePartitionQuery(t *testing.T) { + emulatorTests := newEmulatorTests(t) + client := emulatorTests.getClient(t) + + database := emulatorTests.createDatabase(t, context.TODO(), client, "queryTests") + defer emulatorTests.deleteDatabase(t, context.TODO(), database) + properties := ContainerProperties{ + ID: "aContainer", + PartitionKeyDefinition: PartitionKeyDefinition{ + Paths: []string{"/pk"}, + }, + } + + _, err := database.CreateContainer(context.TODO(), properties, nil) + if err != nil { + t.Fatalf("Failed to create container: %v", err) + } + + container, _ := database.NewContainer("aContainer") + documentsPerPk := 10 + createSampleItems(t, container, documentsPerPk) + + numberOfPages := 0 + receivedIds := []string{} + opt := QueryOptions{PageSizeHint: 5} + queryPager := container.NewQueryItemsPager("select * from c", NewPartitionKeyString("1"), &opt) + for queryPager.More() { + queryResponse, err := queryPager.NextPage(context.TODO()) + if err != nil { + t.Fatalf("Failed to query items: %v", err) + } + + numberOfPages++ + for _, item := range queryResponse.Items { + var itemResponseBody map[string]interface{} + err = json.Unmarshal(item, &itemResponseBody) + if err != nil { + t.Fatalf("Failed to unmarshal: %v", err) + } + receivedIds = append(receivedIds, itemResponseBody["id"].(string)) + } + + if queryPager.More() && queryResponse.ContinuationToken == "" { + t.Fatal("Query has more pages but no continuation was provided") + } + + if queryResponse.QueryMetrics == nil { + t.Fatal("Query metrics were not returned") + } + + if queryResponse.IndexMetrics != nil { + t.Fatal("Index metrics were returned") + } + + if queryResponse.ActivityID == "" { + t.Fatal("Activity id was not returned") + } + + if queryResponse.RequestCharge == 0 { + t.Fatal("Request charge was not returned") + } + + if len(queryResponse.Items) != 5 && len(queryResponse.Items) != 0 { + t.Fatalf("Expected 5 items, got %d", len(queryResponse.Items)) + } + + if numberOfPages == 2 && opt.ContinuationToken != "" { + t.Fatalf("Original options should not be modified, initial continuation was empty, now it has %v", opt.ContinuationToken) + } + } + + if numberOfPages != 2 { + t.Fatalf("Expected 2 pages, got %d", numberOfPages) + } + + if len(receivedIds) != documentsPerPk { + t.Fatalf("Expected %d documents, got %d", documentsPerPk, len(receivedIds)) + } + + for i := 0; i < documentsPerPk; i++ { + if receivedIds[i] != strconv.Itoa(i) { + t.Fatalf("Expected id %d, got %s", i, receivedIds[i]) + } + } +} + +func createSampleItems(t *testing.T, container *ContainerClient, documentsPerPk int) { + for i := 0; i < documentsPerPk; i++ { + item := map[string]string{ + "id": strconv.Itoa(i), + "pk": "1", + "someProp": "2", + } + + marshalled, err := json.Marshal(item) + if err != nil { + t.Fatal(err) + } + + _, err = container.CreateItem(context.TODO(), NewPartitionKeyString("1"), marshalled, nil) + if err != nil { + t.Fatalf("Failed to create item: %v", err) + } + + item2 := map[string]string{ + "id": strconv.Itoa(i), + "pk": "2", + "someProp": "2", + } + + marshalled, err = json.Marshal(item2) + if err != nil { + t.Fatal(err) + } + + _, err = container.CreateItem(context.TODO(), NewPartitionKeyString("2"), marshalled, nil) + if err != nil { + t.Fatalf("Failed to create item: %v", err) + } + } +} diff --git a/sdk/data/azcosmos/example_test.go b/sdk/data/azcosmos/example_test.go index f453555be5db..9d927a8f9f3a 100644 --- a/sdk/data/azcosmos/example_test.go +++ b/sdk/data/azcosmos/example_test.go @@ -8,10 +8,10 @@ import ( "encoding/json" "errors" "fmt" - "net/http" "os" "time" + "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" ) @@ -64,7 +64,9 @@ func ExampleClient_CreateDatabase() { databaseProperties := azcosmos.DatabaseProperties{ID: "databaseName"} databaseResponse, err := client.CreateDatabase(context.Background(), databaseProperties, nil) if err != nil { - panic(err) + var responseErr *azcore.ResponseError + errors.As(err, &responseErr) + panic(responseErr) } fmt.Printf("Database created. ActivityId %s", databaseResponse.ActivityID) @@ -107,7 +109,9 @@ func ExampleDatabaseClient_CreateContainer() { resp, err := database.CreateContainer(context.Background(), properties, &azcosmos.CreateContainerOptions{ThroughputProperties: &throughput}) if err != nil { - panic(err) + var responseErr *azcore.ResponseError + errors.As(err, &responseErr) + panic(responseErr) } fmt.Printf("Container created. ActivityId %s", resp.ActivityID) @@ -154,7 +158,9 @@ func ExampleContainerClient_ReplaceThroughput() { newScale := azcosmos.NewManualThroughputProperties(500) replaceThroughputResponse, err := container.ReplaceThroughput(context.Background(), newScale, nil) if err != nil { - panic(err) + var responseErr *azcore.ResponseError + errors.As(err, &responseErr) + panic(responseErr) } fmt.Printf("Throughput updated. ActivityId %s", replaceThroughputResponse.ActivityID) @@ -202,7 +208,9 @@ func ExampleContainerClient_Replace() { // Replace container properties replaceResponse, err := container.Replace(context.Background(), *containerResponse.ContainerProperties, nil) if err != nil { - panic(err) + var responseErr *azcore.ResponseError + errors.As(err, &responseErr) + panic(responseErr) } fmt.Printf("Container updated. ActivityId %s", replaceResponse.ActivityID) @@ -249,7 +257,9 @@ func ExampleContainerClient_CreateItem() { itemResponse, err := container.CreateItem(context.Background(), pk, marshalled, nil) if err != nil { - panic(err) + var responseErr *azcore.ResponseError + errors.As(err, &responseErr) + panic(responseErr) } fmt.Printf("Item created. ActivityId %s consuming %v RU", itemResponse.ActivityID, itemResponse.RequestCharge) @@ -286,7 +296,9 @@ func ExampleContainerClient_ReadItem() { id := "anId" itemResponse, err := container.ReadItem(context.Background(), pk, id, nil) if err != nil { - panic(err) + var responseErr *azcore.ResponseError + errors.As(err, &responseErr) + panic(responseErr) } var itemResponseBody map[string]string @@ -347,7 +359,9 @@ func ExampleContainerClient_ReplaceItem() { itemResponse, err = container.ReplaceItem(context.Background(), pk, id, marshalledReplace, nil) if err != nil { - panic(err) + var responseErr *azcore.ResponseError + errors.As(err, &responseErr) + panic(responseErr) } fmt.Printf("Item replaced. ActivityId %s consuming %v RU", itemResponse.ActivityID, itemResponse.RequestCharge) @@ -384,7 +398,9 @@ func ExampleContainerClient_DeleteItem() { id := "anId" itemResponse, err := container.DeleteItem(context.Background(), pk, id, nil) if err != nil { - panic(err) + var responseErr *azcore.ResponseError + errors.As(err, &responseErr) + panic(responseErr) } fmt.Printf("Item deleted. ActivityId %s consuming %v RU", itemResponse.ActivityID, itemResponse.RequestCharge) @@ -440,7 +456,9 @@ func ExampleContainerClient_ReadItem_sessionConsistency() { // In another client, maintain the session by passing the session token itemResponse, err = container.ReadItem(context.Background(), pk, id, &azcosmos.ItemOptions{SessionToken: itemSessionToken}) if err != nil { - panic(err) + var responseErr *azcore.ResponseError + errors.As(err, &responseErr) + panic(responseErr) } fmt.Printf("Item read. ActivityId %s consuming %v RU", itemResponse.ActivityID, itemResponse.RequestCharge) @@ -501,15 +519,64 @@ func ExampleContainerClient_ReplaceItem_optimisticConcurrency() { // Replace with Etag etag := itemResponse.ETag itemResponse, err = container.ReplaceItem(context.Background(), pk, id, marshalledReplace, &azcosmos.ItemOptions{IfMatchEtag: &etag}) - var httpErr interface{ RawResponse() *http.Response } + var responseErr *azcore.ResponseError - return (errors.As(err, &httpErr) && itemResponse.RawResponse.StatusCode == 412), err + return (errors.As(err, &responseErr) && responseErr.StatusCode == 412), err }) if err != nil { panic(err) } } +func ExampleContainerClient_NewQueryItemsPager() { + endpoint, ok := os.LookupEnv("AZURE_COSMOS_ENDPOINT") + if !ok { + panic("AZURE_COSMOS_ENDPOINT could not be found") + } + + key, ok := os.LookupEnv("AZURE_COSMOS_KEY") + if !ok { + panic("AZURE_COSMOS_KEY could not be found") + } + + cred, err := azcosmos.NewKeyCredential(key) + if err != nil { + panic(err) + } + + client, err := azcosmos.NewClientWithKey(endpoint, cred, nil) + if err != nil { + panic(err) + } + + container, err := client.NewContainer("databaseName", "aContainer") + if err != nil { + panic(err) + } + + pk := azcosmos.NewPartitionKeyString("newPartitionKey") + + queryPager := container.NewQueryItemsPager("select * from docs c", pk, nil) + for queryPager.More() { + queryResponse, err := queryPager.NextPage(context.Background()) + if err != nil { + var responseErr *azcore.ResponseError + errors.As(err, &responseErr) + panic(responseErr) + } + + for _, item := range queryResponse.Items { + var itemResponseBody map[string]interface{} + err = json.Unmarshal(item, &itemResponseBody) + if err != nil { + panic(err) + } + } + + fmt.Printf("Query page received with %v items. ActivityId %s consuming %v RU", len(queryResponse.Items), queryResponse.ActivityID, queryResponse.RequestCharge) + } +} + func retryOptimisticConcurrency(retryAttempts int, wait time.Duration, retry func() (bool, error)) (result error) { for i := 0; ; i++ { retryResult, err := retry() diff --git a/sdk/data/azcosmos/go.mod b/sdk/data/azcosmos/go.mod index bbf875a632f2..53a61a526027 100644 --- a/sdk/data/azcosmos/go.mod +++ b/sdk/data/azcosmos/go.mod @@ -1,12 +1,18 @@ module github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos -go 1.16 +go 1.18 require ( - github.com/Azure/azure-sdk-for-go v57.3.0+incompatible - github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.0 - github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3 - github.com/davecgh/go-spew v1.1.1 // indirect + github.com/Azure/azure-sdk-for-go v63.2.0+incompatible + github.com/Azure/azure-sdk-for-go/sdk/azcore v0.23.0 + github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.2 github.com/stretchr/testify v1.7.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f // indirect + golang.org/x/text v0.3.7 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect ) diff --git a/sdk/data/azcosmos/go.sum b/sdk/data/azcosmos/go.sum index deb56301b319..11b87f3d512f 100644 --- a/sdk/data/azcosmos/go.sum +++ b/sdk/data/azcosmos/go.sum @@ -1,9 +1,9 @@ -github.com/Azure/azure-sdk-for-go v57.3.0+incompatible h1:zxuxvsRYSXcowMuT/P5b7o6YJYuGYP74jCb9IvlgOLA= -github.com/Azure/azure-sdk-for-go v57.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= -github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.0 h1:8wVJL0HUP5yDFXvotdewORTw7Yu88JbreWN/mobSvsQ= -github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.0/go.mod h1:fBF9PQNqB8scdgpZ3ufzaLntG0AG7C1WjPMsiFOmfHM= -github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3 h1:E+m3SkZCN0Bf5q7YdTs5lSm2CYY3CK4spn5OmUIiQtk= -github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3/go.mod h1:KLF4gFr6DcKFZwSuH8w8yEK6DpFl3LP5rhdvAb7Yz5I= +github.com/Azure/azure-sdk-for-go v63.2.0+incompatible h1:OIqkK/zTGqVUuzpEvY0B1YSYDRAFC/j+y0w2GovCggI= +github.com/Azure/azure-sdk-for-go v63.2.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= +github.com/Azure/azure-sdk-for-go/sdk/azcore v0.23.0 h1:D7l5jspkc4kwBYRWoZE4DQnu6LVpLwDsMZjBKS4wZLQ= +github.com/Azure/azure-sdk-for-go/sdk/azcore v0.23.0/go.mod h1:w5pDIZuawUmY3Bj4tVx3Xb8KS96ToB0j315w9rqpAg0= +github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.2 h1:Px2KVERcYEg2Lv25AqC2hVr0xUWaq94wuEObLIkYzmA= +github.com/Azure/azure-sdk-for-go/sdk/internal v0.9.2/go.mod h1:CdSJQNNzZhCkwDaV27XV1w48ZBPtxe7mlrZAsPNxD5g= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -18,18 +18,15 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.0.0-20210610132358-84b48f89b13b h1:k+E048sYJHyVnsr1GDrRZWQ32D2C7lWs9JRc0bel53A= -golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f h1:OfiFi4JbukWwe3lzw+xunroH1mnC1e2Gy5cxNJApiSY= +golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= -golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=