Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions firestore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,11 @@ func withRequestParamsHeader(ctx context.Context, requestParams string) context.
return metadata.NewOutgoingContext(ctx, md)
}

// Pipeline creates a PipelineSource to start building a Firestore pipeline.
func (c *Client) Pipeline() *PipelineSource {
return &PipelineSource{client: c}
}

// Collection creates a reference to a collection with the given path.
// A path is a sequence of IDs separated by slashes.
//
Expand Down
100 changes: 100 additions & 0 deletions firestore/pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package firestore

import (
"context"
"fmt"

pb "cloud.google.com/go/firestore/apiv1/firestorepb"
)

// Pipeline class provides a flexible and expressive framework for building complex data
// transformation and query pipelines for Firestore.
//
// A pipeline takes data sources, such as Firestore collections or collection groups, and applies
// a series of stages that are chained together. Each stage takes the output from the previous stage
// (or the data source) and produces an output for the next stage (or as the final output of the
// pipeline).
//
// Expressions can be used within
// each stages to filter and transform data through the stage.
//
// NOTE: The chained stages do not prescribe exactly how Firestore will execute the pipeline.
// Instead, Firestore only guarantees that the result is the same as if the chained stages were
// executed in order.
type Pipeline struct {
c *Client
stages []pipelineStage
err error
}

func newPipeline(client *Client, initialStage pipelineStage) *Pipeline {
return &Pipeline{
c: client,
stages: []pipelineStage{initialStage},
}
}

// Execute executes the pipeline and returns an iterator for streaming the results.
// TODO: Accept PipelineOptions
func (p *Pipeline) Execute(ctx context.Context) *PipelineResultIterator {
return &PipelineResultIterator{
iter: newStreamPipelineResultIterator(ctx, p),
}
}

func (p *Pipeline) toExecutePipelineRequest() (*pb.ExecutePipelineRequest, error) {
protoStages := make([]*pb.Pipeline_Stage, len(p.stages))
for i, s := range p.stages {
ps, err := s.toProto()
if err != nil {
return nil, fmt.Errorf("firestore: error converting stage %q to proto: %w", s.name(), err)
}
protoStages[i] = ps
}

req := &pb.ExecutePipelineRequest{
Database: p.c.path(),
PipelineType: &pb.ExecutePipelineRequest_StructuredPipeline{
StructuredPipeline: &pb.StructuredPipeline{
Pipeline: &pb.Pipeline{
Stages: protoStages,
},
},
},
// TODO: Add consistencyselector
}
return req, nil
}

// append creates a new Pipeline by adding a stage to the current one.
func (p *Pipeline) append(s pipelineStage) *Pipeline {
if p.err != nil {
return p
}
newP := &Pipeline{
c: p.c,
stages: make([]pipelineStage, len(p.stages)+1),
}
copy(newP.stages, p.stages)
newP.stages[len(p.stages)] = s
return newP
}
Comment on lines +84 to +95
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this function essentially create a deep copy of the existing pipeline then extend the new one? Is this a better approach than just returning the existing object after appending the new pipelineStage to the existing one?

Copy link
Copy Markdown
Contributor Author

@bhshkh bhshkh May 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a deliberate choice to make the Pipeline builder immutable. It's a common pattern for fluent, chainable APIs.

  1. This allows branching e.g.
base := client.Pipeline().Collection("events")
pA := base.Where(Field("type").Eq("A"))
pB := base.Where(Field("type").Eq("B"))
// 'base' is still just Collection("events")
// pA and pB are distinct and don't interfere.
  1. Thread Safety (for reading):
    Immutable objects are inherently safe to share across goroutines for reading purposes without requiring locks, as their state never changes once created.

The minor performance overhead of copying is usually negligible compared to the benefits in robustness.


// Limit limits the maximum number of documents returned by previous stages.
func (p *Pipeline) Limit(limit int) *Pipeline {
return p.append(newLimitStage(limit))
}
268 changes: 268 additions & 0 deletions firestore/pipeline_result.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package firestore

import (
"context"
"errors"
"fmt"
"io"
"time"

pb "cloud.google.com/go/firestore/apiv1/firestorepb"
"cloud.google.com/go/internal/trace"
"google.golang.org/api/iterator"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

// PipelineResult is a result returned from executing a pipeline.
type PipelineResult struct {
// Ref is the DocumentRef for this result. It may be nil if the result
// does not correspond to a specific Firestore document (e.g., an aggregation result
// without grouping, or a synthetic document from a stage).
Ref *DocumentRef

// CreateTime is the time at which the document was created.
// It may be nil if the result does not correspond to a specific Firestore document
CreateTime *time.Time

// UpdateTime is the time at which the document was last changed.
// It may be nil if the result does not correspond to a specific Firestore document
UpdateTime *time.Time

// ExecutionTime is the time at which the document(s) were read.
ExecutionTime *time.Time

c *Client
proto *pb.Document
}

func newPipelineResult(ref *DocumentRef, proto *pb.Document, c *Client, executionTime *timestamppb.Timestamp) (*PipelineResult, error) {
pr := &PipelineResult{
Ref: ref,
c: c,
proto: proto,
}
if proto != nil {
if proto.GetCreateTime() != nil {
if err := proto.GetCreateTime().CheckValid(); err != nil {
return nil, err
}
createTime := proto.GetCreateTime().AsTime()
pr.CreateTime = &createTime
}
if proto.GetUpdateTime() != nil {
if err := proto.GetUpdateTime().CheckValid(); err != nil {
return nil, err
}
updateTime := proto.GetUpdateTime().AsTime()
pr.UpdateTime = &updateTime
}
}
if executionTime != nil {
if err := executionTime.CheckValid(); err != nil {
return nil, err
}
execTime := executionTime.AsTime()
pr.ExecutionTime = &execTime
}
return pr, nil
}

// Data returns the PipelineResult's fields as a map.
// It is equivalent to
//
// var m map[string]interface{}
// p.DataTo(&m)
//
// except that it returns nil if the document does not exist.
func (p *PipelineResult) Data() map[string]interface{} {
if !p.Exists() {
return nil
}
m, err := createMapFromValueMap(p.proto.Fields, p.c)
// Any error here is a bug in the client.
if err != nil {
panic(fmt.Sprintf("firestore: %v", err))
}
return m
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is PipelineResult.Get() coming later?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PipelineResult is similar to DocumentSnapshot. So, there won't be a Get.

DocumentRef field inside PipelineResult and DocumentSnapshot already have a Get

The other methods that I would add are DataAt and DataAtPath

// DataAt returns the data value denoted by path.
//
// The path argument can be a single field or a dot-separated sequence of
// fields, and must not contain any of the runes "˜*/[]". Use DataAtPath instead for
// such a path.
//
// See DocumentSnapshot.DataTo for how Firestore values are converted to Go values.
//
// If the document does not exist, DataAt returns a NotFound error.
func (d *DocumentSnapshot) DataAt(path string) (interface{}, error) {
if !d.Exists() {
return nil, status.Errorf(codes.NotFound, "document %s does not exist", d.Ref.Path)
}
fp, err := parseDotSeparatedString(path)
if err != nil {
return nil, err
}
return d.DataAtPath(fp)
}
// DataAtPath returns the data value denoted by the FieldPath fp.
// If the document does not exist, DataAtPath returns a NotFound error.
func (d *DocumentSnapshot) DataAtPath(fp FieldPath) (interface{}, error) {
if !d.Exists() {
return nil, status.Errorf(codes.NotFound, "document %s does not exist", d.Ref.Path)
}
v, err := valueAtPath(fp, d.proto.Fields)
if err != nil {
return nil, err
}
return createFromProtoValue(v, d.c)
}

// DataTo uses the PipelineResult's fields to populate v, which can be a pointer to a
// map[string]interface{} or a pointer to a struct.
// This is similar to [DocumentSnapshot.DataTo]
func (p *PipelineResult) DataTo(v interface{}) error {
if !p.Exists() {
return status.Errorf(codes.NotFound, "document does not exist")
}
return setFromProtoValue(v, &pb.Value{ValueType: &pb.Value_MapValue{MapValue: &pb.MapValue{Fields: p.proto.Fields}}}, p.c)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is setFromProtoValue defined? Is this just a generic serialization function?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Its here:

func setFromProtoValue(dest interface{}, vprotoSrc *pb.Value, c *Client) error {
destV := reflect.ValueOf(dest)
if destV.Kind() != reflect.Ptr || destV.IsNil() {
return errors.New("firestore: nil or not a pointer")
}
return setReflectFromProtoValue(destV.Elem(), vprotoSrc, c)
}

It is also being used for query results

}

// PipelineResultIterator is an iterator over PipelineResults from a pipeline execution.
type PipelineResultIterator struct {
iter pipelineResultIteratorInternal
err error // Stores sticky error from Next() or construction
}

// Next returns the next result. Its second return value is iterator.Done if there
// are no more results. Once Next returns Done, all subsequent calls will return
// Done.
func (it *PipelineResultIterator) Next() (*PipelineResult, error) {
if it.err != nil {
return nil, it.err
}
if it.iter == nil { // Iterator was stopped or not initialized
return nil, iterator.Done
}

pr, err := it.iter.next()
if err != nil {
it.err = err // Store sticky error
}
return pr, err
}

// Stop stops the iterator, freeing its resources.
// Always call Stop when you are done with a DocumentIterator.
// It is not safe to call Stop concurrently with Next.
func (it *PipelineResultIterator) Stop() {
if it.iter != nil {
it.iter.stop()
}
// Set a sticky error indicating the iterator is now done if not already errored.
if it.err == nil {
it.err = iterator.Done
}
}

// GetAll returns all the documents remaining from the iterator.
// It is not necessary to call Stop on the iterator after calling GetAll.
func (it *PipelineResultIterator) GetAll() ([]*PipelineResult, error) {
if it.err != nil {
return nil, it.err
}
defer it.Stop()

var results []*PipelineResult
for {
pr, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return results, err
}
results = append(results, pr)
}
return results, nil
}

// pipelineResultIteratorInternal is an unexported interface defining the core iteration logic.
type pipelineResultIteratorInternal interface {
next() (*PipelineResult, error)
stop()
}

// streamPipelineResultIterator is the concrete implementation for gRPC streaming of pipeline results.
type streamPipelineResultIterator struct {
ctx context.Context
cancel func()
p *Pipeline
streamClient pb.Firestore_ExecutePipelineClient
currResp *pb.ExecutePipelineResponse
currRespResultsIdx int
}

func newStreamPipelineResultIterator(ctx context.Context, p *Pipeline) *streamPipelineResultIterator {
ctx, cancel := context.WithCancel(ctx)
return &streamPipelineResultIterator{
ctx: ctx,
cancel: cancel,
p: p,
}
}

// Each ExecutePipelineResponse received from Firestore service contains a list of Documents
// On each next() call, return a single document.
func (it *streamPipelineResultIterator) next() (_ *PipelineResult, err error) {
client := it.p.c

// streamClient is initialized on first next call
if it.streamClient == nil {
it.ctx = trace.StartSpan(it.ctx, "cloud.google.com/go/firestore.ExecutePipeline")
defer func() {
if errors.Is(err, iterator.Done) {
trace.EndSpan(it.ctx, nil)
} else {
trace.EndSpan(it.ctx, err)
}
}()
req, err := it.p.toExecutePipelineRequest()
if err != nil {
return nil, err
}
it.streamClient, err = client.c.ExecutePipeline(it.ctx, req)
if err != nil {
return nil, err
}
}

// If the current response is nil or all its results have been processed,
// receive the next response from the stream.
if it.currResp == nil || it.currRespResultsIdx >= len(it.currResp.GetResults()) {
var res *pb.ExecutePipelineResponse
for {
res, err = it.streamClient.Recv()
if err == io.EOF {
return nil, iterator.Done
}
if err != nil {
return nil, err
}
if res.GetResults() != nil {
it.currResp = res
it.currRespResultsIdx = 0
break
}
// No results => partial progress; keep receiving
// TODO: Set ExplainStats
}
}

// Get the next document proto from the current response.
docProto := it.currResp.GetResults()[it.currRespResultsIdx]
it.currRespResultsIdx++

var docRef *DocumentRef
if len(docProto.GetName()) == 0 {
var pathErr error
docRef, pathErr = pathToDoc(docProto.GetName(), client)
if pathErr != nil {
return nil, pathErr
}
}

pr, err := newPipelineResult(docRef, docProto, client, it.currResp.GetExecutionTime())
if err != nil {
return nil, err
}
return pr, nil
}

func (it *streamPipelineResultIterator) stop() {
it.cancel()
}
Loading
Loading