Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
54c4f6e
fixed tests
daniel-sanche Oct 23, 2025
107e412
added vector expressions
daniel-sanche Oct 23, 2025
a0c36ca
added new math expressions
daniel-sanche Oct 24, 2025
dcd6af1
added string manipulation expressions
daniel-sanche Oct 24, 2025
71308dc
added not_nan, not_null, and is_absent
daniel-sanche Oct 24, 2025
0260014
added new Array type
daniel-sanche Oct 24, 2025
1b69435
added map and related expressions
daniel-sanche Oct 24, 2025
be749e0
remove dict and list from constant types
daniel-sanche Oct 24, 2025
789f29c
Fixed lint
daniel-sanche Oct 25, 2025
64be10d
added count_if and count_distinct
daniel-sanche Oct 27, 2025
5d4f878
added misc expressions
daniel-sanche Oct 27, 2025
6d6c57f
added error functions
daniel-sanche Oct 27, 2025
f1690d8
fixed lint
daniel-sanche Oct 27, 2025
559ad80
removed AliasedAggregate in favor of generics
daniel-sanche Oct 27, 2025
86ad143
improved e2e tests
daniel-sanche Oct 28, 2025
f2697ca
fixed broken stages
daniel-sanche Oct 28, 2025
d18e1f9
added options to generic stage
daniel-sanche Oct 28, 2025
68b9eff
fixed unit tests
daniel-sanche Oct 28, 2025
9b2cc6b
ran blacken
daniel-sanche Oct 28, 2025
3ebdb13
broke out aggregates
daniel-sanche Oct 28, 2025
a951789
broke up test file
daniel-sanche Oct 28, 2025
1d48d4d
removed duplicates
daniel-sanche Oct 28, 2025
390ea72
added math file
daniel-sanche Oct 28, 2025
7c2f41f
renamed tests
daniel-sanche Oct 28, 2025
41ff06d
include test file in test name
daniel-sanche Oct 28, 2025
d1dc233
fixed lint
daniel-sanche Oct 28, 2025
ca2caa9
broke out tests into multiple yaml files
daniel-sanche Oct 28, 2025
7425832
fixed static function access
daniel-sanche Oct 28, 2025
086c3af
ignore upgrade warning
daniel-sanche Oct 28, 2025
ce2fab2
renamed Expr to Expression
daniel-sanche Oct 28, 2025
d45b8b7
Merge branch 'pipeline_queries_improve_tests' into pipeline_queries_c…
daniel-sanche Oct 28, 2025
9eb32e1
renamed variables
daniel-sanche Oct 28, 2025
adbfd21
renamed GenericStage to RawStage
daniel-sanche Oct 28, 2025
5d4d6b0
ran blacken
daniel-sanche Oct 28, 2025
ea3aaa5
made pipeline_stages public
daniel-sanche Oct 28, 2025
b46dc66
Merge branch 'pipeline_queries_approved' into pipeline_queries_improv…
daniel-sanche Oct 29, 2025
7059435
skip pipeline verification on kokoro
daniel-sanche Oct 29, 2025
52e11bf
fixed lint
daniel-sanche Oct 29, 2025
c994ad6
fixed test coverage
daniel-sanche Oct 29, 2025
a52ba2e
Merge branch 'pipeline_queries_improve_tests' into pipeline_queries_c…
daniel-sanche Oct 29, 2025
2eeb688
Merge branch 'pipeline_queries_approved' into pipeline_queries_cleanup
daniel-sanche Oct 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion google/cloud/firestore_v1/async_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from __future__ import annotations
from typing import AsyncIterable, TYPE_CHECKING
from google.cloud.firestore_v1 import _pipeline_stages as stages
from google.cloud.firestore_v1 import pipeline_stages as stages
from google.cloud.firestore_v1.base_pipeline import _BasePipeline

if TYPE_CHECKING: # pragma: NO COVER
Expand Down
4 changes: 2 additions & 2 deletions google/cloud/firestore_v1/base_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
)
from google.cloud.firestore_v1.pipeline_expressions import AggregateFunction
from google.cloud.firestore_v1.pipeline_expressions import Count
from google.cloud.firestore_v1.pipeline_expressions import AliasedExpr
from google.cloud.firestore_v1.pipeline_expressions import AliasedExpression
from google.cloud.firestore_v1.pipeline_expressions import Field

# Types needed only for Type Hints
Expand Down Expand Up @@ -86,7 +86,7 @@ def _to_protobuf(self):
@abc.abstractmethod
def _to_pipeline_expr(
self, autoindexer: Iterable[int]
) -> AliasedExpr[AggregateFunction]:
) -> AliasedExpression[AggregateFunction]:
"""
Convert this instance to a pipeline expression for use with pipeline.aggregate()

Expand Down
51 changes: 26 additions & 25 deletions google/cloud/firestore_v1/base_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from __future__ import annotations
from typing import Iterable, Sequence, TYPE_CHECKING
from google.cloud.firestore_v1 import _pipeline_stages as stages
from google.cloud.firestore_v1 import pipeline_stages as stages
from google.cloud.firestore_v1.types.pipeline import (
StructuredPipeline as StructuredPipeline_pb,
)
Expand All @@ -23,10 +23,11 @@
from google.cloud.firestore_v1.types.firestore import ExecutePipelineRequest
from google.cloud.firestore_v1.pipeline_result import PipelineResult
from google.cloud.firestore_v1.pipeline_expressions import (
AliasedAggregate,
Expr,
AggregateFunction,
AliasedExpression,
Expression,
Field,
BooleanExpr,
BooleanExpression,
Selectable,
)
from google.cloud.firestore_v1 import _helpers
Expand Down Expand Up @@ -146,7 +147,7 @@ def add_fields(self, *fields: Selectable) -> "_BasePipeline":
The added fields are defined using `Selectable` expressions, which can be:
- `Field`: References an existing document field.
- `Function`: Performs a calculation using functions like `add`,
`multiply` with assigned aliases using `Expr.as_()`.
`multiply` with assigned aliases using `Expression.as_()`.

Example:
>>> from google.cloud.firestore_v1.pipeline_expressions import Field, add
Expand Down Expand Up @@ -193,7 +194,7 @@ def select(self, *selections: str | Selectable) -> "_BasePipeline":
The selected fields are defined using `Selectable` expressions or field names:
- `Field`: References an existing document field.
- `Function`: Represents the result of a function with an assigned alias
name using `Expr.as_()`.
name using `Expression.as_()`.
- `str`: The name of an existing field.

If no selections are provided, the output of this stage is empty. Use
Expand All @@ -219,14 +220,14 @@ def select(self, *selections: str | Selectable) -> "_BasePipeline":
"""
return self._append(stages.Select(*selections))

def where(self, condition: BooleanExpr) -> "_BasePipeline":
def where(self, condition: BooleanExpression) -> "_BasePipeline":
"""
Filters the documents from previous stages to only include those matching
the specified `BooleanExpr`.
the specified `BooleanExpression`.

This stage allows you to apply conditions to the data, similar to a "WHERE"
clause in SQL. You can filter documents based on their field values, using
implementations of `BooleanExpr`, typically including but not limited to:
implementations of `BooleanExpression`, typically including but not limited to:
- field comparators: `eq`, `lt` (less than), `gt` (greater than), etc.
- logical operators: `And`, `Or`, `Not`, etc.
- advanced functions: `regex_matches`, `array_contains`, etc.
Expand All @@ -251,7 +252,7 @@ def where(self, condition: BooleanExpr) -> "_BasePipeline":


Args:
condition: The `BooleanExpr` to apply.
condition: The `BooleanExpression` to apply.

Returns:
A new Pipeline object with this stage appended to the stage list
Expand All @@ -260,7 +261,7 @@ def where(self, condition: BooleanExpr) -> "_BasePipeline":

def find_nearest(
self,
field: str | Expr,
field: str | Expression,
vector: Sequence[float] | "Vector",
distance_measure: "DistanceMeasure",
options: stages.FindNearestOptions | None = None,
Expand Down Expand Up @@ -297,7 +298,7 @@ def find_nearest(
... )

Args:
field: The name of the field (str) or an expression (`Expr`) that
field: The name of the field (str) or an expression (`Expression`) that
evaluates to the vector data. This field should store vector values.
vector: The target vector (sequence of floats or `Vector` object) to
compare against.
Expand Down Expand Up @@ -457,28 +458,29 @@ def unnest(
"""
return self._append(stages.Unnest(field, alias, options))

def generic_stage(self, name: str, *params: Expr) -> "_BasePipeline":
def raw_stage(self, name: str, *params: Expression) -> "_BasePipeline":
"""
Adds a generic, named stage to the pipeline with specified parameters.
Adds a stage to the pipeline by specifying the stage name as an argument. This does not offer any
type safety on the stage params and requires the caller to know the order (and optionally names)
of parameters accepted by the stage.

This method provides a flexible way to extend the pipeline's functionality
by adding custom stages. Each generic stage is defined by a unique `name`
and a set of `params` that control its behavior.
This class provides a way to call stages that are supported by the Firestore backend but that
are not implemented in the SDK version being used.

Example:
>>> # Assume we don't have a built-in "where" stage
>>> pipeline = client.pipeline().collection("books")
>>> pipeline = pipeline.generic_stage("where", [Field.of("published").lt(900)])
>>> pipeline = pipeline.raw_stage("where", Field.of("published").lt(900))
>>> pipeline = pipeline.select("title", "author")

Args:
name: The name of the generic stage.
*params: A sequence of `Expr` objects representing the parameters for the stage.
name: The name of the stage.
*params: A sequence of `Expression` objects representing the parameters for the stage.

Returns:
A new Pipeline object with this stage appended to the stage list
"""
return self._append(stages.GenericStage(name, *params))
return self._append(stages.RawStage(name, *params))

def offset(self, offset: int) -> "_BasePipeline":
"""
Expand Down Expand Up @@ -530,7 +532,7 @@ def limit(self, limit: int) -> "_BasePipeline":

def aggregate(
self,
*accumulators: AliasedAggregate,
*accumulators: AliasedExpression[AggregateFunction],
groups: Sequence[str | Selectable] = (),
) -> "_BasePipeline":
"""
Expand All @@ -546,7 +548,6 @@ def aggregate(
- **Groups:** Optionally specify fields (by name or `Selectable`) to group
the documents by. Aggregations are then performed within each distinct group.
If no groups are provided, the aggregation is performed over the entire input.

Example:
>>> from google.cloud.firestore_v1.pipeline_expressions import Field
>>> pipeline = client.pipeline().collection("books")
Expand All @@ -568,8 +569,8 @@ def aggregate(


Args:
*accumulators: One or more `AliasedAggregate` expressions defining
the aggregations to perform and their output names.
*accumulators: One or more expressions defining the aggregations to perform and their
corresponding output names.
groups: An optional sequence of field names (str) or `Selectable`
expressions to group by before aggregating.

Expand Down
2 changes: 1 addition & 1 deletion google/cloud/firestore_v1/base_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -1151,7 +1151,7 @@ def pipeline(self):
# Filters
for filter_ in self._field_filters:
ppl = ppl.where(
pipeline_expressions.BooleanExpr._from_query_filter_pb(
pipeline_expressions.BooleanExpression._from_query_filter_pb(
filter_, self._client
)
)
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/firestore_v1/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from __future__ import annotations
from typing import Iterable, TYPE_CHECKING
from google.cloud.firestore_v1 import _pipeline_stages as stages
from google.cloud.firestore_v1 import pipeline_stages as stages
from google.cloud.firestore_v1.base_pipeline import _BasePipeline

if TYPE_CHECKING: # pragma: NO COVER
Expand Down
Loading