From 6c25c1e1562fa25e2c46f3dea0fb980a640a99d7 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Fri, 31 Oct 2025 23:02:01 +0000 Subject: [PATCH 1/4] feat(firestore): [PQ] add consistency selector --- firestore/integration_test.go | 4 +- firestore/pipeline.go | 65 +++++++--- firestore/pipeline_integration_test.go | 160 +++++++++++++++++++++++++ firestore/transaction.go | 11 ++ 4 files changed, 224 insertions(+), 16 deletions(-) diff --git a/firestore/integration_test.go b/firestore/integration_test.go index ab01ad6b2a62..a5c5b7dcb2f5 100644 --- a/firestore/integration_test.go +++ b/firestore/integration_test.go @@ -156,7 +156,7 @@ func initIntegrationTest() { }, }, } - copts := append(ti.CallOptions(), option.WithTokenSource(ts)) + copts := append(ti.CallOptions()) //, option.WithTokenSource(ts)) c, err := NewClientWithDatabase(ctx, testProjectID, databaseID, copts...) if err != nil { log.Fatalf("NewClient: %v", err) @@ -166,7 +166,7 @@ func initIntegrationTest() { adminCtx, cancel := context.WithTimeout(context.Background(), 15*time.Minute) defer cancel() - adminC, err := apiv1.NewFirestoreAdminClient(adminCtx, option.WithTokenSource(ts)) + adminC, err := apiv1.NewFirestoreAdminClient(adminCtx) //, option.WithTokenSource(ts)) if err != nil { log.Fatalf("NewFirestoreAdminClient: %v", err) } diff --git a/firestore/pipeline.go b/firestore/pipeline.go index 830eff99a611..994c71943fce 100644 --- a/firestore/pipeline.go +++ b/firestore/pipeline.go @@ -36,23 +36,26 @@ import ( // Instead, Firestore only guarantees that the result is the same as if the chained stages were // executed in order. type Pipeline struct { - c *Client - stages []pipelineStage - err error + c *Client + stages []pipelineStage + readSettings *readSettings + tx *Transaction + err error } func newPipeline(client *Client, initialStage pipelineStage) *Pipeline { return &Pipeline{ - c: client, - stages: []pipelineStage{initialStage}, + c: client, + stages: []pipelineStage{initialStage}, + readSettings: &readSettings{}, } } // Execute executes the pipeline and returns an iterator for streaming the results. -// TODO: Accept PipelineOptions func (p *Pipeline) Execute(ctx context.Context) *PipelineResultIterator { ctx = withResourceHeader(ctx, p.c.path()) ctx = withRequestParamsHeader(ctx, reqParamsHeaderVal(p.c.path())) + return &PipelineResultIterator{ iter: newStreamPipelineResultIterator(ctx, p), } @@ -67,9 +70,20 @@ func (p *Pipeline) toExecutePipelineRequest() (*pb.ExecutePipelineRequest, error req := &pb.ExecutePipelineRequest{ Database: p.c.path(), PipelineType: &pb.ExecutePipelineRequest_StructuredPipeline{ - StructuredPipeline: &pb.StructuredPipeline{Pipeline: pipelinePb}, + StructuredPipeline: &pb.StructuredPipeline{ + Pipeline: pipelinePb, + }, }, - // TODO: Add consistencyselector + } + + // Note that transaction ID and other consistency selectors are mutually exclusive. + // We respect the transaction first, any read options passed by the caller second, + // and any read options stored in the client third. + if rt, hasOpts := parseReadTime(p.c, p.readSettings); hasOpts { + req.ConsistencySelector = &pb.ExecutePipelineRequest_ReadTime{ReadTime: rt} + } + if p.tx != nil { + req.ConsistencySelector = &pb.ExecutePipelineRequest_Transaction{Transaction: p.tx.id} } return req, nil } @@ -89,17 +103,40 @@ func (p *Pipeline) toProto() (*pb.Pipeline, error) { return &pb.Pipeline{Stages: protoStages}, nil } +func (p *Pipeline) copy() *Pipeline { + newP := &Pipeline{ + c: p.c, + stages: make([]pipelineStage, len(p.stages)), + readSettings: &readSettings{}, + tx: p.tx, + err: p.err, + } + copy(newP.stages, p.stages) + if p.readSettings != nil { + *newP.readSettings = *p.readSettings + } + return newP +} + +// WithReadOptions specifies constraints for accessing documents from the database, +// such as ReadTime. +func (p *Pipeline) WithReadOptions(opts ...ReadOption) *Pipeline { + newP := p.copy() + for _, opt := range opts { + if opt != nil { + opt.apply(newP.readSettings) + } + } + return newP +} + // append creates a new Pipeline by adding a stage to the current one. func (p *Pipeline) append(s pipelineStage) *Pipeline { if p.err != nil { return p } - newP := &Pipeline{ - c: p.c, - stages: make([]pipelineStage, len(p.stages)+1), - } - copy(newP.stages, p.stages) - newP.stages[len(p.stages)] = s + newP := p.copy() + newP.stages = append(newP.stages, s) return newP } diff --git a/firestore/pipeline_integration_test.go b/firestore/pipeline_integration_test.go index f3936349ef0c..40d4a0b977e4 100644 --- a/firestore/pipeline_integration_test.go +++ b/firestore/pipeline_integration_test.go @@ -16,6 +16,7 @@ package firestore import ( "context" + "fmt" "math" "strings" "testing" @@ -27,6 +28,165 @@ import ( "google.golang.org/grpc/status" ) +func TestIntegration_PipelineExecute(t *testing.T) { + if testParams[firestoreEditionKey].(firestoreEdition) != editionEnterprise { + t.Skip("Skipping pipeline queries tests since the firestore edition of", testParams[databaseIDKey].(string), "database is not enterprise") + } + ctx := context.Background() + client := integrationClient(t) + coll := integrationColl(t) + h := testHelper{t} + type Author struct { + Name string `firestore:"name"` + Country string `firestore:"country"` + } + type Book struct { + Title string `firestore:"title"` + Author `firestore:"author"` + Genre string `firestore:"genre"` + Published int `firestore:"published"` + Rating float64 `firestore:"rating"` + Tags []string `firestore:"tags"` + } + books := []Book{ + { + Title: "The Hitchhiker's Guide to the Galaxy", + Author: Author{Name: "Douglas Adams", Country: "UK"}, + Genre: "Science Fiction", + Published: 1979, + Rating: 4.2, + Tags: []string{"comedy", "space", "adventure"}, + }, + { + Title: "Pride and Prejudice", + Author: Author{Name: "Jane Austen", Country: "UK"}, + Genre: "Romance", + Published: 1813, + Rating: 4.5, + Tags: []string{"classic", "social commentary", "love"}, + }, + { + Title: "One Hundred Years of Solitude", + Author: Author{Name: "Gabriel García Márquez", Country: "Colombia"}, + Genre: "Magical Realism", + Published: 1967, + Rating: 4.3, + Tags: []string{"family", "history", "fantasy"}, + }, + { + Title: "The Lord of the Rings", + Author: Author{Name: "J.R.R. Tolkien", Country: "UK"}, + Genre: "Fantasy", + Published: 1954, + Rating: 4.7, + Tags: []string{"adventure", "magic", "epic"}, + }, + { + Title: "The Handmaid's Tale", + Author: Author{Name: "Margaret Atwood", Country: "Canada"}, + Genre: "Dystopian", + Published: 1985, + Rating: 4.1, + Tags: []string{"feminism", "totalitarianism", "resistance"}, + }, + { + Title: "Crime and Punishment", + Author: Author{Name: "Fyodor Dostoevsky", Country: "Russia"}, + Genre: "Psychological Thriller", + Published: 1866, + Rating: 4.3, + Tags: []string{"philosophy", "crime", "redemption"}, + }, + { + Title: "To Kill a Mockingbird", + Author: Author{Name: "Harper Lee", Country: "USA"}, + Genre: "Southern Gothic", + Published: 1960, + Rating: 4.2, + Tags: []string{"racism", "injustice", "coming-of-age"}, + }, + { + Title: "1984", + Author: Author{Name: "George Orwell", Country: "UK"}, + Genre: "Dystopian", + Published: 1949, + Rating: 4.2, + Tags: []string{"surveillance", "totalitarianism", "propaganda"}, + }, + { + Title: "The Great Gatsby", + Author: Author{Name: "F. Scott Fitzgerald", Country: "USA"}, + Genre: "Modernist", + Published: 1925, + Rating: 4.0, + Tags: []string{"wealth", "american dream", "love"}, + }, + { + Title: "Dune", + Author: Author{Name: "Frank Herbert", Country: "USA"}, + Genre: "Science Fiction", + Published: 1965, + Rating: 4.6, + Tags: []string{"politics", "desert", "ecology"}, + }, + } + timeBeforeCreate := time.Now().Add(-time.Minute) + var docRefs []*DocumentRef + for _, b := range books { + docRef := coll.NewDoc() + h.mustCreate(docRef, b) + docRefs = append(docRefs, docRef) + } + t.Cleanup(func() { + deleteDocuments(docRefs) + }) + t.Run("WithReadOptions", func(t *testing.T) { + doc1 := coll.NewDoc() + _, err := doc1.Create(ctx, map[string]interface{}{"a": 1}) + if err != nil { + t.Fatal(err) + } + + // Let a little time pass to ensure the next write has a later timestamp. + time.Sleep(1 * time.Millisecond) + + doc2 := coll.NewDoc() + _, err = doc2.Create(ctx, map[string]interface{}{"a": 2}) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { + deleteDocuments([]*DocumentRef{doc1, doc2}) + }) + + iter := client.Pipeline().Collection(coll.ID).WithReadOptions(ReadTime(timeBeforeCreate)).Execute(ctx) + res, err := iter.GetAll() + if err != nil { + t.Fatal(err) + } + if len(res) != 0 { + t.Errorf("got %d documents, want 0", len(res)) + } + }) + t.Run("WithTransaction", func(t *testing.T) { + p := client.Pipeline().Collection(coll.ID) + err := client.RunTransaction(ctx, func(ctx context.Context, txn *Transaction) error { + iter := txn.Execute(p) + res, err := iter.GetAll() + if err != nil { + return err + } + if len(res) != len(books) { + return fmt.Errorf("got %d documents, want %d", len(res), len(books)) + } + return nil + }) + if err != nil { + t.Fatal(err) + } + }) +} + func TestIntegration_PipelineFunctions(t *testing.T) { if testParams[firestoreEditionKey].(firestoreEdition) != editionEnterprise { t.Skip("Skipping pipeline queries tests since the firestore edition of", testParams[databaseIDKey].(string), "database is not enterprise") diff --git a/firestore/transaction.go b/firestore/transaction.go index 00a59c9fa327..c8270141afeb 100644 --- a/firestore/transaction.go +++ b/firestore/transaction.go @@ -353,3 +353,14 @@ func (t *Transaction) WithReadOptions(opts ...ReadOption) *Transaction { } return t } + +// Execute runs the given pipeline in the context of the transaction. +func (t *Transaction) Execute(p *Pipeline) *PipelineResultIterator { + if len(t.writes) > 0 { + t.readAfterWrite = true + return &PipelineResultIterator{err: errReadAfterWrite} + } + p2 := p.copy() + p2.tx = t + return p2.Execute(t.ctx) +} From b68bbb2f717f7b5a7f61bb4d71a6c87d0491dadc Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Fri, 31 Oct 2025 23:10:11 +0000 Subject: [PATCH 2/4] remove commented code --- firestore/integration_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/firestore/integration_test.go b/firestore/integration_test.go index 1118a6b611da..974e06c732a5 100644 --- a/firestore/integration_test.go +++ b/firestore/integration_test.go @@ -156,7 +156,7 @@ func initIntegrationTest() { }, }, } - copts := append(ti.CallOptions()) //, option.WithTokenSource(ts)) + copts := append(ti.CallOptions(), option.WithTokenSource(ts)) c, err := NewClientWithDatabase(ctx, testProjectID, databaseID, copts...) if err != nil { log.Fatalf("NewClient: %v", err) @@ -166,7 +166,7 @@ func initIntegrationTest() { adminCtx, cancel := context.WithTimeout(context.Background(), 15*time.Minute) defer cancel() - adminC, err := apiv1.NewFirestoreAdminClient(adminCtx) //, option.WithTokenSource(ts)) + adminC, err := apiv1.NewFirestoreAdminClient(adminCtx, option.WithTokenSource(ts)) if err != nil { log.Fatalf("NewFirestoreAdminClient: %v", err) } From 04f29cc2c93a73fef263c105e48a3f3fbb4c6893 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Fri, 31 Oct 2025 23:15:42 +0000 Subject: [PATCH 3/4] simplify tests --- firestore/pipeline_integration_test.go | 147 +++++++------------------ 1 file changed, 42 insertions(+), 105 deletions(-) diff --git a/firestore/pipeline_integration_test.go b/firestore/pipeline_integration_test.go index b47fcaaffa6e..decf53f859d4 100644 --- a/firestore/pipeline_integration_test.go +++ b/firestore/pipeline_integration_test.go @@ -37,112 +37,9 @@ func TestIntegration_PipelineExecute(t *testing.T) { ctx := context.Background() client := integrationClient(t) coll := integrationColl(t) - h := testHelper{t} - type Author struct { - Name string `firestore:"name"` - Country string `firestore:"country"` - } - type Book struct { - Title string `firestore:"title"` - Author `firestore:"author"` - Genre string `firestore:"genre"` - Published int `firestore:"published"` - Rating float64 `firestore:"rating"` - Tags []string `firestore:"tags"` - } - books := []Book{ - { - Title: "The Hitchhiker's Guide to the Galaxy", - Author: Author{Name: "Douglas Adams", Country: "UK"}, - Genre: "Science Fiction", - Published: 1979, - Rating: 4.2, - Tags: []string{"comedy", "space", "adventure"}, - }, - { - Title: "Pride and Prejudice", - Author: Author{Name: "Jane Austen", Country: "UK"}, - Genre: "Romance", - Published: 1813, - Rating: 4.5, - Tags: []string{"classic", "social commentary", "love"}, - }, - { - Title: "One Hundred Years of Solitude", - Author: Author{Name: "Gabriel García Márquez", Country: "Colombia"}, - Genre: "Magical Realism", - Published: 1967, - Rating: 4.3, - Tags: []string{"family", "history", "fantasy"}, - }, - { - Title: "The Lord of the Rings", - Author: Author{Name: "J.R.R. Tolkien", Country: "UK"}, - Genre: "Fantasy", - Published: 1954, - Rating: 4.7, - Tags: []string{"adventure", "magic", "epic"}, - }, - { - Title: "The Handmaid's Tale", - Author: Author{Name: "Margaret Atwood", Country: "Canada"}, - Genre: "Dystopian", - Published: 1985, - Rating: 4.1, - Tags: []string{"feminism", "totalitarianism", "resistance"}, - }, - { - Title: "Crime and Punishment", - Author: Author{Name: "Fyodor Dostoevsky", Country: "Russia"}, - Genre: "Psychological Thriller", - Published: 1866, - Rating: 4.3, - Tags: []string{"philosophy", "crime", "redemption"}, - }, - { - Title: "To Kill a Mockingbird", - Author: Author{Name: "Harper Lee", Country: "USA"}, - Genre: "Southern Gothic", - Published: 1960, - Rating: 4.2, - Tags: []string{"racism", "injustice", "coming-of-age"}, - }, - { - Title: "1984", - Author: Author{Name: "George Orwell", Country: "UK"}, - Genre: "Dystopian", - Published: 1949, - Rating: 4.2, - Tags: []string{"surveillance", "totalitarianism", "propaganda"}, - }, - { - Title: "The Great Gatsby", - Author: Author{Name: "F. Scott Fitzgerald", Country: "USA"}, - Genre: "Modernist", - Published: 1925, - Rating: 4.0, - Tags: []string{"wealth", "american dream", "love"}, - }, - { - Title: "Dune", - Author: Author{Name: "Frank Herbert", Country: "USA"}, - Genre: "Science Fiction", - Published: 1965, - Rating: 4.6, - Tags: []string{"politics", "desert", "ecology"}, - }, - } - timeBeforeCreate := time.Now().Add(-time.Minute) - var docRefs []*DocumentRef - for _, b := range books { - docRef := coll.NewDoc() - h.mustCreate(docRef, b) - docRefs = append(docRefs, docRef) - } - t.Cleanup(func() { - deleteDocuments(docRefs) - }) + t.Run("WithReadOptions", func(t *testing.T) { + timeBeforeCreate := time.Now() doc1 := coll.NewDoc() _, err := doc1.Create(ctx, map[string]interface{}{"a": 1}) if err != nil { @@ -171,6 +68,46 @@ func TestIntegration_PipelineExecute(t *testing.T) { } }) t.Run("WithTransaction", func(t *testing.T) { + h := testHelper{t} + type Author struct { + Name string `firestore:"name"` + Country string `firestore:"country"` + } + type Book struct { + Title string `firestore:"title"` + Author `firestore:"author"` + Genre string `firestore:"genre"` + Published int `firestore:"published"` + Rating float64 `firestore:"rating"` + Tags []string `firestore:"tags"` + } + books := []Book{ + { + Title: "The Hitchhiker's Guide to the Galaxy", + Author: Author{Name: "Douglas Adams", Country: "UK"}, + Genre: "Science Fiction", + Published: 1979, + Rating: 4.2, + Tags: []string{"comedy", "space", "adventure"}, + }, + { + Title: "Pride and Prejudice", + Author: Author{Name: "Jane Austen", Country: "UK"}, + Genre: "Romance", + Published: 1813, + Rating: 4.5, + Tags: []string{"classic", "social commentary", "love"}, + }, + } + var docRefs []*DocumentRef + for _, b := range books { + docRef := coll.NewDoc() + h.mustCreate(docRef, b) + docRefs = append(docRefs, docRef) + } + t.Cleanup(func() { + deleteDocuments(docRefs) + }) p := client.Pipeline().Collection(coll.ID) err := client.RunTransaction(ctx, func(ctx context.Context, txn *Transaction) error { iter := txn.Execute(p) From 7e74dbf71f90c9767c92e3b4d460224799bb873f Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Fri, 31 Oct 2025 23:43:05 +0000 Subject: [PATCH 4/4] refactor --- firestore/pipeline.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/firestore/pipeline.go b/firestore/pipeline.go index 994c71943fce..b9e724ea9fd7 100644 --- a/firestore/pipeline.go +++ b/firestore/pipeline.go @@ -79,11 +79,10 @@ func (p *Pipeline) toExecutePipelineRequest() (*pb.ExecutePipelineRequest, error // Note that transaction ID and other consistency selectors are mutually exclusive. // We respect the transaction first, any read options passed by the caller second, // and any read options stored in the client third. - if rt, hasOpts := parseReadTime(p.c, p.readSettings); hasOpts { - req.ConsistencySelector = &pb.ExecutePipelineRequest_ReadTime{ReadTime: rt} - } if p.tx != nil { req.ConsistencySelector = &pb.ExecutePipelineRequest_Transaction{Transaction: p.tx.id} + } else if rt, hasOpts := parseReadTime(p.c, p.readSettings); hasOpts { + req.ConsistencySelector = &pb.ExecutePipelineRequest_ReadTime{ReadTime: rt} } return req, nil } @@ -112,9 +111,7 @@ func (p *Pipeline) copy() *Pipeline { err: p.err, } copy(newP.stages, p.stages) - if p.readSettings != nil { - *newP.readSettings = *p.readSettings - } + *newP.readSettings = *p.readSettings return newP }