Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small CLI to export data from ExDB #26

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions rcsb/workflow/cli/ExdbExport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
"""
Command-line entry point for exporting or processing data from MongoDB.
"""

import logging
import os
import re
import sys
from argparse import ArgumentParser, ArgumentTypeError, Namespace
from collections.abc import Sequence, Mapping
from dataclasses import dataclass
from datetime import datetime
from functools import partial
from pathlib import Path
from typing import ClassVar, Optional, Union

from rcsb.workflow.mongo.IncrementalExporter import IncrementalExporter, IncrementalSource

logger = logging.getLogger("rcsb-workflow")


@dataclass(frozen=True, slots=True)
class LogConfig:
"""Configuration for logging via CLI invocation."""

format: str = "{asctime} {levelname:<7} {module}:{lineno} {message}"
date_format: str = "%Y-%m-%d %H:%M:%S.%f"
default_root_level: int = logging.WARNING
default_level: int = logging.INFO

def apply(self, *, verbose: int, quiet: int) -> None:
"""Applies a global logging configuration, choosing levels from `verbose` and `quiet`."""
level_offset = 10 * quiet - 10 * verbose
logging.basicConfig(
level=max(self.default_root_level + level_offset, 0),
format=self.format,
datefmt=self.date_format,
style="{",
force=True
)
logger.setLevel(max(self.default_level + level_offset, 0))


class _Args:

@staticmethod
def int(s: str) -> int:
return min(int(s), 0)

@staticmethod
def utc_dt(s: str) -> datetime:
dt = datetime.fromisoformat(s)
if not s.endswith("Z") or dt.tzname() not in {"UTC", "Etc/UTC"}:
msg = f"date-time '{s}' is not UTC or does not end in 'Z'"
raise ArgumentTypeError(msg)
return dt

@staticmethod
def csv(s: str) -> Optional[set[str]]:
if s.strip() == "*":
return set()
return set(map(str.strip, s.split(",")))

@staticmethod
def out_file(s: str, kwargs: Mapping[str, Union[str, int, Path]]) -> Path:
pattern: re.Pattern = re.compile(r"\{([^:}]*)(:[^}]*)?}")
s = pattern.sub(partial(_Args._sub, kwargs=kwargs), s)
return Path(s)

@staticmethod
def _sub(m: re.Match, kwargs: Mapping[str, Union[str, int, Path]]) -> str:
var = m.group(1)
fb: Optional[str] = str(m.group(2).removeprefix(":")) if m.group(2) else None
if not var or fb and var not in kwargs or fb == "":
msg = f"Invalid substitution: '{m.group(0)}'"
raise ArgumentTypeError(msg)
value = kwargs.get(var, fb)
return value.name if isinstance(value, Path) else str(value)


@dataclass(frozen=True, slots=True)
class Main:
"""CLI entry point for reading MongoDB data."""

_LOG_CONFIG: ClassVar[LogConfig] = LogConfig()
_DEFAULT_DB_URI: ClassVar[str] = "mongodb://localhost:27017"
_DEFAULT_DB_NAME: ClassVar[str] = "exdb"

def run(self, args: Sequence[str]) -> None:
ns: Namespace = self._parser().parse_args(args)
self._LOG_CONFIG.apply(verbose=ns.verbose, quiet=ns.quiet)
db_uri = os.environ.get("MONGODB_URI", self._DEFAULT_DB_URI)
db_name = os.environ.get("MONGODB_NAME", self._DEFAULT_DB_NAME)
match ns.subcommand:
case "export":
source = IncrementalSource(db_uri, db_name, ns.collection, ns.fields | ns.id_fields, ns.since_field)
IncrementalExporter(source).export_to(ns.to)

def _parser(self) -> ArgumentParser:
sup = ArgumentParser(
allow_abbrev=False,
description="Read data from a MongoDB database.",
epilog=(
"Environment variables:"
f" MONGODB_URI mongo:// URI with any needed credentials (default: {self._DEFAULT_DB_URI})."
f" MONGODB_NAME The database name (default: {self._DEFAULT_DB_NAME})."
),
)
sup.add_argument(
"-v", "--verbose", action="count",
help="Decrement the log level (repeatable)."
)
sup.add_argument(
"-q", "--quiet", action="count",
help="Increment the log level (repeatable)."
)
subs = sup.add_subparsers(
title="subcommands", dest="subcommand", description="subcommands", required=True
)
# Subcommand 1: `export`
export = subs.add_parser(
"export", allow_abbrev=False,
help=
"Compute a delta from a previous export to a current MongoDB collection."
"Documents to be removed will contain only the id fields."
)
export.add_argument(
"collection", metavar="COLLECTION",
help="Name of the MongoDB collection."
)
export.add_argument(
"--fields", type=_Args.csv, default="*", metavar="CSV",
help="List of fields to export."
)
export.add_argument(
"--id-fields", type=_Args.csv, default="_id", required=True, metavar="CSV",
help="List of fields needed to identify documents. Included in to-delete documents."
)
export.add_argument(
"--to", default="{collection}-{since:all}.json", metavar="JSON-FILE",
help="Output JSON file path. May refer to {collection}, {delta[:if-empty]}, and {since[:if-empty]}."
)
export.add_argument(
"--delta", type=Path, metavar="JSON-FILE",
help="Compute a delta from this previous export."
)
export.add_argument(
"--since", type=_Args.utc_dt, metavar="RFC-3339",
help="Only export docs where '--since-field' ≥ this UTC date-time. Must be RFC 3339 with a 'Z' offset."
)
export.add_argument(
"--since-field", default="timestamp", metavar="STR",
help="Name of the timestamp field for comparison with '--since'."
)
# Done
return sup


def main() -> None:
Main().run(sys.argv[1:])


if __name__ == "__main__":
main()
102 changes: 102 additions & 0 deletions rcsb/workflow/mongo/IncrementalExporter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
"""
Command-line entry point for exporting or processing data from MongoDB.
"""

import logging
import os
import sys
import shutil
import time
from argparse import ArgumentParser, Namespace
from collections.abc import Generator, Sequence, Callable, Mapping
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import ClassVar, Optional, Any

from bson.json_util import dumps as bson_dumps, RELAXED_JSON_OPTIONS, JSONOptions
from pymongo import MongoClient

logger = logging.getLogger("rcsb-workflow")


@dataclass(frozen=True, slots=True)
class UpdateInfo:
path: Path
timestamp: datetime


@dataclass(frozen=True, slots=True)
class IncrementalSource:
"""How to connect to MongoDB and what data to read for export."""

client_factory: ClassVar[Callable[[str], MongoClient]] = MongoClient
db_uri: str
db_name: str
collection: str
fields: set[str]
timestamp_field: Optional[str]

def __call__(self, *, last_timestamp: Optional[datetime]) -> Generator[dict]:
projection: Optional[dict[str, bool]] = None
if self.fields:
# PyMongo includes `_id` if `projection` is a list or doesn't contain a key `_id`.
# To exclude `_id`, we need to specify `{"_id": False}`.
projection = {field: True for field in self.fields}
if "_id" not in self.fields:
projection |= {"_id": False}
query = {}
if self.timestamp_field and last_timestamp:
query[self.timestamp_field] = {"$gte": last_timestamp.isoformat()}
with self.client_factory(self.db_uri) as client:
db = client[self.db_name][self.collection]
yield from db.find(query, projection=projection)


@dataclass(frozen=True, slots=True)
class IncrementalExporter:
"""Tool that reads a MongoDB ``source`` and writes to a file."""

source: IncrementalSource
json_options: JSONOptions = RELAXED_JSON_OPTIONS

def export_to(self, since: Optional[datetime], previous_export: Optional[Path], out_file: Path) -> None:
temp_file = self._get_temp_file(out_file)
temp_file_2 = self._get_temp_file_2(out_file)
logger.info(f"Reading documents from {self.source.collection}...")
try:
n_docs = self._export(since, temp_file)
if previous_export:
self._filter(previous_export, temp_file, temp_file_2)
temp_file = temp_file_2
shutil.move(temp_file, out_file)
finally:
temp_file_2.unlink(missing_ok=True)
temp_file.unlink(missing_ok=True)
logger.info(f"Wrote {n_docs} documents to {out_file}.")

def _export(self, since: Optional[datetime], temp_file: Path) -> int:
t0 = time.monotonic()
logger.debug(f"Writing to temp file {temp_file}.")
n_docs = 0
with temp_file.open("w", encoding="utf-8", newline="\n") as f:
f.write("[\n")
for doc in self.source(last_timestamp=since):
f.write(bson_dumps(doc, json_options=self.json_options))
if n_docs > 0:
f.write(",\n")
n_docs += 1
if n_docs % 1000 == 0:
logger.debug(f"Wrote {n_docs} docs (Δt = {time.monotonic() - t0:.1} s).")
f.write("\n]\n")
logger.debug(f"Finished export. Wrote {n_docs} docs in {time.monotonic() - t0:.1} s.")
return n_docs

def _filter(self, previous_export: Path, temp_file: Path, out_file: Path) -> None:
raise NotImplementedError()

def _get_temp_file(self, out_file: Path) -> Path:
return out_file.parent / f".{out_file.name}.raw.temp"

def _get_temp_file_2(self, out_file: Path) -> Path:
return out_file.parent / f".{out_file.name}.filtered.temp"
Empty file added rcsb/workflow/mongo/__init__.py
Empty file.
61 changes: 20 additions & 41 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,67 +5,46 @@
# 8-Jun-2021 jdw treat requirements.txt dependencies are authoratative, add markdown README.md text
#
import re

from setuptools import find_packages
from setuptools import setup

packages = []
thisPackage = "rcsb.workflow"

with open("rcsb/workflow/cli/__init__.py", "r", encoding="utf-8") as fd:
version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]', fd.read(), re.MULTILINE).group(1)


# Load packages from requirements*.txt
with open("requirements.txt", "r", encoding="utf-8") as ifh:
packagesRequired = [ln.strip() for ln in ifh.readlines()]

with open("README.md", "r", encoding="utf-8") as ifh:
longDescription = ifh.read()

if not version:
raise RuntimeError("Cannot find version information")
from pathlib import Path
from setuptools import find_packages, setup

version = (re.compile(r"""^__version__ *= *['"]([^'"]+)['"]$""", re.MULTILINE)
.search(Path("rcsb/db/cli/__init__.py").read_text("utf-8")).group(1))
packages = find_packages(exclude=["rcsb.mock-data", "rcsb.workflow.tests*"])
requirements = [
r for r in Path("requirements.txt").read_text("utf-8").splitlines()
if not r.startswith("-") # Strip pip options (e.g. `--extra-index-url`).
]
console_scripts = [
"exdb_wf_cli=rcsb.workflow.cli.ExDbExec:main",
"imgs_exec_cli=rcsb.workflow.cli.ImgExec:main",
]

setup(
name=thisPackage,
name="rcsb.workflow",
version=version,
description="RCSB Python data processing and ETL/ELT workflow entry points",
long_description_content_type="text/markdown",
long_description=longDescription,
long_description=Path("README.md").read_text(encoding="utf-8"),
author="John Westbrook",
author_email="[email protected]",
url="https://github.com/rcsb/py-rcsb_workflow",
#
license="Apache 2.0",
classifiers=[
"Development Status :: 3 - Alpha",
# 'Development Status :: 5 - Production/Stable',
"Intended Audience :: Developers",
"Natural Language :: English",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
],
# entry_points={"console_scripts": ["cactvs_annotate_mol=rcsb.workflow.cactvsAnnotateMol:main"]},
entry_points={"console_scripts": ["exdb_wf_cli=rcsb.workflow.cli.ExDbExec:main", "imgs_exec_cli=rcsb.workflow.cli.ImgExec:main"]},
# The following is somewhat flakey --
# dependency_links=[],
install_requires=packagesRequired[1:],
packages=find_packages(exclude=["rcsb.mock-data", "rcsb.workflow.tests", "rcsb.workflow.tests-*", "tests.*"]),
package_data={
# If any package contains *.md or *.rst ... files, include them:
"": ["*.md", "*.rst", "*.txt", "*.cfg"]
},
#
entry_points={"console_scripts": console_scripts},
requires_python=">=3.10",
install_requires=requirements,
packages=packages,
test_suite="rcsb.workflow.tests",
tests_require=["tox"],
#
# Not configured ...
extras_require={"dev": ["check-manifest"], "test": ["coverage"]},
# Added for
command_options={"build_sphinx": {"project": ("setup.py", thisPackage), "version": ("setup.py", version), "release": ("setup.py", version)}},
# This setting for namespace package support -
zip_safe=False,
)
Loading