Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions firestore/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,3 +484,24 @@ func (p *Pipeline) FindNearest(vectorField any, queryVector any, measure Pipelin
}
return p.append(stage)
}

// RawStage adds a generic stage to the pipeline.
// This method provides a flexible way to extend the pipeline's functionality by adding custom stages.
//
// Example:
//
// // Assume we don't have a built-in "where" stage
// client.Pipeline().Collection("books").
// RawStage(
// NewRawStage("where").
// WithArguments(
// LessThan(FieldOf("published"), 1900),
// ),
// ).
// Select("title", "author")
func (p *Pipeline) RawStage(stage *RawStage) *Pipeline {
if p.err != nil {
return p
}
return p.append(stage)
}
30 changes: 30 additions & 0 deletions firestore/pipeline_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,36 @@ func TestIntegration_PipelineStages(t *testing.T) {
t.Errorf("got title %q, want 'The Great Gatsby'", data["title"])
}
})
t.Run("RawStage", func(t *testing.T) {
// Using RawStage to perform a Limit operation
iter := client.Pipeline().Collection(coll.ID).RawStage(NewRawStage("limit").WithArguments(3)).Execute(ctx)
defer iter.Stop()
results, err := iter.GetAll()
if err != nil {
t.Fatalf("Failed to iterate: %v", err)
}
if len(results) != 3 {
t.Errorf("got %d documents, want 3", len(results))
}

// Using RawStage to perform a Select operation with options
iter = client.Pipeline().Collection(coll.ID).RawStage(NewRawStage("select").WithArguments(map[string]interface{}{"title": FieldOf("title")})).Limit(1).Execute(ctx)
defer iter.Stop()
doc, err := iter.Next()
if err != nil {
t.Fatalf("Failed to iterate: %v", err)
}
if !doc.Exists() {
t.Fatalf("Exists: got: false, want: true")
}
data := doc.Data()
if _, ok := data["title"]; !ok {
t.Error("missing 'title' field")
}
if _, ok := data["genre"]; ok {
t.Error("unexpected 'genre' field")
}
})
t.Run("RemoveFields", func(t *testing.T) {
iter := client.Pipeline().Collection(coll.ID).
Limit(1).
Expand Down
59 changes: 59 additions & 0 deletions firestore/pipeline_stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package firestore

import (
"fmt"
"reflect"
"strings"

pb "cloud.google.com/go/firestore/apiv1/firestorepb"
Expand Down Expand Up @@ -524,3 +525,61 @@ func newWhereStage(condition BooleanExpr) (*whereStage, error) {
stagePb: newUnaryStage(stageNameWhere, argsPb),
}}, nil
}

// RawStageOptions holds the options for a RawStage.
type RawStageOptions map[string]any

// RawStage is a generic stage in the pipeline.
// It provides a flexible way to extend the pipeline's functionality by adding custom
// stages. It also allows the users to call the stages that are supported by the Firestore backend
// but not yet available in the current SDK version.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The first line of this description seems like it could be a bit confusing. It seems like it's saying RawStage itself is unimplemented, but it means to say RawStage lets you use unimplemented stages

I just used the description from the java client

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Updated. Thanks for the review !!

type RawStage struct {
stageName string
args []any
options RawStageOptions
}

// NewRawStage creates a new RawStage with the given name.
func NewRawStage(name string) *RawStage {
return &RawStage{stageName: name}
}

// WithArguments sets the arguments for the RawStage.
func (s *RawStage) WithArguments(args ...any) *RawStage {
s.args = args
return s
}

// WithOptions sets the options for the RawStage.
func (s *RawStage) WithOptions(options RawStageOptions) *RawStage {
s.options = options
return s
}

func (s *RawStage) name() string { return s.stageName }

func (s *RawStage) toProto() (*pb.Pipeline_Stage, error) {
argsPb := make([]*pb.Value, len(s.args))
for i, arg := range s.args {
val, _, err := toProtoValue(reflect.ValueOf(arg))
if err != nil {
return nil, fmt.Errorf("firestore: error converting raw stage argument %d: %w", i, err)
}
argsPb[i] = val
}

optionsPb := make(map[string]*pb.Value, len(s.options))
for key, val := range s.options {
valPb, _, err := toProtoValue(reflect.ValueOf(val))
if err != nil {
return nil, fmt.Errorf("firestore: error converting raw stage option %q: %w", key, err)
}
optionsPb[key] = valPb
}

return &pb.Pipeline_Stage{
Name: s.name(),
Args: argsPb,
Options: optionsPb,
}, nil
}
Loading