- 
                Notifications
    
You must be signed in to change notification settings  - Fork 31
 
feat: support aborted transactions internal retry #544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
50580b4
              481db2a
              313e16d
              3a7537c
              4edf6c1
              f87129a
              49e17be
              a8158b3
              ccf5385
              a537628
              0b1a641
              0e2ca3e
              a4ffab5
              fc890c2
              d204836
              9ea2a01
              7e70d86
              870e170
              450b91b
              578eaa2
              59d597a
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| # Copyright 2020 Google LLC | ||
| # | ||
| # Use of this source code is governed by a BSD-style | ||
| # license that can be found in the LICENSE file or at | ||
| # https://developers.google.com/open-source/licenses/bsd | ||
| 
     | 
||
| """API to calculate checksums of SQL statements results.""" | ||
| 
     | 
||
| import hashlib | ||
| import pickle | ||
| 
     | 
||
| from google.api_core.exceptions import Aborted | ||
| 
     | 
||
| 
     | 
||
| class ResultsChecksum: | ||
| """Cumulative checksum. | ||
| 
     | 
||
| Used to calculate a total checksum of all the results | ||
| returned by operations executed within transaction. | ||
| Includes methods for checksums comparison. | ||
| These checksums are used while retrying an aborted | ||
| transaction to check if the results of a retried transaction | ||
| are equal to the results of the original transaction. | ||
| """ | ||
| 
     | 
||
| def __init__(self): | ||
| self.checksum = hashlib.sha256() | ||
| self.count = 0 # counter of consumed results | ||
| 
     | 
||
| def __len__(self): | ||
| """Return the number of consumed results. | ||
| 
     | 
||
| :rtype: :class:`int` | ||
| :returns: The number of results. | ||
| """ | ||
| return self.count | ||
| 
     | 
||
| def __eq__(self, other): | ||
| """Check if checksums are equal. | ||
| 
     | 
||
| :type other: :class:`google.cloud.spanner_dbapi.checksum.ResultsChecksum` | ||
| :param other: Another checksum to compare with this one. | ||
| """ | ||
| return self.checksum.digest() == other.checksum.digest() | ||
| 
     | 
||
| def consume_result(self, result): | ||
| """Add the given result into the checksum. | ||
| 
     | 
||
| :type result: Union[int, list] | ||
| :param result: Streamed row or row count from an UPDATE operation. | ||
| """ | ||
| self.checksum.update(pickle.dumps(result)) | ||
| self.count += 1 | ||
| 
     | 
||
| 
     | 
||
| def _compare_checksums(original, retried): | ||
| """Compare the given checksums. | ||
| 
     | 
||
| Raise an error if the given checksums has | ||
| different length, or are not equal. | ||
| 
     | 
||
| :type original: :class:`~google.cloud.spanner_v1.transaction.ResultsChecksum` | ||
| :param original: results checksum of the original transaction. | ||
| 
     | 
||
| :type retried: :class:`~google.cloud.spanner_v1.transaction.ResultsChecksum` | ||
| :param retried: results checksum of the retried transaction. | ||
| 
     | 
||
| :raises: :exc:`google.api_core.exceptions.Aborted` in case if checksums are not equal. | ||
| """ | ||
| if len(retried) != len(original) or retried != original: | ||
| raise Aborted( | ||
| "The transaction was aborted and could not be retried due to a concurrent modification." | ||
| ) | 
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 
          
            
          
           | 
    @@ -6,6 +6,7 @@ | |||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| """Database cursor for Google Cloud Spanner DB-API.""" | ||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| from google.api_core.exceptions import Aborted | ||||||||||||||||||||||||||||||||||||
| from google.api_core.exceptions import AlreadyExists | ||||||||||||||||||||||||||||||||||||
| from google.api_core.exceptions import FailedPrecondition | ||||||||||||||||||||||||||||||||||||
| from google.api_core.exceptions import InternalServerError | ||||||||||||||||||||||||||||||||||||
| 
        
          
        
         | 
    @@ -14,7 +15,7 @@ | |||||||||||||||||||||||||||||||||||
| from collections import namedtuple | ||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| from google.cloud import spanner_v1 as spanner | ||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| from google.cloud.spanner_dbapi.checksum import ResultsChecksum | ||||||||||||||||||||||||||||||||||||
| from google.cloud.spanner_dbapi.exceptions import IntegrityError | ||||||||||||||||||||||||||||||||||||
| from google.cloud.spanner_dbapi.exceptions import InterfaceError | ||||||||||||||||||||||||||||||||||||
| from google.cloud.spanner_dbapi.exceptions import OperationalError | ||||||||||||||||||||||||||||||||||||
| 
        
          
        
         | 
    @@ -26,6 +27,7 @@ | |||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| from google.cloud.spanner_dbapi import parse_utils | ||||||||||||||||||||||||||||||||||||
| from google.cloud.spanner_dbapi.parse_utils import get_param_types | ||||||||||||||||||||||||||||||||||||
| from google.cloud.spanner_dbapi.parse_utils import sql_pyformat_args_to_spanner | ||||||||||||||||||||||||||||||||||||
| from google.cloud.spanner_dbapi.utils import PeekIterator | ||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| _UNSET_COUNT = -1 | ||||||||||||||||||||||||||||||||||||
| 
        
          
        
         | 
    @@ -46,6 +48,8 @@ def __init__(self, connection): | |||||||||||||||||||||||||||||||||||
| self._row_count = _UNSET_COUNT | ||||||||||||||||||||||||||||||||||||
| self.connection = connection | ||||||||||||||||||||||||||||||||||||
| self._is_closed = False | ||||||||||||||||||||||||||||||||||||
| # the currently running SQL statement results checksum | ||||||||||||||||||||||||||||||||||||
| self._checksum = None | ||||||||||||||||||||||||||||||||||||
                
      
                  IlyaFaer marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
         | 
||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| # the number of rows to fetch at a time with fetchmany() | ||||||||||||||||||||||||||||||||||||
| self.arraysize = 1 | ||||||||||||||||||||||||||||||||||||
| 
          
            
          
           | 
    @@ -158,16 +162,18 @@ def execute(self, sql, args=None): | |||||||||||||||||||||||||||||||||||
| self.connection.run_prior_DDL_statements() | ||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| if not self.connection.autocommit: | ||||||||||||||||||||||||||||||||||||
| transaction = self.connection.transaction_checkout() | ||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| sql, params = parse_utils.sql_pyformat_args_to_spanner( | ||||||||||||||||||||||||||||||||||||
| sql, args | ||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| self._result_set = transaction.execute_sql( | ||||||||||||||||||||||||||||||||||||
| sql, params, param_types=get_param_types(params) | ||||||||||||||||||||||||||||||||||||
| sql, params = sql_pyformat_args_to_spanner(sql, args) | ||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| statement = { | ||||||||||||||||||||||||||||||||||||
| "sql": sql, | ||||||||||||||||||||||||||||||||||||
| "params": params, | ||||||||||||||||||||||||||||||||||||
| "param_types": get_param_types(params), | ||||||||||||||||||||||||||||||||||||
| "checksum": ResultsChecksum(), | ||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||
| self._res, self._checksum = self.connection.run_statement( | ||||||||||||||||||||||||||||||||||||
| statement | ||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||
| self._itr = PeekIterator(self._result_set) | ||||||||||||||||||||||||||||||||||||
| self._itr = PeekIterator(self._res) | ||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| if classification == parse_utils.STMT_NON_UPDATING: | ||||||||||||||||||||||||||||||||||||
| 
          
            
          
           | 
    @@ -207,9 +213,31 @@ def fetchone(self): | |||||||||||||||||||||||||||||||||||
| self._raise_if_closed() | ||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||
| return next(self) | ||||||||||||||||||||||||||||||||||||
| res = next(self) | ||||||||||||||||||||||||||||||||||||
                
      
                  IlyaFaer marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
         | 
||||||||||||||||||||||||||||||||||||
| self._checksum.consume_result(res) | ||||||||||||||||||||||||||||||||||||
                
      
                  IlyaFaer marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
         | 
||||||||||||||||||||||||||||||||||||
| return res | ||||||||||||||||||||||||||||||||||||
| except StopIteration: | ||||||||||||||||||||||||||||||||||||
| return None | ||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||
| except Aborted: | ||||||||||||||||||||||||||||||||||||
| self.connection.retry_transaction() | ||||||||||||||||||||||||||||||||||||
| return self.fetchone() | ||||||||||||||||||||||||||||||||||||
                
      
                  IlyaFaer marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
                
      
                  IlyaFaer marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
        There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this will be a problem. Assuming that this is using the  Assume the following situation: 
 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems to me we can just drop the  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for reopening this, and this comment should not be considered blocking for merging this PR, but I think we need to look into this once more. Only dropping the  
 The JDBC driver client solves the above problem by wrapping all streaming iterators before returning these to the client application. That makes it possible for the JDBC driver to replace the underlying streaming iterator with a new one when a transaction has been aborted and successfully retried. We should add that to the Python DBApi as well, but we could do that in a separate PR to prevent this PR from becoming even bigger than it already is. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @c24t, @olavloite, hm-m. I think we're protected from errors here, because our connection API doesn't actually give streaming result objects to a user. Here is where we're getting a streaming iterator: python-spanner-django/google/cloud/spanner_dbapi/cursor.py Lines 167 to 170 in 196c449 
 So, iterator is held in the protected property  python-spanner-django/google/cloud/spanner_dbapi/cursor.py Lines 204 to 212 in 196c449 
 Where  python-spanner-django/google/cloud/spanner_dbapi/cursor.py Lines 293 to 296 in 196c449 
 Thus, if a transaction failed, the connection will drop the transaction, checkout a new one, re-run all the statements, each of which will replace   | 
||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| def fetchall(self): | ||||||||||||||||||||||||||||||||||||
| """Fetch all (remaining) rows of a query result, returning them as | ||||||||||||||||||||||||||||||||||||
| a sequence of sequences. | ||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||
| self._raise_if_closed() | ||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| res = [] | ||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||
| for row in self.__iter__(): | ||||||||||||||||||||||||||||||||||||
                
      
                  IlyaFaer marked this conversation as resolved.
               
              
                Outdated
          
            Show resolved
            Hide resolved
         | 
||||||||||||||||||||||||||||||||||||
| self._checksum.consume_result(row) | ||||||||||||||||||||||||||||||||||||
| res.append(row) | ||||||||||||||||||||||||||||||||||||
| except Aborted: | ||||||||||||||||||||||||||||||||||||
| self._connection.retry_transaction() | ||||||||||||||||||||||||||||||||||||
| return self.fetchall() | ||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| return res | ||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| def fetchmany(self, size=None): | ||||||||||||||||||||||||||||||||||||
| """Fetch the next set of rows of a query result, returning a sequence | ||||||||||||||||||||||||||||||||||||
| 
        
          
        
         | 
    @@ -230,20 +258,17 @@ def fetchmany(self, size=None): | |||||||||||||||||||||||||||||||||||
| items = [] | ||||||||||||||||||||||||||||||||||||
| for i in range(size): | ||||||||||||||||||||||||||||||||||||
| try: | ||||||||||||||||||||||||||||||||||||
| items.append(tuple(self.__next__())) | ||||||||||||||||||||||||||||||||||||
                
      
                  IlyaFaer marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
         | 
||||||||||||||||||||||||||||||||||||
| res = next(self) | ||||||||||||||||||||||||||||||||||||
                
      
                  IlyaFaer marked this conversation as resolved.
               
          
            Show resolved
            Hide resolved
         | 
||||||||||||||||||||||||||||||||||||
| self._checksum.consume_result(res) | ||||||||||||||||||||||||||||||||||||
| items.append(res) | ||||||||||||||||||||||||||||||||||||
| except StopIteration: | ||||||||||||||||||||||||||||||||||||
| break | ||||||||||||||||||||||||||||||||||||
| except Aborted: | ||||||||||||||||||||||||||||||||||||
| self._connection.retry_transaction() | ||||||||||||||||||||||||||||||||||||
| return self.fetchmany(size) | ||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| return items | ||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| def fetchall(self): | ||||||||||||||||||||||||||||||||||||
| """Fetch all (remaining) rows of a query result, returning them as | ||||||||||||||||||||||||||||||||||||
| a sequence of sequences. | ||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||
| self._raise_if_closed() | ||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| return list(self.__iter__()) | ||||||||||||||||||||||||||||||||||||
| 
     | 
||||||||||||||||||||||||||||||||||||
| def nextset(self): | ||||||||||||||||||||||||||||||||||||
| """A no-op, raising an error if the cursor or connection is closed.""" | ||||||||||||||||||||||||||||||||||||
| self._raise_if_closed() | ||||||||||||||||||||||||||||||||||||
| 
          
            
          
           | 
    ||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| # Copyright 2020 Google LLC | ||
| # | ||
| # Use of this source code is governed by a BSD-style | ||
| # license that can be found in the LICENSE file or at | ||
| # https://developers.google.com/open-source/licenses/bsd | ||
| 
     | 
||
| import unittest | ||
| 
     | 
||
| from google.api_core.exceptions import Aborted | ||
| from google.cloud.spanner_dbapi.checksum import ( | ||
| _compare_checksums, | ||
| ResultsChecksum, | ||
| ) | ||
| 
     | 
||
| 
     | 
||
| class Test_compare_checksums(unittest.TestCase): | ||
| def test_equal(self): | ||
| original = ResultsChecksum() | ||
| original.consume_result(5) | ||
| 
     | 
||
| retried = ResultsChecksum() | ||
| retried.consume_result(5) | ||
| 
     | 
||
| self.assertIsNone(_compare_checksums(original, retried)) | ||
| 
     | 
||
| def test_less_results(self): | ||
| original = ResultsChecksum() | ||
| original.consume_result(5) | ||
| 
     | 
||
| retried = ResultsChecksum() | ||
| 
     | 
||
| with self.assertRaises(Aborted): | ||
| _compare_checksums(original, retried) | ||
| 
     | 
||
| def test_more_results(self): | ||
| original = ResultsChecksum() | ||
| original.consume_result(5) | ||
| 
     | 
||
| retried = ResultsChecksum() | ||
| retried.consume_result(5) | ||
| retried.consume_result(2) | ||
| 
     | 
||
| with self.assertRaises(Aborted): | ||
| _compare_checksums(original, retried) | ||
| 
     | 
||
| def test_mismatch(self): | ||
| original = ResultsChecksum() | ||
| original.consume_result(5) | ||
| 
     | 
||
| retried = ResultsChecksum() | ||
| retried.consume_result(2) | ||
| 
     | 
||
| with self.assertRaises(Aborted): | ||
| _compare_checksums(original, retried) | 
Uh oh!
There was an error while loading. Please reload this page.