Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
15573d3
support passing in explain_options
daniel-sanche Nov 6, 2025
406b9b2
added PipelineSnapshot and PipelineStream classes
daniel-sanche Nov 6, 2025
5aca1cf
added ExplainStats class
daniel-sanche Nov 6, 2025
065be33
got pipeline containers set up
daniel-sanche Nov 6, 2025
e0ed49f
added async pipeline
daniel-sanche Nov 6, 2025
129116a
moved request logic into PipelineStream class
daniel-sanche Nov 6, 2025
2a09653
added errors if iterated multiple times
daniel-sanche Nov 6, 2025
6943391
simplified creation of pipelinesnapshot
daniel-sanche Nov 6, 2025
c866d7c
added generics
daniel-sanche Nov 6, 2025
cb592cc
simplified how arguments are passed
daniel-sanche Nov 6, 2025
4ac6a2f
Merge branch 'pipeline-preview' into explain_stats
daniel-sanche Nov 6, 2025
05a40f7
added explain stats tests
daniel-sanche Nov 6, 2025
ee0dc0f
got tests passing
daniel-sanche Nov 7, 2025
06fe8da
added unit tests
daniel-sanche Nov 7, 2025
bb911f8
added more tests for pipeline streams
daniel-sanche Nov 7, 2025
e2137f2
added system test
daniel-sanche Nov 7, 2025
da5ef80
added unit tests
daniel-sanche Nov 7, 2025
36b2d3f
fixed lint
daniel-sanche Nov 7, 2025
e2fc5ce
updated docstrings
daniel-sanche Nov 7, 2025
18a78d3
use custom type for PipelineExplainOptions
daniel-sanche Nov 7, 2025
73b72e2
Merge branch 'pipeline-preview' into explain_stats
daniel-sanche Nov 7, 2025
e7895af
Merge branch 'pipeline-preview' into explain_stats
daniel-sanche Nov 8, 2025
6a40bf3
add check for explain_stats
daniel-sanche Nov 10, 2025
474e15d
Merge branch 'pipeline-preview' into explain_stats
daniel-sanche Nov 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 43 additions & 24 deletions google/cloud/firestore_v1/async_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@
# limitations under the License.

from __future__ import annotations
from typing import AsyncIterable, TYPE_CHECKING
from typing import TYPE_CHECKING
from google.cloud.firestore_v1 import pipeline_stages as stages
from google.cloud.firestore_v1.base_pipeline import _BasePipeline
from google.cloud.firestore_v1.pipeline_result import AsyncPipelineStream
from google.cloud.firestore_v1.pipeline_result import PipelineSnapshot
from google.cloud.firestore_v1.pipeline_result import PipelineResult

if TYPE_CHECKING: # pragma: NO COVER
import datetime
from google.cloud.firestore_v1.async_client import AsyncClient
from google.cloud.firestore_v1.pipeline_result import PipelineResult
from google.cloud.firestore_v1.async_transaction import AsyncTransaction
from google.cloud.firestore_v1.pipeline_expressions import Constant
from google.cloud.firestore_v1.types.document import Value
from google.cloud.firestore_v1.query_profile import PipelineExplainOptions


class AsyncPipeline(_BasePipeline):
Expand All @@ -41,7 +46,7 @@ class AsyncPipeline(_BasePipeline):
... .collection("books")
... .where(Field.of("published").gt(1980))
... .select("title", "author")
... async for result in pipeline.execute():
... async for result in pipeline.stream():
... print(result)

Use `client.pipeline()` to create instances of this class.
Expand All @@ -59,15 +64,18 @@ def __init__(self, client: AsyncClient, *stages: stages.Stage):

async def execute(
self,
*,
transaction: "AsyncTransaction" | None = None,
read_time: datetime.datetime | None = None,
) -> list[PipelineResult]:
explain_options: PipelineExplainOptions | None = None,
index_mode: str | None = None,
additional_options: dict[str, Value | Constant] = {},
) -> PipelineSnapshot[PipelineResult]:
"""
Executes this pipeline and returns results as a list

Args:
transaction
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
If a ``transaction`` is used and it already has write operations
added, this method cannot be used (i.e. read-after-write is not
Expand All @@ -76,25 +84,33 @@ async def execute(
time. This must be a microsecond precision timestamp within the past one hour, or
if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
within the past 7 days. For the most accurate results, use UTC timezone.
explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned list.
index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present.
Firestore will reject the request if there is not appropiate indexes to serve the query.
additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode)
"""
return [
result
async for result in self.stream(
transaction=transaction, read_time=read_time
)
]
kwargs = {k: v for k, v in locals().items() if k != "self"}
stream = AsyncPipelineStream(PipelineResult, self, **kwargs)
results = [result async for result in stream]
return PipelineSnapshot(results, stream)

async def stream(
def stream(
self,
transaction: "AsyncTransaction" | None = None,
*,
read_time: datetime.datetime | None = None,
) -> AsyncIterable[PipelineResult]:
transaction: "AsyncTransaction" | None = None,
explain_options: PipelineExplainOptions | None = None,
index_mode: str | None = None,
additional_options: dict[str, Value | Constant] = {},
) -> AsyncPipelineStream[PipelineResult]:
"""
Process this pipeline as a stream, providing results through an Iterable
Process this pipeline as a stream, providing results through an AsyncIterable

Args:
transaction
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
If a ``transaction`` is used and it already has write operations
added, this method cannot be used (i.e. read-after-write is not
Expand All @@ -103,10 +119,13 @@ async def stream(
time. This must be a microsecond precision timestamp within the past one hour, or
if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
within the past 7 days. For the most accurate results, use UTC timezone.
explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present.
Firestore will reject the request if there is not appropiate indexes to serve the query.
additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode)
"""
request = self._prep_execute_request(transaction, read_time)
async for response in await self._client._firestore_api.execute_pipeline(
request
):
for result in self._execute_response_helper(response):
yield result
kwargs = {k: v for k, v in locals().items() if k != "self"}
return AsyncPipelineStream(PipelineResult, self, **kwargs)
54 changes: 4 additions & 50 deletions google/cloud/firestore_v1/base_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@
# limitations under the License.

from __future__ import annotations
from typing import Iterable, Sequence, TYPE_CHECKING
from typing import Sequence, TYPE_CHECKING
from google.cloud.firestore_v1 import pipeline_stages as stages
from google.cloud.firestore_v1.types.pipeline import (
StructuredPipeline as StructuredPipeline_pb,
)
from google.cloud.firestore_v1.vector import Vector
from google.cloud.firestore_v1.base_vector_query import DistanceMeasure
from google.cloud.firestore_v1.types.firestore import ExecutePipelineRequest
from google.cloud.firestore_v1.pipeline_result import PipelineResult
from google.cloud.firestore_v1.pipeline_expressions import (
AggregateFunction,
AliasedExpression,
Expand All @@ -30,14 +28,10 @@
BooleanExpression,
Selectable,
)
from google.cloud.firestore_v1 import _helpers

if TYPE_CHECKING: # pragma: NO COVER
import datetime
from google.cloud.firestore_v1.client import Client
from google.cloud.firestore_v1.async_client import AsyncClient
from google.cloud.firestore_v1.types.firestore import ExecutePipelineResponse
from google.cloud.firestore_v1.transaction import BaseTransaction


class _BasePipeline:
Expand Down Expand Up @@ -88,9 +82,10 @@ def __repr__(self):
stages_str = ",\n ".join([repr(s) for s in self.stages])
return f"{cls_str}(\n {stages_str}\n)"

def _to_pb(self) -> StructuredPipeline_pb:
def _to_pb(self, **options) -> StructuredPipeline_pb:
return StructuredPipeline_pb(
pipeline={"stages": [s._to_pb() for s in self.stages]}
pipeline={"stages": [s._to_pb() for s in self.stages]},
options=options,
)

def _append(self, new_stage):
Expand All @@ -99,47 +94,6 @@ def _append(self, new_stage):
"""
return self.__class__._create_with_stages(self._client, *self.stages, new_stage)

def _prep_execute_request(
self,
transaction: BaseTransaction | None,
read_time: datetime.datetime | None,
) -> ExecutePipelineRequest:
"""
shared logic for creating an ExecutePipelineRequest
"""
database_name = (
f"projects/{self._client.project}/databases/{self._client._database}"
)
transaction_id = (
_helpers.get_transaction_id(transaction)
if transaction is not None
else None
)
request = ExecutePipelineRequest(
database=database_name,
transaction=transaction_id,
structured_pipeline=self._to_pb(),
read_time=read_time,
)
return request

def _execute_response_helper(
self, response: ExecutePipelineResponse
) -> Iterable[PipelineResult]:
"""
shared logic for unpacking an ExecutePipelineReponse into PipelineResults
"""
for doc in response.results:
ref = self._client.document(doc.name) if doc.name else None
yield PipelineResult(
self._client,
doc.fields,
ref,
response._pb.execution_time,
doc._pb.create_time if doc.create_time else None,
doc._pb.update_time if doc.update_time else None,
)

def add_fields(self, *fields: Selectable) -> "_BasePipeline":
"""
Adds new fields to outputs from previous stages.
Expand Down
54 changes: 39 additions & 15 deletions google/cloud/firestore_v1/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@
# limitations under the License.

from __future__ import annotations
from typing import Iterable, TYPE_CHECKING
from typing import TYPE_CHECKING
from google.cloud.firestore_v1 import pipeline_stages as stages
from google.cloud.firestore_v1.base_pipeline import _BasePipeline
from google.cloud.firestore_v1.pipeline_result import PipelineStream
from google.cloud.firestore_v1.pipeline_result import PipelineSnapshot
from google.cloud.firestore_v1.pipeline_result import PipelineResult

if TYPE_CHECKING: # pragma: NO COVER
import datetime
from google.cloud.firestore_v1.client import Client
from google.cloud.firestore_v1.pipeline_result import PipelineResult
from google.cloud.firestore_v1.pipeline_expressions import Constant
from google.cloud.firestore_v1.transaction import Transaction
from google.cloud.firestore_v1.types.document import Value
from google.cloud.firestore_v1.query_profile import PipelineExplainOptions


class Pipeline(_BasePipeline):
Expand Down Expand Up @@ -56,15 +61,18 @@ def __init__(self, client: Client, *stages: stages.Stage):

def execute(
self,
*,
transaction: "Transaction" | None = None,
read_time: datetime.datetime | None = None,
) -> list[PipelineResult]:
explain_options: PipelineExplainOptions | None = None,
index_mode: str | None = None,
additional_options: dict[str, Value | Constant] = {},
) -> PipelineSnapshot[PipelineResult]:
"""
Executes this pipeline and returns results as a list

Args:
transaction
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
If a ``transaction`` is used and it already has write operations
added, this method cannot be used (i.e. read-after-write is not
Expand All @@ -73,23 +81,33 @@ def execute(
time. This must be a microsecond precision timestamp within the past one hour, or
if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
within the past 7 days. For the most accurate results, use UTC timezone.
explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned list.
index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present.
Firestore will reject the request if there is not appropiate indexes to serve the query.
additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode)
"""
return [
result
for result in self.stream(transaction=transaction, read_time=read_time)
]
kwargs = {k: v for k, v in locals().items() if k != "self"}
stream = PipelineStream(PipelineResult, self, **kwargs)
results = [result for result in stream]
return PipelineSnapshot(results, stream)

def stream(
self,
*,
transaction: "Transaction" | None = None,
read_time: datetime.datetime | None = None,
) -> Iterable[PipelineResult]:
explain_options: PipelineExplainOptions | None = None,
index_mode: str | None = None,
additional_options: dict[str, Value | Constant] = {},
) -> PipelineStream[PipelineResult]:
"""
Process this pipeline as a stream, providing results through an Iterable

Args:
transaction
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
If a ``transaction`` is used and it already has write operations
added, this method cannot be used (i.e. read-after-write is not
Expand All @@ -98,7 +116,13 @@ def stream(
time. This must be a microsecond precision timestamp within the past one hour, or
if Point-in-Time Recovery is enabled, can additionally be a whole minute timestamp
within the past 7 days. For the most accurate results, use UTC timezone.
explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.PipelineExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
index_mode (Optional[str]): Configures the pipeline to require a certain type of indexes to be present.
Firestore will reject the request if there is not appropiate indexes to serve the query.
additional_options (Optional[dict[str, Value | Constant]]): Additional options to pass to the query.
These options will take precedence over method argument if there is a conflict (e.g. explain_options, index_mode)
"""
request = self._prep_execute_request(transaction, read_time)
for response in self._client._firestore_api.execute_pipeline(request):
yield from self._execute_response_helper(response)
kwargs = {k: v for k, v in locals().items() if k != "self"}
return PipelineStream(PipelineResult, self, **kwargs)
Loading