From a247a45df623d84b44725a03a49660afaa633090 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Fri, 31 Oct 2025 21:34:44 +0000 Subject: [PATCH 1/3] feat(firestore): [PQ] add raw stage --- firestore/integration_test.go | 30 ++++++++++++++++++ firestore/pipeline.go | 9 ++++++ firestore/pipeline_stage.go | 60 +++++++++++++++++++++++++++++++++++ 3 files changed, 99 insertions(+) diff --git a/firestore/integration_test.go b/firestore/integration_test.go index ab01ad6b2a62..fbc8d8ed6eab 100644 --- a/firestore/integration_test.go +++ b/firestore/integration_test.go @@ -3825,6 +3825,36 @@ func TestIntegration_PipelineStages(t *testing.T) { t.Errorf("got title %q, want 'The Great Gatsby'", data["title"]) } }) + t.Run("RawStage", func(t *testing.T) { + // Using RawStage to perform a Limit operation + iter := client.Pipeline().Collection(coll.ID).RawStage(NewRawStage("limit").WithArguments(3)).Execute(ctx) + defer iter.Stop() + results, err := iter.GetAll() + if err != nil { + t.Fatalf("Failed to iterate: %v", err) + } + if len(results) != 3 { + t.Errorf("got %d documents, want 3", len(results)) + } + + // Using RawStage to perform a Select operation with options + iter = client.Pipeline().Collection(coll.ID).RawStage(NewRawStage("select").WithArguments(map[string]interface{}{"title": FieldOf("title")})).Limit(1).Execute(ctx) + defer iter.Stop() + doc, err := iter.Next() + if err != nil { + t.Fatalf("Failed to iterate: %v", err) + } + if !doc.Exists() { + t.Fatalf("Exists: got: false, want: true") + } + data := doc.Data() + if _, ok := data["title"]; !ok { + t.Error("missing 'title' field") + } + if _, ok := data["genre"]; ok { + t.Error("unexpected 'genre' field") + } + }) t.Run("RemoveFields", func(t *testing.T) { iter := client.Pipeline().Collection(coll.ID). Limit(1). diff --git a/firestore/pipeline.go b/firestore/pipeline.go index 830eff99a611..ebf8a1fcaa66 100644 --- a/firestore/pipeline.go +++ b/firestore/pipeline.go @@ -484,3 +484,12 @@ func (p *Pipeline) FindNearest(vectorField any, queryVector any, measure Pipelin } return p.append(stage) } + +// RawStage adds a raw stage to the pipeline. +// This is useful for using stages that are not yet implemented in the SDK. +func (p *Pipeline) RawStage(stage *RawStage) *Pipeline { + if p.err != nil { + return p + } + return p.append(stage) +} diff --git a/firestore/pipeline_stage.go b/firestore/pipeline_stage.go index e05aac5cfd56..538538bea71f 100644 --- a/firestore/pipeline_stage.go +++ b/firestore/pipeline_stage.go @@ -16,6 +16,7 @@ package firestore import ( "fmt" + "reflect" "strings" pb "cloud.google.com/go/firestore/apiv1/firestorepb" @@ -524,3 +525,62 @@ func newWhereStage(condition BooleanExpr) (*whereStage, error) { stagePb: newUnaryStage(stageNameWhere, argsPb), }}, nil } + +// RawStageOptions holds the options for a RawStage. +type RawStageOptions map[string]any + +// RawStage represents a pipeline stage that is not yet implemented in the SDK. +// This allows users to call stages that are supported by the Firestore backend +// but not yet available in the current SDK version. +type RawStage struct { + stageName string + args []any + options RawStageOptions +} + +// NewRawStage creates a new RawStage with the given name. +func NewRawStage(name string) *RawStage { + return &RawStage{stageName: name} +} + +// WithArguments sets the arguments for the RawStage. +func (s *RawStage) WithArguments(args ...any) *RawStage { + s.args = args + return s +} + +// WithOptions sets the options for the RawStage. +func (s *RawStage) WithOptions(options RawStageOptions) *RawStage { + s.options = options + return s +} + +func (s *RawStage) name() string { return s.stageName } + +func (s *RawStage) toProto() (*pb.Pipeline_Stage, error) { + argsPb := make([]*pb.Value, len(s.args)) + for i, arg := range s.args { + val, _, err := toProtoValue(reflect.ValueOf(arg)) + if err != nil { + return nil, fmt.Errorf("firestore: error converting raw stage argument %d: %w", i, err) + } + argsPb[i] = val + } + + optionsPb := make(map[string]*pb.Value) + if s.options != nil { + for key, val := range s.options { + valPb, _, err := toProtoValue(reflect.ValueOf(val)) + if err != nil { + return nil, fmt.Errorf("firestore: error converting raw stage option %q: %w", key, err) + } + optionsPb[key] = valPb + } + } + + return &pb.Pipeline_Stage{ + Name: s.name(), + Args: argsPb, + Options: optionsPb, + }, nil +} From 37664b585c19a8fce28ae77c56dec4d2f30f6600 Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Fri, 31 Oct 2025 22:04:47 +0000 Subject: [PATCH 2/3] precalculate map size --- firestore/pipeline_stage.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/firestore/pipeline_stage.go b/firestore/pipeline_stage.go index 538538bea71f..c34c1fb2e855 100644 --- a/firestore/pipeline_stage.go +++ b/firestore/pipeline_stage.go @@ -567,15 +567,13 @@ func (s *RawStage) toProto() (*pb.Pipeline_Stage, error) { argsPb[i] = val } - optionsPb := make(map[string]*pb.Value) - if s.options != nil { - for key, val := range s.options { - valPb, _, err := toProtoValue(reflect.ValueOf(val)) - if err != nil { - return nil, fmt.Errorf("firestore: error converting raw stage option %q: %w", key, err) - } - optionsPb[key] = valPb + optionsPb := make(map[string]*pb.Value, len(s.options)) + for key, val := range s.options { + valPb, _, err := toProtoValue(reflect.ValueOf(val)) + if err != nil { + return nil, fmt.Errorf("firestore: error converting raw stage option %q: %w", key, err) } + optionsPb[key] = valPb } return &pb.Pipeline_Stage{ From 07290572984328deee8ba12f136ebfdb520a30ee Mon Sep 17 00:00:00 2001 From: Baha Aiman Date: Fri, 31 Oct 2025 23:36:51 +0000 Subject: [PATCH 3/3] update comment --- firestore/pipeline.go | 16 ++++++++++++++-- firestore/pipeline_stage.go | 5 +++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/firestore/pipeline.go b/firestore/pipeline.go index ebf8a1fcaa66..3ee9d764ec82 100644 --- a/firestore/pipeline.go +++ b/firestore/pipeline.go @@ -485,8 +485,20 @@ func (p *Pipeline) FindNearest(vectorField any, queryVector any, measure Pipelin return p.append(stage) } -// RawStage adds a raw stage to the pipeline. -// This is useful for using stages that are not yet implemented in the SDK. +// RawStage adds a generic stage to the pipeline. +// This method provides a flexible way to extend the pipeline's functionality by adding custom stages. +// +// Example: +// +// // Assume we don't have a built-in "where" stage +// client.Pipeline().Collection("books"). +// RawStage( +// NewRawStage("where"). +// WithArguments( +// LessThan(FieldOf("published"), 1900), +// ), +// ). +// Select("title", "author") func (p *Pipeline) RawStage(stage *RawStage) *Pipeline { if p.err != nil { return p diff --git a/firestore/pipeline_stage.go b/firestore/pipeline_stage.go index c34c1fb2e855..f6d41256297d 100644 --- a/firestore/pipeline_stage.go +++ b/firestore/pipeline_stage.go @@ -529,8 +529,9 @@ func newWhereStage(condition BooleanExpr) (*whereStage, error) { // RawStageOptions holds the options for a RawStage. type RawStageOptions map[string]any -// RawStage represents a pipeline stage that is not yet implemented in the SDK. -// This allows users to call stages that are supported by the Firestore backend +// RawStage is a generic stage in the pipeline. +// It provides a flexible way to extend the pipeline's functionality by adding custom +// stages. It also allows the users to call the stages that are supported by the Firestore backend // but not yet available in the current SDK version. type RawStage struct { stageName string