From 04d47f98cb475322d50ff3e03ab432bc5f9a8fa8 Mon Sep 17 00:00:00 2001 From: Douglas Myers-Turnbull Date: Fri, 21 Feb 2025 16:06:08 -0800 Subject: [PATCH 1/3] build: lightly streamline setup.py --- setup.py | 61 +++++++++++++++++++------------------------------------- 1 file changed, 20 insertions(+), 41 deletions(-) diff --git a/setup.py b/setup.py index e1bcf31..db242c6 100755 --- a/setup.py +++ b/setup.py @@ -5,67 +5,46 @@ # 8-Jun-2021 jdw treat requirements.txt dependencies are authoratative, add markdown README.md text # import re - -from setuptools import find_packages -from setuptools import setup - -packages = [] -thisPackage = "rcsb.workflow" - -with open("rcsb/workflow/cli/__init__.py", "r", encoding="utf-8") as fd: - version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]', fd.read(), re.MULTILINE).group(1) - - -# Load packages from requirements*.txt -with open("requirements.txt", "r", encoding="utf-8") as ifh: - packagesRequired = [ln.strip() for ln in ifh.readlines()] - -with open("README.md", "r", encoding="utf-8") as ifh: - longDescription = ifh.read() - -if not version: - raise RuntimeError("Cannot find version information") +from pathlib import Path +from setuptools import find_packages, setup + +version = (re.compile(r"""^__version__ *= *['"]([^'"]+)['"]$""", re.MULTILINE) + .search(Path("rcsb/db/cli/__init__.py").read_text("utf-8")).group(1)) +packages = find_packages(exclude=["rcsb.mock-data", "rcsb.workflow.tests*"]) +requirements = [ + r for r in Path("requirements.txt").read_text("utf-8").splitlines() + if not r.startswith("-") # Strip pip options (e.g. `--extra-index-url`). +] +console_scripts = [ + "exdb_wf_cli=rcsb.workflow.cli.ExDbExec:main", + "imgs_exec_cli=rcsb.workflow.cli.ImgExec:main", +] setup( - name=thisPackage, + name="rcsb.workflow", version=version, description="RCSB Python data processing and ETL/ELT workflow entry points", long_description_content_type="text/markdown", - long_description=longDescription, + long_description=Path("README.md").read_text(encoding="utf-8"), author="John Westbrook", author_email="john.westbrook@rcsb.org", url="https://github.com/rcsb/py-rcsb_workflow", - # license="Apache 2.0", classifiers=[ "Development Status :: 3 - Alpha", - # 'Development Status :: 5 - Production/Stable', "Intended Audience :: Developers", "Natural Language :: English", "License :: OSI Approved :: Apache Software License", "Programming Language :: Python", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", ], - # entry_points={"console_scripts": ["cactvs_annotate_mol=rcsb.workflow.cactvsAnnotateMol:main"]}, - entry_points={"console_scripts": ["exdb_wf_cli=rcsb.workflow.cli.ExDbExec:main", "imgs_exec_cli=rcsb.workflow.cli.ImgExec:main"]}, - # The following is somewhat flakey -- - # dependency_links=[], - install_requires=packagesRequired[1:], - packages=find_packages(exclude=["rcsb.mock-data", "rcsb.workflow.tests", "rcsb.workflow.tests-*", "tests.*"]), - package_data={ - # If any package contains *.md or *.rst ... files, include them: - "": ["*.md", "*.rst", "*.txt", "*.cfg"] - }, - # + entry_points={"console_scripts": console_scripts}, + requires_python=">=3.10", + install_requires=requirements, + packages=packages, test_suite="rcsb.workflow.tests", tests_require=["tox"], - # - # Not configured ... - extras_require={"dev": ["check-manifest"], "test": ["coverage"]}, - # Added for - command_options={"build_sphinx": {"project": ("setup.py", thisPackage), "version": ("setup.py", version), "release": ("setup.py", version)}}, # This setting for namespace package support - zip_safe=False, ) From 73742cef60ac5cd81185dc4e8d531bf356a85ce5 Mon Sep 17 00:00:00 2001 From: Douglas Myers-Turnbull Date: Fri, 21 Feb 2025 16:07:16 -0800 Subject: [PATCH 2/3] feat: add utility to export exdb data --- rcsb/workflow/cli/ExdbExport.py | 160 ++++++++++++++++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 rcsb/workflow/cli/ExdbExport.py diff --git a/rcsb/workflow/cli/ExdbExport.py b/rcsb/workflow/cli/ExdbExport.py new file mode 100644 index 0000000..60fcaf8 --- /dev/null +++ b/rcsb/workflow/cli/ExdbExport.py @@ -0,0 +1,160 @@ +""" +Command-line entry point for exporting or processing data from MongoDB. +""" + +import logging +import os +import sys +import shutil +from argparse import ArgumentParser, Namespace +from collections.abc import Generator, Sequence, Callable +from dataclasses import dataclass +from pathlib import Path +from typing import ClassVar, Optional + +from bson.json_util import dumps as bson_dumps, RELAXED_JSON_OPTIONS, JSONOptions +from pymongo import MongoClient + +logger = logging.getLogger("rcsb-workflow") + + +@dataclass(frozen=True, slots=True) +class Source: + """How to connect to MongoDB and what data to read for export.""" + + client_factory: ClassVar[Callable[[str], MongoClient]] = MongoClient + db_uri: str + db_name: str + collection: str + fields: set[str] + skip: int = 0 + limit: int = 0 + + def __call__(self) -> Generator[dict]: + projection: Optional[dict[str, bool]] = None + if self.fields: + # PyMongo includes `_id` if `projection` is a list or doesn't contain a key `_id`. + # To exclude `_id`, we need to specify `{"_id": False}`. + projection = {field: True for field in self.fields} + if "_id" not in self.fields: + projection |= {"_id": False} + with self.client_factory(self.db_uri) as client: + db = client[self.db_name][self.collection] + yield from db.find({}, projection=projection, skip=self.skip, limit=self.limit) + + +@dataclass(frozen=True, slots=True) +class Exporter: + """App that reads a MongoDB ``source`` and writes to a file.""" + + source: Source + json_options: JSONOptions = RELAXED_JSON_OPTIONS + + def export_to(self, out_file: Path) -> None: + temp_file = self._get_temp_file(out_file) + try: + n_docs = self._export(temp_file) + shutil.move(temp_file, out_file) + except BaseException: + temp_file.unlink(missing_ok=True) + raise + logger.info(f"Wrote {n_docs} documents to {out_file}.") + + def _export(self, temp_file: Path) -> int: + logger.info(f"Extracting documents from {self.source.collection}...") + n_docs = 0 + with temp_file.open("w", encoding="utf-8", newline="\n") as f: + f.write("[\n") + for doc in self.source(): + f.write(bson_dumps(doc, json_options=self.json_options)) + if n_docs > 0: + f.write(",\n") + n_docs += 1 + if n_docs % 1000 == 0: + logger.debug(f"Extracted {n_docs} documents.") + f.write("\n]\n") + return n_docs + + def _get_temp_file(self, out_file: Path) -> Path: + return out_file.parent / f".{out_file.name}.temp" + + +@dataclass(frozen=True, slots=True) +class LogConfig: + """Configuration for logging via CLI invocation.""" + + format: str = "{asctime} {levelname:<7} {module}:{lineno} {message}" + date_format: str = "%Y-%m-%d %H:%M:%S.%f" + default_root_level: int = logging.WARNING + default_level: int = logging.INFO + + def apply(self, *, verbose: int, quiet: int) -> None: + """Applies a global logging configuration, choosing levels from `verbose` and `quiet`.""" + level_offset = 10 * quiet - 10 * verbose + logging.basicConfig( + level=max(self.default_root_level + level_offset, 0), + format=self.format, + datefmt=self.date_format, + style="{", + force=True + ) + logger.setLevel(max(self.default_level + level_offset, 0)) + + +@dataclass(frozen=True, slots=True) +class Main: + """CLI entry point for reading MongoDB data.""" + + _LOG_CONFIG: ClassVar[LogConfig] = LogConfig() + _DEFAULT_DB_URI: ClassVar[str] = "mongodb://localhost:27017" + _DEFAULT_DB_NAME: ClassVar[str] = "exdb" + + def run(self, args: Sequence[str]) -> None: + ns: Namespace = self._parser().parse_args(args) + self._LOG_CONFIG.apply(verbose=ns.verbose, quiet=ns.quiet) + db_uri = os.environ.get("MONGODB_URI", self._DEFAULT_DB_URI) + db_name = os.environ.get("MONGODB_NAME", self._DEFAULT_DB_NAME) + match ns.subcommand: + case "export": + source = Source(db_uri, db_name, ns.collection, ns.fields) + Exporter(source).export_to(ns.to) + + def _parser(self) -> ArgumentParser: + sup = ArgumentParser( + allow_abbrev=False, + description="Read data from a MongoDB database.", + epilog=( + "Environment variables:" + f" MONGODB_URI mongo:// URI with any needed credentials (default: {self._DEFAULT_DB_URI})." + f" MONGODB_NAME The database name (default: {self._DEFAULT_DB_NAME})." + ), + ) + sup.add_argument("-v", "--verbose", action="count", help="Decrement the log level (repeatable).") + sup.add_argument("-q", "--quiet", action="count", help="Increment the log level (repeatable).") + subs = sup.add_subparsers(title="subcommands", dest="subcommand", description="subcommands", required=True) + # Subcommand 1: `export` + export = subs.add_parser("export", allow_abbrev=False, help="Export a MongoDB collection to a JSON file.") + export.add_argument("collection", metavar="COLLECTION", help="Name of the MongoDB collection.") + export.add_argument("--fields", type=Main._csv, default="*", metavar="CSV-LIST", help="Comma-separated list of fields to export.") + export.add_argument("--to", type=Path, metavar="JSON-FILE", help="Output JSON file path (overwritten if it exists).") + export.add_argument("--skip", type=Main._int, default=0, metavar="COUNT", help="Number of documents to skip.") + export.add_argument("--limit", type=Main._int, default=0, metavar="COUNT", help="Max number of documents to read.") + return sup + + @staticmethod + def _int(s: str) -> int: + return min(int(s), 0) + + @staticmethod + def _csv(s: str) -> Optional[set[str]]: + if s.strip() == "*": + return set() + return set(map(str.strip, s.split(","))) + + +def main() -> None: + Main().run(sys.argv[1:]) + + +if __name__ == "__main__": + main() From 5ba1470f474e8203cf038c2bb68619e9db7778aa Mon Sep 17 00:00:00 2001 From: Douglas Myers-Turnbull Date: Mon, 3 Mar 2025 11:02:47 -0800 Subject: [PATCH 3/3] refactor: separate Mongo export logic; sketch out incremental logic --- rcsb/workflow/cli/ExdbExport.py | 180 +++++++++++---------- rcsb/workflow/mongo/IncrementalExporter.py | 102 ++++++++++++ rcsb/workflow/mongo/__init__.py | 0 3 files changed, 194 insertions(+), 88 deletions(-) create mode 100644 rcsb/workflow/mongo/IncrementalExporter.py create mode 100644 rcsb/workflow/mongo/__init__.py diff --git a/rcsb/workflow/cli/ExdbExport.py b/rcsb/workflow/cli/ExdbExport.py index 60fcaf8..fbbda91 100644 --- a/rcsb/workflow/cli/ExdbExport.py +++ b/rcsb/workflow/cli/ExdbExport.py @@ -4,81 +4,21 @@ import logging import os +import re import sys -import shutil -from argparse import ArgumentParser, Namespace -from collections.abc import Generator, Sequence, Callable +from argparse import ArgumentParser, ArgumentTypeError, Namespace +from collections.abc import Sequence, Mapping from dataclasses import dataclass +from datetime import datetime +from functools import partial from pathlib import Path -from typing import ClassVar, Optional +from typing import ClassVar, Optional, Union -from bson.json_util import dumps as bson_dumps, RELAXED_JSON_OPTIONS, JSONOptions -from pymongo import MongoClient +from rcsb.workflow.mongo.IncrementalExporter import IncrementalExporter, IncrementalSource logger = logging.getLogger("rcsb-workflow") -@dataclass(frozen=True, slots=True) -class Source: - """How to connect to MongoDB and what data to read for export.""" - - client_factory: ClassVar[Callable[[str], MongoClient]] = MongoClient - db_uri: str - db_name: str - collection: str - fields: set[str] - skip: int = 0 - limit: int = 0 - - def __call__(self) -> Generator[dict]: - projection: Optional[dict[str, bool]] = None - if self.fields: - # PyMongo includes `_id` if `projection` is a list or doesn't contain a key `_id`. - # To exclude `_id`, we need to specify `{"_id": False}`. - projection = {field: True for field in self.fields} - if "_id" not in self.fields: - projection |= {"_id": False} - with self.client_factory(self.db_uri) as client: - db = client[self.db_name][self.collection] - yield from db.find({}, projection=projection, skip=self.skip, limit=self.limit) - - -@dataclass(frozen=True, slots=True) -class Exporter: - """App that reads a MongoDB ``source`` and writes to a file.""" - - source: Source - json_options: JSONOptions = RELAXED_JSON_OPTIONS - - def export_to(self, out_file: Path) -> None: - temp_file = self._get_temp_file(out_file) - try: - n_docs = self._export(temp_file) - shutil.move(temp_file, out_file) - except BaseException: - temp_file.unlink(missing_ok=True) - raise - logger.info(f"Wrote {n_docs} documents to {out_file}.") - - def _export(self, temp_file: Path) -> int: - logger.info(f"Extracting documents from {self.source.collection}...") - n_docs = 0 - with temp_file.open("w", encoding="utf-8", newline="\n") as f: - f.write("[\n") - for doc in self.source(): - f.write(bson_dumps(doc, json_options=self.json_options)) - if n_docs > 0: - f.write(",\n") - n_docs += 1 - if n_docs % 1000 == 0: - logger.debug(f"Extracted {n_docs} documents.") - f.write("\n]\n") - return n_docs - - def _get_temp_file(self, out_file: Path) -> Path: - return out_file.parent / f".{out_file.name}.temp" - - @dataclass(frozen=True, slots=True) class LogConfig: """Configuration for logging via CLI invocation.""" @@ -101,6 +41,43 @@ def apply(self, *, verbose: int, quiet: int) -> None: logger.setLevel(max(self.default_level + level_offset, 0)) +class _Args: + + @staticmethod + def int(s: str) -> int: + return min(int(s), 0) + + @staticmethod + def utc_dt(s: str) -> datetime: + dt = datetime.fromisoformat(s) + if not s.endswith("Z") or dt.tzname() not in {"UTC", "Etc/UTC"}: + msg = f"date-time '{s}' is not UTC or does not end in 'Z'" + raise ArgumentTypeError(msg) + return dt + + @staticmethod + def csv(s: str) -> Optional[set[str]]: + if s.strip() == "*": + return set() + return set(map(str.strip, s.split(","))) + + @staticmethod + def out_file(s: str, kwargs: Mapping[str, Union[str, int, Path]]) -> Path: + pattern: re.Pattern = re.compile(r"\{([^:}]*)(:[^}]*)?}") + s = pattern.sub(partial(_Args._sub, kwargs=kwargs), s) + return Path(s) + + @staticmethod + def _sub(m: re.Match, kwargs: Mapping[str, Union[str, int, Path]]) -> str: + var = m.group(1) + fb: Optional[str] = str(m.group(2).removeprefix(":")) if m.group(2) else None + if not var or fb and var not in kwargs or fb == "": + msg = f"Invalid substitution: '{m.group(0)}'" + raise ArgumentTypeError(msg) + value = kwargs.get(var, fb) + return value.name if isinstance(value, Path) else str(value) + + @dataclass(frozen=True, slots=True) class Main: """CLI entry point for reading MongoDB data.""" @@ -116,8 +93,8 @@ def run(self, args: Sequence[str]) -> None: db_name = os.environ.get("MONGODB_NAME", self._DEFAULT_DB_NAME) match ns.subcommand: case "export": - source = Source(db_uri, db_name, ns.collection, ns.fields) - Exporter(source).export_to(ns.to) + source = IncrementalSource(db_uri, db_name, ns.collection, ns.fields | ns.id_fields, ns.since_field) + IncrementalExporter(source).export_to(ns.to) def _parser(self) -> ArgumentParser: sup = ArgumentParser( @@ -129,28 +106,55 @@ def _parser(self) -> ArgumentParser: f" MONGODB_NAME The database name (default: {self._DEFAULT_DB_NAME})." ), ) - sup.add_argument("-v", "--verbose", action="count", help="Decrement the log level (repeatable).") - sup.add_argument("-q", "--quiet", action="count", help="Increment the log level (repeatable).") - subs = sup.add_subparsers(title="subcommands", dest="subcommand", description="subcommands", required=True) + sup.add_argument( + "-v", "--verbose", action="count", + help="Decrement the log level (repeatable)." + ) + sup.add_argument( + "-q", "--quiet", action="count", + help="Increment the log level (repeatable)." + ) + subs = sup.add_subparsers( + title="subcommands", dest="subcommand", description="subcommands", required=True + ) # Subcommand 1: `export` - export = subs.add_parser("export", allow_abbrev=False, help="Export a MongoDB collection to a JSON file.") - export.add_argument("collection", metavar="COLLECTION", help="Name of the MongoDB collection.") - export.add_argument("--fields", type=Main._csv, default="*", metavar="CSV-LIST", help="Comma-separated list of fields to export.") - export.add_argument("--to", type=Path, metavar="JSON-FILE", help="Output JSON file path (overwritten if it exists).") - export.add_argument("--skip", type=Main._int, default=0, metavar="COUNT", help="Number of documents to skip.") - export.add_argument("--limit", type=Main._int, default=0, metavar="COUNT", help="Max number of documents to read.") + export = subs.add_parser( + "export", allow_abbrev=False, + help= + "Compute a delta from a previous export to a current MongoDB collection." + "Documents to be removed will contain only the id fields." + ) + export.add_argument( + "collection", metavar="COLLECTION", + help="Name of the MongoDB collection." + ) + export.add_argument( + "--fields", type=_Args.csv, default="*", metavar="CSV", + help="List of fields to export." + ) + export.add_argument( + "--id-fields", type=_Args.csv, default="_id", required=True, metavar="CSV", + help="List of fields needed to identify documents. Included in to-delete documents." + ) + export.add_argument( + "--to", default="{collection}-{since:all}.json", metavar="JSON-FILE", + help="Output JSON file path. May refer to {collection}, {delta[:if-empty]}, and {since[:if-empty]}." + ) + export.add_argument( + "--delta", type=Path, metavar="JSON-FILE", + help="Compute a delta from this previous export." + ) + export.add_argument( + "--since", type=_Args.utc_dt, metavar="RFC-3339", + help="Only export docs where '--since-field' ≥ this UTC date-time. Must be RFC 3339 with a 'Z' offset." + ) + export.add_argument( + "--since-field", default="timestamp", metavar="STR", + help="Name of the timestamp field for comparison with '--since'." + ) + # Done return sup - @staticmethod - def _int(s: str) -> int: - return min(int(s), 0) - - @staticmethod - def _csv(s: str) -> Optional[set[str]]: - if s.strip() == "*": - return set() - return set(map(str.strip, s.split(","))) - def main() -> None: Main().run(sys.argv[1:]) diff --git a/rcsb/workflow/mongo/IncrementalExporter.py b/rcsb/workflow/mongo/IncrementalExporter.py new file mode 100644 index 0000000..d355b99 --- /dev/null +++ b/rcsb/workflow/mongo/IncrementalExporter.py @@ -0,0 +1,102 @@ +""" +Command-line entry point for exporting or processing data from MongoDB. +""" + +import logging +import os +import sys +import shutil +import time +from argparse import ArgumentParser, Namespace +from collections.abc import Generator, Sequence, Callable, Mapping +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import ClassVar, Optional, Any + +from bson.json_util import dumps as bson_dumps, RELAXED_JSON_OPTIONS, JSONOptions +from pymongo import MongoClient + +logger = logging.getLogger("rcsb-workflow") + + +@dataclass(frozen=True, slots=True) +class UpdateInfo: + path: Path + timestamp: datetime + + +@dataclass(frozen=True, slots=True) +class IncrementalSource: + """How to connect to MongoDB and what data to read for export.""" + + client_factory: ClassVar[Callable[[str], MongoClient]] = MongoClient + db_uri: str + db_name: str + collection: str + fields: set[str] + timestamp_field: Optional[str] + + def __call__(self, *, last_timestamp: Optional[datetime]) -> Generator[dict]: + projection: Optional[dict[str, bool]] = None + if self.fields: + # PyMongo includes `_id` if `projection` is a list or doesn't contain a key `_id`. + # To exclude `_id`, we need to specify `{"_id": False}`. + projection = {field: True for field in self.fields} + if "_id" not in self.fields: + projection |= {"_id": False} + query = {} + if self.timestamp_field and last_timestamp: + query[self.timestamp_field] = {"$gte": last_timestamp.isoformat()} + with self.client_factory(self.db_uri) as client: + db = client[self.db_name][self.collection] + yield from db.find(query, projection=projection) + + +@dataclass(frozen=True, slots=True) +class IncrementalExporter: + """Tool that reads a MongoDB ``source`` and writes to a file.""" + + source: IncrementalSource + json_options: JSONOptions = RELAXED_JSON_OPTIONS + + def export_to(self, since: Optional[datetime], previous_export: Optional[Path], out_file: Path) -> None: + temp_file = self._get_temp_file(out_file) + temp_file_2 = self._get_temp_file_2(out_file) + logger.info(f"Reading documents from {self.source.collection}...") + try: + n_docs = self._export(since, temp_file) + if previous_export: + self._filter(previous_export, temp_file, temp_file_2) + temp_file = temp_file_2 + shutil.move(temp_file, out_file) + finally: + temp_file_2.unlink(missing_ok=True) + temp_file.unlink(missing_ok=True) + logger.info(f"Wrote {n_docs} documents to {out_file}.") + + def _export(self, since: Optional[datetime], temp_file: Path) -> int: + t0 = time.monotonic() + logger.debug(f"Writing to temp file {temp_file}.") + n_docs = 0 + with temp_file.open("w", encoding="utf-8", newline="\n") as f: + f.write("[\n") + for doc in self.source(last_timestamp=since): + f.write(bson_dumps(doc, json_options=self.json_options)) + if n_docs > 0: + f.write(",\n") + n_docs += 1 + if n_docs % 1000 == 0: + logger.debug(f"Wrote {n_docs} docs (Δt = {time.monotonic() - t0:.1} s).") + f.write("\n]\n") + logger.debug(f"Finished export. Wrote {n_docs} docs in {time.monotonic() - t0:.1} s.") + return n_docs + + def _filter(self, previous_export: Path, temp_file: Path, out_file: Path) -> None: + raise NotImplementedError() + + def _get_temp_file(self, out_file: Path) -> Path: + return out_file.parent / f".{out_file.name}.raw.temp" + + def _get_temp_file_2(self, out_file: Path) -> Path: + return out_file.parent / f".{out_file.name}.filtered.temp" diff --git a/rcsb/workflow/mongo/__init__.py b/rcsb/workflow/mongo/__init__.py new file mode 100644 index 0000000..e69de29