From 51bdc1c06ff7584b5a8cf2e8a166b480f426ef62 Mon Sep 17 00:00:00 2001 From: Yanks Yoon <37652070+yanksyoon@users.noreply.github.com> Date: Mon, 15 Jan 2024 17:51:46 +0800 Subject: [PATCH] feat: initial charm (#5) * feat: initial charm skeleton * feat: initial charm * chore: use self-hosted * fix: workflow * chore: ignore all libs * chore: revert debug for workflows * chore: update lib * debug * test: command separator * test: command separator * test: debug * test: debug * test: debug * test: debug * test: integratino test module * test: assert & log * test: debug * test: create .ssh dir * test: ssh dir * test: ssh dir fix * test: debug * test: debug(check id_rsa exist) * test: generate id_rsa * test: rebootstrap localhost * test: rebootstrap localhost * debug * debug * test: wait for id_rsa * test: copy id_rsa files * test: try empty password * test: remove pre-run-script * test: make id_rsa * test: enable pre-run-script * test: fix tests * test: fix lint * test: use factoryboy & fix lints * test: fix dates * chore: merge charmcraft & metadata * fix: charmcraft yaml * chore: revert metadata yaml * Revert "chore: revert metadata yaml" This reverts commit 242be68cf689464277c472fb5ebb9bc6cf3b44f3. * Revert "fix: charmcraft yaml" This reverts commit 4f0870fbf8e7b0546b89829a60907632c25742f7. * Revert "chore: merge charmcraft & metadata" This reverts commit 924367c320a8b9bc39c8935f322826b86336d91e. * feat: handle invalid bind * fix: tests * fix: use id_rsa * fix: feedback * fix: tests * chore: remove unused import * fix: install tmate * feat: update relation data when ready --- .github/workflows/integration_test.yaml | 6 + .github/workflows/test.yaml | 5 +- .woke.yaml | 5 +- README.md | 12 +- actions.yaml | 6 + charmcraft.yaml | 11 + lib/charms/operator_libs_linux/v0/apt.py | 1361 ++++++++++++++++++ lib/charms/operator_libs_linux/v0/passwd.py | 259 ++++ lib/charms/operator_libs_linux/v1/systemd.py | 288 ++++ metadata.yaml | 36 + src-docs/actions.py.md | 59 + src-docs/charm.py.md | 48 +- src-docs/ssh_debug.py.md | 63 + src-docs/state.py.md | 90 ++ src-docs/tmate.py.md | 199 +++ src/actions.py | 46 + src/charm.py | 78 +- src/ssh_debug.py | 74 + src/state.py | 62 + src/tmate.py | 205 +++ templates/create_keys.sh.j2 | 22 + templates/tmate-ssh-server.service.j2 | 17 + tests/integration/__init__.py | 4 + tests/integration/conftest.py | 108 ++ tests/integration/helpers.py | 47 + tests/integration/pre_run_script.sh | 12 + tests/integration/requirements.txt | 1 + tests/integration/test_charm.py | 88 +- tests/unit/__init__.py | 4 + tests/unit/conftest.py | 47 + tests/unit/factories.py | 40 + tests/unit/requirements.txt | 1 + tests/unit/test_actions.py | 74 + tests/unit/test_charm.py | 130 +- tests/unit/test_sshdebug.py | 95 ++ tests/unit/test_state.py | 39 + tests/unit/test_tmate.py | 251 ++++ tox.ini | 3 + 38 files changed, 3874 insertions(+), 22 deletions(-) create mode 100644 actions.yaml create mode 100644 charmcraft.yaml create mode 100644 lib/charms/operator_libs_linux/v0/apt.py create mode 100644 lib/charms/operator_libs_linux/v0/passwd.py create mode 100644 lib/charms/operator_libs_linux/v1/systemd.py create mode 100644 metadata.yaml create mode 100644 src-docs/actions.py.md create mode 100644 src-docs/ssh_debug.py.md create mode 100644 src-docs/state.py.md create mode 100644 src-docs/tmate.py.md create mode 100644 src/actions.py mode change 100644 => 100755 src/charm.py create mode 100644 src/ssh_debug.py create mode 100644 src/state.py create mode 100644 src/tmate.py create mode 100644 templates/create_keys.sh.j2 create mode 100644 templates/tmate-ssh-server.service.j2 create mode 100644 tests/integration/__init__.py create mode 100644 tests/integration/conftest.py create mode 100644 tests/integration/helpers.py create mode 100755 tests/integration/pre_run_script.sh create mode 100644 tests/integration/requirements.txt create mode 100644 tests/unit/__init__.py create mode 100644 tests/unit/conftest.py create mode 100644 tests/unit/factories.py create mode 100644 tests/unit/requirements.txt create mode 100644 tests/unit/test_actions.py create mode 100644 tests/unit/test_sshdebug.py create mode 100644 tests/unit/test_state.py create mode 100644 tests/unit/test_tmate.py diff --git a/.github/workflows/integration_test.yaml b/.github/workflows/integration_test.yaml index 46f068e..89ed74e 100644 --- a/.github/workflows/integration_test.yaml +++ b/.github/workflows/integration_test.yaml @@ -10,3 +10,9 @@ jobs: with: juju-channel: 3.1/stable provider: lxd + tmate-debug: true + pre-run-script: | + -c "chmod +x tests/integration/pre_run_script.sh + ./tests/integration/pre_run_script.sh" + self-hosted-runner: true + self-hosted-runner-label: "edge" diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index d256b5b..bd1426c 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -5,5 +5,8 @@ on: jobs: unit-tests: - uses: canonical/operator-workflows/.github/workflows/test.yaml@debug/yanks + uses: canonical/operator-workflows/.github/workflows/test.yaml@main secrets: inherit + with: + self-hosted-runner: true + self-hosted-runner-label: "edge" diff --git a/.woke.yaml b/.woke.yaml index 83f2fde..0ff43cd 100644 --- a/.woke.yaml +++ b/.woke.yaml @@ -1,5 +1,6 @@ rules: - # Ignore man-in-the-middle - the operator_libs_linux/v0/apt lib uses this term. - - name: man-in-the-middle # Ignore whitelist - we are using it to ignore pydantic in pyproject.toml - name: whitelist +ignore_files: + # Ignore all files under lib dir since they are not managed by the charm. + - lib/ diff --git a/README.md b/README.md index ee3c572..20bd4c8 100644 --- a/README.md +++ b/README.md @@ -4,11 +4,11 @@ A [Juju](https://juju.is/) [charm](https://juju.is/docs/olm/charmed-operators) deploying and managing [Tmate self-hosted server](https://tmate.io/). Tmate is an open source terminal multiplexer, providing instant terminal sharing capabilities. -Tmate is a terminal multiplexer that allows remote terminal sharing. It enables users to share -their terminal session with other users over the internet, allowing them to collaborate, provide -technical support, or demonstrate commands and procedures in real-time. +Tmate enables users to share their terminal session with other users over the internet, allowing +them to collaborate, provide technical support, or demonstrate commands and procedures in +real-time. -This charm provides the tmate-ssh-server service, and when paired with the tmate client provides +This charm provides a tmate-ssh-server service, and when paired with the tmate client provides a self-hosted ssh relay server. For DevOps and SRE teams, this charm will make operating self hosted tmate-ssh-server simple and @@ -24,7 +24,7 @@ project that warmly welcomes community projects, contributions, suggestions, fixes and constructive feedback. * [Code of conduct](https://ubuntu.com/community/code-of-conduct) * [Get support](https://discourse.charmhub.io/) -* [Join our online chat](https://chat.charmhub.io/charmhub/channels/charm-dev) +* [Join our online chat](https://matrix.to/#/#charmhub-charmdev:ubuntu.com) * [Contribute](https://charmhub.io/tmate-ssh-server/docs/contributing) * [Getting Started](https://charmhub.io/tmate-ssh-server/docs/getting-started) Thinking about using the tmate-ssh-server Operator for your next project? @@ -40,6 +40,6 @@ For further details, Run the following command: ```bash -echo -e "tox -e src-docs\ngit add src-docs\n" > .git/hooks/pre-commit +echo -e "tox -e src-docs\ngit add src-docs\n" >> .git/hooks/pre-commit chmod +x .git/hooks/pre-commit ``` diff --git a/actions.yaml b/actions.yaml new file mode 100644 index 0000000..de05254 --- /dev/null +++ b/actions.yaml @@ -0,0 +1,6 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +get-server-config: + description: | + Retrieve the server configuration values and secrets for SSH debug access. diff --git a/charmcraft.yaml b/charmcraft.yaml new file mode 100644 index 0000000..274e7b2 --- /dev/null +++ b/charmcraft.yaml @@ -0,0 +1,11 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +type: charm +bases: + - build-on: + - name: ubuntu + channel: "22.04" + run-on: + - name: ubuntu + channel: "22.04" diff --git a/lib/charms/operator_libs_linux/v0/apt.py b/lib/charms/operator_libs_linux/v0/apt.py new file mode 100644 index 0000000..1400df7 --- /dev/null +++ b/lib/charms/operator_libs_linux/v0/apt.py @@ -0,0 +1,1361 @@ +# Copyright 2021 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Abstractions for the system's Debian/Ubuntu package information and repositories. + +This module contains abstractions and wrappers around Debian/Ubuntu-style repositories and +packages, in order to easily provide an idiomatic and Pythonic mechanism for adding packages and/or +repositories to systems for use in machine charms. + +A sane default configuration is attainable through nothing more than instantiation of the +appropriate classes. `DebianPackage` objects provide information about the architecture, version, +name, and status of a package. + +`DebianPackage` will try to look up a package either from `dpkg -L` or from `apt-cache` when +provided with a string indicating the package name. If it cannot be located, `PackageNotFoundError` +will be returned, as `apt` and `dpkg` otherwise return `100` for all errors, and a meaningful error +message if the package is not known is desirable. + +To install packages with convenience methods: + +```python +try: + # Run `apt-get update` + apt.update() + apt.add_package("zsh") + apt.add_package(["vim", "htop", "wget"]) +except PackageNotFoundError: + logger.error("a specified package not found in package cache or on system") +except PackageError as e: + logger.error("could not install package. Reason: %s", e.message) +```` + +To find details of a specific package: + +```python +try: + vim = apt.DebianPackage.from_system("vim") + + # To find from the apt cache only + # apt.DebianPackage.from_apt_cache("vim") + + # To find from installed packages only + # apt.DebianPackage.from_installed_package("vim") + + vim.ensure(PackageState.Latest) + logger.info("updated vim to version: %s", vim.fullversion) +except PackageNotFoundError: + logger.error("a specified package not found in package cache or on system") +except PackageError as e: + logger.error("could not install package. Reason: %s", e.message) +``` + + +`RepositoryMapping` will return a dict-like object containing enabled system repositories +and their properties (available groups, baseuri. gpg key). This class can add, disable, or +manipulate repositories. Items can be retrieved as `DebianRepository` objects. + +In order add a new repository with explicit details for fields, a new `DebianRepository` can +be added to `RepositoryMapping` + +`RepositoryMapping` provides an abstraction around the existing repositories on the system, +and can be accessed and iterated over like any `Mapping` object, to retrieve values by key, +iterate, or perform other operations. + +Keys are constructed as `{repo_type}-{}-{release}` in order to uniquely identify a repository. + +Repositories can be added with explicit values through a Python constructor. + +Example: +```python +repositories = apt.RepositoryMapping() + +if "deb-example.com-focal" not in repositories: + repositories.add(DebianRepository(enabled=True, repotype="deb", + uri="https://example.com", release="focal", groups=["universe"])) +``` + +Alternatively, any valid `sources.list` line may be used to construct a new +`DebianRepository`. + +Example: +```python +repositories = apt.RepositoryMapping() + +if "deb-us.archive.ubuntu.com-xenial" not in repositories: + line = "deb http://us.archive.ubuntu.com/ubuntu xenial main restricted" + repo = DebianRepository.from_repo_line(line) + repositories.add(repo) +``` +""" + +import fileinput +import glob +import logging +import os +import re +import subprocess +from collections.abc import Mapping +from enum import Enum +from subprocess import PIPE, CalledProcessError, check_output +from typing import Iterable, List, Optional, Tuple, Union +from urllib.parse import urlparse + +logger = logging.getLogger(__name__) + +# The unique Charmhub library identifier, never change it +LIBID = "7c3dbc9c2ad44a47bd6fcb25caa270e5" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 13 + + +VALID_SOURCE_TYPES = ("deb", "deb-src") +OPTIONS_MATCHER = re.compile(r"\[.*?\]") + + +class Error(Exception): + """Base class of most errors raised by this library.""" + + def __repr__(self): + """Represent the Error.""" + return "<{}.{} {}>".format(type(self).__module__, type(self).__name__, self.args) + + @property + def name(self): + """Return a string representation of the model plus class.""" + return "<{}.{}>".format(type(self).__module__, type(self).__name__) + + @property + def message(self): + """Return the message passed as an argument.""" + return self.args[0] + + +class PackageError(Error): + """Raised when there's an error installing or removing a package.""" + + +class PackageNotFoundError(Error): + """Raised when a requested package is not known to the system.""" + + +class PackageState(Enum): + """A class to represent possible package states.""" + + Present = "present" + Absent = "absent" + Latest = "latest" + Available = "available" + + +class DebianPackage: + """Represents a traditional Debian package and its utility functions. + + `DebianPackage` wraps information and functionality around a known package, whether installed + or available. The version, epoch, name, and architecture can be easily queried and compared + against other `DebianPackage` objects to determine the latest version or to install a specific + version. + + The representation of this object as a string mimics the output from `dpkg` for familiarity. + + Installation and removal of packages is handled through the `state` property or `ensure` + method, with the following options: + + apt.PackageState.Absent + apt.PackageState.Available + apt.PackageState.Present + apt.PackageState.Latest + + When `DebianPackage` is initialized, the state of a given `DebianPackage` object will be set to + `Available`, `Present`, or `Latest`, with `Absent` implemented as a convenience for removal + (though it operates essentially the same as `Available`). + """ + + def __init__( + self, name: str, version: str, epoch: str, arch: str, state: PackageState + ) -> None: + self._name = name + self._arch = arch + self._state = state + self._version = Version(version, epoch) + + def __eq__(self, other) -> bool: + """Equality for comparison. + + Args: + other: a `DebianPackage` object for comparison + + Returns: + A boolean reflecting equality + """ + return isinstance(other, self.__class__) and ( + self._name, + self._version.number, + ) == (other._name, other._version.number) + + def __hash__(self): + """Return a hash of this package.""" + return hash((self._name, self._version.number)) + + def __repr__(self): + """Represent the package.""" + return "<{}.{}: {}>".format(self.__module__, self.__class__.__name__, self.__dict__) + + def __str__(self): + """Return a human-readable representation of the package.""" + return "<{}: {}-{}.{} -- {}>".format( + self.__class__.__name__, + self._name, + self._version, + self._arch, + str(self._state), + ) + + @staticmethod + def _apt( + command: str, + package_names: Union[str, List], + optargs: Optional[List[str]] = None, + ) -> None: + """Wrap package management commands for Debian/Ubuntu systems. + + Args: + command: the command given to `apt-get` + package_names: a package name or list of package names to operate on + optargs: an (Optional) list of additioanl arguments + + Raises: + PackageError if an error is encountered + """ + optargs = optargs if optargs is not None else [] + if isinstance(package_names, str): + package_names = [package_names] + _cmd = ["apt-get", "-y", *optargs, command, *package_names] + try: + env = os.environ.copy() + env["DEBIAN_FRONTEND"] = "noninteractive" + subprocess.run(_cmd, capture_output=True, check=True, text=True, env=env) + except CalledProcessError as e: + raise PackageError( + "Could not {} package(s) [{}]: {}".format(command, [*package_names], e.stderr) + ) from None + + def _add(self) -> None: + """Add a package to the system.""" + self._apt( + "install", + "{}={}".format(self.name, self.version), + optargs=["--option=Dpkg::Options::=--force-confold"], + ) + + def _remove(self) -> None: + """Remove a package from the system. Implementation-specific.""" + return self._apt("remove", "{}={}".format(self.name, self.version)) + + @property + def name(self) -> str: + """Returns the name of the package.""" + return self._name + + def ensure(self, state: PackageState): + """Ensure that a package is in a given state. + + Args: + state: a `PackageState` to reconcile the package to + + Raises: + PackageError from the underlying call to apt + """ + if self._state is not state: + if state not in (PackageState.Present, PackageState.Latest): + self._remove() + else: + self._add() + self._state = state + + @property + def present(self) -> bool: + """Returns whether or not a package is present.""" + return self._state in (PackageState.Present, PackageState.Latest) + + @property + def latest(self) -> bool: + """Returns whether the package is the most recent version.""" + return self._state is PackageState.Latest + + @property + def state(self) -> PackageState: + """Returns the current package state.""" + return self._state + + @state.setter + def state(self, state: PackageState) -> None: + """Set the package state to a given value. + + Args: + state: a `PackageState` to reconcile the package to + + Raises: + PackageError from the underlying call to apt + """ + if state in (PackageState.Latest, PackageState.Present): + self._add() + else: + self._remove() + self._state = state + + @property + def version(self) -> "Version": + """Returns the version for a package.""" + return self._version + + @property + def epoch(self) -> str: + """Returns the epoch for a package. May be unset.""" + return self._version.epoch + + @property + def arch(self) -> str: + """Returns the architecture for a package.""" + return self._arch + + @property + def fullversion(self) -> str: + """Returns the name+epoch for a package.""" + return "{}.{}".format(self._version, self._arch) + + @staticmethod + def _get_epoch_from_version(version: str) -> Tuple[str, str]: + """Pull the epoch, if any, out of a version string.""" + epoch_matcher = re.compile(r"^((?P\d+):)?(?P.*)") + matches = epoch_matcher.search(version).groupdict() + return matches.get("epoch", ""), matches.get("version") + + @classmethod + def from_system( + cls, package: str, version: Optional[str] = "", arch: Optional[str] = "" + ) -> "DebianPackage": + """Locates a package, either on the system or known to apt, and serializes the information. + + Args: + package: a string representing the package + version: an optional string if a specific version is requested + arch: an optional architecture, defaulting to `dpkg --print-architecture`. If an + architecture is not specified, this will be used for selection. + + """ + try: + return DebianPackage.from_installed_package(package, version, arch) + except PackageNotFoundError: + logger.debug( + "package '%s' is not currently installed or has the wrong architecture.", package + ) + + # Ok, try `apt-cache ...` + try: + return DebianPackage.from_apt_cache(package, version, arch) + except (PackageNotFoundError, PackageError): + # If we get here, it's not known to the systems. + # This seems unnecessary, but virtually all `apt` commands have a return code of `100`, + # and providing meaningful error messages without this is ugly. + raise PackageNotFoundError( + "Package '{}{}' could not be found on the system or in the apt cache!".format( + package, ".{}".format(arch) if arch else "" + ) + ) from None + + @classmethod + def from_installed_package( + cls, package: str, version: Optional[str] = "", arch: Optional[str] = "" + ) -> "DebianPackage": + """Check whether the package is already installed and return an instance. + + Args: + package: a string representing the package + version: an optional string if a specific version is requested + arch: an optional architecture, defaulting to `dpkg --print-architecture`. + If an architecture is not specified, this will be used for selection. + """ + system_arch = check_output( + ["dpkg", "--print-architecture"], universal_newlines=True + ).strip() + arch = arch if arch else system_arch + + # Regexps are a really terrible way to do this. Thanks dpkg + output = "" + try: + output = check_output(["dpkg", "-l", package], stderr=PIPE, universal_newlines=True) + except CalledProcessError: + raise PackageNotFoundError("Package is not installed: {}".format(package)) from None + + # Pop off the output from `dpkg -l' because there's no flag to + # omit it` + lines = str(output).splitlines()[5:] + + dpkg_matcher = re.compile( + r""" + ^(?P\w+?)\s+ + (?P.*?)(?P:\w+?)?\s+ + (?P.*?)\s+ + (?P\w+?)\s+ + (?P.*) + """, + re.VERBOSE, + ) + + for line in lines: + try: + matches = dpkg_matcher.search(line).groupdict() + package_status = matches["package_status"] + + if not package_status.endswith("i"): + logger.debug( + "package '%s' in dpkg output but not installed, status: '%s'", + package, + package_status, + ) + break + + epoch, split_version = DebianPackage._get_epoch_from_version(matches["version"]) + pkg = DebianPackage( + matches["package_name"], + split_version, + epoch, + matches["arch"], + PackageState.Present, + ) + if (pkg.arch == "all" or pkg.arch == arch) and ( + version == "" or str(pkg.version) == version + ): + return pkg + except AttributeError: + logger.warning("dpkg matcher could not parse line: %s", line) + + # If we didn't find it, fail through + raise PackageNotFoundError("Package {}.{} is not installed!".format(package, arch)) + + @classmethod + def from_apt_cache( + cls, package: str, version: Optional[str] = "", arch: Optional[str] = "" + ) -> "DebianPackage": + """Check whether the package is already installed and return an instance. + + Args: + package: a string representing the package + version: an optional string if a specific version is requested + arch: an optional architecture, defaulting to `dpkg --print-architecture`. + If an architecture is not specified, this will be used for selection. + """ + system_arch = check_output( + ["dpkg", "--print-architecture"], universal_newlines=True + ).strip() + arch = arch if arch else system_arch + + # Regexps are a really terrible way to do this. Thanks dpkg + keys = ("Package", "Architecture", "Version") + + try: + output = check_output( + ["apt-cache", "show", package], stderr=PIPE, universal_newlines=True + ) + except CalledProcessError as e: + raise PackageError( + "Could not list packages in apt-cache: {}".format(e.stderr) + ) from None + + pkg_groups = output.strip().split("\n\n") + keys = ("Package", "Architecture", "Version") + + for pkg_raw in pkg_groups: + lines = str(pkg_raw).splitlines() + vals = {} + for line in lines: + if line.startswith(keys): + items = line.split(":", 1) + vals[items[0]] = items[1].strip() + else: + continue + + epoch, split_version = DebianPackage._get_epoch_from_version(vals["Version"]) + pkg = DebianPackage( + vals["Package"], + split_version, + epoch, + vals["Architecture"], + PackageState.Available, + ) + + if (pkg.arch == "all" or pkg.arch == arch) and ( + version == "" or str(pkg.version) == version + ): + return pkg + + # If we didn't find it, fail through + raise PackageNotFoundError("Package {}.{} is not in the apt cache!".format(package, arch)) + + +class Version: + """An abstraction around package versions. + + This seems like it should be strictly unnecessary, except that `apt_pkg` is not usable inside a + venv, and wedging version comparisons into `DebianPackage` would overcomplicate it. + + This class implements the algorithm found here: + https://www.debian.org/doc/debian-policy/ch-controlfields.html#version + """ + + def __init__(self, version: str, epoch: str): + self._version = version + self._epoch = epoch or "" + + def __repr__(self): + """Represent the package.""" + return "<{}.{}: {}>".format(self.__module__, self.__class__.__name__, self.__dict__) + + def __str__(self): + """Return human-readable representation of the package.""" + return "{}{}".format("{}:".format(self._epoch) if self._epoch else "", self._version) + + @property + def epoch(self): + """Returns the epoch for a package. May be empty.""" + return self._epoch + + @property + def number(self) -> str: + """Returns the version number for a package.""" + return self._version + + def _get_parts(self, version: str) -> Tuple[str, str]: + """Separate the version into component upstream and Debian pieces.""" + try: + version.rindex("-") + except ValueError: + # No hyphens means no Debian version + return version, "0" + + upstream, debian = version.rsplit("-", 1) + return upstream, debian + + def _listify(self, revision: str) -> List[str]: + """Split a revision string into a listself. + + This list is comprised of alternating between strings and numbers, + padded on either end to always be "str, int, str, int..." and + always be of even length. This allows us to trivially implement the + comparison algorithm described. + """ + result = [] + while revision: + rev_1, remains = self._get_alphas(revision) + rev_2, remains = self._get_digits(remains) + result.extend([rev_1, rev_2]) + revision = remains + return result + + def _get_alphas(self, revision: str) -> Tuple[str, str]: + """Return a tuple of the first non-digit characters of a revision.""" + # get the index of the first digit + for i, char in enumerate(revision): + if char.isdigit(): + if i == 0: + return "", revision + return revision[0:i], revision[i:] + # string is entirely alphas + return revision, "" + + def _get_digits(self, revision: str) -> Tuple[int, str]: + """Return a tuple of the first integer characters of a revision.""" + # If the string is empty, return (0,'') + if not revision: + return 0, "" + # get the index of the first non-digit + for i, char in enumerate(revision): + if not char.isdigit(): + if i == 0: + return 0, revision + return int(revision[0:i]), revision[i:] + # string is entirely digits + return int(revision), "" + + def _dstringcmp(self, a, b): # noqa: C901 + """Debian package version string section lexical sort algorithm. + + The lexical comparison is a comparison of ASCII values modified so + that all the letters sort earlier than all the non-letters and so that + a tilde sorts before anything, even the end of a part. + """ + if a == b: + return 0 + try: + for i, char in enumerate(a): + if char == b[i]: + continue + # "a tilde sorts before anything, even the end of a part" + # (emptyness) + if char == "~": + return -1 + if b[i] == "~": + return 1 + # "all the letters sort earlier than all the non-letters" + if char.isalpha() and not b[i].isalpha(): + return -1 + if not char.isalpha() and b[i].isalpha(): + return 1 + # otherwise lexical sort + if ord(char) > ord(b[i]): + return 1 + if ord(char) < ord(b[i]): + return -1 + except IndexError: + # a is longer than b but otherwise equal, greater unless there are tildes + if char == "~": + return -1 + return 1 + # if we get here, a is shorter than b but otherwise equal, so check for tildes... + if b[len(a)] == "~": + return 1 + return -1 + + def _compare_revision_strings(self, first: str, second: str): # noqa: C901 + """Compare two debian revision strings.""" + if first == second: + return 0 + + # listify pads results so that we will always be comparing ints to ints + # and strings to strings (at least until we fall off the end of a list) + first_list = self._listify(first) + second_list = self._listify(second) + if first_list == second_list: + return 0 + try: + for i, item in enumerate(first_list): + # explicitly raise IndexError if we've fallen off the edge of list2 + if i >= len(second_list): + raise IndexError + # if the items are equal, next + if item == second_list[i]: + continue + # numeric comparison + if isinstance(item, int): + if item > second_list[i]: + return 1 + if item < second_list[i]: + return -1 + else: + # string comparison + return self._dstringcmp(item, second_list[i]) + except IndexError: + # rev1 is longer than rev2 but otherwise equal, hence greater + # ...except for goddamn tildes + if first_list[len(second_list)][0][0] == "~": + return 1 + return 1 + # rev1 is shorter than rev2 but otherwise equal, hence lesser + # ...except for goddamn tildes + if second_list[len(first_list)][0][0] == "~": + return -1 + return -1 + + def _compare_version(self, other) -> int: + if (self.number, self.epoch) == (other.number, other.epoch): + return 0 + + if self.epoch < other.epoch: + return -1 + if self.epoch > other.epoch: + return 1 + + # If none of these are true, follow the algorithm + upstream_version, debian_version = self._get_parts(self.number) + other_upstream_version, other_debian_version = self._get_parts(other.number) + + upstream_cmp = self._compare_revision_strings(upstream_version, other_upstream_version) + if upstream_cmp != 0: + return upstream_cmp + + debian_cmp = self._compare_revision_strings(debian_version, other_debian_version) + if debian_cmp != 0: + return debian_cmp + + return 0 + + def __lt__(self, other) -> bool: + """Less than magic method impl.""" + return self._compare_version(other) < 0 + + def __eq__(self, other) -> bool: + """Equality magic method impl.""" + return self._compare_version(other) == 0 + + def __gt__(self, other) -> bool: + """Greater than magic method impl.""" + return self._compare_version(other) > 0 + + def __le__(self, other) -> bool: + """Less than or equal to magic method impl.""" + return self.__eq__(other) or self.__lt__(other) + + def __ge__(self, other) -> bool: + """Greater than or equal to magic method impl.""" + return self.__gt__(other) or self.__eq__(other) + + def __ne__(self, other) -> bool: + """Not equal to magic method impl.""" + return not self.__eq__(other) + + +def add_package( + package_names: Union[str, List[str]], + version: Optional[str] = "", + arch: Optional[str] = "", + update_cache: Optional[bool] = False, +) -> Union[DebianPackage, List[DebianPackage]]: + """Add a package or list of packages to the system. + + Args: + package_names: single package name, or list of package names + name: the name(s) of the package(s) + version: an (Optional) version as a string. Defaults to the latest known + arch: an optional architecture for the package + update_cache: whether or not to run `apt-get update` prior to operating + + Raises: + TypeError if no package name is given, or explicit version is set for multiple packages + PackageNotFoundError if the package is not in the cache. + PackageError if packages fail to install + """ + cache_refreshed = False + if update_cache: + update() + cache_refreshed = True + + packages = {"success": [], "retry": [], "failed": []} + + package_names = [package_names] if isinstance(package_names, str) else package_names + if not package_names: + raise TypeError("Expected at least one package name to add, received zero!") + + if len(package_names) != 1 and version: + raise TypeError( + "Explicit version should not be set if more than one package is being added!" + ) + + for p in package_names: + pkg, success = _add(p, version, arch) + if success: + packages["success"].append(pkg) + else: + logger.warning("failed to locate and install/update '%s'", pkg) + packages["retry"].append(p) + + if packages["retry"] and not cache_refreshed: + logger.info("updating the apt-cache and retrying installation of failed packages.") + update() + + for p in packages["retry"]: + pkg, success = _add(p, version, arch) + if success: + packages["success"].append(pkg) + else: + packages["failed"].append(p) + + if packages["failed"]: + raise PackageError("Failed to install packages: {}".format(", ".join(packages["failed"]))) + + return packages["success"] if len(packages["success"]) > 1 else packages["success"][0] + + +def _add( + name: str, + version: Optional[str] = "", + arch: Optional[str] = "", +) -> Tuple[Union[DebianPackage, str], bool]: + """Add a package to the system. + + Args: + name: the name(s) of the package(s) + version: an (Optional) version as a string. Defaults to the latest known + arch: an optional architecture for the package + + Returns: a tuple of `DebianPackage` if found, or a :str: if it is not, and + a boolean indicating success + """ + try: + pkg = DebianPackage.from_system(name, version, arch) + pkg.ensure(state=PackageState.Present) + return pkg, True + except PackageNotFoundError: + return name, False + + +def remove_package( + package_names: Union[str, List[str]] +) -> Union[DebianPackage, List[DebianPackage]]: + """Remove package(s) from the system. + + Args: + package_names: the name of a package + + Raises: + PackageNotFoundError if the package is not found. + """ + packages = [] + + package_names = [package_names] if isinstance(package_names, str) else package_names + if not package_names: + raise TypeError("Expected at least one package name to add, received zero!") + + for p in package_names: + try: + pkg = DebianPackage.from_installed_package(p) + pkg.ensure(state=PackageState.Absent) + packages.append(pkg) + except PackageNotFoundError: + logger.info("package '%s' was requested for removal, but it was not installed.", p) + + # the list of packages will be empty when no package is removed + logger.debug("packages: '%s'", packages) + return packages[0] if len(packages) == 1 else packages + + +def update() -> None: + """Update the apt cache via `apt-get update`.""" + subprocess.run(["apt-get", "update"], capture_output=True, check=True) + + +def import_key(key: str) -> str: + """Import an ASCII Armor key. + + A Radix64 format keyid is also supported for backwards + compatibility. In this case Ubuntu keyserver will be + queried for a key via HTTPS by its keyid. This method + is less preferable because https proxy servers may + require traffic decryption which is equivalent to a + man-in-the-middle attack (a proxy server impersonates + keyserver TLS certificates and has to be explicitly + trusted by the system). + + Args: + key: A GPG key in ASCII armor format, including BEGIN + and END markers or a keyid. + + Returns: + The GPG key filename written. + + Raises: + GPGKeyError if the key could not be imported + """ + key = key.strip() + if "-" in key or "\n" in key: + # Send everything not obviously a keyid to GPG to import, as + # we trust its validation better than our own. eg. handling + # comments before the key. + logger.debug("PGP key found (looks like ASCII Armor format)") + if ( + "-----BEGIN PGP PUBLIC KEY BLOCK-----" in key + and "-----END PGP PUBLIC KEY BLOCK-----" in key + ): + logger.debug("Writing provided PGP key in the binary format") + key_bytes = key.encode("utf-8") + key_name = DebianRepository._get_keyid_by_gpg_key(key_bytes) + key_gpg = DebianRepository._dearmor_gpg_key(key_bytes) + gpg_key_filename = "/etc/apt/trusted.gpg.d/{}.gpg".format(key_name) + DebianRepository._write_apt_gpg_keyfile( + key_name=gpg_key_filename, key_material=key_gpg + ) + return gpg_key_filename + else: + raise GPGKeyError("ASCII armor markers missing from GPG key") + else: + logger.warning( + "PGP key found (looks like Radix64 format). " + "SECURELY importing PGP key from keyserver; " + "full key not provided." + ) + # as of bionic add-apt-repository uses curl with an HTTPS keyserver URL + # to retrieve GPG keys. `apt-key adv` command is deprecated as is + # apt-key in general as noted in its manpage. See lp:1433761 for more + # history. Instead, /etc/apt/trusted.gpg.d is used directly to drop + # gpg + key_asc = DebianRepository._get_key_by_keyid(key) + # write the key in GPG format so that apt-key list shows it + key_gpg = DebianRepository._dearmor_gpg_key(key_asc.encode("utf-8")) + gpg_key_filename = "/etc/apt/trusted.gpg.d/{}.gpg".format(key) + DebianRepository._write_apt_gpg_keyfile(key_name=gpg_key_filename, key_material=key_gpg) + return gpg_key_filename + + +class InvalidSourceError(Error): + """Exceptions for invalid source entries.""" + + +class GPGKeyError(Error): + """Exceptions for GPG keys.""" + + +class DebianRepository: + """An abstraction to represent a repository.""" + + def __init__( + self, + enabled: bool, + repotype: str, + uri: str, + release: str, + groups: List[str], + filename: Optional[str] = "", + gpg_key_filename: Optional[str] = "", + options: Optional[dict] = None, + ): + self._enabled = enabled + self._repotype = repotype + self._uri = uri + self._release = release + self._groups = groups + self._filename = filename + self._gpg_key_filename = gpg_key_filename + self._options = options + + @property + def enabled(self): + """Return whether or not the repository is enabled.""" + return self._enabled + + @property + def repotype(self): + """Return whether it is binary or source.""" + return self._repotype + + @property + def uri(self): + """Return the URI.""" + return self._uri + + @property + def release(self): + """Return which Debian/Ubuntu releases it is valid for.""" + return self._release + + @property + def groups(self): + """Return the enabled package groups.""" + return self._groups + + @property + def filename(self): + """Returns the filename for a repository.""" + return self._filename + + @filename.setter + def filename(self, fname: str) -> None: + """Set the filename used when a repo is written back to disk. + + Args: + fname: a filename to write the repository information to. + """ + if not fname.endswith(".list"): + raise InvalidSourceError("apt source filenames should end in .list!") + + self._filename = fname + + @property + def gpg_key(self): + """Returns the path to the GPG key for this repository.""" + return self._gpg_key_filename + + @property + def options(self): + """Returns any additional repo options which are set.""" + return self._options + + def make_options_string(self) -> str: + """Generate the complete options string for a a repository. + + Combining `gpg_key`, if set, and the rest of the options to find + a complex repo string. + """ + options = self._options if self._options else {} + if self._gpg_key_filename: + options["signed-by"] = self._gpg_key_filename + + return ( + "[{}] ".format(" ".join(["{}={}".format(k, v) for k, v in options.items()])) + if options + else "" + ) + + @staticmethod + def prefix_from_uri(uri: str) -> str: + """Get a repo list prefix from the uri, depending on whether a path is set.""" + uridetails = urlparse(uri) + path = ( + uridetails.path.lstrip("/").replace("/", "-") if uridetails.path else uridetails.netloc + ) + return "/etc/apt/sources.list.d/{}".format(path) + + @staticmethod + def from_repo_line(repo_line: str, write_file: Optional[bool] = True) -> "DebianRepository": + """Instantiate a new `DebianRepository` a `sources.list` entry line. + + Args: + repo_line: a string representing a repository entry + write_file: boolean to enable writing the new repo to disk + """ + repo = RepositoryMapping._parse(repo_line, "UserInput") + fname = "{}-{}.list".format( + DebianRepository.prefix_from_uri(repo.uri), repo.release.replace("/", "-") + ) + repo.filename = fname + + options = repo.options if repo.options else {} + if repo.gpg_key: + options["signed-by"] = repo.gpg_key + + # For Python 3.5 it's required to use sorted in the options dict in order to not have + # different results in the order of the options between executions. + options_str = ( + "[{}] ".format(" ".join(["{}={}".format(k, v) for k, v in sorted(options.items())])) + if options + else "" + ) + + if write_file: + with open(fname, "wb") as f: + f.write( + ( + "{}".format("#" if not repo.enabled else "") + + "{} {}{} ".format(repo.repotype, options_str, repo.uri) + + "{} {}\n".format(repo.release, " ".join(repo.groups)) + ).encode("utf-8") + ) + + return repo + + def disable(self) -> None: + """Remove this repository from consideration. + + Disable it instead of removing from the repository file. + """ + searcher = "{} {}{} {}".format( + self.repotype, self.make_options_string(), self.uri, self.release + ) + for line in fileinput.input(self._filename, inplace=True): + if re.match(r"^{}\s".format(re.escape(searcher)), line): + print("# {}".format(line), end="") + else: + print(line, end="") + + def import_key(self, key: str) -> None: + """Import an ASCII Armor key. + + A Radix64 format keyid is also supported for backwards + compatibility. In this case Ubuntu keyserver will be + queried for a key via HTTPS by its keyid. This method + is less preferable because https proxy servers may + require traffic decryption which is equivalent to a + man-in-the-middle attack (a proxy server impersonates + keyserver TLS certificates and has to be explicitly + trusted by the system). + + Args: + key: A GPG key in ASCII armor format, + including BEGIN and END markers or a keyid. + + Raises: + GPGKeyError if the key could not be imported + """ + self._gpg_key_filename = import_key(key) + + @staticmethod + def _get_keyid_by_gpg_key(key_material: bytes) -> str: + """Get a GPG key fingerprint by GPG key material. + + Gets a GPG key fingerprint (40-digit, 160-bit) by the ASCII armor-encoded + or binary GPG key material. Can be used, for example, to generate file + names for keys passed via charm options. + """ + # Use the same gpg command for both Xenial and Bionic + cmd = ["gpg", "--with-colons", "--with-fingerprint"] + ps = subprocess.run( + cmd, + stdout=PIPE, + stderr=PIPE, + input=key_material, + ) + out, err = ps.stdout.decode(), ps.stderr.decode() + if "gpg: no valid OpenPGP data found." in err: + raise GPGKeyError("Invalid GPG key material provided") + # from gnupg2 docs: fpr :: Fingerprint (fingerprint is in field 10) + return re.search(r"^fpr:{9}([0-9A-F]{40}):$", out, re.MULTILINE).group(1) + + @staticmethod + def _get_key_by_keyid(keyid: str) -> str: + """Get a key via HTTPS from the Ubuntu keyserver. + + Different key ID formats are supported by SKS keyservers (the longer ones + are more secure, see "dead beef attack" and https://evil32.com/). Since + HTTPS is used, if SSLBump-like HTTPS proxies are in place, they will + impersonate keyserver.ubuntu.com and generate a certificate with + keyserver.ubuntu.com in the CN field or in SubjAltName fields of a + certificate. If such proxy behavior is expected it is necessary to add the + CA certificate chain containing the intermediate CA of the SSLBump proxy to + every machine that this code runs on via ca-certs cloud-init directive (via + cloudinit-userdata model-config) or via other means (such as through a + custom charm option). Also note that DNS resolution for the hostname in a + URL is done at a proxy server - not at the client side. + 8-digit (32 bit) key ID + https://keyserver.ubuntu.com/pks/lookup?search=0x4652B4E6 + 16-digit (64 bit) key ID + https://keyserver.ubuntu.com/pks/lookup?search=0x6E85A86E4652B4E6 + 40-digit key ID: + https://keyserver.ubuntu.com/pks/lookup?search=0x35F77D63B5CEC106C577ED856E85A86E4652B4E6 + + Args: + keyid: An 8, 16 or 40 hex digit keyid to find a key for + + Returns: + A string contining key material for the specified GPG key id + + + Raises: + subprocess.CalledProcessError + """ + # options=mr - machine-readable output (disables html wrappers) + keyserver_url = ( + "https://keyserver.ubuntu.com" "/pks/lookup?op=get&options=mr&exact=on&search=0x{}" + ) + curl_cmd = ["curl", keyserver_url.format(keyid)] + # use proxy server settings in order to retrieve the key + return check_output(curl_cmd).decode() + + @staticmethod + def _dearmor_gpg_key(key_asc: bytes) -> bytes: + """Convert a GPG key in the ASCII armor format to the binary format. + + Args: + key_asc: A GPG key in ASCII armor format. + + Returns: + A GPG key in binary format as a string + + Raises: + GPGKeyError + """ + ps = subprocess.run(["gpg", "--dearmor"], stdout=PIPE, stderr=PIPE, input=key_asc) + out, err = ps.stdout, ps.stderr.decode() + if "gpg: no valid OpenPGP data found." in err: + raise GPGKeyError( + "Invalid GPG key material. Check your network setup" + " (MTU, routing, DNS) and/or proxy server settings" + " as well as destination keyserver status." + ) + else: + return out + + @staticmethod + def _write_apt_gpg_keyfile(key_name: str, key_material: bytes) -> None: + """Write GPG key material into a file at a provided path. + + Args: + key_name: A key name to use for a key file (could be a fingerprint) + key_material: A GPG key material (binary) + """ + with open(key_name, "wb") as keyf: + keyf.write(key_material) + + +class RepositoryMapping(Mapping): + """An representation of known repositories. + + Instantiation of `RepositoryMapping` will iterate through the + filesystem, parse out repository files in `/etc/apt/...`, and create + `DebianRepository` objects in this list. + + Typical usage: + + repositories = apt.RepositoryMapping() + repositories.add(DebianRepository( + enabled=True, repotype="deb", uri="https://example.com", release="focal", + groups=["universe"] + )) + """ + + def __init__(self): + self._repository_map = {} + # Repositories that we're adding -- used to implement mode param + self.default_file = "/etc/apt/sources.list" + + # read sources.list if it exists + if os.path.isfile(self.default_file): + self.load(self.default_file) + + # read sources.list.d + for file in glob.iglob("/etc/apt/sources.list.d/*.list"): + self.load(file) + + def __contains__(self, key: str) -> bool: + """Magic method for checking presence of repo in mapping.""" + return key in self._repository_map + + def __len__(self) -> int: + """Return number of repositories in map.""" + return len(self._repository_map) + + def __iter__(self) -> Iterable[DebianRepository]: + """Return iterator for RepositoryMapping.""" + return iter(self._repository_map.values()) + + def __getitem__(self, repository_uri: str) -> DebianRepository: + """Return a given `DebianRepository`.""" + return self._repository_map[repository_uri] + + def __setitem__(self, repository_uri: str, repository: DebianRepository) -> None: + """Add a `DebianRepository` to the cache.""" + self._repository_map[repository_uri] = repository + + def load(self, filename: str): + """Load a repository source file into the cache. + + Args: + filename: the path to the repository file + """ + parsed = [] + skipped = [] + with open(filename, "r") as f: + for n, line in enumerate(f): + try: + repo = self._parse(line, filename) + except InvalidSourceError: + skipped.append(n) + else: + repo_identifier = "{}-{}-{}".format(repo.repotype, repo.uri, repo.release) + self._repository_map[repo_identifier] = repo + parsed.append(n) + logger.debug("parsed repo: '%s'", repo_identifier) + + if skipped: + skip_list = ", ".join(str(s) for s in skipped) + logger.debug("skipped the following lines in file '%s': %s", filename, skip_list) + + if parsed: + logger.info("parsed %d apt package repositories", len(parsed)) + else: + raise InvalidSourceError("all repository lines in '{}' were invalid!".format(filename)) + + @staticmethod + def _parse(line: str, filename: str) -> DebianRepository: + """Parse a line in a sources.list file. + + Args: + line: a single line from `load` to parse + filename: the filename being read + + Raises: + InvalidSourceError if the source type is unknown + """ + enabled = True + repotype = uri = release = gpg_key = "" + options = {} + groups = [] + + line = line.strip() + if line.startswith("#"): + enabled = False + line = line[1:] + + # Check for "#" in the line and treat a part after it as a comment then strip it off. + i = line.find("#") + if i > 0: + line = line[:i] + + # Split a source into substrings to initialize a new repo. + source = line.strip() + if source: + # Match any repo options, and get a dict representation. + for v in re.findall(OPTIONS_MATCHER, source): + opts = dict(o.split("=") for o in v.strip("[]").split()) + # Extract the 'signed-by' option for the gpg_key + gpg_key = opts.pop("signed-by", "") + options = opts + + # Remove any options from the source string and split the string into chunks + source = re.sub(OPTIONS_MATCHER, "", source) + chunks = source.split() + + # Check we've got a valid list of chunks + if len(chunks) < 3 or chunks[0] not in VALID_SOURCE_TYPES: + raise InvalidSourceError("An invalid sources line was found in %s!", filename) + + repotype = chunks[0] + uri = chunks[1] + release = chunks[2] + groups = chunks[3:] + + return DebianRepository( + enabled, repotype, uri, release, groups, filename, gpg_key, options + ) + else: + raise InvalidSourceError("An invalid sources line was found in %s!", filename) + + def add(self, repo: DebianRepository, default_filename: Optional[bool] = False) -> None: + """Add a new repository to the system. + + Args: + repo: a `DebianRepository` object + default_filename: an (Optional) filename if the default is not desirable + """ + new_filename = "{}-{}.list".format( + DebianRepository.prefix_from_uri(repo.uri), repo.release.replace("/", "-") + ) + + fname = repo.filename or new_filename + + options = repo.options if repo.options else {} + if repo.gpg_key: + options["signed-by"] = repo.gpg_key + + with open(fname, "wb") as f: + f.write( + ( + "{}".format("#" if not repo.enabled else "") + + "{} {}{} ".format(repo.repotype, repo.make_options_string(), repo.uri) + + "{} {}\n".format(repo.release, " ".join(repo.groups)) + ).encode("utf-8") + ) + + self._repository_map["{}-{}-{}".format(repo.repotype, repo.uri, repo.release)] = repo + + def disable(self, repo: DebianRepository) -> None: + """Remove a repository. Disable by default. + + Args: + repo: a `DebianRepository` to disable + """ + searcher = "{} {}{} {}".format( + repo.repotype, repo.make_options_string(), repo.uri, repo.release + ) + + for line in fileinput.input(repo.filename, inplace=True): + if re.match(r"^{}\s".format(re.escape(searcher)), line): + print("# {}".format(line), end="") + else: + print(line, end="") + + self._repository_map["{}-{}-{}".format(repo.repotype, repo.uri, repo.release)] = repo diff --git a/lib/charms/operator_libs_linux/v0/passwd.py b/lib/charms/operator_libs_linux/v0/passwd.py new file mode 100644 index 0000000..ed5a058 --- /dev/null +++ b/lib/charms/operator_libs_linux/v0/passwd.py @@ -0,0 +1,259 @@ +# Copyright 2021 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Simple library for managing Linux users and groups. + +The `passwd` module provides convenience methods and abstractions around users and groups on a +Linux system, in order to make adding and managing users and groups easy. + +Example of adding a user named 'test': + +```python +import passwd +passwd.add_group(name='special_group') +passwd.add_user(username='test', secondary_groups=['sudo']) + +if passwd.user_exists('some_user'): + do_stuff() +``` +""" + +import grp +import logging +import pwd +from subprocess import STDOUT, check_output +from typing import List, Optional, Union + +logger = logging.getLogger(__name__) + +# The unique Charmhub library identifier, never change it +LIBID = "cf7655b2bf914d67ac963f72b930f6bb" + +# Increment this major API version when introducing breaking changes +LIBAPI = 0 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 4 + + +def user_exists(user: Union[str, int]) -> Optional[pwd.struct_passwd]: + """Check if a user exists. + + Args: + user: username or gid of user whose existence to check + + Raises: + TypeError: where neither a string or int is passed as the first argument + """ + try: + if type(user) is int: + return pwd.getpwuid(user) + elif type(user) is str: + return pwd.getpwnam(user) + else: + raise TypeError("specified argument '%r' should be a string or int", user) + except KeyError: + logger.info("specified user '%s' doesn't exist", str(user)) + return None + + +def group_exists(group: Union[str, int]) -> Optional[grp.struct_group]: + """Check if a group exists. + + Args: + group: username or gid of user whose existence to check + + Raises: + TypeError: where neither a string or int is passed as the first argument + """ + try: + if type(group) is int: + return grp.getgrgid(group) + elif type(group) is str: + return grp.getgrnam(group) + else: + raise TypeError("specified argument '%r' should be a string or int", group) + except KeyError: + logger.info("specified group '%s' doesn't exist", str(group)) + return None + + +def add_user( + username: str, + password: Optional[str] = None, + shell: str = "/bin/bash", + system_user: bool = False, + primary_group: str = None, + secondary_groups: List[str] = None, + uid: int = None, + home_dir: str = None, + create_home: bool = True, +) -> str: + """Add a user to the system. + + Will log but otherwise succeed if the user already exists. + + Arguments: + username: Username to create + password: Password for user; if ``None``, create a system user + shell: The default shell for the user + system_user: Whether to create a login or system user + primary_group: Primary group for user; defaults to username + secondary_groups: Optional list of additional groups + uid: UID for user being created + home_dir: Home directory for user + create_home: Force home directory creation + + Returns: + The password database entry struct, as returned by `pwd.getpwnam` + """ + try: + if uid: + user_info = pwd.getpwuid(int(uid)) + logger.info("user '%d' already exists", uid) + return user_info + user_info = pwd.getpwnam(username) + logger.info("user with uid '%s' already exists", username) + return user_info + except KeyError: + logger.info("creating user '%s'", username) + + cmd = ["useradd", "--shell", shell] + + if uid: + cmd.extend(["--uid", str(uid)]) + if home_dir: + cmd.extend(["--home", str(home_dir)]) + if password: + cmd.extend(["--password", password]) + if create_home: + cmd.append("--create-home") + if system_user or password is None: + cmd.append("--system") + + if not primary_group: + try: + grp.getgrnam(username) + primary_group = username # avoid "group exists" error + except KeyError: + pass + + if primary_group: + cmd.extend(["-g", primary_group]) + if secondary_groups: + cmd.extend(["-G", ",".join(secondary_groups)]) + + cmd.append(username) + check_output(cmd, stderr=STDOUT) + user_info = pwd.getpwnam(username) + return user_info + + +def add_group(group_name: str, system_group: bool = False, gid: int = None): + """Add a group to the system. + + Will log but otherwise succeed if the group already exists. + + Args: + group_name: group to create + system_group: Create system group + gid: GID for user being created + + Returns: + The group's password database entry struct, as returned by `grp.getgrnam` + """ + try: + group_info = grp.getgrnam(group_name) + logger.info("group '%s' already exists", group_name) + if gid: + group_info = grp.getgrgid(gid) + logger.info("group with gid '%d' already exists", gid) + except KeyError: + logger.info("creating group '%s'", group_name) + cmd = ["addgroup"] + if gid: + cmd.extend(["--gid", str(gid)]) + if system_group: + cmd.append("--system") + else: + cmd.extend(["--group"]) + cmd.append(group_name) + check_output(cmd, stderr=STDOUT) + group_info = grp.getgrnam(group_name) + return group_info + + +def add_user_to_group(username: str, group: str): + """Add a user to a group. + + Args: + username: user to add to specified group + group: name of group to add user to + + Returns: + The group's password database entry struct, as returned by `grp.getgrnam` + """ + if not user_exists(username): + raise ValueError("user '{}' does not exist".format(username)) + if not group_exists(group): + raise ValueError("group '{}' does not exist".format(group)) + + logger.info("adding user '%s' to group '%s'", username, group) + check_output(["gpasswd", "-a", username, group], stderr=STDOUT) + return grp.getgrnam(group) + + +def remove_user(user: Union[str, int], remove_home: bool = False) -> bool: + """Remove a user from the system. + + Args: + user: the username or uid of the user to remove + remove_home: indicates whether the user's home directory should be removed + """ + u = user_exists(user) + if not u: + logger.info("user '%s' does not exist", str(u)) + return True + + cmd = ["userdel"] + if remove_home: + cmd.append("-f") + cmd.append(u.pw_name) + + logger.info("removing user '%s'", u.pw_name) + check_output(cmd, stderr=STDOUT) + return True + + +def remove_group(group: Union[str, int], force: bool = False) -> bool: + """Remove a user from the system. + + Args: + group: the name or gid of the group to remove + force: force group removal even if it's the primary group for a user + """ + g = group_exists(group) + if not g: + logger.info("group '%s' does not exist", str(g)) + return True + + cmd = ["groupdel"] + if force: + cmd.append("-f") + cmd.append(g.gr_name) + + logger.info("removing group '%s'", g.gr_name) + check_output(cmd, stderr=STDOUT) + return True diff --git a/lib/charms/operator_libs_linux/v1/systemd.py b/lib/charms/operator_libs_linux/v1/systemd.py new file mode 100644 index 0000000..cdcbad6 --- /dev/null +++ b/lib/charms/operator_libs_linux/v1/systemd.py @@ -0,0 +1,288 @@ +# Copyright 2021 Canonical Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Abstractions for stopping, starting and managing system services via systemd. + +This library assumes that your charm is running on a platform that uses systemd. E.g., +Centos 7 or later, Ubuntu Xenial (16.04) or later. + +For the most part, we transparently provide an interface to a commonly used selection of +systemd commands, with a few shortcuts baked in. For example, service_pause and +service_resume with run the mask/unmask and enable/disable invocations. + +Example usage: + +```python +from charms.operator_libs_linux.v0.systemd import service_running, service_reload + +# Start a service +if not service_running("mysql"): + success = service_start("mysql") + +# Attempt to reload a service, restarting if necessary +success = service_reload("nginx", restart_on_failure=True) +``` +""" + +__all__ = [ # Don't export `_systemctl`. (It's not the intended way of using this lib.) + "SystemdError", + "daemon_reload", + "service_disable", + "service_enable", + "service_failed", + "service_pause", + "service_reload", + "service_restart", + "service_resume", + "service_running", + "service_start", + "service_stop", +] + +import logging +import subprocess + +logger = logging.getLogger(__name__) + +# The unique Charmhub library identifier, never change it +LIBID = "045b0d179f6b4514a8bb9b48aee9ebaf" + +# Increment this major API version when introducing breaking changes +LIBAPI = 1 + +# Increment this PATCH version before using `charmcraft publish-lib` or reset +# to 0 if you are raising the major API version +LIBPATCH = 4 + + +class SystemdError(Exception): + """Custom exception for SystemD related errors.""" + + +def _systemctl(*args: str, check: bool = False) -> int: + """Control a system service using systemctl. + + Args: + *args: Arguments to pass to systemctl. + check: Check the output of the systemctl command. Default: False. + + Returns: + Returncode of systemctl command execution. + + Raises: + SystemdError: Raised if calling systemctl returns a non-zero returncode and check is True. + """ + cmd = ["systemctl", *args] + logger.debug(f"Executing command: {cmd}") + try: + proc = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + encoding="utf-8", + check=check, + ) + logger.debug( + f"Command {cmd} exit code: {proc.returncode}. systemctl output:\n{proc.stdout}" + ) + return proc.returncode + except subprocess.CalledProcessError as e: + raise SystemdError( + f"Command {cmd} failed with returncode {e.returncode}. systemctl output:\n{e.stdout}" + ) + + +def service_running(service_name: str) -> bool: + """Report whether a system service is running. + + Args: + service_name: The name of the service to check. + + Return: + True if service is running/active; False if not. + """ + # If returncode is 0, this means that is service is active. + return _systemctl("--quiet", "is-active", service_name) == 0 + + +def service_failed(service_name: str) -> bool: + """Report whether a system service has failed. + + Args: + service_name: The name of the service to check. + + Returns: + True if service is marked as failed; False if not. + """ + # If returncode is 0, this means that the service has failed. + return _systemctl("--quiet", "is-failed", service_name) == 0 + + +def service_start(*args: str) -> bool: + """Start a system service. + + Args: + *args: Arguments to pass to `systemctl start` (normally the service name). + + Returns: + On success, this function returns True for historical reasons. + + Raises: + SystemdError: Raised if `systemctl start ...` returns a non-zero returncode. + """ + return _systemctl("start", *args, check=True) == 0 + + +def service_stop(*args: str) -> bool: + """Stop a system service. + + Args: + *args: Arguments to pass to `systemctl stop` (normally the service name). + + Returns: + On success, this function returns True for historical reasons. + + Raises: + SystemdError: Raised if `systemctl stop ...` returns a non-zero returncode. + """ + return _systemctl("stop", *args, check=True) == 0 + + +def service_restart(*args: str) -> bool: + """Restart a system service. + + Args: + *args: Arguments to pass to `systemctl restart` (normally the service name). + + Returns: + On success, this function returns True for historical reasons. + + Raises: + SystemdError: Raised if `systemctl restart ...` returns a non-zero returncode. + """ + return _systemctl("restart", *args, check=True) == 0 + + +def service_enable(*args: str) -> bool: + """Enable a system service. + + Args: + *args: Arguments to pass to `systemctl enable` (normally the service name). + + Returns: + On success, this function returns True for historical reasons. + + Raises: + SystemdError: Raised if `systemctl enable ...` returns a non-zero returncode. + """ + return _systemctl("enable", *args, check=True) == 0 + + +def service_disable(*args: str) -> bool: + """Disable a system service. + + Args: + *args: Arguments to pass to `systemctl disable` (normally the service name). + + Returns: + On success, this function returns True for historical reasons. + + Raises: + SystemdError: Raised if `systemctl disable ...` returns a non-zero returncode. + """ + return _systemctl("disable", *args, check=True) == 0 + + +def service_reload(service_name: str, restart_on_failure: bool = False) -> bool: + """Reload a system service, optionally falling back to restart if reload fails. + + Args: + service_name: The name of the service to reload. + restart_on_failure: + Boolean indicating whether to fall back to a restart if the reload fails. + + Returns: + On success, this function returns True for historical reasons. + + Raises: + SystemdError: Raised if `systemctl reload|restart ...` returns a non-zero returncode. + """ + try: + return _systemctl("reload", service_name, check=True) == 0 + except SystemdError: + if restart_on_failure: + return service_restart(service_name) + else: + raise + + +def service_pause(service_name: str) -> bool: + """Pause a system service. + + Stops the service and prevents the service from starting again at boot. + + Args: + service_name: The name of the service to pause. + + Returns: + On success, this function returns True for historical reasons. + + Raises: + SystemdError: Raised if service is still running after being paused by systemctl. + """ + _systemctl("disable", "--now", service_name) + _systemctl("mask", service_name) + + if service_running(service_name): + raise SystemdError(f"Attempted to pause {service_name!r}, but it is still running.") + + return True + + +def service_resume(service_name: str) -> bool: + """Resume a system service. + + Re-enable starting the service again at boot. Start the service. + + Args: + service_name: The name of the service to resume. + + Returns: + On success, this function returns True for historical reasons. + + Raises: + SystemdError: Raised if service is not running after being resumed by systemctl. + """ + _systemctl("unmask", service_name) + _systemctl("enable", "--now", service_name) + + if not service_running(service_name): + raise SystemdError(f"Attempted to resume {service_name!r}, but it is not running.") + + return True + + +def daemon_reload() -> bool: + """Reload systemd manager configuration. + + Returns: + On success, this function returns True for historical reasons. + + Raises: + SystemdError: Raised if `systemctl daemon-reload` returns a non-zero returncode. + """ + return _systemctl("daemon-reload", check=True) == 0 diff --git a/metadata.yaml b/metadata.yaml new file mode 100644 index 0000000..9bfc80f --- /dev/null +++ b/metadata.yaml @@ -0,0 +1,36 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +name: tmate-ssh-server +display-name: Tmate-ssh-server +maintainers: + - https://launchpad.net/~canonical-is-devops +description: | + A [Juju](https://juju.is/) [charm](https://juju.is/docs/olm/charmed-operators) + deploying and managing [Tmate self-hosted server](https://tmate.io/). Tmate is an + open source terminal multiplexer, providing instant terminal sharing capabilities. + + Tmate is a terminal multiplexer that allows remote terminal sharing. It enables users to share + their terminal session with other users over the internet, allowing them to collaborate, provide + technical support, or demonstrate commands and procedures in real-time. + + This charm provides the tmate-ssh-server service, and when paired with the tmate client provides + self-hosted ssh relay server. + + For DevOps and SRE teams, this charm will make operating self hosted tmate-ssh-server simple and + straightforward through Juju's clean interface. Allowing machine relations to the + [Github runner](https://charmhub.io/github-runner), it supports SSH debug access to the running + machines. +docs: https://discourse.charmhub.io/t/tmate-ssh-server-documentation-overview/12886 +issues: https://github.com/canonical/tmate-ssh-server-operator/issues +source: https://github.com/canonical/tmate-ssh-server-operator +summary: Tmate SSH Relay Server +series: + - jammy +tags: + - application_development + - ops + +provides: + debug-ssh: + interface: debug-ssh diff --git a/src-docs/actions.py.md b/src-docs/actions.py.md new file mode 100644 index 0000000..89e617d --- /dev/null +++ b/src-docs/actions.py.md @@ -0,0 +1,59 @@ + + + + +# module `actions.py` +tmate-ssh-server charm actions. + + + +--- + +## class `Observer` +Tmate-ssh-server charm actions observer. + + + +### function `__init__` + +```python +__init__(charm: CharmBase, state: State) +``` + +Initialize the observer and register actions handlers. + + + +**Args:** + + - `charm`: The parent charm to attach the observer to. + - `state`: The charm state. + + +--- + +#### property model + +Shortcut for more simple access the model. + + + +--- + + + +### function `on_get_server_config` + +```python +on_get_server_config(event: ActionEvent) → None +``` + +Get server configuration values for .tmate.conf. + + + +**Args:** + + - `event`: The get-server-config action event. + + diff --git a/src-docs/charm.py.md b/src-docs/charm.py.md index aa47ed5..ba71fd2 100644 --- a/src-docs/charm.py.md +++ b/src-docs/charm.py.md @@ -12,17 +12,59 @@ Charm tmate-ssh-server. ## class `TmateSSHServerOperatorCharm` Charm tmate-ssh-server. - + ### function `__init__` ```python -__init__() → None +__init__(*args: Any) ``` -Initialize the charm. +Initialize the charm and register event handlers. +**Args:** + + - `args`: Arguments to initialize the charm base. + + +--- + +#### property app + +Application that this unit is part of. + +--- + +#### property charm_dir + +Root directory of the charm as it is running. + +--- + +#### property config + +A mapping containing the charm's config and current values. + +--- + +#### property meta + +Metadata of this charm. + +--- + +#### property model + +Shortcut for more simple access the model. + +--- + +#### property unit + +Unit that this execution is responsible for. + + diff --git a/src-docs/ssh_debug.py.md b/src-docs/ssh_debug.py.md new file mode 100644 index 0000000..df58d4c --- /dev/null +++ b/src-docs/ssh_debug.py.md @@ -0,0 +1,63 @@ + + + + +# module `ssh_debug.py` +Observer module for ssh-debug integration. + +**Global Variables** +--------------- +- **DEBUG_SSH_INTEGRATION_NAME** + + +--- + +## class `Observer` +The ssh-debug integration observer. + + + +### function `__init__` + +```python +__init__(charm: CharmBase, state: State) +``` + +Initialize the observer and register event handlers. + + + +**Args:** + + - `charm`: The parent charm to attach the observer to. + - `state`: The charm state. + + +--- + +#### property model + +Shortcut for more simple access the model. + + + +--- + + + +### function `update_relation_data` + +```python +update_relation_data(host: str, fingerprints: Fingerprints) → None +``` + +Update ssh_debug relation data if relation is available. + + + +**Args:** + + - `host`: The unit's bound IP address. + - `fingerprints`: The tmate-ssh-server generated fingerprint for RSA and ED25519 keys. + + diff --git a/src-docs/state.py.md b/src-docs/state.py.md new file mode 100644 index 0000000..018595c --- /dev/null +++ b/src-docs/state.py.md @@ -0,0 +1,90 @@ + + + + +# module `state.py` +tmate-ssh-server states. + +**Global Variables** +--------------- +- **DEBUG_SSH_INTEGRATION_NAME** + + +--- + +## class `CharmStateBaseError` +Represents an error with charm state. + + + + + +--- + +## class `InvalidCharmStateError` +Represents an invalid charm state. + + + +### function `__init__` + +```python +__init__(reason: str) +``` + +Initialize the error. + + + +**Args:** + + - `reason`: The reason why the state is invalid. + + + + + +--- + +## class `State` +The tmate-ssh-server operator charm state. + + + +**Attributes:** + + - `ip_addr`: The host IP address of the given tmate-ssh-server unit. + + + + +--- + + + +### classmethod `from_charm` + +```python +from_charm(charm: CharmBase) → State +``` + +Initialize the state from charm. + + + +**Args:** + + - `charm`: The charm root TmateSSHServer charm. + + + +**Returns:** + The current state of tmate-ssh-server charm. + + + +**Raises:** + + - `InvalidCharmStateError`: if the network bind address was not of IPv4/IPv6. + + diff --git a/src-docs/tmate.py.md b/src-docs/tmate.py.md new file mode 100644 index 0000000..ad7e997 --- /dev/null +++ b/src-docs/tmate.py.md @@ -0,0 +1,199 @@ + + + + +# module `tmate.py` +Configurations and functions to operate tmate-ssh-server. + +**Global Variables** +--------------- +- **APT_DEPENDENCIES** +- **GIT_REPOSITORY_URL** +- **USER** +- **GROUP** +- **PORT** + +--- + + + +## function `install_dependencies` + +```python +install_dependencies() → None +``` + +Install dependenciese required to start tmate-ssh-server container. + + + +**Raises:** + + - `DependencySetupError`: if there was something wrong installing the apt package dependencies. + + +--- + + + +## function `install_keys` + +```python +install_keys(host_ip: Union[IPv4Address, IPv6Address, str]) → None +``` + +Install key creation script and generate keys. + + + +**Args:** + + - `host_ip`: The charm host's public IP address. + + + +**Raises:** + + - `KeyInstallError`: if there was an error creating ssh keys. + + +--- + + + +## function `start_daemon` + +```python +start_daemon(address: str) → None +``` + +Install unit files and start daemon. + + + +**Args:** + + - `address`: The IP address to bind to. + + + +**Raises:** + + - `DaemonStartError`: if there was an error starting the tmate-ssh-server docker process. + + +--- + + + +## function `get_fingerprints` + +```python +get_fingerprints() → Fingerprints +``` + +Get fingerprint from generated keys. + + + +**Raises:** + + - `IncompleteInitError`: if the keys have not been generated by the create_keys.sh script. + + + +**Returns:** + The generated public key fingerprints. + + +--- + + + +## function `generate_tmate_conf` + +```python +generate_tmate_conf(host: str) → str +``` + +Generate the .tmate.conf values from generated keys. + + + +**Args:** + + - `host`: The host IP address. + + + +**Raises:** + + - `FingerprintError`: if there was an error generating fingerprints from public keys. + + + +**Returns:** + The tmate config file contents. + + +--- + +## class `DaemonStartError` +Represents an error while starting tmate-ssh-server daemon. + + + + + +--- + +## class `DependencySetupError` +Represents an error while installing and setting up dependencies. + + + + + +--- + +## class `FingerprintError` +Represents an error with generating fingerprints from public keys. + + + + + +--- + +## class `Fingerprints` +The public key fingerprints. + + + +**Attributes:** + + - `rsa`: The RSA public key fingerprint. + - `ed25519`: The ed25519 public key fingerprint. + + + + + +--- + +## class `IncompleteInitError` +The tmate-ssh-server has not been fully initialized. + + + + + +--- + +## class `KeyInstallError` +Represents an error while installing/generating key files. + + + + + diff --git a/src/actions.py b/src/actions.py new file mode 100644 index 0000000..0c68235 --- /dev/null +++ b/src/actions.py @@ -0,0 +1,46 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""tmate-ssh-server charm actions.""" +import logging + +import ops + +import tmate +from state import State + +logger = logging.getLogger(__name__) + + +class Observer(ops.Object): + """Tmate-ssh-server charm actions observer.""" + + def __init__(self, charm: ops.CharmBase, state: State): + """Initialize the observer and register actions handlers. + + Args: + charm: The parent charm to attach the observer to. + state: The charm state. + """ + super().__init__(charm, "actions-observer") + self.charm = charm + self.state = state + + charm.framework.observe(charm.on.get_server_config_action, self.on_get_server_config) + + def on_get_server_config(self, event: ops.ActionEvent) -> None: + """Get server configuration values for .tmate.conf. + + Args: + event: The get-server-config action event. + """ + if not self.state.ip_addr: + event.fail("Host address not ready yet.") + return + try: + conf = tmate.generate_tmate_conf(str(self.state.ip_addr)) + except tmate.FingerprintError as exc: + logger.error("Failed to generate .tmate.conf, %s.", exc) + event.fail("Failed to generate .tmate.conf. See juju debug-log output.") + return + event.set_results({"tmate-config": conf}) diff --git a/src/charm.py b/src/charm.py old mode 100644 new mode 100755 index 7d1b0f2..ba36174 --- a/src/charm.py +++ b/src/charm.py @@ -5,14 +5,82 @@ """Charm tmate-ssh-server.""" +import logging +import typing -# This is a placeholder skeleton charm, it will be implemented in the future. -class TmateSSHServerOperatorCharm: # pylint: disable=R0903 +import ops + +import actions +import ssh_debug +import tmate +from state import State + +logger = logging.getLogger(__name__) + + +class TmateSSHServerOperatorCharm(ops.CharmBase): """Charm tmate-ssh-server.""" - def __init__(self) -> None: - """Initialize the charm.""" + def __init__(self, *args: typing.Any): + """Initialize the charm and register event handlers. + + Args: + args: Arguments to initialize the charm base. + """ + super().__init__(*args) + self.state = State.from_charm(self) + self.actions = actions.Observer(self, self.state) + self.sshdebug = ssh_debug.Observer(self, self.state) + + self.framework.observe(self.on.install, self._on_install) + + def _on_install(self, event: ops.InstallEvent) -> None: + """Install and start tmate-ssh-server. + + Args: + event: The event emitted on install hook. + + Raises: + DependencyInstallError: if the dependencies required to start charm has failed. + KeyInstallError: if the ssh-key installation and fingerprint generation failed. + DaemonStartError: if the workload daemon was unable to start. + """ + if not self.state.ip_addr: + logger.warning("Unit address not assigned.") + # Try again until unit is assigned an IP address. + event.defer() + return + + try: + self.unit.status = ops.MaintenanceStatus("Installing packages.") + tmate.install_dependencies() + except tmate.DependencySetupError as exc: + logger.error("Failed to install docker package, %s.", exc) + raise + + try: + self.unit.status = ops.MaintenanceStatus("Generating keys.") + tmate.install_keys(host_ip=self.state.ip_addr) + except tmate.KeyInstallError as exc: + logger.error("Failed to install/generate keys, %s.", exc) + raise + + try: + self.unit.status = ops.MaintenanceStatus("Starting tmate-ssh-server daemon.") + tmate.start_daemon(address=str(self.state.ip_addr)) + except tmate.DaemonStartError as exc: + logger.error("Failed to start tmate-ssh-server daemon, %s.", exc) + raise + + try: + fingerprints = tmate.get_fingerprints() + except tmate.IncompleteInitError as exc: + logger.error("Something went wrong initializing keys, %s.", exc) + raise + + self.sshdebug.update_relation_data(host=str(self.state.ip_addr), fingerprints=fingerprints) + self.unit.status = ops.ActiveStatus() if __name__ == "__main__": # pragma: nocover - pass + ops.main.main(TmateSSHServerOperatorCharm) diff --git a/src/ssh_debug.py b/src/ssh_debug.py new file mode 100644 index 0000000..94e5f25 --- /dev/null +++ b/src/ssh_debug.py @@ -0,0 +1,74 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Observer module for ssh-debug integration.""" +import logging +import typing + +import ops + +import tmate +from state import DEBUG_SSH_INTEGRATION_NAME, State + +logger = logging.getLogger(__name__) + + +class Observer(ops.Object): + """The ssh-debug integration observer.""" + + def __init__(self, charm: ops.CharmBase, state: State): + """Initialize the observer and register event handlers. + + Args: + charm: The parent charm to attach the observer to. + state: The charm state. + """ + super().__init__(charm, "ssh-debug-observer") + self.charm = charm + self.state = state + + charm.framework.observe( + charm.on[DEBUG_SSH_INTEGRATION_NAME].relation_joined, + self._on_ssh_debug_relation_joined, + ) + + def update_relation_data(self, host: str, fingerprints: tmate.Fingerprints) -> None: + """Update ssh_debug relation data if relation is available. + + Args: + host: The unit's bound IP address. + fingerprints: The tmate-ssh-server generated fingerprint for RSA and ED25519 keys. + """ + relations: typing.List[ops.Relation] | None = self.charm.model.relations.get( + DEBUG_SSH_INTEGRATION_NAME + ) + if not relations: + logger.warning( + "%s relation not yet ready. Relation data will be setup when it becomes available.", + DEBUG_SSH_INTEGRATION_NAME, + ) + return + for relation in relations: + relation_data: ops.RelationDataContent = relation.data[self.charm.unit] + relation_data.update( + { + "host": host, + "port": str(tmate.PORT), + "rsa_fingerprint": fingerprints.rsa, + "ed25519_fingerprint": fingerprints.ed25519, + } + ) + + def _on_ssh_debug_relation_joined(self, _: ops.RelationJoinedEvent) -> None: + """Handle ssh-debug relation joined event. + + Raises: + KeyInstallError: if there was an error getting keys fingerprints. + """ + try: + fingerprints = tmate.get_fingerprints() + except tmate.IncompleteInitError as exc: + logger.error("Error getting fingerprint data, %s.", exc) + raise + + self.update_relation_data(host=str(self.state.ip_addr), fingerprints=fingerprints) diff --git a/src/state.py b/src/state.py new file mode 100644 index 0000000..70a9835 --- /dev/null +++ b/src/state.py @@ -0,0 +1,62 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""tmate-ssh-server states.""" +import dataclasses +import ipaddress +import typing + +import ops + +DEBUG_SSH_INTEGRATION_NAME = "debug-ssh" + + +class CharmStateBaseError(Exception): + """Represents an error with charm state.""" + + +class InvalidCharmStateError(CharmStateBaseError): + """Represents an invalid charm state.""" + + def __init__(self, reason: str): + """Initialize the error. + + Args: + reason: The reason why the state is invalid. + """ + self.reason = reason + + +@dataclasses.dataclass(frozen=True) +class State: + """The tmate-ssh-server operator charm state. + + Attributes: + ip_addr: The host IP address of the given tmate-ssh-server unit. + """ + + ip_addr: typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address, str, None] + + @classmethod + def from_charm(cls, charm: ops.CharmBase) -> "State": + """Initialize the state from charm. + + Args: + charm: The charm root TmateSSHServer charm. + + Returns: + The current state of tmate-ssh-server charm. + + Raises: + InvalidCharmStateError: if the network bind address was not of IPv4/IPv6. + """ + binding = charm.model.get_binding("juju-info") + if not binding: + return cls(ip_addr=None) + # If unable to get a casted IPvX address, it is not useful. + # https://github.com/canonical/operator/blob/8a08e8e1b389fce4e7b54663863c4b2d06e72224/ops/model.py#L939-L947 + if isinstance(binding.network.bind_address, str): + raise InvalidCharmStateError( + f"Invalid network bind address {binding.network.bind_address}." + ) + return cls(ip_addr=binding.network.bind_address if binding else None) diff --git a/src/tmate.py b/src/tmate.py new file mode 100644 index 0000000..0df4107 --- /dev/null +++ b/src/tmate.py @@ -0,0 +1,205 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Configurations and functions to operate tmate-ssh-server.""" + +import base64 +import dataclasses +import hashlib +import ipaddress + +# subprocess module is required to install and start docker daemon processes, the security +# implications have been considered. +import subprocess # nosec +import textwrap +import typing +from pathlib import Path + +import jinja2 +from charms.operator_libs_linux.v0 import apt, passwd +from charms.operator_libs_linux.v1 import systemd + +APT_DEPENDENCIES = ["docker.io", "openssh-client"] + +GIT_REPOSITORY_URL = "https://github.com/tmate-io/tmate-ssh-server.git" + +WORK_DIR = Path("/home/ubuntu/") +CREATE_KEYS_SCRIPT_PATH = WORK_DIR / "create_keys.sh" +KEYS_DIR = WORK_DIR / "keys" +RSA_PUB_KEY_PATH = KEYS_DIR / "ssh_host_rsa_key.pub" +ED25519_PUB_KEY_PATH = KEYS_DIR / "ssh_host_ed25519_key.pub" +TMATE_SSH_SERVER_SERVICE_PATH = Path("/etc/systemd/system/tmate-ssh-server.service") + +USER = "ubuntu" +GROUP = "ubuntu" + +PORT = 10022 + + +class DependencySetupError(Exception): + """Represents an error while installing and setting up dependencies.""" + + +class KeyInstallError(Exception): + """Represents an error while installing/generating key files.""" + + +class DaemonStartError(Exception): + """Represents an error while starting tmate-ssh-server daemon.""" + + +class IncompleteInitError(Exception): + """The tmate-ssh-server has not been fully initialized.""" + + +class FingerprintError(Exception): + """Represents an error with generating fingerprints from public keys.""" + + +def install_dependencies() -> None: + """Install dependenciese required to start tmate-ssh-server container. + + Raises: + DependencySetupError: if there was something wrong installing the apt package + dependencies. + """ + try: + apt.update() + apt.add_package(APT_DEPENDENCIES) + except (apt.PackageNotFoundError, apt.PackageError) as exc: + raise DependencySetupError("Failed to install apt packages.") from exc + passwd.add_group("docker") + try: + passwd.add_user_to_group(USER, "docker") + except ValueError as exc: + raise DependencySetupError(f"Failed to add user {USER} to docker group.") from exc + + +def install_keys(host_ip: typing.Union[ipaddress.IPv4Address, ipaddress.IPv6Address, str]) -> None: + """Install key creation script and generate keys. + + Args: + host_ip: The charm host's public IP address. + + Raises: + KeyInstallError: if there was an error creating ssh keys. + """ + environment = jinja2.Environment(loader=jinja2.FileSystemLoader("templates"), autoescape=True) + template = environment.get_template("create_keys.sh.j2") + script = template.render(keys_dir=KEYS_DIR, host=str(host_ip), port=PORT) + WORK_DIR.mkdir(parents=True, exist_ok=True) + KEYS_DIR.mkdir(parents=True, exist_ok=True) + CREATE_KEYS_SCRIPT_PATH.write_text(script, encoding="utf-8") + try: + # B603:subprocess_without_shell_equals_true false positive + # see https://github.com/PyCQA/bandit/issues/333 + subprocess.check_call(["/usr/bin/chown", "-R", f"{USER}:{GROUP}", str(WORK_DIR)]) # nosec + CREATE_KEYS_SCRIPT_PATH.chmod(755) + subprocess.check_call([str(CREATE_KEYS_SCRIPT_PATH)]) # nosec + except subprocess.CalledProcessError as exc: + raise KeyInstallError from exc + + +def start_daemon(address: str) -> None: + """Install unit files and start daemon. + + Args: + address: The IP address to bind to. + + Raises: + DaemonStartError: if there was an error starting the tmate-ssh-server docker process. + """ + environment = jinja2.Environment(loader=jinja2.FileSystemLoader("templates"), autoescape=True) + service_content = environment.get_template("tmate-ssh-server.service.j2").render( + WORKDIR=WORK_DIR, + KEYS_DIR=KEYS_DIR, + PORT=PORT, + ADDRESS=address, + ) + TMATE_SSH_SERVER_SERVICE_PATH.write_text(service_content, encoding="utf-8") + try: + systemd.daemon_reload() + except systemd.SystemdError as exc: + raise DaemonStartError("Failed to reload tmate-ssh-server daemon") from exc + try: + systemd.service_start("tmate-ssh-server") + except systemd.SystemdError as exc: + raise DaemonStartError("Failed to start tmate-ssh-server daemon") from exc + + +@dataclasses.dataclass +class Fingerprints: + """The public key fingerprints. + + Attributes: + rsa: The RSA public key fingerprint. + ed25519: The ed25519 public key fingerprint. + """ + + rsa: str + ed25519: str + + +def _calculate_fingerprint(key: str) -> str: + """Calculate the SHA256 fingerprint of a key. + + Args: + key: Base64 encoded key value. + + Returns: + Fingerprint of a key. + """ + decoded_bytes = base64.b64decode(key) + key_hash = hashlib.sha256(decoded_bytes).digest() + return base64.b64encode(key_hash).decode("utf-8").removesuffix("=") + + +def get_fingerprints() -> Fingerprints: + """Get fingerprint from generated keys. + + Raises: + IncompleteInitError: if the keys have not been generated by the create_keys.sh script. + + Returns: + The generated public key fingerprints. + """ + if not KEYS_DIR.exists() or not RSA_PUB_KEY_PATH.exists() or not ED25519_PUB_KEY_PATH.exists(): + raise IncompleteInitError("Missing keys path(s).") + + # format of a public key is: ssh-rsa + rsa_pub_key = RSA_PUB_KEY_PATH.read_text(encoding="utf-8") + rsa_key_b64 = rsa_pub_key.split()[1] + rsa_fingerprint = _calculate_fingerprint(rsa_key_b64) + + ed25519_pub_key = ED25519_PUB_KEY_PATH.read_text(encoding="utf-8") + ed25519_key_b64 = ed25519_pub_key.split()[1] + ed25519_fingerprint = _calculate_fingerprint(ed25519_key_b64) + + return Fingerprints(rsa=f"SHA256:{rsa_fingerprint}", ed25519=f"SHA256:{ed25519_fingerprint}") + + +def generate_tmate_conf(host: str) -> str: + """Generate the .tmate.conf values from generated keys. + + Args: + host: The host IP address. + + Raises: + FingerprintError: if there was an error generating fingerprints from public keys. + + Returns: + The tmate config file contents. + """ + try: + fingerprints = get_fingerprints() + except (IncompleteInitError, KeyInstallError) as exc: + raise FingerprintError("Error generating fingerprints.") from exc + + return textwrap.dedent( + f""" + set -g tmate-server-host {host} + set -g tmate-server-port {PORT} + set -g tmate-server-rsa-fingerprint {fingerprints.rsa} + set -g tmate-server-ed25519-fingerprint {fingerprints.ed25519} + """ + ) diff --git a/templates/create_keys.sh.j2 b/templates/create_keys.sh.j2 new file mode 100644 index 0000000..7ce903c --- /dev/null +++ b/templates/create_keys.sh.j2 @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +# This script is taken from the official tmate-ssh-server repository. + +set -eu + +gen_key() { + keytype=$1 + ks="${keytype}_" + key="{{ keys_dir }}/ssh_host_${ks}key" + if [ ! -e "${key}" ] ; then + ssh-keygen -t "${keytype}" -f "${key}" -N '' + echo "" + fi + SIG=$(ssh-keygen -l -E SHA256 -f "$key.pub" | cut -d ' ' -f 2) +} + +mkdir -p keys +gen_key rsa +RSA_SIG=$SIG +gen_key ed25519 +ED25519_SIG=$SIG diff --git a/templates/tmate-ssh-server.service.j2 b/templates/tmate-ssh-server.service.j2 new file mode 100644 index 0000000..0e38427 --- /dev/null +++ b/templates/tmate-ssh-server.service.j2 @@ -0,0 +1,17 @@ +[Unit] +Description=Docker instance to serve tmate-ssh-server. +After=network.target + +[Service] +User=ubuntu +Group=docker +WorkingDirectory={{ WORKDIR }} +# run as root to allow reading from /keys dir +ExecStart=docker run --user root \ + --net=host --cap-add SYS_ADMIN -v {{ KEYS_DIR }}:/keys \ + --entrypoint=/srv/tmate-ssh-server/tmate-ssh-server \ + --env SSH_KEYS_PATH=/keys ghcr.io/canonical/tmate-ssh-server:0.1.1 \ + -A -h {{ ADDRESS }} -p {{ PORT }} -k /keys + +[Install] +WantedBy=multi-user.target diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..887614c --- /dev/null +++ b/tests/integration/__init__.py @@ -0,0 +1,4 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Integration tests module.""" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..3f022ef --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,108 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Fixtures for tmate-ssh-server charm integration tests.""" +import logging +import typing +from pathlib import Path + +import pytest +import pytest_asyncio +from juju.action import Action +from juju.application import Application +from juju.client._definitions import DetailedStatus, FullStatus, MachineStatus +from juju.machine import Machine +from juju.model import Model +from juju.unit import Unit +from pytest_operator.plugin import OpsTest + +from .helpers import wait_for + +logger = logging.getLogger(__name__) + + +@pytest.fixture(scope="module", name="model") +def model_fixture(ops_test: OpsTest) -> Model: + """Juju model used in the test.""" + assert ops_test.model is not None + return ops_test.model + + +@pytest_asyncio.fixture(scope="module", name="charm") +async def charm_fixture(request: pytest.FixtureRequest, ops_test: OpsTest) -> str: + """The path to charm.""" + charm = request.config.getoption("--charm-file") + if not charm: + charm = await ops_test.build_charm(".") + else: + charm = f"./{charm}" + + return charm + + +@pytest_asyncio.fixture(scope="module", name="tmate_ssh_server") +async def tmate_ssh_server_fixture(model: Model, charm: str): + """The tmate-ssh-server application fixture.""" + app = await model.deploy(charm) + await model.wait_for_idle(apps=[app.name], wait_for_active=True) + return app + + +@pytest.fixture(scope="module", name="unit") +def unit_fixture(tmate_ssh_server: Application): + """The tmate-ssh-server unit.""" + unit: Unit = next(iter(tmate_ssh_server.units)) + return unit + + +@pytest_asyncio.fixture(scope="module", name="tmate_config") +async def tmate_config_fixture(unit: Unit): + """The .tmate.conf contents.""" + action: Action = await unit.run_action("get-server-config") + await action.wait() + assert ( + action.status == "completed" + ), f"Get server-config action failed, status: {action.status}" + config = action.results["tmate-config"] + return config + + +@pytest.fixture(scope="module", name="pub_key") +def pub_key_fixture(): + """The id_rsa public key fixture to use for ssh authorization.""" + pub_key_path = Path(Path.home() / ".ssh/id_rsa.pub") + return pub_key_path.read_text(encoding="utf-8") + + +@pytest_asyncio.fixture(scope="module", name="tmate_machine") +async def ssh_machine_fixture(model: Model, ops_test: OpsTest): + """A machine to test tmate ssh connection.""" + machine: Machine = await model.add_machine() + + async def wait_machine(): + """Wait for machine to be in running status. + + Returns: + True if the machine is running, False otherwise. + """ + status: FullStatus = await model.get_status() + machine_status: MachineStatus = status.machines[machine.entity_id] + assert machine_status, f"Failed to get machine {machine.entity_id}" + # mypy incorrectly assumes dict[Any, Any] | DetailedStatus. + instance_status = typing.cast(DetailedStatus, machine_status.instance_status) + logger.info("Waiting for machine to be running... %s", instance_status.status) + return instance_status.status == "running" + + await wait_for(wait_machine, timeout=60 * 5) + + logger.info("Running update.") + (retcode, _, stderr) = await ops_test.juju("ssh", str(machine.entity_id), "sudo apt update -y") + assert retcode == 0, f"Failed to run apt update, {stderr}" + logger.info("Installing tmate.") + (retcode, stdout, stderr) = await ops_test.juju( + "ssh", + str(machine.entity_id), + "DEBIAN_FRONTEND=noninteractive sudo apt-get install -y tmate", + ) + assert retcode == 0, f"Failed to run apt install, {stdout} {stderr}" + return machine diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py new file mode 100644 index 0000000..3749392 --- /dev/null +++ b/tests/integration/helpers.py @@ -0,0 +1,47 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Helpers for tmate-ssh-server-operator charm integration tests.""" + +import inspect +import time +import typing + + +async def wait_for( + func: typing.Callable[[], typing.Union[typing.Awaitable, typing.Any]], + timeout: int = 300, + check_interval: int = 10, +) -> typing.Any: + """Wait for function execution to become truthy. + + Args: + func: A callback function to wait to return a truthy value. + timeout: Time in seconds to wait for function result to become truthy. + check_interval: Time in seconds to wait between ready checks. + + Raises: + TimeoutError: if the callback function did not return a truthy value within timeout. + + Returns: + The result of the function if any. + """ + deadline = time.time() + timeout + is_awaitable = inspect.iscoroutinefunction(func) + while time.time() < deadline: + if is_awaitable: + if result := await func(): + return result + else: + if result := func(): + return result + time.sleep(check_interval) + + # final check before raising TimeoutError. + if is_awaitable: + if result := await func(): + return result + else: + if result := func(): + return result + raise TimeoutError() diff --git a/tests/integration/pre_run_script.sh b/tests/integration/pre_run_script.sh new file mode 100755 index 0000000..c7a6625 --- /dev/null +++ b/tests/integration/pre_run_script.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +# Pre-run script for integration test operator-workflows action. +# https://github.com/canonical/operator-workflows/blob/main/.github/workflows/integration_test.yaml + +# The runner has ~/.ssh dir owned by Docker, change the ownership for ssh access. +sudo mkdir -p ~/.ssh +sudo chown -R runner:runner ~/.ssh +mkdir -p ~/.ssh && ssh-keygen -t rsa -f ~/.ssh/id_rsa -N "" diff --git a/tests/integration/requirements.txt b/tests/integration/requirements.txt new file mode 100644 index 0000000..b95ca9c --- /dev/null +++ b/tests/integration/requirements.txt @@ -0,0 +1 @@ +paramiko>3,<4 diff --git a/tests/integration/test_charm.py b/tests/integration/test_charm.py index bb56536..07b97f7 100644 --- a/tests/integration/test_charm.py +++ b/tests/integration/test_charm.py @@ -2,7 +2,91 @@ # See LICENSE file for licensing details. """Integration tests for tmate-ssh-server charm.""" +import logging +import secrets +from pathlib import Path +import paramiko +from juju.machine import Machine +from juju.unit import Unit +from pytest_operator.plugin import OpsTest -def test_charm(): - """Test charm.""" +from tmate import PORT + +logger = logging.getLogger(__name__) + + +async def test_ssh_connection( + ops_test: OpsTest, tmate_config: str, unit: Unit, tmate_machine: Machine, pub_key: str +): + """ + arrange: given a related github-runner charm and a tmate-ssh-server charm. + act: when ssh connection is requested. + assert: the connection is made successfully. + """ + temp_config_file_path = Path(f"./{secrets.token_hex(8)}") + temp_config_file_path.write_text(tmate_config, encoding="utf-8") + (retcode, stdout, stderr) = await ops_test.juju( + "scp", temp_config_file_path.name, f"{tmate_machine.entity_id}:~/.tmate.conf" + ) + assert retcode == 0, f"Failed to scp tmate conf file {stdout} {stderr}" + temp_config_file_path.unlink() + + await ops_test.juju( + "ssh", tmate_machine.entity_id, "--", f"echo '{pub_key}' >> ~/.ssh/authorized_keys" + ) + + logger.info("Starting tmate session") + (retcode, stdout, stderr) = await ops_test.juju( + "ssh", + tmate_machine.entity_id, + "--", + "tmate -a ~/.ssh/authorized_keys -S /tmp/tmate.sock new-session -d", + ) + assert retcode == 0, f"Error running ssh display command, {stdout}, {stderr}" + logger.info("New session created %s %s %s", retcode, stdout, stderr) + (retcode, stdout, stderr) = await ops_test.juju( + "ssh", tmate_machine.entity_id, "--", "tmate -S /tmp/tmate.sock wait tmate-ready" + ) + assert retcode == 0, f"Error running ssh display command, {stdout}, {stderr}" + logger.info("Tmate ready %s %s %s", retcode, stdout, stderr) + (retcode, stdout, stderr) = await ops_test.juju( + "ssh", tmate_machine.entity_id, "--", "tmate -S /tmp/tmate.sock display -p '#{tmate_ssh}'" + ) + assert retcode == 0, f"Error running ssh display command, {stdout}, {stderr}" + logger.info("Tmate connection output: %s %s %s", retcode, stdout, stderr) + + token = stdout.split(" ")[2].split("@")[0] + client = paramiko.SSHClient() + unit_ip = await unit.get_public_address() + # trust missing host key for testing purposes only. + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # nosec + logger.info("Connecting to created ssh session, %s %s %s", unit_ip, PORT, token) + client.connect( + unit_ip, + PORT, + token, + compress=True, + allow_agent=False, + key_filename=f"{Path.home()}/.ssh/id_rsa.pub", + ) + transport = client.get_transport() + assert transport, "Transport wasn't initialized." + session = transport.open_session() + session.get_pty() + session.invoke_shell() + stdout = session.recv(10000) + logger.info("Shell stdout: %s", str(stdout)) + # The send expects bytes type but the docstrings want str type (bytes type doesn't work). + session.send("q\n") # type: ignore + stdout = session.recv(10000) + logger.info("Shell stdout: %s", str(stdout)) + session.send("echo test > ~/test.txt && cat ~/test.txt\n") # type: ignore + stdout = session.recv(10000) + logger.info("Shell stdout: %s", str(stdout)) + (retcode, stdout, stderr) = await ops_test.juju( + "ssh", tmate_machine.entity_id, "cat ~/test.txt" + ) + + assert retcode == 0, f"Error running ssh command, {stdout}, {stderr}" + assert "test" in stdout, f"Failed to write with ssh command, {stdout}" diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..3b60682 --- /dev/null +++ b/tests/unit/__init__.py @@ -0,0 +1,4 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Unit tests module.""" diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py new file mode 100644 index 0000000..7ccfd87 --- /dev/null +++ b/tests/unit/conftest.py @@ -0,0 +1,47 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Fixtures for tmate-ssh-server-operator charm unit tests.""" + +from unittest.mock import MagicMock + +import pytest +from ops.testing import Harness + +import tmate +from charm import TmateSSHServerOperatorCharm +from tmate import Fingerprints + + +@pytest.fixture(scope="function", name="harness") +def harness_fixture(): + """Enable ops test framework harness.""" + harness = Harness(TmateSSHServerOperatorCharm) + # The charm code `binding.network.bind_address` for getting unit ip address will fail without + # the add_network call. + harness.add_network("10.0.0.10") + yield harness + harness.cleanup() + + +@pytest.fixture(scope="function", name="charm") +def charm_fixture(harness: Harness): + """Harnessed TmateSSHServerOperator charm.""" + harness.begin() + return harness.charm + + +@pytest.fixture(scope="function", name="fingerprints") +def fingerprints_fixture(): + """Test fingerprint fixture.""" + return Fingerprints(rsa="rsa_fingerprint", ed25519="ed25519_fingerprint") + + +@pytest.fixture(scope="function", name="patch_get_fingerprints") +def patch_get_fingerprints_fixture(monkeypatch: pytest.MonkeyPatch, fingerprints: Fingerprints): + """Monkeypatch get_fingerprints function.""" + monkeypatch.setattr( + tmate, + "get_fingerprints", + MagicMock(spec=tmate.get_fingerprints, return_value=fingerprints), + ) diff --git a/tests/unit/factories.py b/tests/unit/factories.py new file mode 100644 index 0000000..7f4e7e8 --- /dev/null +++ b/tests/unit/factories.py @@ -0,0 +1,40 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""Factories for generating test data.""" + +# The factory definitions don't need public methods +# pylint: disable=too-few-public-methods + +from typing import Generic, TypeVar + +import factory + +from state import State + +T = TypeVar("T") + + +class BaseMetaFactory(Generic[T], factory.base.FactoryMetaClass): + """Used for type hints of factories.""" + + # No need for docstring because it is used for type hints + def __call__(cls, *args, **kwargs) -> T: # noqa: N805 + """Used for type hints of factories.""" # noqa: DCO020 + return super().__call__(*args, **kwargs) # noqa: DCO030 + + +# The attributes of these classes are generators for the attributes of the meta class +# mypy incorrectly believes the factories don't support metaclass +class StateFactory(factory.Factory, metaclass=BaseMetaFactory[State]): # type: ignore[misc] + # Docstrings have been abbreviated for factories, checking for docstrings on model attributes + # can be skipped. + """Generate PathInfos.""" # noqa: DCO060 + + class Meta: + """Configuration for factory.""" # noqa: DCO060 + + model = State + abstract = False + + ip_addr = factory.Faker("ipv4") diff --git a/tests/unit/requirements.txt b/tests/unit/requirements.txt new file mode 100644 index 0000000..db61a71 --- /dev/null +++ b/tests/unit/requirements.txt @@ -0,0 +1 @@ +factory_boy>=3,<4 diff --git a/tests/unit/test_actions.py b/tests/unit/test_actions.py new file mode 100644 index 0000000..9583c4a --- /dev/null +++ b/tests/unit/test_actions.py @@ -0,0 +1,74 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""tmate-ssh-server charm actions unit tests.""" + +from unittest.mock import MagicMock + +import ops +import pytest + +import tmate +from charm import TmateSSHServerOperatorCharm + +from .factories import StateFactory + + +def test_on_get_server_config_fail( + monkeypatch: pytest.MonkeyPatch, + charm: TmateSSHServerOperatorCharm, +): + """ + arrange: given a monkeypatched state.ip_addr that does not yet have a value. + act: when on_get_server_config is called. + assert: the event is failed. + """ + mock_state = StateFactory(ip_addr=None) + monkeypatch.setattr(charm.actions, "state", mock_state) + + mock_event = MagicMock(spec=ops.ActionEvent) + charm.actions.on_get_server_config(mock_event) + + mock_event.fail.assert_called_once() + + +def test_on_get_server_config_error( + monkeypatch: pytest.MonkeyPatch, + charm: TmateSSHServerOperatorCharm, +): + """ + arrange: given a monkeypatched tmate.generate_tmate_conf that raises an exception. + act: when on_get_server_config is called. + assert: the event is failed. + """ + monkeypatch.setattr( + tmate, + "generate_tmate_conf", + MagicMock(spec=tmate.generate_tmate_conf, side_effect=[tmate.FingerprintError]), + ) + + mock_event = MagicMock(spec=ops.ActionEvent) + charm.actions.on_get_server_config(mock_event) + + mock_event.fail.assert_called_once() + + +def test_on_get_server_config( + monkeypatch: pytest.MonkeyPatch, + charm: TmateSSHServerOperatorCharm, +): + """ + arrange: given a monkeypatched tmate.generate_tmate_conf that returns a tmate config. + act: when on_get_server_config is called. + assert: the event returns the tmate configuration values. + """ + monkeypatch.setattr( + tmate, + "generate_tmate_conf", + MagicMock(spec=tmate.generate_tmate_conf, return_value=(value := "test_config_value")), + ) + + mock_event = MagicMock(spec=ops.ActionEvent) + charm.actions.on_get_server_config(mock_event) + + mock_event.set_results.assert_called_once_with({"tmate-config": value}) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 469c6dc..346ad9f 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -3,9 +3,133 @@ """tmate-ssh-server charm unit tests.""" +from unittest.mock import MagicMock + +import ops +import pytest + +import tmate from charm import TmateSSHServerOperatorCharm +from state import State + +# Need access to protected functions for testing +# pylint: disable=protected-access + + +def test__on_install_dependencies_error( + monkeypatch: pytest.MonkeyPatch, + charm: TmateSSHServerOperatorCharm, +): + """ + arrange: given mocked tmate install_dependencies function that raises an exception. + act: when _on_install is called. + assert: exceptions are re-raised. + """ + mock_install_deps = MagicMock( + spec=tmate.install_dependencies, side_effect=[tmate.DependencySetupError] + ) + monkeypatch.setattr(tmate, "install_dependencies", mock_install_deps) + + with pytest.raises(tmate.DependencySetupError): + charm._on_install(MagicMock(spec=ops.InstallEvent)) + + +def test__on_install_keys_error( + monkeypatch: pytest.MonkeyPatch, + charm: TmateSSHServerOperatorCharm, +): + """ + arrange: given mocked tmate instakk_keys function that raises an exception. + act: when _on_install is called. + assert: exceptions are re-raised. + """ + mock_install_deps = MagicMock(spec=tmate.install_dependencies) + monkeypatch.setattr(tmate, "install_dependencies", mock_install_deps) + mock_install_deps = MagicMock(spec=tmate.install_keys, side_effect=[tmate.KeyInstallError]) + monkeypatch.setattr(tmate, "install_keys", mock_install_deps) + + with pytest.raises(tmate.KeyInstallError): + charm._on_install(MagicMock(spec=ops.InstallEvent)) + + +def test__on_install_daemon_error( + monkeypatch: pytest.MonkeyPatch, + charm: TmateSSHServerOperatorCharm, +): + """ + arrange: given mocked tmate start_daemon function that raises an exception. + act: when _on_install is called. + assert: exceptions are re-raised. + """ + mock_install_deps = MagicMock(spec=tmate.install_dependencies) + monkeypatch.setattr(tmate, "install_dependencies", mock_install_deps) + mock_install_keys = MagicMock(spec=tmate.install_keys) + monkeypatch.setattr(tmate, "install_keys", mock_install_keys) + mock_install_deps = MagicMock(spec=tmate.start_daemon, side_effect=[tmate.DaemonStartError]) + monkeypatch.setattr(tmate, "start_daemon", mock_install_deps) + + with pytest.raises(tmate.DaemonStartError): + charm._on_install(MagicMock(spec=ops.InstallEvent)) + + +def test__on_install_defer( + monkeypatch: pytest.MonkeyPatch, + charm: TmateSSHServerOperatorCharm, +): + """ + arrange: given a monkeypatched state.ip_addr that does not yet have a value. + act: when _on_install is called. + assert: the event is deferred. + """ + mock_install_deps = MagicMock(spec=tmate.install_dependencies) + monkeypatch.setattr(tmate, "install_dependencies", mock_install_deps) + mock_state = MagicMock(spec=State) + mock_state.ip_addr = None + monkeypatch.setattr(charm, "state", mock_state) + + mock_event = MagicMock(spec=ops.InstallEvent) + charm._on_install(mock_event) + + mock_event.defer.assert_called_once() + + +def test__on_install_error( + monkeypatch: pytest.MonkeyPatch, + charm: TmateSSHServerOperatorCharm, +): + """ + arrange: given a monkeypatched tmate.get_fingerprints that raises an error. + act: when _on_install is called. + assert: the charm raises an error. + """ + mock_install_deps = MagicMock(spec=tmate.install_dependencies) + monkeypatch.setattr(tmate, "install_dependencies", mock_install_deps) + monkeypatch.setattr(tmate, "install_keys", MagicMock()) + monkeypatch.setattr(tmate, "start_daemon", MagicMock()) + monkeypatch.setattr( + tmate, "get_fingerprints", MagicMock(side_effect=[tmate.IncompleteInitError]) + ) + + mock_event = MagicMock(spec=ops.InstallEvent) + with pytest.raises(tmate.IncompleteInitError): + charm._on_install(mock_event) + + +def test__on_install( + monkeypatch: pytest.MonkeyPatch, + charm: TmateSSHServerOperatorCharm, +): + """ + arrange: given a monkeypatched tmate installation function calls. + act: when _on_install is called. + assert: the unit is in active status. + """ + monkeypatch.setattr(tmate, "install_dependencies", MagicMock(spec=tmate.install_dependencies)) + monkeypatch.setattr(tmate, "install_keys", MagicMock(spec=tmate.install_keys)) + monkeypatch.setattr(tmate, "start_daemon", MagicMock(spec=tmate.start_daemon)) + monkeypatch.setattr(tmate, "get_fingerprints", MagicMock(spec=tmate.get_fingerprints)) + mock_event = MagicMock(spec=ops.InstallEvent) + charm._on_install(mock_event) -def test_charm(): - """Initial test charm function.""" - TmateSSHServerOperatorCharm() + assert charm.unit.status.name == "active" diff --git a/tests/unit/test_sshdebug.py b/tests/unit/test_sshdebug.py new file mode 100644 index 0000000..95c288f --- /dev/null +++ b/tests/unit/test_sshdebug.py @@ -0,0 +1,95 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""tmate-ssh-server charm sshdebug module unit tests.""" + +from unittest.mock import MagicMock + +import ops +import pytest +from ops.testing import Harness + +import tmate +from charm import TmateSSHServerOperatorCharm +from ssh_debug import DEBUG_SSH_INTEGRATION_NAME + +from .factories import StateFactory + +# Need access to protected functions for testing +# pylint: disable=protected-access + + +def test_update_relation_data_no_relations( + monkeypatch: pytest.MonkeyPatch, + harness: Harness, + fingerprints: tmate.Fingerprints, +): + """ + arrange: given debug_ssh integration. + act: when update_relation_data is called. + assert: relation data is correctly updated. + """ + monkeypatch.setattr( + tmate, + "get_fingerprints", + MagicMock(spec=tmate.get_fingerprints, return_value=fingerprints), + ) + relation_id = harness.add_relation(DEBUG_SSH_INTEGRATION_NAME, "github_runner") + harness.add_relation_unit(relation_id, "github_runner/0") + harness.begin() + + charm: TmateSSHServerOperatorCharm = harness.charm + charm.sshdebug.update_relation_data("host", fingerprints) + + relation_data = harness.get_relation_data(relation_id, charm.unit) + assert relation_data == { + "host": "host", + "port": str(tmate.PORT), + "rsa_fingerprint": fingerprints.rsa, + "ed25519_fingerprint": fingerprints.ed25519, + } + + +def test__on_ssh_debug_relation_joined_error( + monkeypatch: pytest.MonkeyPatch, + charm: TmateSSHServerOperatorCharm, +): + """ + arrange: given a monkeypatched tmate.get_fingerprints that raises a IncompleteInitError. + act: when _on_ssh_debug_relation_joined is called. + assert: the charm raises an error. + """ + monkeypatch.setattr( + tmate, + "get_fingerprints", + MagicMock(spec=tmate.get_fingerprints, side_effect=[tmate.IncompleteInitError]), + ) + + mock_event = MagicMock(spec=ops.RelationJoinedEvent) + + with pytest.raises(tmate.IncompleteInitError): + charm.sshdebug._on_ssh_debug_relation_joined(mock_event) + + +@pytest.mark.usefixtures("patch_get_fingerprints") +def test__on_ssh_debug_relation_joined( + monkeypatch: pytest.MonkeyPatch, + charm: TmateSSHServerOperatorCharm, + fingerprints: tmate.Fingerprints, +): + """ + arrange: given a monkeypatched get_fingerprints returning fingerprint data and state. + act: when _on_ssh_debug_relation_joined is called. + assert: the relation data is updatedd. + """ + mock_state = StateFactory() + monkeypatch.setattr(charm.sshdebug, "state", mock_state) + mock_update_relation_data = MagicMock() + monkeypatch.setattr(charm.sshdebug, "update_relation_data", mock_update_relation_data) + + mock_event = MagicMock(spec=ops.RelationJoinedEvent) + charm.sshdebug._on_ssh_debug_relation_joined(mock_event) + + mock_update_relation_data.assert_called_once_with( + host=mock_state.ip_addr, fingerprints=fingerprints + ) diff --git a/tests/unit/test_state.py b/tests/unit/test_state.py new file mode 100644 index 0000000..fe826d5 --- /dev/null +++ b/tests/unit/test_state.py @@ -0,0 +1,39 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""tmate-ssh-server charm state unit tests.""" + +from unittest.mock import MagicMock + +import ops +import pytest + +import state +from state import State + + +def test_invalid_bind_address(): + """ + arrange: given mocked juju network.bind_address. + act: when the state is initialized. + assert: InvalidCharmStateError is raised. + """ + mock_binding = MagicMock(spec=ops.Binding) + mock_binding.network.bind_address = "invalid_address" + mock_charm = MagicMock(spec=ops.CharmBase) + mock_charm.model.get_binding.return_value = mock_binding + + with pytest.raises(state.InvalidCharmStateError): + State.from_charm(mock_charm) + + +def test_bind_address_not_ready(): + """ + arrange: given mocked juju model get_binding that isn't ready. + act: when the state is initialized. + assert: ip_addr is None. + """ + mock_charm = MagicMock(spec=ops.CharmBase) + mock_charm.model.get_binding.return_value = None + + assert not State.from_charm(mock_charm).ip_addr diff --git a/tests/unit/test_tmate.py b/tests/unit/test_tmate.py new file mode 100644 index 0000000..3d3b215 --- /dev/null +++ b/tests/unit/test_tmate.py @@ -0,0 +1,251 @@ +# Copyright 2024 Canonical Ltd. +# See LICENSE file for licensing details. + +"""tmate-ssh-server charm tmate module unit tests.""" + +import textwrap +from pathlib import Path +from unittest.mock import MagicMock + +import pytest +from charms.operator_libs_linux.v0 import apt + +import tmate + +# Need access to protected functions for testing +# pylint: disable=protected-access + + +@pytest.mark.parametrize( + "exception", + [ + pytest.param(apt.PackageNotFoundError, id="package not found"), + pytest.param(apt.PackageError, id="package error"), + ], +) +def test_install_dependencies_apt_error( + exception: type[Exception], monkeypatch: pytest.MonkeyPatch +): + """ + arrange: given a monkeypatched apt module that raises an exception. + act: when install_dependencies is called. + assert: DependencyInstallError is raised. + """ + monkeypatch.setattr(tmate.apt, "update", MagicMock(spec=apt.update)) + monkeypatch.setattr( + tmate.apt, "add_package", MagicMock(spec=apt.add_package, side_effect=[exception]) + ) + + with pytest.raises(tmate.DependencySetupError): + tmate.install_dependencies() + + +def test_install_dependencies_add_user_to_group_error(monkeypatch: pytest.MonkeyPatch): + """ + arrange: given a monkeypatched passwd module that raises an exception. + act: when install_dependencies is called. + assert: DependencyInstallError is raised. + """ + monkeypatch.setattr(tmate.apt, "update", MagicMock(spec=apt.update)) + monkeypatch.setattr(tmate.apt, "add_package", MagicMock(spec=apt.add_package)) + monkeypatch.setattr(tmate.passwd, "add_group", MagicMock(spec=tmate.passwd.add_group)) + monkeypatch.setattr(tmate.passwd, "add_user_to_group", MagicMock(side_effect=[ValueError])) + + with pytest.raises(tmate.DependencySetupError): + tmate.install_dependencies() + + +def test_install_keys_error(monkeypatch: pytest.MonkeyPatch): + """ + arrange: given a monkeypatched subprocess call that raises CalledProcessError. + act: when install_keys is called. + assert: KeyInstallError is raised. + """ + monkeypatch.setattr(tmate, "WORK_DIR", MagicMock(spec=Path)) + monkeypatch.setattr(tmate, "KEYS_DIR", MagicMock(spec=Path)) + monkeypatch.setattr(tmate, "CREATE_KEYS_SCRIPT_PATH", MagicMock(spec=Path)) + monkeypatch.setattr( + tmate.subprocess, + "check_call", + MagicMock( + spec=tmate.subprocess.check_call, + side_effect=[tmate.subprocess.CalledProcessError(returncode=1, cmd="test")], + ), + ) + + with pytest.raises(tmate.KeyInstallError): + tmate.install_keys(MagicMock()) + + +def test_install_keys(monkeypatch: pytest.MonkeyPatch): + """ + arrange: given a monkeypatched subprocess call. + act: when install_keys is called. + assert: KeyInstallError is raised. + """ + monkeypatch.setattr(tmate, "WORK_DIR", MagicMock(spec=Path)) + monkeypatch.setattr(tmate, "KEYS_DIR", MagicMock(spec=Path)) + monkeypatch.setattr(tmate, "CREATE_KEYS_SCRIPT_PATH", MagicMock(spec=Path)) + monkeypatch.setattr( + tmate.subprocess, + "check_call", + MagicMock(spec=tmate.subprocess.check_call), + ) + + tmate.install_keys(MagicMock()) + + +def test_start_daemon_daemon_reload_error(monkeypatch: pytest.MonkeyPatch): + """ + arrange: given a monkeypatched subprocess call that raises CalledProcessError. + act: when start_daemon is called. + assert: DaemonStartError is raised. + """ + monkeypatch.setattr(tmate, "WORK_DIR", MagicMock(spec=Path)) + monkeypatch.setattr(tmate, "KEYS_DIR", MagicMock(spec=Path)) + monkeypatch.setattr(tmate, "CREATE_KEYS_SCRIPT_PATH", MagicMock(spec=Path)) + monkeypatch.setattr(tmate, "TMATE_SSH_SERVER_SERVICE_PATH", MagicMock(spec=Path)) + monkeypatch.setattr( + tmate.systemd, + "daemon_reload", + MagicMock( + spec=tmate.systemd.daemon_reload, + side_effect=[ + tmate.systemd.SystemdError, + ], + ), + ) + + with pytest.raises(tmate.DaemonStartError): + tmate.start_daemon(address="test") + + +def test_start_daemon_service_start_error(monkeypatch: pytest.MonkeyPatch): + """ + arrange: given a monkeypatched subprocess call that raises CalledProcessError. + act: when start_daemon is called. + assert: DaemonStartError is raised. + """ + monkeypatch.setattr(tmate, "WORK_DIR", MagicMock(spec=Path)) + monkeypatch.setattr(tmate, "KEYS_DIR", MagicMock(spec=Path)) + monkeypatch.setattr(tmate, "CREATE_KEYS_SCRIPT_PATH", MagicMock(spec=Path)) + monkeypatch.setattr(tmate, "TMATE_SSH_SERVER_SERVICE_PATH", MagicMock(spec=Path)) + monkeypatch.setattr( + tmate.systemd, "daemon_reload", MagicMock(spec=tmate.systemd.daemon_reload) + ) + monkeypatch.setattr( + tmate.systemd, + "service_start", + MagicMock( + spec=tmate.systemd.service_start, + side_effect=[ + tmate.systemd.SystemdError, + ], + ), + ) + + with pytest.raises(tmate.DaemonStartError): + tmate.start_daemon(address="test") + + +def test__calculat_fingerprint(): + """ + arrange: given a test fingerprint data. + act: when _calculate_fingerprint is called. + assert: correct fingerprint data is returned. + """ + test_b64_pub_key_data = "AAAAB3NzaC1yc2EAAAADAQABAAABgQDt5qyv585y8lKFoirTyexOR9YwMSzihoDG/N6mi\ + FzHv/22Fd/6NN96Xymf8HGoUdR6KhUZ3SQRwUmmPRb2eASaOBvDzDdSSzWT6N2DuW31WXw/Kw1DUXZ6AWyAH5O3Y5kvmD\ + 7prT3QGVgOKtm9Cy/EeXzNdbiK6sTbfER2k6KZpjdz/onA0iovd7N2SrxZwSfvhZ6sTpD//WDTmN/bV+W+6/d3zNYwak4\ + mNPRNTC1hcjBryOMYJ2Q0MnjAtWf7MKU1IvNYiWUZlPKVBlPuDxML/4kSf5xbC/qG2EIyYsywHErfThX2sOZuU2gc+4+1\ + mb1YZpEpPDGLN/l4Er2gtQaW8qes6JozuGmjU6+ZZt7sLqYrBSChJbHlDPDNee9mjMRVPXtppqzpmpZsYR7N7PoRC+KLe\ + K/4OQKLtHSYxKVCf4dGaDvgxsoG4AyECE7is3bMlkc87GxhV0IEb1A1iZ3ycAxIrmP9G5g2Nao/OL9G4zVW9AY4Lg4M4k\ + H26zctvb0=" + + assert ( + tmate._calculate_fingerprint(test_b64_pub_key_data) + == "uW23WW14JnjeVLUg4kWvbhWptvjAbODK2d4jJmnQyqI" + ) + + +def test_get_fingerprints_incomplete_init_error(monkeypatch: pytest.MonkeyPatch): + """ + arrange: given a monkeypatched paths that don't exist. + act: when get_fingerprints is called. + assert: IncompleteInitError is raised. + """ + mock_path = MagicMock(spec=Path) + mock_path.exists.return_value = False + monkeypatch.setattr(tmate, "KEYS_DIR", mock_path) + + with pytest.raises(tmate.IncompleteInitError): + tmate.get_fingerprints() + + +def test_get_fingerprints(monkeypatch: pytest.MonkeyPatch): + """ + arrange: given a monkeypatched subprocess.check_output calls. + act: when get_fingerprints is called. + assert: Correct fingerprint data is returned. + """ + monkeypatch.setattr(tmate, "KEYS_DIR", MagicMock(spec=Path)) + monkeypatch.setattr(tmate, "RSA_PUB_KEY_PATH", MagicMock(spec=Path)) + monkeypatch.setattr(tmate, "ED25519_PUB_KEY_PATH", MagicMock(spec=Path)) + monkeypatch.setattr( + tmate, + "_calculate_fingerprint", + MagicMock( + spec=tmate._calculate_fingerprint, + side_effect=[(rsa_fingerprint := "rsa"), (ed25519_fingerprint := "ed25519")], + ), + ) + + assert ( + tmate.Fingerprints( + rsa=f"SHA256:{rsa_fingerprint}", ed25519=f"SHA256:{ed25519_fingerprint}" + ) + == tmate.get_fingerprints() + ) + + +@pytest.mark.parametrize( + "exception", + [ + pytest.param(tmate.IncompleteInitError, id="incomplete init"), + pytest.param(tmate.KeyInstallError, id="key install error"), + ], +) +def test_generate_tmate_conf_error(monkeypatch: pytest.MonkeyPatch, exception: type[Exception]): + """ + arrange: given a monkeypatched get_fingerprints that raises exceptions. + act: when generate_tmate_conf is called. + assert: FingerPrintError is raised. + """ + monkeypatch.setattr( + tmate, "get_fingerprints", MagicMock(spec=tmate.get_fingerprints, side_effect=[exception]) + ) + + with pytest.raises(tmate.FingerprintError): + tmate.generate_tmate_conf(MagicMock()) + + +@pytest.mark.usefixtures("patch_get_fingerprints") +def test_generate_tmate_conf(fingerprints: tmate.Fingerprints): + """ + arrange: given a monkeypatched get_fingerprints that returns mock fingerprint data. + act: when generate_tmate_conf is called. + assert: a tmate.conf file contents are generated. + """ + host = "test_host_value" + + assert ( + textwrap.dedent( + f""" + set -g tmate-server-host {host} + set -g tmate-server-port {tmate.PORT} + set -g tmate-server-rsa-fingerprint {fingerprints.rsa} + set -g tmate-server-ed25519-fingerprint {fingerprints.ed25519} + """ + ) + == tmate.generate_tmate_conf(host) + ) diff --git a/tox.ini b/tox.ini index b9bd642..2291452 100644 --- a/tox.ini +++ b/tox.ini @@ -43,6 +43,7 @@ commands = description = Check code against coding style standards deps = -r{toxinidir}/requirements.txt + -r{[vars]tst_path}unit/requirements.txt black flake8<6.0.0 flake8-docstrings>=1.6 @@ -63,6 +64,7 @@ deps = pytest_operator types-requests types-PyYAML + types-paramiko # used for integration test pytest_asyncio pydocstyle>=2.10 commands = @@ -84,6 +86,7 @@ deps = pytest coverage[toml] -r{toxinidir}/requirements.txt + -r{[vars]tst_path}unit/requirements.txt commands = coverage run --source={[vars]src_path} \ -m pytest --ignore={[vars]tst_path}integration -v --tb native -s {posargs}