From 0d5d1f301db0f2557909c75e0b2e53af741f78d6 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Wed, 7 May 2025 21:30:43 +0000 Subject: [PATCH 1/3] feat(firestore): add pipeline queries --- firestore/client.go | 5 + firestore/pipeline.go | 100 ++++++++ firestore/pipeline_result.go | 275 ++++++++++++++++++++++ firestore/pipeline_result_test.go | 371 ++++++++++++++++++++++++++++++ firestore/pipeline_source.go | 26 +++ firestore/pipeline_source_test.go | 52 +++++ firestore/pipeline_stage.go | 63 +++++ firestore/pipeline_test.go | 152 ++++++++++++ 8 files changed, 1044 insertions(+) create mode 100644 firestore/pipeline.go create mode 100644 firestore/pipeline_result.go create mode 100644 firestore/pipeline_result_test.go create mode 100644 firestore/pipeline_source.go create mode 100644 firestore/pipeline_source_test.go create mode 100644 firestore/pipeline_stage.go create mode 100644 firestore/pipeline_test.go diff --git a/firestore/client.go b/firestore/client.go index 1cad80581c7c..49c76d1a5d37 100644 --- a/firestore/client.go +++ b/firestore/client.go @@ -166,6 +166,11 @@ func withRequestParamsHeader(ctx context.Context, requestParams string) context. return metadata.NewOutgoingContext(ctx, md) } +// Pipeline creates a PipelineSource to start building a Firestore pipeline. +func (c *Client) Pipeline() *PipelineSource { + return &PipelineSource{client: c} +} + // Collection creates a reference to a collection with the given path. // A path is a sequence of IDs separated by slashes. // diff --git a/firestore/pipeline.go b/firestore/pipeline.go new file mode 100644 index 000000000000..422a4e8e9d32 --- /dev/null +++ b/firestore/pipeline.go @@ -0,0 +1,100 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package firestore + +import ( + "context" + "fmt" + + pb "cloud.google.com/go/firestore/apiv1/firestorepb" +) + +// Pipeline class provides a flexible and expressive framework for building complex data +// transformation and query pipelines for Firestore. + +// A pipeline takes data sources, such as Firestore collections or collection groups, and applies +// a series of stages that are chained together. Each stage takes the output from the previous stage +// (or the data source) and produces an output for the next stage (or as the final output of the +// pipeline). + +// Expressions can be used within +// each stages to filter and transform data through the stage. + +// NOTE: The chained stages do not prescribe exactly how Firestore will execute the pipeline. +// Instead, Firestore only guarantees that the result is the same as if the chained stages were +// executed in order. +type Pipeline struct { + c *Client + stages []pipelineStage + err error +} + +func newPipeline(client *Client, initialStage pipelineStage) *Pipeline { + return &Pipeline{ + c: client, + stages: []pipelineStage{initialStage}, + } +} + +// Execute executes the pipeline and returns an iterator for streaming the results. +// TODO: Accept PipelineOptions +func (p *Pipeline) Execute(ctx context.Context) *PipelineResultIterator { + return &PipelineResultIterator{ + iter: newStreamPipelineResultIterator(ctx, p), + } +} + +func (p *Pipeline) toExecutePipelineRequest() (*pb.ExecutePipelineRequest, error) { + protoStages := make([]*pb.Pipeline_Stage, len(p.stages)) + for i, s := range p.stages { + ps, err := s.toProto() + if err != nil { + return nil, fmt.Errorf("firestore: error converting stage %q to proto: %w", s.name(), err) + } + protoStages[i] = ps + } + + req := &pb.ExecutePipelineRequest{ + Database: p.c.path(), + PipelineType: &pb.ExecutePipelineRequest_StructuredPipeline{ + StructuredPipeline: &pb.StructuredPipeline{ + Pipeline: &pb.Pipeline{ + Stages: protoStages, + }, + }, + }, + // TODO: Add consistencyselector + } + return req, nil +} + +// append creates a new Pipeline by adding a stage to the current one. +func (p *Pipeline) append(s pipelineStage) *Pipeline { + if p.err != nil { + return p + } + newP := &Pipeline{ + c: p.c, + stages: make([]pipelineStage, len(p.stages)+1), + } + copy(newP.stages, p.stages) + newP.stages[len(p.stages)] = s + return newP +} + +// Limit limits the maximum number of documents returned by previous stages. +func (p *Pipeline) Limit(limit int) *Pipeline { + return p.append(newLimitStage(limit)) +} diff --git a/firestore/pipeline_result.go b/firestore/pipeline_result.go new file mode 100644 index 000000000000..3f960b6b47f1 --- /dev/null +++ b/firestore/pipeline_result.go @@ -0,0 +1,275 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package firestore + +import ( + "context" + "errors" + "fmt" + "io" + "time" + + pb "cloud.google.com/go/firestore/apiv1/firestorepb" + "cloud.google.com/go/internal/trace" + "google.golang.org/api/iterator" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// PipelineResult is a result returned from executing a pipeline. +type PipelineResult struct { + // Ref is the DocumentRef for this result. It may be nil if the result + // does not correspond to a specific Firestore document (e.g., an aggregation result + // without grouping, or a synthetic document from a stage). + Ref *DocumentRef + + // CreateTime is the time at which the document was created. + // It may be nil if the result does not correspond to a specific Firestore document + CreateTime *time.Time + + // UpdateTime is the time at which the document was last changed. + // It may be nil if the result does not correspond to a specific Firestore document + UpdateTime *time.Time + + // ExecutionTime is the time at which the document(s) were read. + ExecutionTime *time.Time + + c *Client + proto *pb.Document +} + +func newPipelineResult(ref *DocumentRef, proto *pb.Document, c *Client, executionTime *timestamppb.Timestamp) (*PipelineResult, error) { + pr := &PipelineResult{ + Ref: ref, + c: c, + proto: proto, + } + if proto != nil { + if proto.GetCreateTime() != nil { + if err := proto.GetCreateTime().CheckValid(); err != nil { + return nil, err + } + createTime := proto.GetCreateTime().AsTime() + pr.CreateTime = &createTime + } + if proto.GetUpdateTime() != nil { + if err := proto.GetUpdateTime().CheckValid(); err != nil { + return nil, err + } + updateTime := proto.GetUpdateTime().AsTime() + pr.UpdateTime = &updateTime + } + } + if executionTime != nil { + if err := executionTime.CheckValid(); err != nil { + return nil, err + } + execTime := executionTime.AsTime() + pr.ExecutionTime = &execTime + } + return pr, nil +} + +// Exists reports whether the PipelineResult represents a document. +// Even if Exists returns false, the ExecutionTime field of the PipelineResult +// is valid. +func (p *PipelineResult) Exists() bool { + return p.proto != nil +} + +// Data returns the PipelineResult's fields as a map. +// It is equivalent to +// +// var m map[string]interface{} +// p.DataTo(&m) +// +// except that it returns nil if the document does not exist. +func (p *PipelineResult) Data() map[string]interface{} { + if !p.Exists() { + return nil + } + m, err := createMapFromValueMap(p.proto.Fields, p.c) + // Any error here is a bug in the client. + if err != nil { + panic(fmt.Sprintf("firestore: %v", err)) + } + return m +} + +// DataTo uses the PipelineResult's fields to populate v, which can be a pointer to a +// map[string]interface{} or a pointer to a struct. +// This is similar to [DocumentSnapshot.DataTo] +func (p *PipelineResult) DataTo(v interface{}) error { + if !p.Exists() { + return status.Errorf(codes.NotFound, "document does not exist") + } + return setFromProtoValue(v, &pb.Value{ValueType: &pb.Value_MapValue{MapValue: &pb.MapValue{Fields: p.proto.Fields}}}, p.c) +} + +// PipelineResultIterator is an iterator over PipelineResults from a pipeline execution. +type PipelineResultIterator struct { + iter pipelineResultIteratorInternal + err error // Stores sticky error from Next() or construction +} + +// Next returns the next result. Its second return value is iterator.Done if there +// are no more results. Once Next returns Done, all subsequent calls will return +// Done. +func (it *PipelineResultIterator) Next() (*PipelineResult, error) { + if it.err != nil { + return nil, it.err + } + if it.iter == nil { // Iterator was stopped or not initialized + return nil, iterator.Done + } + + pr, err := it.iter.next() + if err != nil { + it.err = err // Store sticky error + } + return pr, err +} + +// Stop stops the iterator, freeing its resources. +// Always call Stop when you are done with a DocumentIterator. +// It is not safe to call Stop concurrently with Next. +func (it *PipelineResultIterator) Stop() { + if it.iter != nil { + it.iter.stop() + } + // Set a sticky error indicating the iterator is now done if not already errored. + if it.err == nil { + it.err = iterator.Done + } +} + +// GetAll returns all the documents remaining from the iterator. +// It is not necessary to call Stop on the iterator after calling GetAll. +func (it *PipelineResultIterator) GetAll() ([]*PipelineResult, error) { + if it.err != nil { + return nil, it.err + } + defer it.Stop() + + var results []*PipelineResult + for { + pr, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + return results, err + } + results = append(results, pr) + } + return results, nil +} + +// pipelineResultIteratorInternal is an unexported interface defining the core iteration logic. +type pipelineResultIteratorInternal interface { + next() (*PipelineResult, error) + stop() +} + +// streamPipelineResultIterator is the concrete implementation for gRPC streaming of pipeline results. +type streamPipelineResultIterator struct { + ctx context.Context + cancel func() + p *Pipeline + streamClient pb.Firestore_ExecutePipelineClient + currResp *pb.ExecutePipelineResponse + currRespResultsIdx int +} + +func newStreamPipelineResultIterator(ctx context.Context, p *Pipeline) *streamPipelineResultIterator { + ctx, cancel := context.WithCancel(ctx) + return &streamPipelineResultIterator{ + ctx: ctx, + cancel: cancel, + p: p, + } +} + +// Each ExecutePipelineResponse received from Firestore service contains a list of Documents +// On each next() call, return a single document. +func (it *streamPipelineResultIterator) next() (_ *PipelineResult, err error) { + client := it.p.c + + // streamClient is initialized on first next call + if it.streamClient == nil { + it.ctx = trace.StartSpan(it.ctx, "cloud.google.com/go/firestore.ExecutePipeline") + defer func() { + if errors.Is(err, iterator.Done) { + trace.EndSpan(it.ctx, nil) + } else { + trace.EndSpan(it.ctx, err) + } + }() + req, err := it.p.toExecutePipelineRequest() + if err != nil { + return nil, err + } + it.streamClient, err = client.c.ExecutePipeline(it.ctx, req) + if err != nil { + return nil, err + } + } + + // If the current response is nil or all its results have been processed, + // receive the next response from the stream. + if it.currResp == nil || it.currRespResultsIdx >= len(it.currResp.GetResults()) { + var res *pb.ExecutePipelineResponse + for { + res, err = it.streamClient.Recv() + if err == io.EOF { + return nil, iterator.Done + } + if err != nil { + return nil, err + } + if res.GetResults() != nil { + it.currResp = res + it.currRespResultsIdx = 0 + break + } + // No results => partial progress; keep receiving + // TODO: Set ExplainStats + } + } + + // Get the next document proto from the current response. + docProto := it.currResp.GetResults()[it.currRespResultsIdx] + it.currRespResultsIdx++ + + var docRef *DocumentRef + if len(docProto.GetName()) == 0 { + var pathErr error + docRef, pathErr = pathToDoc(docProto.GetName(), client) + if pathErr != nil { + return nil, pathErr + } + } + + pr, err := newPipelineResult(docRef, docProto, client, it.currResp.GetExecutionTime()) + if err != nil { + return nil, err + } + return pr, nil +} + +func (it *streamPipelineResultIterator) stop() { + it.cancel() +} diff --git a/firestore/pipeline_result_test.go b/firestore/pipeline_result_test.go new file mode 100644 index 000000000000..143059e2c085 --- /dev/null +++ b/firestore/pipeline_result_test.go @@ -0,0 +1,371 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package firestore + +import ( + "context" + "errors" + "io" + "testing" + "time" + + pb "cloud.google.com/go/firestore/apiv1/firestorepb" + "cloud.google.com/go/internal/testutil" + "github.com/google/go-cmp/cmp" + "google.golang.org/api/iterator" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestStreamPipelineResultIterator_Next(t *testing.T) { + ctx := context.Background() + client := newTestClient() // For PipelineResult construction + p := &Pipeline{c: client} // Dummy pipeline for iterator context + + now := time.Now() + tsNow := timestamppb.New(now) + ts2MinLater := timestamppb.New(now.Add(-2 * time.Minute)) + + mockResponses := []*pb.ExecutePipelineResponse{ + { // First response with two results + Results: []*pb.Document{ + {Name: "projects/test-project/databases/test-db/documents/col/doc1", Fields: map[string]*pb.Value{"foo": {ValueType: &pb.Value_StringValue{StringValue: "bar1"}}}, CreateTime: tsNow, UpdateTime: tsNow}, + {Name: "projects/test-project/databases/test-db/documents/col/doc2", Fields: map[string]*pb.Value{"foo": {ValueType: &pb.Value_StringValue{StringValue: "bar2"}}}, CreateTime: tsNow, UpdateTime: tsNow}, + }, + ExecutionTime: tsNow, + }, + { // Second response with one result + Results: []*pb.Document{ + {Name: "projects/test-project/databases/test-db/documents/col/doc3", Fields: map[string]*pb.Value{"foo": {ValueType: &pb.Value_StringValue{StringValue: "bar3"}}}, CreateTime: tsNow, UpdateTime: tsNow}, + }, + ExecutionTime: ts2MinLater, + }, + } + + tests := []struct { + name string + responses []*pb.ExecutePipelineResponse + errors []error + gotCount int + wantErr error + wantData []map[string]interface{} + }{ + { + name: "successful iteration", + responses: mockResponses, + errors: []error{nil, nil, io.EOF}, // EOF after 2 responses (containing 3 docs) + gotCount: 3, + wantErr: iterator.Done, + wantData: []map[string]interface{}{ + {"foo": "bar1"}, + {"foo": "bar2"}, + {"foo": "bar3"}, + }, + }, + { + name: "iteration with gRPC error", + responses: []*pb.ExecutePipelineResponse{mockResponses[0]}, // Only first response + errors: []error{nil, status.Error(codes.Unavailable, "service unavailable")}, + gotCount: 2, // Expect results from the first response before error + wantErr: status.Error(codes.Unavailable, "service unavailable"), + wantData: []map[string]interface{}{ + {"foo": "bar1"}, + {"foo": "bar2"}, + }, + }, + { + name: "no results", + responses: []*pb.ExecutePipelineResponse{{Results: []*pb.Document{}}}, + errors: []error{io.EOF}, + gotCount: 0, + wantErr: iterator.Done, + }, + { + name: "partial progress then results", + responses: []*pb.ExecutePipelineResponse{{ExecutionTime: tsNow /* No results */}, mockResponses[0]}, + errors: []error{nil, nil, io.EOF}, + gotCount: 2, + wantErr: iterator.Done, + wantData: []map[string]interface{}{ + {"foo": "bar1"}, + {"foo": "bar2"}, + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + mockStreamClient := &mockExecutePipelineClient{ + RecvResponses: tc.responses, + RecvErrors: tc.errors, + ContextVal: ctx, + } + iter := &streamPipelineResultIterator{ + ctx: ctx, + cancel: func() {}, + p: p, + streamClient: mockStreamClient, + } + defer iter.stop() + var results []*PipelineResult + var gotErr error + var pr *PipelineResult + for { + pr, gotErr = iter.next() + if gotErr != nil { + break + } + results = append(results, pr) + } + + if len(results) != tc.gotCount { + t.Errorf("results got %d, want %d", len(results), tc.gotCount) + } + + if tc.wantErr != nil { + if gotErr == nil { + t.Fatalf("error %v, got nil", tc.wantErr) + } + if !errors.Is(gotErr, tc.wantErr) && gotErr.Error() != tc.wantErr.Error() { + t.Errorf("error got %v, want %v", gotErr, tc.wantErr) + } + } else if gotErr != nil { + t.Errorf("error got %v, want %v", gotErr, nil) + } + + if tc.wantData != nil { + if len(results) != len(tc.wantData) { + t.Fatalf("Result count mismatch for data check: expected %d, got %d", len(tc.wantData), len(results)) + } + for i, pr := range results { + if diff := cmp.Diff(tc.wantData[i], pr.Data()); diff != "" { + t.Errorf("Data mismatch for result %d (-want +got):\n%s", i, diff) + } + } + } + }) + } +} + +func TestPipelineResultIterator_Stop(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + client := newTestClient() + p := &Pipeline{c: client} + + mockStreamClient := &mockExecutePipelineClient{ + ContextVal: ctx, // Iterator will use this context + } + + // Create the public iterator which wraps the stream iterator + publicIter := &PipelineResultIterator{ + iter: &streamPipelineResultIterator{ + ctx: ctx, // This context is passed to the stream client + cancel: cancel, // This cancel func should be called by Stop + p: p, + streamClient: mockStreamClient, + }, + } + + publicIter.Stop() + + // Check if the context was cancelled + select { + case <-ctx.Done(): + // Expected: context is cancelled + default: + t.Errorf("Expected context to be cancelled after Stop(), but it was not") + } + + // Calling Stop again should be a no-op + publicIter.Stop() // Should not panic or error + + // Check that Next after Stop returns iterator.Done + _, err := publicIter.Next() + if !errors.Is(err, iterator.Done) { + t.Errorf("Next after Stop(): got %v, want %v", err, iterator.Done) + } +} + +func TestPipelineResultIterator_GetAll(t *testing.T) { + ctx := context.Background() + client := newTestClient() + p := &Pipeline{c: client} + + mockStreamClient := &mockExecutePipelineClient{ + RecvResponses: []*pb.ExecutePipelineResponse{ + {Results: []*pb.Document{ + {Name: "projects/p/databases/d/documents/c/doc1", Fields: map[string]*pb.Value{"id": {ValueType: &pb.Value_IntegerValue{IntegerValue: 1}}}}, + }}, + {Results: []*pb.Document{ + {Name: "projects/p/databases/d/documents/c/doc2", Fields: map[string]*pb.Value{"id": {ValueType: &pb.Value_IntegerValue{IntegerValue: 2}}}}, + }}, + }, + RecvErrors: []error{nil, nil, io.EOF}, // EOF after two responses + ContextVal: ctx, + } + + publicIter := &PipelineResultIterator{ + iter: &streamPipelineResultIterator{ + ctx: ctx, + cancel: func() {}, + p: p, + streamClient: mockStreamClient, + }, + } + + allResults, err := publicIter.GetAll() + if err != nil { + t.Fatalf("GetAll: %v", err) + } + if len(allResults) != 2 { + t.Errorf("results from GetAll(): got %d, want: 2", len(allResults)) + } + if allResults[0].Data()["id"].(int64) != 1 { + t.Errorf("first result id: got %v, want: 1", allResults[0].Data()["id"]) + } + if allResults[1].Data()["id"].(int64) != 2 { + t.Errorf("second result id: got %v, want: 2", allResults[1].Data()["id"]) + } + + // After GetAll, Next should return iterator.Done + _, nextErr := publicIter.Next() + if !errors.Is(nextErr, iterator.Done) { + t.Errorf("Next after GetAll(): got %v, want: %v", nextErr, iterator.Done) + } +} + +func TestPipelineResult_DataExtraction(t *testing.T) { + client := newTestClient() + now := time.Now() + tsNowProto := timestamppb.New(now) + + docProto := &pb.Document{ + Name: "projects/test/databases/d/documents/mycoll/mydoc", + CreateTime: tsNowProto, + UpdateTime: tsNowProto, + Fields: map[string]*pb.Value{ + "stringProp": {ValueType: &pb.Value_StringValue{StringValue: "hello"}}, + "intProp": {ValueType: &pb.Value_IntegerValue{IntegerValue: 123}}, + "boolProp": {ValueType: &pb.Value_BooleanValue{BooleanValue: true}}, + "mapProp": {ValueType: &pb.Value_MapValue{MapValue: &pb.MapValue{ + Fields: map[string]*pb.Value{ + "nestedString": {ValueType: &pb.Value_StringValue{StringValue: "world"}}, + }, + }}}, + }, + } + execTimeProto := timestamppb.New(now.Add(time.Second)) + + docRef, _ := pathToDoc(docProto.Name, client) + pr, err := newPipelineResult(docRef, docProto, client, execTimeProto) + if err != nil { + t.Fatalf("newPipelineResult: %v", err) + } + + if !pr.Exists() { + t.Error("pr.Exists: got false, want true") + } + + // Test Data() + dataMap := pr.Data() + if dataMap["stringProp"].(string) != "hello" { + t.Errorf("stringProp: got %v, want 'hello'", dataMap["stringProp"]) + } + + if dataMap["intProp"].(int64) != 123 { + t.Errorf("intProp: got %v, want 123", dataMap["intProp"]) + } + nestedMap, ok := dataMap["mapProp"].(map[string]interface{}) + if !ok { + t.Fatalf("mapProp is not a map[string]interface{}") + } + if nestedMap["nestedString"].(string) != "world" { + t.Errorf("nestedString: got %v, want 'world'", nestedMap["nestedString"]) + } + + // Test DataTo() with a struct + type MyStruct struct { + StringProp string `firestore:"stringProp"` + IntProp int `firestore:"intProp"` + BoolProp bool `firestore:"boolProp"` + MapProp map[string]interface{} `firestore:"mapProp"` + NonExistent float64 `firestore:"nonExistent"` + } + gotDst := MyStruct{ + StringProp: "world", + IntProp: 456, + BoolProp: false, + MapProp: map[string]interface{}{"nestedString": "hello"}, + NonExistent: 456.789, + } + + wantDst := MyStruct{ + StringProp: "hello", + IntProp: 123, + BoolProp: true, + MapProp: map[string]interface{}{"nestedString": "world"}, + NonExistent: 456.789, + } + + if err := pr.DataTo(&gotDst); err != nil { + t.Fatalf("pr.DataTo(&gotDst): %v", err) + } + + if diff := testutil.Diff(wantDst, gotDst); diff != "" { + t.Errorf("dst mismatch (-want +got):\n%s", diff) + } + + // Test Timestamps + if pr.CreateTime == nil || !pr.CreateTime.Equal(now) { + t.Errorf("CreateTime: got %v, want %v", pr.CreateTime, now) + } + if pr.ExecutionTime == nil || !pr.ExecutionTime.Equal(now.Add(time.Second)) { + t.Errorf("ExecutionTime: got %v, want %v", pr.ExecutionTime, now.Add(time.Second)) + } +} + +func TestPipelineResult_NoResults(t *testing.T) { + client := newTestClient() + execTime := time.Now() + execTimeProto := timestamppb.New(execTime) + + pr, err := newPipelineResult(nil, nil, client, execTimeProto) // No proto document + if err != nil { + t.Fatalf("newPipelineResult: %v", err) + } + + if pr.Exists() { + t.Error("pr.Exists() for non-existent result: got true, want false") + } + if data := pr.Data(); data != nil { + t.Errorf("pr.Data() for non-existent result: got %v, want nil", data) + } + + type MyStruct struct{ Foo string } + var s MyStruct + err = pr.DataTo(&s) // Should behave like populating from an empty map + if err != nil { + // DataTo on a non-existent PipelineResult should not error out but result in a zero-valued struct. + t.Fatalf("pr.DataTo(&s) on non-existent result failed: %v", err) + } + if s.Foo != "" { + t.Errorf("Struct Foo for non-existent result: got %q, want \"\"", s.Foo) + } + + if pr.ExecutionTime == nil || !pr.ExecutionTime.Equal(execTime) { + t.Errorf("ExecutionTime for non-existent result: got %v, want %v", pr.ExecutionTime, execTime) + } +} diff --git a/firestore/pipeline_source.go b/firestore/pipeline_source.go new file mode 100644 index 000000000000..b6c277f0e0fa --- /dev/null +++ b/firestore/pipeline_source.go @@ -0,0 +1,26 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package firestore + +// PipelineSource is a factory for creating Pipeline instances. +// It is obtained by calling [Client.Pipeline()]. +type PipelineSource struct { + client *Client +} + +// Collection returns all documents from the entire collection. +func (ps *PipelineSource) Collection(path string) *Pipeline { + return newPipeline(ps.client, newInputStageCollection(path)) +} diff --git a/firestore/pipeline_source_test.go b/firestore/pipeline_source_test.go new file mode 100644 index 000000000000..e6c41f70d8a5 --- /dev/null +++ b/firestore/pipeline_source_test.go @@ -0,0 +1,52 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package firestore + +import ( + "testing" + + pb "cloud.google.com/go/firestore/apiv1/firestorepb" + "cloud.google.com/go/internal/testutil" +) + +func TestPipelineSource_Collection(t *testing.T) { + client := newTestClient() + ps := &PipelineSource{client: client} + p := ps.Collection("users") + + if p.err != nil { + t.Fatalf("Collection: %v", p.err) + } + if len(p.stages) != 1 { + t.Fatalf("initial stages: got %d, want 1", len(p.stages)) + } + + req, err := p.toExecutePipelineRequest() + if err != nil { + t.Fatalf("toExecutePipelineRequest: %v", err) + } + + wantStage := &pb.Pipeline_Stage{ + Name: "collection", + Args: []*pb.Value{{ValueType: &pb.Value_ReferenceValue{ReferenceValue: "/users"}}}, + } + + if len(req.GetStructuredPipeline().GetPipeline().GetStages()) != 1 { + t.Fatalf("stage in proto: got %d, want 1", len(req.GetStructuredPipeline().GetPipeline().GetStages())) + } + if diff := testutil.Diff(wantStage, req.GetStructuredPipeline().GetPipeline().GetStages()[0]); diff != "" { + t.Errorf("toExecutePipelineRequest mismatch for collection stage (-want +got):\n%s", diff) + } +} diff --git a/firestore/pipeline_stage.go b/firestore/pipeline_stage.go new file mode 100644 index 000000000000..1a5a95d5eb42 --- /dev/null +++ b/firestore/pipeline_stage.go @@ -0,0 +1,63 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package firestore + +import ( + "strings" + + pb "cloud.google.com/go/firestore/apiv1/firestorepb" +) + +// internal interface for pipeline stages. +type pipelineStage interface { + toProto() (*pb.Pipeline_Stage, error) + name() string // For identification, logging, and potential validation +} + +// inputStageCollection returns all documents from the entire collection. +type inputStageCollection struct { + path string +} + +func newInputStageCollection(path string) *inputStageCollection { + if !strings.HasPrefix(path, "/") { + path = "/" + path + } + return &inputStageCollection{path: path} +} +func (s *inputStageCollection) name() string { return "collection" } +func (s *inputStageCollection) toProto() (*pb.Pipeline_Stage, error) { + arg := &pb.Value{ValueType: &pb.Value_ReferenceValue{ReferenceValue: s.path}} + return &pb.Pipeline_Stage{ + Name: s.name(), + Args: []*pb.Value{arg}, + }, nil +} + +type limitStage struct { + limit int +} + +func newLimitStage(limit int) *limitStage { + return &limitStage{limit: limit} +} +func (s *limitStage) name() string { return "limit" } +func (s *limitStage) toProto() (*pb.Pipeline_Stage, error) { + arg := &pb.Value{ValueType: &pb.Value_IntegerValue{IntegerValue: int64(s.limit)}} + return &pb.Pipeline_Stage{ + Name: s.name(), + Args: []*pb.Value{arg}, + }, nil +} diff --git a/firestore/pipeline_test.go b/firestore/pipeline_test.go new file mode 100644 index 000000000000..f60193b3503e --- /dev/null +++ b/firestore/pipeline_test.go @@ -0,0 +1,152 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package firestore + +import ( + "context" + "io" + "testing" + + pb "cloud.google.com/go/firestore/apiv1/firestorepb" + "github.com/google/go-cmp/cmp" + "google.golang.org/grpc/metadata" + "google.golang.org/protobuf/testing/protocmp" +) + +// mockExecutePipelineClient is a mock implementation of pb.Firestore_ExecutePipelineClient. +type mockExecutePipelineClient struct { + pb.Firestore_ExecutePipelineClient // Embed for forward compatibility + RecvResponses []*pb.ExecutePipelineResponse + RecvErrors []error + RecvIdx int + CloseSendErr error + HeaderVal metadata.MD + TrailerVal metadata.MD + ContextVal context.Context + SendHeaderVal metadata.MD +} + +func (m *mockExecutePipelineClient) Recv() (*pb.ExecutePipelineResponse, error) { + if m.ContextVal != nil && m.ContextVal.Err() != nil { + return nil, m.ContextVal.Err() + } + if m.RecvIdx < len(m.RecvResponses) || m.RecvIdx < len(m.RecvErrors) { + var resp *pb.ExecutePipelineResponse + var err error + if m.RecvIdx < len(m.RecvResponses) { + resp = m.RecvResponses[m.RecvIdx] + } + if m.RecvIdx < len(m.RecvErrors) { + err = m.RecvErrors[m.RecvIdx] + } + m.RecvIdx++ + return resp, err + } + return nil, io.EOF +} +func (m *mockExecutePipelineClient) CloseSend() error { return m.CloseSendErr } +func (m *mockExecutePipelineClient) Header() (metadata.MD, error) { return m.HeaderVal, nil } +func (m *mockExecutePipelineClient) Trailer() metadata.MD { return m.TrailerVal } +func (m *mockExecutePipelineClient) Context() context.Context { return m.ContextVal } +func (m *mockExecutePipelineClient) SendHeader(md metadata.MD) error { + m.SendHeaderVal = md + return nil +} +func (m *mockExecutePipelineClient) SetHeader(md metadata.MD) error { return nil } +func (m *mockExecutePipelineClient) SetTrailer(md metadata.MD) {} +func (m *mockExecutePipelineClient) SendMsg(i interface{}) error { return nil } +func (m *mockExecutePipelineClient) RecvMsg(i interface{}) error { return nil } + +// Test helper to create a minimal Client for non-RPC tests +func newTestClient() *Client { + return &Client{ + projectID: "test-project", + databaseID: "test-db", + } +} + +func TestPipeline_Limit(t *testing.T) { + client := newTestClient() + ps := &PipelineSource{client: client} + p := ps.Collection("users").Limit(10) + + if p.err != nil { + t.Fatalf("Pipeline.Limit() returned error: %v", p.err) + } + if len(p.stages) != 2 { + t.Fatalf("Expected 2 stages, got %d", len(p.stages)) + } + + req, err := p.toExecutePipelineRequest() + if err != nil { + t.Fatalf("p.toExecutePipelineRequest() failed: %v", err) + } + + stages := req.GetStructuredPipeline().GetPipeline().GetStages() + if len(stages) != 2 { + t.Fatalf("Expected 2 stages in proto, got %d", len(stages)) + } + + wantLimitStage := &pb.Pipeline_Stage{ + Name: "limit", + Args: []*pb.Value{{ValueType: &pb.Value_IntegerValue{IntegerValue: 10}}}, + } + if diff := cmp.Diff(wantLimitStage, stages[1], protocmp.Transform()); diff != "" { + t.Errorf("toExecutePipelineRequest() mismatch for limit stage (-want +got):\n%s", diff) + } +} + +func TestPipeline_ToExecutePipelineRequest(t *testing.T) { + client := newTestClient() + ps := &PipelineSource{client: client} + p := ps.Collection("items").Limit(5) + + req, err := p.toExecutePipelineRequest() + if err != nil { + t.Fatalf("toExecutePipelineRequest: %v", err) + } + + if req.GetDatabase() != "projects/test-project/databases/test-db" { + t.Errorf("req.GetDatabase: got %s, want %s", req.GetDatabase(), "projects/test-project/databases/test-db") + } + + pipelineProto := req.GetStructuredPipeline().GetPipeline() + if pipelineProto == nil { + t.Fatal("StructuredPipeline.Pipeline is nil") + } + + stagesProto := pipelineProto.GetStages() + if len(stagesProto) != 2 { + t.Fatalf("stages: got %d want 2", len(stagesProto)) + } + + // Check collection stage + wantCollStage := &pb.Pipeline_Stage{ + Name: "collection", + Args: []*pb.Value{{ValueType: &pb.Value_ReferenceValue{ReferenceValue: "/items"}}}, + } + if diff := cmp.Diff(wantCollStage, stagesProto[0], protocmp.Transform()); diff != "" { + t.Errorf("Collection stage mismatch (-want +got):\n%s", diff) + } + + // Check limit stage + wantLimitStage := &pb.Pipeline_Stage{ + Name: "limit", + Args: []*pb.Value{{ValueType: &pb.Value_IntegerValue{IntegerValue: 5}}}, + } + if diff := cmp.Diff(wantLimitStage, stagesProto[1], protocmp.Transform()); diff != "" { + t.Errorf("Limit stage mismatch (-want +got):\n%s", diff) + } +} From d1da0c689f5b0095f8d6bb76b08bd6c7cee28906 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Wed, 7 May 2025 21:54:57 +0000 Subject: [PATCH 2/3] add comments --- firestore/pipeline.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/firestore/pipeline.go b/firestore/pipeline.go index 422a4e8e9d32..2572f150cbc6 100644 --- a/firestore/pipeline.go +++ b/firestore/pipeline.go @@ -23,15 +23,15 @@ import ( // Pipeline class provides a flexible and expressive framework for building complex data // transformation and query pipelines for Firestore. - +// // A pipeline takes data sources, such as Firestore collections or collection groups, and applies // a series of stages that are chained together. Each stage takes the output from the previous stage // (or the data source) and produces an output for the next stage (or as the final output of the // pipeline). - +// // Expressions can be used within // each stages to filter and transform data through the stage. - +// // NOTE: The chained stages do not prescribe exactly how Firestore will execute the pipeline. // Instead, Firestore only guarantees that the result is the same as if the chained stages were // executed in order. From 51f207ea8c889617ca36e8cdf7c38b9f8fd068d6 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Tue, 13 May 2025 15:51:05 +0000 Subject: [PATCH 3/3] remove exists --- firestore/pipeline_result.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/firestore/pipeline_result.go b/firestore/pipeline_result.go index 3f960b6b47f1..fc66342d1c83 100644 --- a/firestore/pipeline_result.go +++ b/firestore/pipeline_result.go @@ -83,13 +83,6 @@ func newPipelineResult(ref *DocumentRef, proto *pb.Document, c *Client, executio return pr, nil } -// Exists reports whether the PipelineResult represents a document. -// Even if Exists returns false, the ExecutionTime field of the PipelineResult -// is valid. -func (p *PipelineResult) Exists() bool { - return p.proto != nil -} - // Data returns the PipelineResult's fields as a map. // It is equivalent to //