Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion sdk/data/azcosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
### Features Added

* Adds support for float 16 datatype for vector embedding policy. See [PR 25707](https://github.com/Azure/azure-sdk-for-go/pull/25707)
* Improved the performance of the built-in ReadMany implementation (when no QueryEngine is present). See [PR 26007](https://github.com/Azure/azure-sdk-for-go/pull/26007)
* Improved the performance of the built-in ReadMany implementation. See [PR 26007](https://github.com/Azure/azure-sdk-for-go/pull/26007)

### Breaking Changes

* Removed `QueryEngine` field from `ReadManyOptions`. ReadMany now always uses the built-in Go-native implementation.

### Bugs Fixed

### Other Changes
Expand Down
10 changes: 0 additions & 10 deletions sdk/data/azcosmos/cosmos_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,10 +468,6 @@ func (c *ContainerClient) ReadManyItems(
return ReadManyItemsResponse{}, errors.New("item identity at index " + fmt.Sprint(i) + " has an empty ID")
}
}
correlatedActivityId, _ := uuid.New()
h := headerOptionsOverride{
correlatedActivityId: &correlatedActivityId,
}

readManyOptions := &ReadManyOptions{}
if o != nil {
Expand All @@ -484,12 +480,6 @@ func (c *ContainerClient) ReadManyItems(
resourceAddress: c.link,
}

if readManyOptions.QueryEngine != nil {
// use correlated activity id header for read many queries
operationContext.headerOptionsOverride = &h
return c.executeReadManyWithEngine(readManyOptions.QueryEngine, itemIdentities, readManyOptions, operationContext, ctx)
}

return c.executeReadManyWithQueries(ctx, itemIdentities, readManyOptions, operationContext)
}

Expand Down
89 changes: 0 additions & 89 deletions sdk/data/azcosmos/cosmos_container_read_many.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,97 +10,8 @@ import (
"sync"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos/queryengine"
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
)

// executeReadManyWithEngine executes a query using the provided query engine.
func (c *ContainerClient) executeReadManyWithEngine(queryEngine queryengine.QueryEngine, items []ItemIdentity, readManyOptions *ReadManyOptions, operationContext pipelineRequestOptions, ctx context.Context) (ReadManyItemsResponse, error) {
path, err := generatePathForNameBased(resourceTypeDocument, operationContext.resourceAddress, true)
if err != nil {
// We are specifying the resource type here, so this shouldn't fail. Still, we can't crash the process on a panic, so we return an error instead.
return ReadManyItemsResponse{}, errors.New("invalid resource address in operation context: " + operationContext.resourceAddress)
}

// get the partition key ranges for the container
rawPartitionKeyRanges, err := c.getPartitionKeyRangesRaw(ctx, operationContext)
if err != nil {
// if we can't get the partition key ranges, return empty response
return ReadManyItemsResponse{}, err
}

// get the container properties
containerRsp, err := c.Read(ctx, nil)
if err != nil {
return ReadManyItemsResponse{}, err
}

// create the item identities for the query engine with json string
newItemIdentities := make([]queryengine.ItemIdentity, len(items))
for i := range items {
pkStr, err := items[i].PartitionKey.toJsonString()
if err != nil {
return ReadManyItemsResponse{}, err
}
newItemIdentities[i] = queryengine.ItemIdentity{
PartitionKeyValue: pkStr,
ID: items[i].ID,
}
}
var pkVersion uint8
pkDefinition := containerRsp.ContainerProperties.PartitionKeyDefinition
if pkDefinition.Version == 0 {
pkVersion = uint8(1)
} else {
pkVersion = uint8(pkDefinition.Version)
}

readManyPipeline, err := queryEngine.CreateReadManyPipeline(newItemIdentities, string(rawPartitionKeyRanges), string(pkDefinition.Kind), pkVersion, pkDefinition.Paths)
if err != nil {
return ReadManyItemsResponse{}, err
}
log.Writef(EventQueryEngine, "Created readMany pipeline")
// Initial run to get any requests.
log.Writef(EventQueryEngine, "Fetching more data from readMany pipeline")
result, err := readManyPipeline.Run()
if err != nil {
readManyPipeline.Close()
return ReadManyItemsResponse{}, err
}

concurrency := determineConcurrency(nil)
if readManyOptions != nil {
concurrency = determineConcurrency(readManyOptions.MaxConcurrency)
}
totalRequestCharge, err := runEngineRequests(ctx, c, path, readManyPipeline, operationContext, result.Requests, concurrency, func(req queryengine.QueryRequest) (string, []QueryParameter, bool) {
// ReadMany pipeline requests carry a Query (optional override). No parameters and we always page until continuation exhausted.
return req.Query, nil, true /* treat like drain for full pagination */
})
if err != nil {
readManyPipeline.Close()
return ReadManyItemsResponse{}, err
}

// Final run to gather merged items.
result, err = readManyPipeline.Run()
if err != nil {
readManyPipeline.Close()
return ReadManyItemsResponse{}, err
}

if readManyPipeline.IsComplete() {
log.Writef(EventQueryEngine, "ReadMany pipeline is complete")
readManyPipeline.Close()
return ReadManyItemsResponse{
Items: result.Items,
RequestCharge: totalRequestCharge,
}, nil
} else {
readManyPipeline.Close()
return ReadManyItemsResponse{}, errors.New("illegal state readMany pipeline did not complete")
}
}

const maxItemsPerQuery = 1000

// queryChunk is a single parameterized query targeting one physical partition key range.
Expand Down
5 changes: 0 additions & 5 deletions sdk/data/azcosmos/cosmos_read_many_request_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ package azcosmos

import (
"strconv"

"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos/queryengine"
)

// ReadManyOptions includes options for read many operations on items.
Expand All @@ -24,9 +22,6 @@ type ReadManyOptions struct {
ConsistencyLevel *ConsistencyLevel
// Options for operations in the dedicated gateway.
DedicatedGatewayRequestOptions *DedicatedGatewayRequestOptions
// QueryEngine can be set to enable the use of an external query engine for processing cross-partition queries.
// This is a preview feature, which is NOT SUPPORTED in production, and is subject to breaking changes.
QueryEngine queryengine.QueryEngine
// MaxConcurrency indicates the maximum number of concurrent operations to use when reading many items.
// If not set, the SDK will determine an optimal number of concurrent operations to use.
MaxConcurrency *int32
Expand Down
51 changes: 0 additions & 51 deletions sdk/data/azcosmos/emulator_cosmos_read_many_items_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"
"testing"

"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos/internal/mock"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -142,56 +141,6 @@ func TestReadMany_PartialFailure(t *testing.T) {

}

func TestReadMany_WithQueryEngine_EmptyItems(t *testing.T) {
emulator := newEmulatorTests(t)
client := emulator.getClient(t, newSpanValidator(t, &spanMatcher{ExpectedSpans: []string{}}))
db := emulator.createDatabase(t, context.Background(), client, "rm_qeng_empty_db")
defer emulator.deleteDatabase(t, context.Background(), db)

container, err := db.NewContainer("c")
require.NoError(t, err)

// call ReadMany with empty list and a mock engine
options := &ReadManyOptions{QueryEngine: mock.NewMockQueryEngine()}
resp, err := container.ReadManyItems(context.Background(), []ItemIdentity{}, options)
require.NoError(t, err)
require.Empty(t, resp.Items)
}

func TestReadMany_WithQueryEngine_ReturnsItems(t *testing.T) {
emulator := newEmulatorTests(t)
client := emulator.getClient(t, newSpanValidator(t, &spanMatcher{ExpectedSpans: []string{}}))
db := emulator.createDatabase(t, context.Background(), client, "rm_qeng_db")
defer emulator.deleteDatabase(t, context.Background(), db)

// create container and some items
_, err := db.CreateContainer(context.Background(), ContainerProperties{ID: "c", PartitionKeyDefinition: PartitionKeyDefinition{
Paths: []string{"/pk"},
}}, nil)
require.NoError(t, err)
container, err := db.NewContainer("c")
require.NoError(t, err)

// insert two items
for i := 0; i < 2; i++ {
itm := map[string]string{"id": fmt.Sprintf("%d", i), "pk": fmt.Sprintf("pk_%d", i)}
b, err := json.Marshal(itm)
require.NoError(t, err)
_, err = container.CreateItem(context.Background(), NewPartitionKeyString(itm["pk"]), b, nil)
require.NoError(t, err)
}

// Build item identities to ask for
idents := []ItemIdentity{{ID: "0", PartitionKey: NewPartitionKeyString("pk_0")}, {ID: "1", PartitionKey: NewPartitionKeyString("pk_1")}}

// Use the mock query engine which will echo these identities as documents
options := &ReadManyOptions{QueryEngine: mock.NewMockQueryEngine()}
resp, err := container.ReadManyItems(context.Background(), idents, options)
require.NoError(t, err)
// Expect two items per engine's behavior
require.Equal(t, 2, len(resp.Items))
}

// TestReadManyWithQueries_MultipleLogicalPKs exercises the query-based read-many
// path with items that have distinct logical partition key values. On the emulator
// (single physical range) this validates that per-logical-PK query routing works.
Expand Down
55 changes: 5 additions & 50 deletions sdk/data/azcosmos/internal/mock/mock_query_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,56 +79,11 @@ func (m *MockQueryEngine) CreateQueryPipeline(query string, plan string, pkrange
return newMockQueryPipeline(query, ranges.PartitionKeyRanges, cfg), nil
}

// CreateReadManyPipeline creates a read-many pipeline which returns the provided item identities
// serialized as JSON documents. This is a simplified pipeline used by tests to exercise the
// SDK's ReadMany->QueryEngine glue without making network calls for each item.
func (m *MockQueryEngine) CreateReadManyPipeline(items []queryengine.ItemIdentity, pkranges string, pkKind string, pkVersion uint8, pkPaths []string) (queryengine.QueryPipeline, error) {
return &MockReadManyPipeline{items: items, completed: false, resultingItems: make([][]byte, 0, len(items))}, nil
}

// MockReadManyPipeline is a minimal QueryPipeline implementation for ReadMany tests.
type MockReadManyPipeline struct {
items []queryengine.ItemIdentity
completed bool
resultingItems [][]byte
}

func (m *MockReadManyPipeline) Close() {
m.completed = true
}

func (m *MockReadManyPipeline) IsComplete() bool {
return m.completed
}

func (m *MockReadManyPipeline) Run() (*queryengine.PipelineResult, error) {
if m.IsComplete() {
return &queryengine.PipelineResult{IsCompleted: true, Items: m.resultingItems, Requests: nil}, nil
}
// first run return queries to execute
requests := make([]queryengine.QueryRequest, 0, len(m.items))
for i := range m.items {
pk := m.items[i].PartitionKeyValue
createQuery := fmt.Sprintf("Select * from c where c.id = '%s' and c.pk = '%s'", m.items[i].ID, pk)
requests = append(requests, queryengine.QueryRequest{
Query: createQuery,
})
}

// second run return result
m.completed = true
return &queryengine.PipelineResult{IsCompleted: true, Items: nil, Requests: requests}, nil
}

func (m *MockReadManyPipeline) ProvideData(data []queryengine.QueryResult) error {
for _, res := range data {
m.resultingItems = append(m.resultingItems, res.Data)
}
return nil
}

func (m *MockReadManyPipeline) Query() string {
return ""
// CreateReadManyPipeline satisfies the QueryEngine interface. The SDK no longer
// calls this method for ReadMany operations, but the interface still declares it
// for backward compatibility with external implementations.
func (m *MockQueryEngine) CreateReadManyPipeline(_ []queryengine.ItemIdentity, _ string, _ string, _ uint8, _ []string) (queryengine.QueryPipeline, error) {
return nil, fmt.Errorf("CreateReadManyPipeline is not supported by MockQueryEngine")
}

func (m *MockQueryEngine) SupportedFeatures() string {
Expand Down