Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 51 additions & 14 deletions firestore/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,23 +36,26 @@ import (
// Instead, Firestore only guarantees that the result is the same as if the chained stages were
// executed in order.
type Pipeline struct {
c *Client
stages []pipelineStage
err error
c *Client
stages []pipelineStage
readSettings *readSettings
tx *Transaction
err error
}

func newPipeline(client *Client, initialStage pipelineStage) *Pipeline {
return &Pipeline{
c: client,
stages: []pipelineStage{initialStage},
c: client,
stages: []pipelineStage{initialStage},
readSettings: &readSettings{},
}
}

// Execute executes the pipeline and returns an iterator for streaming the results.
// TODO: Accept PipelineOptions
func (p *Pipeline) Execute(ctx context.Context) *PipelineResultIterator {
ctx = withResourceHeader(ctx, p.c.path())
ctx = withRequestParamsHeader(ctx, reqParamsHeaderVal(p.c.path()))

return &PipelineResultIterator{
iter: newStreamPipelineResultIterator(ctx, p),
}
Expand All @@ -67,9 +70,20 @@ func (p *Pipeline) toExecutePipelineRequest() (*pb.ExecutePipelineRequest, error
req := &pb.ExecutePipelineRequest{
Database: p.c.path(),
PipelineType: &pb.ExecutePipelineRequest_StructuredPipeline{
StructuredPipeline: &pb.StructuredPipeline{Pipeline: pipelinePb},
StructuredPipeline: &pb.StructuredPipeline{
Pipeline: pipelinePb,
},
},
// TODO: Add consistencyselector
}

// Note that transaction ID and other consistency selectors are mutually exclusive.
// We respect the transaction first, any read options passed by the caller second,
// and any read options stored in the client third.
if rt, hasOpts := parseReadTime(p.c, p.readSettings); hasOpts {
req.ConsistencySelector = &pb.ExecutePipelineRequest_ReadTime{ReadTime: rt}
}
if p.tx != nil {
req.ConsistencySelector = &pb.ExecutePipelineRequest_Transaction{Transaction: p.tx.id}
}
Comment thread
bhshkh marked this conversation as resolved.
Outdated
return req, nil
}
Expand All @@ -89,17 +103,40 @@ func (p *Pipeline) toProto() (*pb.Pipeline, error) {
return &pb.Pipeline{Stages: protoStages}, nil
}

func (p *Pipeline) copy() *Pipeline {
newP := &Pipeline{
c: p.c,
stages: make([]pipelineStage, len(p.stages)),
readSettings: &readSettings{},
tx: p.tx,
err: p.err,
}
copy(newP.stages, p.stages)
if p.readSettings != nil {
*newP.readSettings = *p.readSettings
}
Comment thread
bhshkh marked this conversation as resolved.
Outdated
return newP
}

// WithReadOptions specifies constraints for accessing documents from the database,
// such as ReadTime.
func (p *Pipeline) WithReadOptions(opts ...ReadOption) *Pipeline {
newP := p.copy()
for _, opt := range opts {
if opt != nil {
opt.apply(newP.readSettings)
}
}
return newP
}

// append creates a new Pipeline by adding a stage to the current one.
func (p *Pipeline) append(s pipelineStage) *Pipeline {
if p.err != nil {
return p
}
newP := &Pipeline{
c: p.c,
stages: make([]pipelineStage, len(p.stages)+1),
}
copy(newP.stages, p.stages)
newP.stages[len(p.stages)] = s
newP := p.copy()
newP.stages = append(newP.stages, s)
return newP
}

Expand Down
97 changes: 97 additions & 0 deletions firestore/pipeline_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package firestore

import (
"context"
"fmt"
"math"
"sort"
"strings"
Expand All @@ -29,6 +30,102 @@ import (
"google.golang.org/grpc/status"
)

func TestIntegration_PipelineExecute(t *testing.T) {
if testParams[firestoreEditionKey].(firestoreEdition) != editionEnterprise {
t.Skip("Skipping pipeline queries tests since the firestore edition of", testParams[databaseIDKey].(string), "database is not enterprise")
}
ctx := context.Background()
client := integrationClient(t)
coll := integrationColl(t)

t.Run("WithReadOptions", func(t *testing.T) {
timeBeforeCreate := time.Now()
doc1 := coll.NewDoc()
_, err := doc1.Create(ctx, map[string]interface{}{"a": 1})
if err != nil {
t.Fatal(err)
}

// Let a little time pass to ensure the next write has a later timestamp.
time.Sleep(1 * time.Millisecond)

doc2 := coll.NewDoc()
_, err = doc2.Create(ctx, map[string]interface{}{"a": 2})
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() {
deleteDocuments([]*DocumentRef{doc1, doc2})
})

iter := client.Pipeline().Collection(coll.ID).WithReadOptions(ReadTime(timeBeforeCreate)).Execute(ctx)
res, err := iter.GetAll()
if err != nil {
t.Fatal(err)
}
if len(res) != 0 {
t.Errorf("got %d documents, want 0", len(res))
}
})
t.Run("WithTransaction", func(t *testing.T) {
h := testHelper{t}
type Author struct {
Name string `firestore:"name"`
Country string `firestore:"country"`
}
type Book struct {
Title string `firestore:"title"`
Author `firestore:"author"`
Genre string `firestore:"genre"`
Published int `firestore:"published"`
Rating float64 `firestore:"rating"`
Tags []string `firestore:"tags"`
}
books := []Book{
{
Title: "The Hitchhiker's Guide to the Galaxy",
Author: Author{Name: "Douglas Adams", Country: "UK"},
Genre: "Science Fiction",
Published: 1979,
Rating: 4.2,
Tags: []string{"comedy", "space", "adventure"},
},
{
Title: "Pride and Prejudice",
Author: Author{Name: "Jane Austen", Country: "UK"},
Genre: "Romance",
Published: 1813,
Rating: 4.5,
Tags: []string{"classic", "social commentary", "love"},
},
}
var docRefs []*DocumentRef
for _, b := range books {
docRef := coll.NewDoc()
h.mustCreate(docRef, b)
docRefs = append(docRefs, docRef)
}
t.Cleanup(func() {
deleteDocuments(docRefs)
})
p := client.Pipeline().Collection(coll.ID)
err := client.RunTransaction(ctx, func(ctx context.Context, txn *Transaction) error {
iter := txn.Execute(p)
res, err := iter.GetAll()
if err != nil {
return err
}
if len(res) != len(books) {
return fmt.Errorf("got %d documents, want %d", len(res), len(books))
}
return nil
})
if err != nil {
t.Fatal(err)
}
})
}

func TestIntegration_PipelineStages(t *testing.T) {
if testParams[firestoreEditionKey].(firestoreEdition) != editionEnterprise {
t.Skip("Skipping pipeline queries tests since the firestore edition of", testParams[databaseIDKey].(string), "database is not enterprise")
Expand Down
11 changes: 11 additions & 0 deletions firestore/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,3 +353,14 @@ func (t *Transaction) WithReadOptions(opts ...ReadOption) *Transaction {
}
return t
}

// Execute runs the given pipeline in the context of the transaction.
func (t *Transaction) Execute(p *Pipeline) *PipelineResultIterator {
if len(t.writes) > 0 {
t.readAfterWrite = true
return &PipelineResultIterator{err: errReadAfterWrite}
}
p2 := p.copy()
p2.tx = t
return p2.Execute(t.ctx)
}
Loading