Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion google/cloud/firestore_v1/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
"""
from __future__ import annotations

import datetime

from typing import TYPE_CHECKING, Any, Generator, List, Optional, Union

from google.api_core import exceptions, gapic_v1
Expand Down Expand Up @@ -56,6 +58,7 @@ def get(
timeout: float | None = None,
*,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime.datetime] = None,
) -> QueryResultsList[AggregationResult]:
"""Runs the aggregation query.

Expand All @@ -78,6 +81,10 @@ def get(
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.

Returns:
QueryResultsList[AggregationResult]: The aggregation query results.
Expand All @@ -90,6 +97,7 @@ def get(
retry=retry,
timeout=timeout,
explain_options=explain_options,
read_time=read_time,
)
result_list = list(result)

Expand All @@ -100,13 +108,16 @@ def get(

return QueryResultsList(result_list, explain_options, explain_metrics)

def _get_stream_iterator(self, transaction, retry, timeout, explain_options=None):
def _get_stream_iterator(
self, transaction, retry, timeout, explain_options=None, read_time=None
):
"""Helper method for :meth:`stream`."""
request, kwargs = self._prep_stream(
transaction,
retry,
timeout,
explain_options,
read_time,
)

return self._client._firestore_api.run_aggregation_query(
Expand All @@ -132,6 +143,7 @@ def _make_stream(
retry: Union[retries.Retry, None, object] = gapic_v1.method.DEFAULT,
timeout: Optional[float] = None,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime.datetime] = None,
) -> Generator[List[AggregationResult], Any, Optional[ExplainMetrics]]:
"""Internal method for stream(). Runs the aggregation query.

Expand All @@ -155,6 +167,10 @@ def _make_stream(
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.

Yields:
List[AggregationResult]:
Expand All @@ -172,6 +188,7 @@ def _make_stream(
retry,
timeout,
explain_options,
read_time,
)
while True:
try:
Expand All @@ -182,6 +199,8 @@ def _make_stream(
transaction,
retry,
timeout,
explain_options,
read_time,
)
continue
else:
Expand All @@ -206,6 +225,7 @@ def stream(
timeout: Optional[float] = None,
*,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime.datetime] = None,
) -> StreamGenerator[List[AggregationResult]]:
"""Runs the aggregation query.

Expand All @@ -229,6 +249,10 @@ def stream(
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.

Returns:
`StreamGenerator[List[AggregationResult]]`:
Expand All @@ -239,5 +263,6 @@ def stream(
retry=retry,
timeout=timeout,
explain_options=explain_options,
read_time=read_time,
)
return StreamGenerator(inner_generator, explain_options)
15 changes: 15 additions & 0 deletions google/cloud/firestore_v1/base_aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from __future__ import annotations

import abc
import datetime

from abc import ABC
from typing import TYPE_CHECKING, Any, Coroutine, List, Optional, Tuple, Union

Expand Down Expand Up @@ -205,6 +207,7 @@ def _prep_stream(
retry: Union[retries.Retry, retries.AsyncRetry, None, object] = None,
timeout: float | None = None,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime.datetime] = None,
) -> Tuple[dict, dict]:
parent_path, expected_prefix = self._collection_ref._parent_info()
request = {
Expand All @@ -214,6 +217,8 @@ def _prep_stream(
}
if explain_options:
request["explain_options"] = explain_options._to_dict()
if read_time is not None:
request["read_time"] = read_time
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)

return request, kwargs
Expand All @@ -228,6 +233,7 @@ def get(
timeout: float | None = None,
*,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime.datetime] = None,
) -> (
QueryResultsList[AggregationResult]
| Coroutine[Any, Any, List[List[AggregationResult]]]
Expand All @@ -253,6 +259,10 @@ def get(
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.

Returns:
(QueryResultsList[List[AggregationResult]] | Coroutine[Any, Any, List[List[AggregationResult]]]):
Expand All @@ -270,6 +280,7 @@ def stream(
timeout: Optional[float] = None,
*,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime.datetime] = None,
) -> (
StreamGenerator[List[AggregationResult]]
| AsyncStreamGenerator[List[AggregationResult]]
Expand All @@ -291,6 +302,10 @@ def stream(
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
read_time (Optional[datetime.datetime]): If set, reads documents as they were at the given
time. This must be a timestamp within the past one hour, or if Point-in-Time Recovery
is enabled, can additionally be a whole minute timestamp within the past 7 days. If no
timezone is specified in the :class:`datetime.datetime` object, it is assumed to be UTC.

Returns:
StreamGenerator[List[AggregationResult]] | AsyncStreamGenerator[List[AggregationResult]]:
Expand Down
15 changes: 14 additions & 1 deletion google/cloud/firestore_v1/base_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"""
from __future__ import annotations

import datetime
import os
from typing import (
Any,
Expand Down Expand Up @@ -437,6 +438,7 @@ def _prep_get_all(
transaction: BaseTransaction | None = None,
retry: retries.Retry | retries.AsyncRetry | object | None = None,
timeout: float | None = None,
read_time: Optional[datetime.datetime] = None,
) -> Tuple[dict, dict, dict]:
"""Shared setup for async/sync :meth:`get_all`."""
document_paths, reference_map = _reference_info(references)
Expand All @@ -447,6 +449,8 @@ def _prep_get_all(
"mask": mask,
"transaction": _helpers.get_transaction_id(transaction),
}
if read_time is not None:
request["read_time"] = read_time
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)

return request, reference_map, kwargs
Expand All @@ -458,6 +462,8 @@ def get_all(
transaction=None,
retry: retries.Retry | retries.AsyncRetry | object | None = None,
timeout: float | None = None,
*,
read_time: Optional[datetime.datetime] = None,
) -> Union[
AsyncGenerator[DocumentSnapshot, Any], Generator[DocumentSnapshot, Any, Any]
]:
Expand All @@ -467,9 +473,14 @@ def _prep_collections(
self,
retry: retries.Retry | retries.AsyncRetry | object | None = None,
timeout: float | None = None,
read_time: Optional[datetime.datetime] = None,
) -> Tuple[dict, dict]:
"""Shared setup for async/sync :meth:`collections`."""
request = {"parent": "{}/documents".format(self._database_string)}
request = {
"parent": "{}/documents".format(self._database_string),
}
if read_time is not None:
request["read_time"] = read_time
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)

return request, kwargs
Expand All @@ -478,6 +489,8 @@ def collections(
self,
retry: retries.Retry | retries.AsyncRetry | object | None = None,
timeout: float | None = None,
*,
read_time: Optional[datetime.datetime] = None,
):
raise NotImplementedError

Expand Down
9 changes: 9 additions & 0 deletions google/cloud/firestore_v1/base_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
"""Classes for representing collections for the Google Cloud Firestore API."""
from __future__ import annotations

import datetime
import random

from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -202,6 +204,7 @@ def _prep_list_documents(
page_size: Optional[int] = None,
retry: retries.Retry | retries.AsyncRetry | object | None = None,
timeout: Optional[float] = None,
read_time: Optional[datetime.datetime] = None,
) -> Tuple[dict, dict]:
"""Shared setup for async / sync :method:`list_documents`"""
parent, _ = self._parent_info()
Expand All @@ -215,6 +218,8 @@ def _prep_list_documents(
# to include no fields
"mask": {"field_paths": None},
}
if read_time is not None:
request["read_time"] = read_time
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)

return request, kwargs
Expand All @@ -224,6 +229,8 @@ def list_documents(
page_size: Optional[int] = None,
retry: retries.Retry | retries.AsyncRetry | object | None = None,
timeout: Optional[float] = None,
*,
read_time: Optional[datetime.datetime] = None,
) -> Union[
Generator[DocumentReference, Any, Any], AsyncGenerator[DocumentReference, Any]
]:
Expand Down Expand Up @@ -497,6 +504,7 @@ def get(
timeout: Optional[float] = None,
*,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime.datetime] = None,
) -> (
QueryResultsList[DocumentSnapshot]
| Coroutine[Any, Any, QueryResultsList[DocumentSnapshot]]
Expand All @@ -510,6 +518,7 @@ def stream(
timeout: Optional[float] = None,
*,
explain_options: Optional[ExplainOptions] = None,
read_time: Optional[datetime.datetime] = None,
) -> StreamGenerator[DocumentSnapshot] | AsyncIterator[DocumentSnapshot]:
raise NotImplementedError

Expand Down
17 changes: 16 additions & 1 deletion google/cloud/firestore_v1/base_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from __future__ import annotations

import copy
import datetime

from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -290,6 +292,7 @@ def _prep_batch_get(
transaction=None,
retry: retries.Retry | retries.AsyncRetry | None | object = None,
timeout: float | None = None,
read_time: Optional[datetime.datetime] = None,
) -> Tuple[dict, dict]:
"""Shared setup for async/sync :meth:`get`."""
if isinstance(field_paths, str):
Expand All @@ -306,6 +309,8 @@ def _prep_batch_get(
"mask": mask,
"transaction": _helpers.get_transaction_id(transaction),
}
if read_time is not None:
request["read_time"] = read_time
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)

return request, kwargs
Expand All @@ -316,6 +321,8 @@ def get(
transaction=None,
retry: retries.Retry | retries.AsyncRetry | None | object = None,
timeout: float | None = None,
*,
read_time: Optional[datetime.datetime] = None,
) -> "DocumentSnapshot" | Awaitable["DocumentSnapshot"]:
raise NotImplementedError

Expand All @@ -324,9 +331,15 @@ def _prep_collections(
page_size: int | None = None,
retry: retries.Retry | retries.AsyncRetry | None | object = None,
timeout: float | None = None,
read_time: Optional[datetime.datetime] = None,
) -> Tuple[dict, dict]:
"""Shared setup for async/sync :meth:`collections`."""
request = {"parent": self._document_path, "page_size": page_size}
request = {
"parent": self._document_path,
"page_size": page_size,
}
if read_time is not None:
request["read_time"] = read_time
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)

return request, kwargs
Expand All @@ -336,6 +349,8 @@ def collections(
page_size: int | None = None,
retry: retries.Retry | retries.AsyncRetry | None | object = None,
timeout: float | None = None,
*,
read_time: Optional[datetime.datetime] = None,
):
raise NotImplementedError

Expand Down
Loading
Loading