- 
                Notifications
    
You must be signed in to change notification settings  - Fork 31
 
feat: support aborted transactions internal retry #544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
50580b4
              481db2a
              313e16d
              3a7537c
              4edf6c1
              f87129a
              49e17be
              a8158b3
              ccf5385
              a537628
              0b1a641
              0e2ca3e
              a4ffab5
              fc890c2
              d204836
              9ea2a01
              7e70d86
              870e170
              450b91b
              578eaa2
              59d597a
              File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| # Copyright 2020 Google LLC | ||
| # | ||
| # Use of this source code is governed by a BSD-style | ||
| # license that can be found in the LICENSE file or at | ||
| # https://developers.google.com/open-source/licenses/bsd | ||
| 
     | 
||
| """API to calculate checksums of SQL statements results.""" | ||
| 
     | 
||
| import hashlib | ||
| import pickle | ||
| 
     | 
||
| 
     | 
||
| class ResultsChecksum: | ||
| """Cumulative checksum. | ||
| Used to calculate a total checksum of all the results | ||
| returned by operations executed within transaction. | ||
| Includes methods for checksums comparison. | ||
| These checksums are used while retrying an aborted | ||
| transaction to check if the results of a retried transaction | ||
| are equal to the results of the original transaction. | ||
| """ | ||
| 
     | 
||
| def __init__(self): | ||
| self.checksum = hashlib.sha256() | ||
| self.count = 0 # counter of consumed results | ||
| 
     | 
||
| def __len__(self): | ||
| """Return the number of consumed results. | ||
| :rtype: :class:`int` | ||
| :returns: The number of results. | ||
| """ | ||
| return self.count | ||
| 
     | 
||
| def __eq__(self, other): | ||
| """Check if checksums are equal. | ||
| :type other: :class:`google.cloud.spanner_dbapi.checksum.ResultsChecksum` | ||
| :param other: Another checksum to compare with this one. | ||
| """ | ||
| return self.checksum.digest() == other.checksum.digest() | ||
| 
     | 
||
| def consume_result(self, result): | ||
| """Add the given result into the checksum. | ||
| :type result: Union[int, list] | ||
| :param result: Streamed row or row count from an UPDATE operation. | ||
| """ | ||
| self.checksum.update(pickle.dumps(result)) | ||
| self.count += 1 | ||
| 
     | 
||
| 
     | 
||
| def _compare_checksums(original, retried): | ||
| """Compare the given checksums. | ||
| Raise an error if the given checksums have consumed | ||
| the same number of results, but are not equal. | ||
| :type original: :class:`~google.cloud.spanner_v1.transaction.ResultsChecksum` | ||
| :param original: results checksum of the original transaction. | ||
| :type retried: :class:`~google.cloud.spanner_v1.transaction.ResultsChecksum` | ||
| :param retried: results checksum of the retried transaction. | ||
| :raises: :exc:`RuntimeError` in case if checksums are not equal. | ||
| """ | ||
| if original is not None: | ||
| if len(retried) == len(original) and retried != original: | ||
| raise RuntimeError( | ||
                
      
                  IlyaFaer marked this conversation as resolved.
               
              
                Outdated
          
            Show resolved
            Hide resolved
         | 
||
| "The underlying data being changed while retrying an aborted transaction." | ||
                
      
                  IlyaFaer marked this conversation as resolved.
               
              
                Outdated
          
            Show resolved
            Hide resolved
         | 
||
| ) | ||
                
      
                  IlyaFaer marked this conversation as resolved.
               
              
                Outdated
          
            Show resolved
            Hide resolved
         | 
||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| # Copyright 2020 Google LLC | ||
| # | ||
| # Use of this source code is governed by a BSD-style | ||
| # license that can be found in the LICENSE file or at | ||
| # https://developers.google.com/open-source/licenses/bsd | ||
| 
     | 
||
| import unittest | ||
| 
     | 
||
| from google.cloud.spanner_dbapi.checksum import ( | ||
| _compare_checksums, | ||
| ResultsChecksum, | ||
| ) | ||
| 
     | 
||
| 
     | 
||
| class Test_compare_checksums(unittest.TestCase): | ||
| def test_no_original_checksum(self): | ||
| self.assertIsNone(_compare_checksums(None, ResultsChecksum())) | ||
                
      
                  IlyaFaer marked this conversation as resolved.
               
              
                Outdated
          
            Show resolved
            Hide resolved
         | 
||
| 
     | 
||
| def test_equal(self): | ||
| original = ResultsChecksum() | ||
| original.consume_result(5) | ||
| 
     | 
||
| retried = ResultsChecksum() | ||
| retried.consume_result(5) | ||
| 
     | 
||
| self.assertIsNone(_compare_checksums(original, retried)) | ||
| 
     | 
||
| def test_less_results(self): | ||
| original = ResultsChecksum() | ||
| original.consume_result(5) | ||
| 
     | 
||
| retried = ResultsChecksum() | ||
| 
     | 
||
| self.assertIsNone(_compare_checksums(original, retried)) | ||
                
      
                  IlyaFaer marked this conversation as resolved.
               
              
                Outdated
          
            Show resolved
            Hide resolved
         | 
||
| 
     | 
||
| def test_mismatch(self): | ||
| original = ResultsChecksum() | ||
| original.consume_result(5) | ||
| 
     | 
||
| retried = ResultsChecksum() | ||
| retried.consume_result(2) | ||
| 
     | 
||
| with self.assertRaises(RuntimeError): | ||
| _compare_checksums(original, retried) | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -12,6 +12,7 @@ | |
| # import google.cloud.spanner_dbapi.exceptions as dbapi_exceptions | ||
| 
     | 
||
| from google.cloud.spanner_dbapi import Connection, InterfaceError | ||
| from google.cloud.spanner_dbapi.checksum import ResultsChecksum | ||
| from google.cloud.spanner_dbapi.connection import AUTOCOMMIT_MODE_WARNING | ||
| from google.cloud.spanner_v1.database import Database | ||
| from google.cloud.spanner_v1.instance import Instance | ||
| 
         There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why these tests are still here? They were copied into  There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question. Looks like this (and  The test files weren't exactly copied, #532 changed them and added some new tests. E.g. the version on master now doesn't include  @mf2199 can you confirm that you meant to change/remove these tests before removing   | 
||
| 
          
            
          
           | 
    @@ -77,3 +78,59 @@ def test_instance_property(self): | |
| 
     | 
||
| with self.assertRaises(AttributeError): | ||
| connection.instance = None | ||
| 
     | 
||
| def test_run_statement(self): | ||
| """Check that Connection remembers executed statements.""" | ||
| statement = """SELECT 23 FROM table WHERE id = @a1""" | ||
| params = {"a1": "value"} | ||
| param_types = {"a1": str} | ||
| 
     | 
||
| connection = self._make_connection() | ||
| 
     | 
||
| with mock.patch( | ||
| "google.cloud.spanner_dbapi.connection.Connection.transaction_checkout" | ||
| ): | ||
| connection.run_statement(statement, params, param_types) | ||
| 
     | 
||
| self.assertEqual(connection._statements[0]["sql"], statement) | ||
| self.assertEqual(connection._statements[0]["params"], params) | ||
| self.assertEqual(connection._statements[0]["param_types"], param_types) | ||
| self.assertIsInstance( | ||
| connection._statements[0]["checksum"], ResultsChecksum | ||
| ) | ||
| 
     | 
||
| def test_clear_statements_on_commit(self): | ||
| """ | ||
| Check that all the saved statements are | ||
| cleared, when the transaction is commited. | ||
| """ | ||
| connection = self._make_connection() | ||
| connection._transaction = mock.Mock() | ||
| connection._statements = [{}, {}] | ||
| 
     | 
||
| self.assertEqual(len(connection._statements), 2) | ||
| 
     | 
||
| with mock.patch( | ||
| "google.cloud.spanner_v1.transaction.Transaction.commit" | ||
| ): | ||
| connection.commit() | ||
| 
     | 
||
| self.assertEqual(len(connection._statements), 0) | ||
| 
     | 
||
| def test_clear_statements_on_rollback(self): | ||
| """ | ||
| Check that all the saved statements are | ||
| cleared, when the transaction is roll backed. | ||
| """ | ||
| connection = self._make_connection() | ||
| connection._transaction = mock.Mock() | ||
| connection._statements = [{}, {}] | ||
| 
     | 
||
| self.assertEqual(len(connection._statements), 2) | ||
| 
     | 
||
| with mock.patch( | ||
| "google.cloud.spanner_v1.transaction.Transaction.commit" | ||
| ): | ||
| connection.rollback() | ||
| 
     | 
||
| self.assertEqual(len(connection._statements), 0) | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| 
          
            
          
           | 
    @@ -4,8 +4,10 @@ | |
| # license that can be found in the LICENSE file or at | ||
| # https://developers.google.com/open-source/licenses/bsd | ||
| 
     | 
||
| import unittest | ||
| import hashlib | ||
| import pickle | ||
| import os | ||
| import unittest | ||
| 
     | 
||
| from google.api_core import exceptions | ||
| 
     | 
||
| 
          
            
          
           | 
    @@ -289,6 +291,34 @@ def test_rollback_on_connection_closing(self): | |
| cursor.close() | ||
| conn.close() | ||
| 
     | 
||
| def test_results_checksum(self): | ||
| """Test that results checksum is calculated properly.""" | ||
| conn = Connection(Config.INSTANCE, self._db) | ||
| cursor = conn.cursor() | ||
| 
     | 
||
| cursor.execute( | ||
| """ | ||
| INSERT INTO contacts (contact_id, first_name, last_name, email) | ||
| VALUES | ||
| (1, 'first-name', 'last-name', '[email protected]'), | ||
| (2, 'first-name2', 'last-name2', '[email protected]') | ||
| """ | ||
| ) | ||
| self.assertEqual(len(conn._statements), 1) | ||
| conn.commit() | ||
| 
     | 
||
| cursor.execute("SELECT * FROM contacts") | ||
| got_rows = cursor.fetchall() | ||
| 
     | 
||
| self.assertEqual(len(conn._statements), 1) | ||
| conn.commit() | ||
| 
     | 
||
| checksum = hashlib.sha256() | ||
| checksum.update(pickle.dumps(got_rows[0])) | ||
| checksum.update(pickle.dumps(got_rows[1])) | ||
| 
     | 
||
| self.assertEqual(cursor._checksum.checksum.digest(), checksum.digest()) | ||
| 
     | 
||
| 
     | 
||
| def clear_table(transaction): | ||
| """Clear the test table.""" | ||
| 
          
            
          
           | 
    ||
Uh oh!
There was an error while loading. Please reload this page.