Skip to content

Commit

Permalink
always rollback (#52)
Browse files Browse the repository at this point in the history
* always rollback

* cleanup

* cleanup
  • Loading branch information
malmans2 authored Dec 22, 2022
1 parent a7c23b5 commit b4654f7
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 22 deletions.
43 changes: 22 additions & 21 deletions cacholote/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
_LOCKER = "__locked__"


def _update_last_primary_keys_and_return(
def _update_last_primary_keys(
session: sqlalchemy.orm.Session, cache_entry: Any, tag: Optional[str]
) -> Any:
# Wait until unlocked
Expand All @@ -55,12 +55,15 @@ def _update_last_primary_keys_and_return(
cache_entry.counter += 1
if tag is not None:
cache_entry.tag = tag
session.commit()
try:
session.commit()
finally:
session.rollback()
LAST_PRIMARY_KEYS.set(cache_entry._primary_keys)
return result


def _clear_last_primary_keys_and_return(result: Any) -> Any:
def _clear_last_primary_keys(result: Any) -> Any:
LAST_PRIMARY_KEYS.set({})
return result

Expand All @@ -69,7 +72,10 @@ def _delete_cache_entry(
session: sqlalchemy.orm.Session, cache_entry: config.CacheEntry
) -> None:
session.delete(cache_entry)
session.commit()
try:
session.commit()
finally:
session.rollback()
# Delete cache file
json.loads(cache_entry._result_as_string, object_hook=clean._delete_cache_file)

Expand Down Expand Up @@ -122,14 +128,14 @@ def wrapper(

# Cache opt-out
if not settings.use_cache:
return _clear_last_primary_keys_and_return(func(*args, **kwargs))
return _clear_last_primary_keys(func(*args, **kwargs))

try:
# Get key
hexdigest = hexdigestify_python_call(func, *args, **kwargs)
except encode.EncodeError as ex:
warnings.warn(f"can NOT encode python call: {ex!r}", UserWarning)
return _clear_last_primary_keys_and_return(func(*args, **kwargs))
return _clear_last_primary_keys(func(*args, **kwargs))

# Filters for the database query
filters = [
Expand All @@ -147,53 +153,48 @@ def wrapper(
):
# Attempt all valid cache entries
try:
return _update_last_primary_keys_and_return(
session, cache_entry, tag
)
return _update_last_primary_keys(session, cache_entry, tag)
except decode.DecodeError as ex:
# Something wrong, e.g. cached files are corrupted
warnings.warn(str(ex), UserWarning)
_delete_cache_entry(session, cache_entry)
finally:
session.rollback()

# Not in the cache
cache_entry = None
try:
# Lock cache entry
# Acquire lock
cache_entry = config.CacheEntry(
key=hexdigest,
expiration=expiration,
result=_LOCKER,
tag=settings.tag,
)
session.add(cache_entry)
session.commit()
try:
session.commit()
finally:
session.rollback()
except sqlalchemy.exc.IntegrityError:
# Concurrent job: This cache entry already exists.
filters = [
config.CacheEntry.key == cache_entry.key,
config.CacheEntry.expiration == cache_entry.expiration,
]
session.rollback()
cache_entry = session.query(config.CacheEntry).filter(*filters).one()
return _update_last_primary_keys_and_return(session, cache_entry, tag)
return _update_last_primary_keys(session, cache_entry, tag)
else:
# Compute result from scratch
result = func(*args, **kwargs)
try:
# Update cache
cache_entry.result = json.loads(encode.dumps(result))
return _update_last_primary_keys_and_return(
session, cache_entry, tag
)
return _update_last_primary_keys(session, cache_entry, tag)
except encode.EncodeError as ex:
# Enconding error, return result without caching
warnings.warn(f"can NOT encode output: {ex!r}", UserWarning)
return _clear_last_primary_keys_and_return(result)
return _clear_last_primary_keys(result)
finally:
session.rollback()
# Unlock
# Release lock
if cache_entry and cache_entry.result == _LOCKER:
_delete_cache_entry(session, cache_entry)

Expand Down
5 changes: 4 additions & 1 deletion cacholote/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ def _delete_cache_file(
if session and cache_entry and not dry_run:
logging.info("Deleting cache entry: %r", cache_entry)
session.delete(cache_entry)
session.commit()
try:
session.commit()
finally:
session.rollback()
if not dry_run:
with utils._Locker(fs, urlpath) as file_exists:
if file_exists:
Expand Down

0 comments on commit b4654f7

Please sign in to comment.