-
Notifications
You must be signed in to change notification settings - Fork 418
Description
Hi folks !
This is 2/2 questions wrt to fsspec cache handling when we want to clear it.
Context:
I develop a python library to work with ocean data collected using a specific type of instruments (Argo floats): https://github.com/euroargodev/argopy
Data can be fetched from local files or from an erddap server online.
I recently tried to use fsspec to implement file/resource management and ease handling of cached data.
I ended up wrapping file, memory and http fsspec file systems into classes that managed custom methods like open_dataset or open_dataframe.
Question 2:
Since the fsspec method wasn't working the way I expected (see issue #324), I implemented my own method in the custom file system wrapper. It works like this:
- Every time a call is made to
open, I register the opened path in an internal list (self.cache_registry) - When users call on the
clear_cache()method, I load the cache and remove the corresponding entry from the picklecachefile and I also delete the corresponding hash file.
This goes basically like this:
import pickle
import tempfile
import shutil
import fsspec
import os
class argo_store():
protocol = 'file'
def __init__(self, cache: bool = False, cachedir: str = "", **kw):
""" Create a file storage system for argopy
Parameters
----------
cache : bool (False)
cachedir : str (from OPTIONS)
"""
self.cache = cache
self.cachedir = OPTIONS['cachedir'] if cachedir == '' else cachedir
self.cache_registry = [] # Will hold uri cached by this store instance
if not self.cache:
self.fs = fsspec.filesystem(self.protocol, **kw)
else:
self.fs = fsspec.filesystem("filecache",
target_protocol=self.protocol,
target_options={'simple_links': True},
cache_storage=self.cachedir,
expiry_time=86400, cache_check=10, **kw)
def store_path(self, uri):
if not uri.startswith(self.fs.target_protocol):
path = self.fs.target_protocol + "://" + uri
else:
path = uri
return path
def register(self, uri):
""" Keep track of files open with this instance """
if self.cache:
self.cache_registry.append(self.store_path(uri))
def _clear_cache_item(self, uri):
""" Open fsspec cache registry (pickle file) and remove entry for an uri """
# Inspired from the fsspec "save_cache()" method in:
# https://filesystem-spec.readthedocs.io/en/latest/_modules/fsspec/implementations/cached.html#WholeFileCacheFileSystem
fn = os.path.join(self.fs.storage[-1], "cache")
cache = self.fs.cached_files[-1]
if os.path.exists(fn):
with open(fn, "rb") as f:
cached_files = pickle.load(f)
else:
cached_files = cache
cache = {}
for k, v in cached_files.items():
if k != uri:
cache[k] = v.copy()
else:
os.remove(os.path.join(self.fs.storage[-1], v['fn']))
fn2 = tempfile.mktemp()
with open(fn2, "wb") as f:
pickle.dump(cache, f)
shutil.move(fn2, fn)
def list_cache(self):
if self.cache:
fn = os.path.join(self.fs.storage[-1], "cache")
cache = self.fs.cached_files[-1]
if os.path.exists(fn):
with open(fn, "rb") as f:
cached_files = pickle.load(f)
else:
cached_files = cache
cache = {}
print("\ncached_files items:")
for k, v in cached_files.items():
print("\n", k, "\n",v)
def clear_cache(self):
""" Remove cached files and cache registry entry from uri opened with this store instance """
if self.cache:
for uri in self.cache_registry:
self._clear_cache_item(uri)
def open(self, url, *args, **kwargs):
url = os.path.abspath(url)
self.register(url)
return self.fs.open(url, *args, **kwargs) and here it is in action, trying to show case the effective cleaning of the cache:
with open("dummy_fileA.txt", "w") as fp:
fp.write('Hello world!')
# Instantiate the file store and list the initial state of the cache:
fs = argo_store(cache=1, cachedir="./tmp")
fs.list_cache()
# Then we read some dummy data from a dummy file to trigger caching
with fs.open(os.path.abspath("dummy_fileA.txt"), "r") as fp:
txt = fp.read()
# Check on the file store internal cache registry:
print("\nURI in argo_store registry:\n", fs.cache_registry)
# and list the cache folder:
fs.list_cache()
# Now, we can clear the cache:
print("\nCLEAR CACHE !")
fs.clear_cache()
# and check again the cache folder content to see the effect:
print("\nFiles in cache folder:\n", os.listdir("./tmp"))
fs.list_cache()produces:
cached_files items:
URI in argo_store registry:
['file:///Users/gmaze/git/github/euroargodev/argopy/local_work/dummy_fileA.txt']
cached_files items:
file:///Users/gmaze/git/github/euroargodev/argopy/local_work/dummy_fileA.txt
{'fn': 'efedbec0e26404c4c80f7668501cb66bd14a44112161acd9b6b5ce0f7706ce0c', 'blocks': True, 'time': 1592411294.763516, 'uid': '282336a7a94cc0ce955a259e49539c0e'}
CLEAR CACHE !
Files in cache folder:
['cache']
cached_files items:
Using this mechanism, the cache folder can be used by other fs instances.
My question is: would this mechanism be robust to other protocols than "file" ? and does it solve #324