From 9601756039921558affd8f0e30d19d1f0bb1d96d Mon Sep 17 00:00:00 2001 From: Kirill Smelkov Date: Fri, 16 Apr 2021 18:36:20 +0300 Subject: [PATCH] tests: Add test for load vs external invalidation race For ZEO this data corruption bug was reported at https://github.com/zopefoundation/ZEO/issues/155 and fixed at https://github.com/zopefoundation/ZEO/pull/169. Without that fix the failure shows e.g. as follows when running ZEO test suite: Failure in test check_race_load_vs_external_invalidate (ZEO.tests.testZEO.BlobAdaptedFileStorageTests) Traceback (most recent call last): File "/usr/lib/python2.7/unittest/case.py", line 329, in run testMethod() File "/home/kirr/src/wendelin/z/ZODB/src/ZODB/tests/BasicStorage.py", line 621, in check_race_load_vs_external_invalidate self.fail([_ for _ in failure if _]) File "/usr/lib/python2.7/unittest/case.py", line 410, in fail raise self.failureException(msg) AssertionError: ['T1: obj1.value (7) != obj2.value (8)'] Even if added test is somewhat similar to check_race_loadopen_vs_local_invalidate, it is added anew without trying to unify code. The reason here is that the probability to catch load vs external invalidation race is significantly reduced when there are only 1 modify and 1 verify workers. The unification with preserving both tests semantic would make test for "load vs local invalidate" harder to follow. Sometimes a little copying is better than trying to unify too much. For the test to work, test infrastructure is amended with ._new_storage_client() method that complements ._storage attribute: client-server storages like ZEO, NEO and RelStorage allow several storage clients to be connected to single storage server. For client-server storages test subclasses should implement _new_storage_client to return new storage client that is connected to the same storage server self._storage is connected to. For ZEO ._new_storage_client() is added by https://github.com/zopefoundation/ZEO/pull/170 Other client-server storages can follow to implement ._new_storage_client() and this way automatically activate this "load vs external invalidation" test when their testsuite is run. Contrary to test for "load vs local invalidate" N is set to lower value (100), because with 8 workers the bug is usually reproduced at not-so-high iteration number (5-10-20). /cc @d-maurer, @jamadden, @jmuchemb --- src/ZODB/tests/BasicStorage.py | 128 +++++++++++++++++++++++++++++++++ 1 file changed, 128 insertions(+) diff --git a/src/ZODB/tests/BasicStorage.py b/src/ZODB/tests/BasicStorage.py index 226c7e0e6..6c57c0a31 100644 --- a/src/ZODB/tests/BasicStorage.py +++ b/src/ZODB/tests/BasicStorage.py @@ -29,6 +29,7 @@ import time import zope.interface import zope.interface.verify +from random import randint from .. import utils @@ -491,3 +492,130 @@ def _modify(): if failed.is_set(): self.fail(failure[0]) + + + # client-server storages like ZEO, NEO and RelStorage allow several storage + # clients to be connected to single storage server. + # + # For client-server storages test subclasses should implement + # _new_storage_client to return new storage client that is connected to the + # same storage server self._storage is connected to. + def _new_storage_client(self): + raise NotImplementedError + + # verify storage for race in between load and external invalidations. + # https://github.com/zopefoundation/ZEO/issues/155 + # + # This test is similar to check_race_loadopen_vs_local_invalidate but does + # not reuse its code because the probability to reproduce external + # invalidation bug with only 1 mutator + 1 verifier is low. + def check_race_load_vs_external_invalidate(self): + # dbopen creates new client storage connection and wraps it with DB. + def dbopen(): + try: + zstor = self._new_storage_client() + except NotImplementedError: + # the test will be skipped from main thread because dbopen is + # first used in init on the main thread before any other thread + # is spawned. + self.skipTest("%s does not implement _new_storage_client" % type(self)) + return DB(zstor) + + # init initializes the database with two integer objects - obj1/obj2 that are set to 0. + def init(): + db = dbopen() + + transaction.begin() + zconn = db.open() + + root = zconn.root() + root['obj1'] = MinPO(0) + root['obj2'] = MinPO(0) + + transaction.commit() + zconn.close() + + db.close() + + + # T is a worker that accesses obj1/obj2 in a loop and verifies + # `obj1.value == obj2.value` invariant. + # + # access to obj1 is organized to always trigger loading from zstor. + # access to obj2 goes through zconn cache and so verifies whether the cache is not stale. + # + # Once in a while T tries to modify obj{1,2}.value maintaining the invariant as + # test source of changes for other workers. + failed = threading.Event() + failure = [] # [tx] is failure from T(tx) + def T(tx, N): + db = dbopen() + + def t_(): + transaction.begin() + zconn = db.open() + + root = zconn.root() + obj1 = root['obj1'] + obj2 = root['obj2'] + + # obj1 - reload it from zstor + # obj2 - get it from zconn cache + obj1._p_invalidate() + + # both objects must have the same values + i1 = obj1.value + i2 = obj2.value + if i1 != i2: + #print('FAIL') + failure[tx] = "T%s: obj1.value (%d) != obj2.value (%d)" % (tx, i1, i2) + failed.set() + + # change objects once in a while + if randint(0,4) == 0: + #print("T%s: modify" % tx) + obj1.value += 1 + obj2.value += 1 + + try: + transaction.commit() + except POSException.ConflictError: + #print('conflict -> ignore') + transaction.abort() + + zconn.close() + + try: + for i in range(N): + #print('T%s.%d' % (tx, i)) + t_() + if failed.is_set(): + break + except: + failed.set() + raise + finally: + db.close() + + + # run 8 T workers concurrently. As of 20210416, due to race conditions + # in ZEO, it triggers the bug where T sees stale obj2 with obj1.value != obj2.value + # + # The probability to reproduce the bug is significantly reduced with + # decreasing n(workers): almost never with nwork=2 and sometimes with nwork=4. + nwork = 8 + init() + + N = 100 + tg = [] + failure = [None]*nwork + for x in range(nwork): + t = threading.Thread(name='T%d' % x, target=T, args=(x, N)) + t.start() + tg.append(t) + + for t in tg: + t.join(60) + + if failed.is_set(): + self.fail([_ for _ in failure if _])