Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/mock_server_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
pull_request:
name: Run Spanner tests against an in-mem mock server
jobs:
system-tests:
mock-server-tests:
runs-on: ubuntu-latest

steps:
Expand Down
42 changes: 42 additions & 0 deletions .github/workflows/presubmit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
on:
push:
branches:
- main
pull_request:
name: Presubmit checks
permissions:
contents: read
pull-requests: write
jobs:
lint:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: 3.8
- name: Install nox
run: python -m pip install nox
- name: Check formatting
run: nox -s lint
units:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python: ["3.8", "3.9", "3.10", "3.11", "3.12", "3.13"]

steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: ${{matrix.python}}
- name: Install nox
run: python -m pip install nox
- name: Run unit tests
run: nox -s unit-${{matrix.python}}
6 changes: 3 additions & 3 deletions .kokoro/presubmit/presubmit.cfg
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Format: //devtools/kokoro/config/proto/build.proto

# Disable system tests.
# Only run a subset of all nox sessions
env_vars: {
key: "RUN_SYSTEM_TESTS"
value: "false"
key: "NOX_SESSION"
value: "unit-3.8 unit-3.12 cover docs docfx"
}
6 changes: 6 additions & 0 deletions google/cloud/spanner_dbapi/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

from google.api_core.exceptions import Aborted
from google.api_core.gapic_v1.client_info import ClientInfo
from google.auth.credentials import AnonymousCredentials

from google.cloud import spanner_v1 as spanner
from google.cloud.spanner_dbapi import partition_helper
from google.cloud.spanner_dbapi.batch_dml_executor import BatchMode, BatchDmlExecutor
Expand Down Expand Up @@ -784,11 +786,15 @@ def connect(
route_to_leader_enabled=route_to_leader_enabled,
)
else:
client_options = None
if isinstance(credentials, AnonymousCredentials):
client_options = kwargs.get("client_options")
client = spanner.Client(
project=project,
credentials=credentials,
client_info=client_info,
route_to_leader_enabled=route_to_leader_enabled,
client_options=client_options,
)
else:
if project is not None and client.project != project:
Expand Down
6 changes: 4 additions & 2 deletions google/cloud/spanner_dbapi/transaction_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def add_execute_statement_for_retry(
self._last_statement_details_per_cursor[cursor] = last_statement_result_details
self._statement_result_details_list.append(last_statement_result_details)

def retry_transaction(self):
def retry_transaction(self, default_retry_delay=None):
"""Retry the aborted transaction.

All the statements executed in the original transaction
Expand Down Expand Up @@ -202,7 +202,9 @@ def retry_transaction(self):
raise RetryAborted(RETRY_ABORTED_ERROR, ex)
return
except Aborted as ex:
delay = _get_retry_delay(ex.errors[0], attempt)
delay = _get_retry_delay(
ex.errors[0], attempt, default_retry_delay=default_retry_delay
)
if delay:
time.sleep(delay)

Expand Down
16 changes: 12 additions & 4 deletions google/cloud/spanner_v1/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ def _metadata_with_prefix(prefix, **kw):
def _retry_on_aborted_exception(
func,
deadline,
default_retry_delay=None,
):
"""
Handles retry logic for Aborted exceptions, considering the deadline.
Expand All @@ -520,7 +521,12 @@ def _retry_on_aborted_exception(
attempts += 1
return func()
except Aborted as exc:
_delay_until_retry(exc, deadline=deadline, attempts=attempts)
_delay_until_retry(
exc,
deadline=deadline,
attempts=attempts,
default_retry_delay=default_retry_delay,
)
continue


Expand Down Expand Up @@ -608,7 +614,7 @@ def _metadata_with_span_context(metadata: List[Tuple[str, str]], **kw) -> None:
inject(setter=OpenTelemetryContextSetter(), carrier=metadata)


def _delay_until_retry(exc, deadline, attempts):
def _delay_until_retry(exc, deadline, attempts, default_retry_delay=None):
"""Helper for :meth:`Session.run_in_transaction`.

Detect retryable abort, and impose server-supplied delay.
Expand All @@ -628,15 +634,15 @@ def _delay_until_retry(exc, deadline, attempts):
if now >= deadline:
raise

delay = _get_retry_delay(cause, attempts)
delay = _get_retry_delay(cause, attempts, default_retry_delay=default_retry_delay)
if delay is not None:
if now + delay > deadline:
raise

time.sleep(delay)


def _get_retry_delay(cause, attempts):
def _get_retry_delay(cause, attempts, default_retry_delay=None):
"""Helper for :func:`_delay_until_retry`.

:type exc: :class:`grpc.Call`
Expand All @@ -658,6 +664,8 @@ def _get_retry_delay(cause, attempts):
retry_info.ParseFromString(retry_info_pb)
nanos = retry_info.retry_delay.nanos
return retry_info.retry_delay.seconds + nanos / 1.0e9
if default_retry_delay is not None:
return default_retry_delay

return 2**attempts + random.random()

Expand Down
2 changes: 2 additions & 0 deletions google/cloud/spanner_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,11 @@ def commit(
deadline = time.time() + kwargs.get(
"timeout_secs", DEFAULT_RETRY_TIMEOUT_SECS
)
default_retry_delay = kwargs.get("default_retry_delay", None)
response = _retry_on_aborted_exception(
method,
deadline=deadline,
default_retry_delay=default_retry_delay,
)
self.committed = response.commit_timestamp
self.commit_stats = response.commit_stats
Expand Down
4 changes: 3 additions & 1 deletion google/cloud/spanner_v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,9 @@ def __init__(
meter_provider = MeterProvider(
metric_readers=[
PeriodicExportingMetricReader(
CloudMonitoringMetricsExporter(),
CloudMonitoringMetricsExporter(
project_id=project, credentials=credentials
),
export_interval_millis=METRIC_EXPORT_INTERVAL_MS,
)
]
Expand Down
3 changes: 3 additions & 0 deletions google/cloud/spanner_v1/metrics/metrics_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from typing import Optional, List, Union, NoReturn, Tuple, Dict

import google.auth
from google.auth import credentials as ga_credentials
from google.api.distribution_pb2 import ( # pylint: disable=no-name-in-module
Distribution,
)
Expand Down Expand Up @@ -111,6 +112,7 @@ def __init__(
self,
project_id: Optional[str] = None,
client: Optional["MetricServiceClient"] = None,
credentials: Optional[ga_credentials.Credentials] = None,
):
"""Initialize a custom exporter to send metrics for the Spanner Service Metrics."""
# Default preferred_temporality is all CUMULATIVE so need to customize
Expand All @@ -121,6 +123,7 @@ def __init__(
transport=MetricServiceGrpcTransport(
channel=MetricServiceGrpcTransport.create_channel(
options=_OPTIONS,
credentials=credentials,
)
)
)
Expand Down
21 changes: 17 additions & 4 deletions google/cloud/spanner_v1/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ def run_in_transaction(self, func, *args, **kw):
reraises any non-ABORT exceptions raised by ``func``.
"""
deadline = time.time() + kw.pop("timeout_secs", DEFAULT_RETRY_TIMEOUT_SECS)
default_retry_delay = kw.pop("default_retry_delay", None)
commit_request_options = kw.pop("commit_request_options", None)
max_commit_delay = kw.pop("max_commit_delay", None)
transaction_tag = kw.pop("transaction_tag", None)
Expand Down Expand Up @@ -502,7 +503,11 @@ def run_in_transaction(self, func, *args, **kw):
except Aborted as exc:
del self._transaction
if span:
delay_seconds = _get_retry_delay(exc.errors[0], attempts)
delay_seconds = _get_retry_delay(
exc.errors[0],
attempts,
default_retry_delay=default_retry_delay,
)
attributes = dict(delay_seconds=delay_seconds, cause=str(exc))
attributes.update(span_attributes)
add_span_event(
Expand All @@ -511,7 +516,9 @@ def run_in_transaction(self, func, *args, **kw):
attributes,
)

_delay_until_retry(exc, deadline, attempts)
_delay_until_retry(
exc, deadline, attempts, default_retry_delay=default_retry_delay
)
continue
except GoogleAPICallError:
del self._transaction
Expand Down Expand Up @@ -539,7 +546,11 @@ def run_in_transaction(self, func, *args, **kw):
except Aborted as exc:
del self._transaction
if span:
delay_seconds = _get_retry_delay(exc.errors[0], attempts)
delay_seconds = _get_retry_delay(
exc.errors[0],
attempts,
default_retry_delay=default_retry_delay,
)
attributes = dict(delay_seconds=delay_seconds)
attributes.update(span_attributes)
add_span_event(
Expand All @@ -548,7 +559,9 @@ def run_in_transaction(self, func, *args, **kw):
attributes,
)

_delay_until_retry(exc, deadline, attempts)
_delay_until_retry(
exc, deadline, attempts, default_retry_delay=default_retry_delay
)
except GoogleAPICallError:
del self._transaction
add_span_event(
Expand Down
82 changes: 36 additions & 46 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,21 +181,6 @@ def install_unittest_dependencies(session, *constraints):
# XXX: Dump installed versions to debug OT issue
session.run("pip", "list")

# Run py.test against the unit tests with OpenTelemetry.
session.run(
"py.test",
"--quiet",
"--cov=google.cloud.spanner",
"--cov=google.cloud",
"--cov=tests.unit",
"--cov-append",
"--cov-config=.coveragerc",
"--cov-report=",
"--cov-fail-under=0",
os.path.join("tests", "unit"),
*session.posargs,
)


@nox.session(python=UNIT_TEST_PYTHON_VERSIONS)
@nox.parametrize(
Expand Down Expand Up @@ -329,9 +314,12 @@ def system(session, protobuf_implementation, database_dialect):
session.skip(
"Credentials or emulator host must be set via environment variable"
)
# If POSTGRESQL tests and Emulator, skip the tests
if os.environ.get("SPANNER_EMULATOR_HOST") and database_dialect == "POSTGRESQL":
session.skip("Postgresql is not supported by Emulator yet.")
if not (
os.environ.get("SPANNER_EMULATOR_HOST") or protobuf_implementation == "python"
):
session.skip(
"Only run system tests on real Spanner with one protobuf implementation to speed up the build"
)

# Install pyopenssl for mTLS testing.
if os.environ.get("GOOGLE_API_USE_CLIENT_CERTIFICATE", "false") == "true":
Expand Down Expand Up @@ -365,7 +353,7 @@ def system(session, protobuf_implementation, database_dialect):
"SKIP_BACKUP_TESTS": "true",
},
)
if system_test_folder_exists:
elif system_test_folder_exists:
session.run(
"py.test",
"--quiet",
Expand Down Expand Up @@ -567,30 +555,32 @@ def prerelease_deps(session, protobuf_implementation, database_dialect):
system_test_path = os.path.join("tests", "system.py")
system_test_folder_path = os.path.join("tests", "system")

# Only run system tests if found.
if os.path.exists(system_test_path):
session.run(
"py.test",
"--verbose",
f"--junitxml=system_{session.python}_sponge_log.xml",
system_test_path,
*session.posargs,
env={
"PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION": protobuf_implementation,
"SPANNER_DATABASE_DIALECT": database_dialect,
"SKIP_BACKUP_TESTS": "true",
},
)
if os.path.exists(system_test_folder_path):
session.run(
"py.test",
"--verbose",
f"--junitxml=system_{session.python}_sponge_log.xml",
system_test_folder_path,
*session.posargs,
env={
"PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION": protobuf_implementation,
"SPANNER_DATABASE_DIALECT": database_dialect,
"SKIP_BACKUP_TESTS": "true",
},
)
# Only run system tests for one protobuf implementation on real Spanner to speed up the build.
if os.environ.get("SPANNER_EMULATOR_HOST") or protobuf_implementation == "python":
# Only run system tests if found.
if os.path.exists(system_test_path):
session.run(
"py.test",
"--verbose",
f"--junitxml=system_{session.python}_sponge_log.xml",
system_test_path,
*session.posargs,
env={
"PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION": protobuf_implementation,
"SPANNER_DATABASE_DIALECT": database_dialect,
"SKIP_BACKUP_TESTS": "true",
},
)
elif os.path.exists(system_test_folder_path):
session.run(
"py.test",
"--verbose",
f"--junitxml=system_{session.python}_sponge_log.xml",
system_test_folder_path,
*session.posargs,
env={
"PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION": protobuf_implementation,
"SPANNER_DATABASE_DIALECT": database_dialect,
"SKIP_BACKUP_TESTS": "true",
},
)
Loading
Loading