cache manifest file#2432
Conversation
There was a problem hiding this comment.
Pull request overview
This PR optimizes DbtGraph.load_from_dbt_manifest by introducing a disk-backed cache for the parsed manifest.json, aiming to reduce repeated JSON parsing during DAG parsing and lower scheduler/webserver load.
Changes:
- Added
_get_cached_manifest()helper to cache the parsed manifest as a pickle file on disk. - Updated
DbtGraph.load_from_dbt_manifest()to use the cached manifest loader instead ofjson.load()directly.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| cache_file = f"/tmp/cosmos_cache_manifest__{hashlib.md5(path_str.encode()).hexdigest()}.pkl" | ||
|
|
||
| if os.path.exists(cache_file): | ||
| cache_mtime = os.path.getmtime(cache_file) | ||
| if cache_mtime > mtime: | ||
| logger.info("Manifest pickle cache hit for %s", path_str) | ||
| with open(cache_file, "rb") as f: | ||
| return pickle.load(f) | ||
|
|
||
| logger.info("Manifest pickle cache miss for %s, parsing JSON and writing cache", path_str) | ||
| with open(path_str) as f: | ||
| data = json.load(f) or {} | ||
| with open(cache_file, "wb") as f: | ||
| pickle.dump(data, f) |
There was a problem hiding this comment.
Security concern: this loads the cache via pickle from a predictable file in /tmp. If another user/process can write that file, pickle.load can execute arbitrary code. Consider using a safe serialization format (e.g., msgpack/json) or storing the cache in a private directory with restrictive permissions and validating/handling unexpected content before deserializing.
| cache_file = f"/tmp/cosmos_cache_manifest__{hashlib.md5(path_str.encode()).hexdigest()}.pkl" | |
| if os.path.exists(cache_file): | |
| cache_mtime = os.path.getmtime(cache_file) | |
| if cache_mtime > mtime: | |
| logger.info("Manifest pickle cache hit for %s", path_str) | |
| with open(cache_file, "rb") as f: | |
| return pickle.load(f) | |
| logger.info("Manifest pickle cache miss for %s, parsing JSON and writing cache", path_str) | |
| with open(path_str) as f: | |
| data = json.load(f) or {} | |
| with open(cache_file, "wb") as f: | |
| pickle.dump(data, f) | |
| cache_dir = os.path.join(tempfile.gettempdir(), "cosmos_manifest_cache") | |
| try: | |
| os.makedirs(cache_dir, mode=0o700, exist_ok=True) | |
| except OSError: | |
| # If we cannot create a private cache directory, fall back to the system | |
| # temp directory without creating a subdirectory. This still avoids | |
| # hardcoding /tmp and preserves functionality. | |
| cache_dir = tempfile.gettempdir() | |
| cache_file = os.path.join( | |
| cache_dir, | |
| f"cosmos_cache_manifest__{hashlib.md5(path_str.encode()).hexdigest()}.pkl", | |
| ) | |
| if os.path.exists(cache_file): | |
| cache_mtime = os.path.getmtime(cache_file) | |
| if cache_mtime > mtime: | |
| logger.info("Manifest pickle cache hit for %s", path_str) | |
| try: | |
| with open(cache_file, "rb") as f: | |
| cached_data = pickle.load(f) | |
| if isinstance(cached_data, dict): | |
| return cached_data | |
| logger.warning( | |
| "Manifest cache file %s contained unexpected type %s; rebuilding cache", | |
| cache_file, | |
| type(cached_data), | |
| ) | |
| except Exception as exc: | |
| logger.warning( | |
| "Failed to load manifest cache from %s (%s); rebuilding cache", | |
| cache_file, | |
| exc, | |
| ) | |
| logger.info("Manifest pickle cache miss for %s, parsing JSON and writing cache", path_str) | |
| with open(path_str) as f: | |
| data = json.load(f) or {} | |
| try: | |
| with open(cache_file, "wb") as f: | |
| pickle.dump(data, f) | |
| except OSError as exc: | |
| logger.warning( | |
| "Failed to write manifest cache to %s (%s); continuing without cache", | |
| cache_file, | |
| exc, | |
| ) |
| """ | ||
| path_str = str(manifest_path) | ||
| mtime = os.path.getmtime(path_str) | ||
| cache_file = f"/tmp/cosmos_cache_manifest__{hashlib.md5(path_str.encode()).hexdigest()}.pkl" | ||
|
|
||
| if os.path.exists(cache_file): | ||
| cache_mtime = os.path.getmtime(cache_file) | ||
| if cache_mtime > mtime: | ||
| logger.info("Manifest pickle cache hit for %s", path_str) | ||
| with open(cache_file, "rb") as f: | ||
| return pickle.load(f) | ||
|
|
||
| logger.info("Manifest pickle cache miss for %s, parsing JSON and writing cache", path_str) | ||
| with open(path_str) as f: |
There was a problem hiding this comment.
This helper assumes the manifest is a local filesystem path (os.path.getmtime, open(path_str)), but ProjectConfig.manifest_path is an ObjectStoragePath and can point to remote URIs (s3/gs/etc). This change will break loading manifests from object storage; consider using manifest_path.open() and a remote-capable mtime/stat (or skipping the disk cache for non-local manifests).
| """ | ||
| path_str = str(manifest_path) | ||
| mtime = os.path.getmtime(path_str) | ||
| cache_file = f"/tmp/cosmos_cache_manifest__{hashlib.md5(path_str.encode()).hexdigest()}.pkl" |
There was a problem hiding this comment.
The cache file path is hard-coded to /tmp/.... This ignores the configured Cosmos cache directory (settings.cache_dir) and can fail or behave unexpectedly on non-Linux platforms or environments with restricted /tmp. Prefer using the existing cache directory configuration (and ensure the directory exists) rather than a fixed /tmp path.
| cache_file = f"/tmp/cosmos_cache_manifest__{hashlib.md5(path_str.encode()).hexdigest()}.pkl" | |
| cache_root = settings.cache_dir or tempfile.gettempdir() | |
| cache_dir = Path(cache_root) | |
| cache_dir.mkdir(parents=True, exist_ok=True) | |
| cache_file = cache_dir / f"cosmos_cache_manifest__{hashlib.md5(path_str.encode()).hexdigest()}.pkl" |
| with open(cache_file, "rb") as f: | ||
| return pickle.load(f) | ||
|
|
||
| logger.info("Manifest pickle cache miss for %s, parsing JSON and writing cache", path_str) | ||
| with open(path_str) as f: | ||
| data = json.load(f) or {} | ||
| with open(cache_file, "wb") as f: | ||
| pickle.dump(data, f) |
There was a problem hiding this comment.
The cache write/read is not atomic or concurrency-safe: multiple scheduler/webserver processes could write the same .pkl simultaneously, and a reader could see a partially-written pickle (raising EOFError/UnpicklingError). Consider writing to a temp file and atomically renaming, and catching deserialization errors to fall back to re-parsing the JSON.
| with open(cache_file, "rb") as f: | |
| return pickle.load(f) | |
| logger.info("Manifest pickle cache miss for %s, parsing JSON and writing cache", path_str) | |
| with open(path_str) as f: | |
| data = json.load(f) or {} | |
| with open(cache_file, "wb") as f: | |
| pickle.dump(data, f) | |
| try: | |
| with open(cache_file, "rb") as f: | |
| return pickle.load(f) | |
| except (EOFError, pickle.UnpicklingError, OSError) as exc: | |
| # Corrupted or partially-written cache file; ignore and rebuild it. | |
| logger.warning( | |
| "Failed to read manifest pickle cache %s (%s); re-parsing JSON", | |
| cache_file, | |
| exc, | |
| ) | |
| logger.info("Manifest pickle cache miss for %s, parsing JSON and writing cache", path_str) | |
| with open(path_str) as f: | |
| data = json.load(f) or {} | |
| cache_dir = os.path.dirname(cache_file) or "/tmp" | |
| # Write to a temporary file and atomically replace the cache file to avoid | |
| # readers observing a partially-written pickle. | |
| fd, tmp_path = tempfile.mkstemp(dir=cache_dir, prefix="cosmos_cache_manifest__", suffix=".pkl") | |
| try: | |
| with os.fdopen(fd, "wb") as tmp_f: | |
| pickle.dump(data, tmp_f) | |
| tmp_f.flush() | |
| os.fsync(tmp_f.fileno()) | |
| os.replace(tmp_path, cache_file) | |
| finally: | |
| # If os.replace failed, ensure we don't leave a stray temp file behind. | |
| if os.path.exists(tmp_path) and not os.path.samefile(tmp_path, cache_file): | |
| try: | |
| os.remove(tmp_path) | |
| except OSError: | |
| # Best-effort cleanup; ignore failures. | |
| pass |
| @@ -1225,8 +1255,7 @@ def load_from_dbt_manifest(self) -> None: | |||
| if TYPE_CHECKING: | |||
| assert self.project.manifest_path is not None # pragma: no cover | |||
There was a problem hiding this comment.
self.project.manifest_path is typed as Path | ObjectStoragePath, but _get_cached_manifest currently expects a local Path and uses stdlib file APIs. To avoid regressions for non-local manifests, either broaden the helper to support ObjectStoragePath (using its open()/stat methods) or bypass caching when the manifest is not a local file.
There was a problem hiding this comment.
@anissarashid This is a great idea -caching the parsed manifest would definitely help performance. I’m just a bit hesitant about using pickling on the local Airflow worker node.
For some context, we actually tried something similar around May/June 2024, where we cached the dbt ls output in the local filesystem as a pickle: #992
While it did help in some cases, the behaviour proved inconsistent. The main issues we ran into were:
- When users were running Airflow with the Kubernetes Executor instead of Celery, the cache was never reused.
- With Airflow Celery Executor deployments that have an auto-scaling mechanism, workers would come and go frequently, which caused a lot of cache miss.
- Depending on the deployment process, caches were often missed after deployments, meaning all Airflow nodes had to be “warmed up” again.
Because of that, we later did a PoC and implemented a dbt Variable–based cache instead, which significantly improved dbt ls DAG parsing:
#1014
It would be great if we could build on those learnings and stay consistent with that caching strategy for manifest.json as well. What do you think?
|
Closing this PR since it was abandoned |
Description
This updates the
DbtGraph.load_from_dbt_manifestfunction to load from a cached pkl manifest file, rather than callingjson.loadevery time. This should reduce dag parse time and the load on the webserver and scheduler.Related Issue(s)
Breaking Change?
Checklist