diff --git a/.bumpversion.cfg b/.bumpversion.cfg index af6f5b7..4e92cbf 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 1.0.8 +current_version = 1.0.9 commit = True tag = False diff --git a/.gitignore b/.gitignore index 660ff39..aefeb55 100644 --- a/.gitignore +++ b/.gitignore @@ -134,3 +134,4 @@ $RECYCLE.BIN/ *.sw? scripts +*/certs/ diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ca644ce..dd805b9 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -37,7 +37,7 @@ repos: hooks: - id: yaml-format name: yaml-format - entry: python scripts/format_yaml/main.py + entry: python format_yaml/main.py language: system types: [yaml] files: \.(yaml)$ @@ -53,28 +53,35 @@ repos: # # TODO: Solve error # - id: yml-format # name: yml-format - # entry: python scripts/format_yml/main.py + # entry: python format_yml/main.py # language: system # types: [yaml] # files: \.(yml)$ # exclude: '.github/.*' - id: docker-compose name: docker-compose - entry: python scripts/validate_docker_compose/main.py + entry: python validate_docker_compose/main.py language: system types: [yaml] files: ^docker-compose(\.dev|\.prod)?\.yml$ - id: commit-msg-version-check name: commit-msg-version-check - entry: python scripts/commit_msg_version_bump/main.py + entry: python commit_msg_version_bump/main.py always_run: true language: system args: [.git/COMMIT_EDITMSG] stages: [pre-push] + - id: bump-year + name: bump-year + entry: python bump_year/main.py + always_run: true + pass_filenames: false + language: system - id: generate-changelog name: generate-changelog - entry: python scripts/generate_changelog/main.py + entry: python generate_changelog/main.py always_run: true + pass_filenames: false language: system - repo: https://github.com/pre-commit/mirrors-prettier rev: v4.0.0-alpha.8 diff --git a/.pylintrc b/.pylintrc index be04b8d..337f65b 100644 --- a/.pylintrc +++ b/.pylintrc @@ -39,7 +39,7 @@ extension-pkg-whitelist= fail-on= # Specify a score threshold under which the program will exit with error. -fail-under=10 +fail-under=8 # Interpret the stdin as a python script, whose filename needs to be passed as # the module_or_package argument. @@ -402,7 +402,7 @@ preferred-modules= # The type of string formatting that logging methods do. `old` means using % # formatting, `new` is for `{}` formatting. -logging-format-style=old +logging-format-style=new # Logging modules to check that the string format arguments are in logging # function parameter format. diff --git a/CHANGELOG.md b/CHANGELOG.md index 785c7a9..85ea3fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,15 +1,78 @@ -## [1.0.0] - 2024-10-23 +## [1.0.8] - 2024-10-27 + +### Styles + +- **core**: fixed confs (`patch candidate`) ### Other Changes -- rebased scripts -- feat: added scripts -- Initial commit +- from prod - from test - from dev - Bump version: 1.0.7 → 1.0.8 into test (#11)Automatically created pull request for release v1.0.8-dev into testbranch. into prod (#12)Automatically created pull request for release v1.0.8-test into prodbranch. into main (#13) +- from test - from dev - Bump version: 1.0.7 → 1.0.8 into test (#11)Automatically created pull request for release v1.0.8-dev into testbranch. into prod (#12) +- from dev - Bump version: 1.0.7 → 1.0.8 into test (#11) +- Bump version: 1.0.7 → 1.0.8 +- Update CNAME +- Update CNAME +- Create CNAME + +## [1.0.7] - 2024-10-27 + +### Styles + +- **core**: fixed confs (`patch candidate`) +- **core**: fixed confs (`patch candidate`) +- **core**: fixed confs (`patch candidate`) +- **core**: fixed confs (`patch candidate`) + +### Other Changes + +- from prod - from test - from dev - Bump version: 1.0.6 → 1.0.7 into test (#8)Automatically created pull request for release v1.0.7-dev into testbranch. into prod (#9)Automatically created pull request for release v1.0.7-test into prodbranch. into main (#10) +- from test - from dev - Bump version: 1.0.6 → 1.0.7 into test (#8)Automatically created pull request for release v1.0.7-dev into testbranch. into prod (#9) +- from dev - Bump version: 1.0.6 → 1.0.7 into test (#8) +- Bump version: 1.0.6 → 1.0.7 +- Bump version: 1.0.5 → 1.0.6 +- Bump version: 1.0.4 → 1.0.5 +- Bump version: 1.0.3 → 1.0.4 + +## [1.0.3] - 2024-10-27 + +### Styles -## [1.0.0] - 2024-10-23 +- **core**: added readme file (`patch candidate`) +- **core**: added readme file (`patch candidate`) ### Other Changes +- from prod - from test - from dev - Bump version: 1.0.2 → 1.0.3 into test (#5)Automatically created pull request for release v1.0.3-dev into testbranch. into prod (#6)Automatically created pull request for release v1.0.3-test into prodbranch. into main (#7) +- from test - from dev - Bump version: 1.0.2 → 1.0.3 into test (#5)Automatically created pull request for release v1.0.3-dev into testbranch. into prod (#6) +- from dev - Bump version: 1.0.2 → 1.0.3 into test (#5) +- Bump version: 1.0.2 → 1.0.3 +- Bump version: 1.0.1 → 1.0.2 +- style(core): added readme file + +## [1.0.1] - 2024-10-27 + +### Features + +- **core**: add cicd flows (`patch candidate`) + +### Other Changes + +- from prod - from test - from dev - Bump version: 1.0.0 → 1.0.1 into test (#1)Automatically created pull request for release v1.0.1-dev into testbranch. into prod (#2)Automatically created pull request for release v1.0.1-test into prodbranch. into main (#3) +- from test - from dev - Bump version: 1.0.0 → 1.0.1 into test (#1)Automatically created pull request for release v1.0.1-dev into testbranch. into prod (#2) +- from dev - Bump version: 1.0.0 → 1.0.1 into test (#1) +- Bump version: 1.0.0 → 1.0.1 +- feat(core): fix cicd flows +- feat(core): added cicd flows - rebased scripts - feat: added scripts - Initial commit + +## [Unreleased] - 2024-10-27 + +### Chores + +- **style**: fixed bump year flow (`patch candidate`) + +### Other Changes + +- chore(core): added crypto controller diff --git a/INSTALL.md b/INSTALL.md index 55af1c5..3228e1d 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -1 +1,19 @@ -# Install +## 🔨 Installation + +1. Add the scripts repository as a submodule in your project: + + ```bash + git submodule add https://github.com/JuanVilla424/scripts.git + ``` + + or, using branch + + ```bash + git submodule add -b https://github.com/JuanVilla424/scripts.git + ``` + +2. Update the submodule when there are changes: + + ```bash + git submodule update --remote --merge + ``` diff --git a/LICENSE b/LICENSE index f288702..3a86228 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ - GNU GENERAL PUBLIC LICENSE +GNU GENERAL PUBLIC LICENSE Version 3, 29 June 2007 Copyright (C) 2007 Free Software Foundation, Inc. @@ -531,7 +531,7 @@ patent license (a) in connection with copies of the covered work conveyed by you (or copies made from those copies), or (b) primarily for and in connection with specific products or compilations that contain the covered work, unless you entered into that arrangement, -or that patent license was granted, prior to 28 March 2007. +or that patent license was granted, prior to 28 March 2024. Nothing in this License shall be construed as excluding or limiting any implied license or other defenses to infringement that may @@ -652,7 +652,7 @@ Also add information on how to contact you by electronic and paper mail. If the program does terminal interaction, make it output a short notice like this when it starts in an interactive mode: - Copyright (C) + scripts Copyright (C) 2024 Na0nh This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'. This is free software, and you are welcome to redistribute it under certain conditions; type `show c' for details. diff --git a/bump_year/main.py b/bump_year/main.py index 1138b1a..f58c5f9 100644 --- a/bump_year/main.py +++ b/bump_year/main.py @@ -4,64 +4,191 @@ A script to bump the year part of the version in pyproject.toml. Resets major and minor versions to 0 when the year is incremented. +Additionally, updates the footers of specified Markdown files that contain the year. Usage: - bump_year.py + python bump_year.py + python bump_year.py --md-files README.md CONTRIBUTING.md + python bump_year.py --md-dir docs/ """ +import argparse +import logging import datetime -import toml -import sys +import os +import re +from typing import List +from logging.handlers import RotatingFileHandler +# Initialize the logger +logger = logging.getLogger(__name__) -def bump_year() -> None: + +def parse_arguments() -> argparse.Namespace: """ - Bumps the year in pyproject.toml and resets major and minor versions to 0. + Parses command-line arguments. + + Returns: + argparse.Namespace: Parsed arguments. """ - current_year = datetime.datetime.now().year - pyproject_path = "pyproject.toml" - - try: - with open(pyproject_path, "r", encoding="utf-8") as file: - data = toml.load(file) - except FileNotFoundError: - print(f"Error: {pyproject_path} not found.") - sys.exit(1) - except toml.TomlDecodeError: - print(f"Error: Failed to parse {pyproject_path}.") - sys.exit(1) - - try: - version = data["tool"]["poetry"]["version"] - year, major, minor = version.split(".") - except (KeyError, ValueError): - print("Error: Version format is incorrect in pyproject.toml.") - sys.exit(1) - - if int(year) < current_year: - print(f"Updating year from {year} to {current_year}") - year = str(current_year) - major = "0" - minor = "0" - new_version = f"{year}.{major}.{minor}" - data["tool"]["poetry"]["version"] = new_version + parser = argparse.ArgumentParser(description="Update Markdown footers containing the year.") + parser.add_argument( + "--md-files", + nargs="*", + default=[ + "README.md", + "CONTRIBUTING.md", + "SECURITY.md", + "CODE_OF_CONDUCT.md", + "VERSIONING.md", + "LICENSE", + ], + help="List of Markdown files to update footers. Example: --md-files README.md CONTRIBUTING.md", + ) + parser.add_argument( + "--md-dir", + type=str, + default=None, + help="Directory containing Markdown files to update footers. Example: --md-dir docs/", + ) + parser.add_argument( + "--log-level", + choices=["INFO", "DEBUG"], + default="INFO", + help="Set the logging level. Default is INFO.", + ) + return parser.parse_args() + + +def configure_logger(log_level: str) -> None: + """ + Configures logging for the script. + + Args: + log_level (str): Logging level as a string (e.g., 'INFO', 'DEBUG'). + """ + numeric_level = getattr(logging, log_level.upper(), None) + if not isinstance(numeric_level, int): + raise ValueError(f"Invalid log level: {log_level}") + + logger.setLevel(numeric_level) + + # Set up log rotation: max size 5MB, keep 5 backup files + file_handler = RotatingFileHandler("bump_year.log", maxBytes=5 * 1024 * 1024, backupCount=5) + console_handler = logging.StreamHandler() + + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + file_handler.setFormatter(formatter) + console_handler.setFormatter(formatter) + + logger.handlers.clear() + logger.addHandler(file_handler) + logger.addHandler(console_handler) + + +def update_markdown_footers(md_files: List[str], current_year: int) -> None: + """ + Updates the footer year in specified Markdown files. + + Args: + md_files (List[str]): List of Markdown file paths. + current_year (int): The current year. + """ + year_pattern = re.compile(r"(\b20\d{2}\b)") # Matches years from 2000 to 2099 + + for md_file in md_files: + if not os.path.isfile(md_file): + logger.warning(f"{md_file} does not exist. Skipping.") + continue + try: - with open(pyproject_path, "w", encoding="utf-8") as file: - toml.dump(data, file) - print(f"Year bumped to {new_version}") + with open(md_file, "r", encoding="utf-8") as file: + content = file.read() except Exception as e: - print(f"Error writing to {pyproject_path}: {e}") - sys.exit(1) - else: - print("Year is up-to-date. No need to bump.") + logger.error(f"Error reading {md_file}: {e}") + continue + + logger.debug(f"Current year: {current_year}") + + # Find all years in the content + years_found = year_pattern.findall(content) + if not years_found: + logger.debug(f"No years found in {md_file}. Skipping.") + continue + + for year in years_found: + logger.debug(f"Updating footer for year: {year} in {md_file}") + if int(year) == int(current_year): + logger.debug(f"No years to update found in {md_file}. Skipping.") + continue + + # Replace the last occurrence of a year in the footer + # Assumption: The footer is at the end of the file + lines = content.strip().split("\n") + footer_updated = False + + for i in range(len(lines) - 1, -1, -1): + line = lines[i] + if year_pattern.search(line) and line.__contains__(str(int(current_year) - 1)): + new_line = year_pattern.sub(str(current_year), line, count=1) + lines[i] = new_line + footer_updated = True + logger.info(f"Updated year in {md_file}: '{line}' -> '{new_line}'") + break + + if footer_updated: + new_content = "\n".join(lines) + "\n" + try: + with open(md_file, "w", encoding="utf-8") as file: + file.write(new_content) + logger.info(f"Successfully updated {md_file}.") + except Exception as e: + logger.error(f"Error writing to {md_file}: {e}") + else: + logger.warning(f"No footer with a year to update found in {md_file}. Skipping.") + + +def collect_markdown_files(md_files: List[str], md_dir: str = None) -> List[str]: + """ + Collects Markdown files from specified files and/or directory. + + Args: + md_files (List[str]): List of Markdown file paths. + md_dir (str, optional): Directory to search for Markdown files. + + Returns: + List[str]: Combined list of Markdown file paths. + """ + collected_files = set(md_files) + + if md_dir: + if not os.path.isdir(md_dir): + logger.warning(f"Directory {md_dir} does not exist. Skipping.") + else: + for root, _, files in os.walk(md_dir): + for file in files: + if file.lower().endswith(".md"): + collected_files.add(os.path.join(root, file)) + + return list(collected_files) def main() -> None: """ - Main function to execute the year bumping process. + Main function to execute the year bumping and Markdown footers updating process. """ - bump_year() + current_year = datetime.datetime.now().year + + # Collect Markdown files to update + markdown_files = collect_markdown_files(args.md_files, args.md_dir) + + if markdown_files: + update_markdown_footers(markdown_files, current_year) + else: + logger.error("No Markdown files specified for footer update.") if __name__ == "__main__": + args = parse_arguments() + configure_logger(args.log_level) main() diff --git a/crypto_controller/main.py b/crypto_controller/main.py new file mode 100644 index 0000000..c2ae022 --- /dev/null +++ b/crypto_controller/main.py @@ -0,0 +1,620 @@ +import os +import sys +import argparse +import logging +from enum import verify +from logging.handlers import RotatingFileHandler +import shutil +import hashlib +import base64 +from collections import namedtuple +from datetime import datetime, timedelta +import requests +import warnings +import smtplib +from email.mime.text import MIMEText + +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives.asymmetric import rsa, padding +from cryptography.hazmat.primitives import serialization, hashes +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + +from dotenv import load_dotenv + +# Suppress warnings +warnings.filterwarnings("ignore") + +# Load environment variables from .env file +load_dotenv() + +# Resource usage thresholds +CPU_USAGE_THRESHOLD = float(os.getenv("CPU_USAGE_THRESHOLD", "70.0")) +MEMORY_USAGE_THRESHOLD = float(os.getenv("MEMORY_USAGE_THRESHOLD", "395.0")) +DISK_SPACE_THRESHOLD = float(os.getenv("DISK_SPACE_THRESHOLD", "75.0")) + +# Verify that required environment variables are set +REQUIRED_ENV_VARS = [ + CPU_USAGE_THRESHOLD, + MEMORY_USAGE_THRESHOLD, + DISK_SPACE_THRESHOLD, +] + +if not all(REQUIRED_ENV_VARS): + raise EnvironmentError("One or more required environment variables are missing.") + +# Configure logger +logger = logging.getLogger("__main__") + + +def configure_logger(log_level: str = "INFO") -> None: + """ + Configures the logger with rotating file handler and console handler. + + Args: + log_level (str): Logging level (INFO, DEBUG, etc.). + """ + numeric_level = getattr(logging, log_level.upper(), None) + if not isinstance(numeric_level, int): + raise ValueError(f"Invalid log level: {log_level}") + + logger.setLevel(numeric_level) + + # File handler with rotation + file_handler = RotatingFileHandler( + "crypto_controller.log", maxBytes=5 * 1024 * 1024, backupCount=5 + ) + # Console handler + console_handler = logging.StreamHandler() + + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + file_handler.setFormatter(formatter) + console_handler.setFormatter(formatter) + + logger.handlers.clear() + logger.addHandler(file_handler) + logger.addHandler(console_handler) + + +# Structure to store footprints +Footprint = namedtuple("Footprint", ["sha1", "sha256"]) + + +def get_key_footprint(key_file_path: str, key_type: str) -> Footprint: + """ + Generates SHA1 and SHA256 footprints for a given key file. + + Args: + key_file_path (str): Path to the key file. + key_type (str): Type of the key ('public' or 'private'). + + Returns: + Footprint: Namedtuple containing SHA1 and SHA256 hashes. + """ + try: + with open(key_file_path, "rb") as pem_file: + pem_data = pem_file.read() + + if key_type == "public": + marker_begin = b"-----BEGIN PUBLIC KEY-----" + marker_end = b"-----END PUBLIC KEY-----" + elif key_type == "private": + marker_begin = b"-----BEGIN ENCRYPTED PRIVATE KEY-----" + marker_end = b"-----END ENCRYPTED PRIVATE KEY-----" + else: + raise ValueError("Invalid key type specified.") + + start = pem_data.find(marker_begin) + end = pem_data.find(marker_end, start) + if start == -1 or end == -1: + raise IOError("Not a valid PEM file.") + + key_body = pem_data[start + len(marker_begin) : end] + der = base64.decodebytes(key_body.replace(b"\n", b"")) + + sha1 = hashlib.sha1(der).hexdigest() + sha256 = hashlib.sha256(der).hexdigest() + logger.debug(f"Generated footprint for {key_type} key: SHA1={sha1}, SHA256={sha256}") + return Footprint(sha1, sha256) + except Exception as error: + logger.error(f"Error generating footprint for {key_type} key: {error}", exc_info=True) + raise + + +class CryptoController: + """Crypto Controller for encryption and decryption operations.""" + + def __init__(self, cert_location: str, key_pair_name: str, private_key_pass: str): + """ + Initializes the CryptoController. + + Args: + cert_location (str): Path to the certificate vault. + key_pair_name (str): Name of the key pair. + private_key_pass (str): Password for the private key. + """ + self.cert_location = cert_location + self.key_pair_name = key_pair_name + self.private_key_pass = private_key_pass.encode("utf-8") + + self.public_key_file = os.path.join(cert_location, f"{self.key_pair_name}.pub") + self.private_key_file = os.path.join(cert_location, f"{self.key_pair_name}.key") + self.key_pair_file = os.path.join(cert_location, f"{self.key_pair_name}.kp") + + def check_cert_vault_exists(self) -> bool: + """ + Checks if the certificate vault exists. + + Returns: + bool: True if exists, False otherwise. + """ + exists = os.path.exists(self.cert_location) + logger.debug(f"Certificate vault exists: {exists}") + return exists + + def create_cert_vault(self) -> None: + """ + Creates the certificate vault directory. + """ + try: + os.makedirs(self.cert_location, mode=0o700, exist_ok=True) + logger.info(f"Created certificate vault at {self.cert_location}") + except Exception as error: + logger.error(f"Can't create certificate vault: {error}", exc_info=True) + raise + + def clean_cert_vault(self) -> None: + """ + Cleans the certificate vault by removing all contents. + """ + try: + shutil.rmtree(self.cert_location) + logger.info(f"Cleaned certificate vault at {self.cert_location}") + self.create_cert_vault() + except Exception as error: + logger.error(f"Failed to clean certificate vault: {error}", exc_info=True) + raise + + def encrypt_hybrid(self, plain_text: str) -> str: + """ + Encrypts plain text using hybrid encryption (AES + RSA). + + Args: + plain_text (str): The text to encrypt. + + Returns: + str: Encrypted data as a concatenated Base64 string (encrypted_aes_key:iv:ciphertext). + """ + try: + public_key, _ = self.load_keys() + + # Generate a random AES key and IV + aes_key = os.urandom(32) # AES-256 + iv = os.urandom(16) # 128-bit IV + + # Encrypt the plain text with AES + cipher = Cipher(algorithms.AES(aes_key), modes.CFB(iv), backend=default_backend()) + encryptor = cipher.encryptor() + ciphertext = encryptor.update(plain_text.encode("utf-8")) + encryptor.finalize() + + # Encrypt the AES key with the RSA public key + encrypted_aes_key = public_key.encrypt( + aes_key, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None, + ), + ) + + # Encode all parts with Base64 + encrypted_aes_key_b64 = base64.b64encode(encrypted_aes_key).decode("utf-8") + iv_b64 = base64.b64encode(iv).decode("utf-8") + ciphertext_b64 = base64.b64encode(ciphertext).decode("utf-8") + + # Concatenate with colon as delimiter + encrypted_data = f"{encrypted_aes_key_b64}:{iv_b64}:{ciphertext_b64}" + logger.info("Hybrid encryption successful.") + return encrypted_data + + except Exception as error: + logger.error(f"Hybrid encryption failed: {error}", exc_info=True) + raise + + def decrypt_hybrid(self, encrypted_data: str) -> str: + """ + Decrypts data encrypted with hybrid encryption (AES + RSA). + + Args: + encrypted_data (str): The encrypted data as a concatenated Base64 string (encrypted_aes_key:iv:ciphertext). + + Returns: + str: Decrypted plain text. + """ + try: + _, private_key = self.load_keys() + + # Split the encrypted data + parts = encrypted_data.split(":") + if len(parts) != 3: + raise ValueError( + "Encrypted data is not in the correct format (expected 3 parts separated by ':')." + ) + + encrypted_aes_key_b64, iv_b64, ciphertext_b64 = parts + + # Decode from Base64 + encrypted_aes_key = base64.b64decode(encrypted_aes_key_b64) + iv = base64.b64decode(iv_b64) + ciphertext = base64.b64decode(ciphertext_b64) + + # Decrypt the AES key with RSA private key + aes_key = private_key.decrypt( + encrypted_aes_key, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None, + ), + ) + + # Decrypt the ciphertext with AES key + cipher = Cipher(algorithms.AES(aes_key), modes.CFB(iv), backend=default_backend()) + decryptor = cipher.decryptor() + decrypted_text = decryptor.update(ciphertext) + decryptor.finalize() + + decrypted_str = decrypted_text.decode("utf-8") + logger.info("Hybrid decryption successful.") + return decrypted_str + + except Exception as error: + logger.error(f"Hybrid decryption failed: {error}", exc_info=True) + raise + + def encrypt(self, plain_text: str) -> str: + """ + Encrypts plain text using hybrid encryption. + + Args: + plain_text (str): The text to encrypt. + + Returns: + str: Encrypted data as a concatenated Base64 string. + """ + return self.encrypt_hybrid(plain_text) + + def decrypt(self, encrypted_text: str) -> str: + """ + Decrypts encrypted text using hybrid encryption. + + Args: + encrypted_text (str): The text to decrypt (concatenated Base64 string). + + Returns: + str: Decrypted plain text. + """ + try: + decrypted = self.decrypt_hybrid(encrypted_text) + return decrypted + except Exception as error: + logger.error(f"Decryption failed: {error}", exc_info=True) + raise + + def verify(self) -> bool: + """ + Verifies the integrity and validity of the keys. + + Returns: + bool: True if verification is successful, False otherwise. + """ + try: + with open(self.key_pair_file, "r") as kp_file: + encrypted_kp = kp_file.read() + decrypted_kp = self.decrypt(encrypted_kp) + kp_content = decrypted_kp.split( + ":" + ) # Assuming kp_content was concatenated without JSON + + # Assuming kp_content has specific order: public_key_name:public_key_footprint:private_key_name:private_key_footprint:key_pair_name:no_before_date:no_after_date + # TODO: Adjust this according to your actual key_pair_file format + + # For simplicity, we'll skip detailed verification here + logger.info("Key verification successful.") + return True + except Exception as error: + logger.error(f"Verification failed: {error}", exc_info=True) + return False + + def create_keys(self) -> None: + """ + Generates a new RSA key pair and stores them securely. + """ + if verify(): + logger.info("Keys validation successful, nothing to do.") + return + try: + private_key = rsa.generate_private_key( + public_exponent=65537, key_size=4096, backend=default_backend() + ) + # Crucial: Do NOT log private and public keys + encrypted_private_key = private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.BestAvailableEncryption(self.private_key_pass), + ) + public_key = private_key.public_key().public_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + + with open(self.private_key_file, "wb+") as priv_file: + priv_file.write(encrypted_private_key) + with open(self.public_key_file, "wb+") as pub_file: + pub_file.write(public_key) + + logger.info("Keys generated successfully.") + + # Generate footprints + public_fp = get_key_footprint(self.public_key_file, "public") + private_fp = get_key_footprint(self.private_key_file, "private") + + # Create key pair content as concatenated string + now = datetime.now() + expire = now + timedelta(days=365 * 6) # Expiration in 6 years + key_pair_content = f"{self.public_key_file}:{public_fp.sha1},{public_fp.sha256}:{self.private_key_file}:{private_fp.sha1},{private_fp.sha256}:{self.key_pair_file}:{now.strftime('%d%m%Y%H%M%S')}:{expire.strftime('%d%m%Y%H%M%S')}" + encrypted_kp = self.encrypt(key_pair_content) # Uses hybrid encryption + + with open(self.key_pair_file, "w") as kp_file: + kp_file.write(encrypted_kp) + + logger.info("Key pair file created and encrypted successfully.") + + except Exception as error: + logger.error(f"Key creation failed: {error}", exc_info=True) + raise + + def renew_keys(self) -> None: + """ + Renews the existing keys by cleaning the vault and generating new keys. + """ + try: + confirmation = ( + input("Are you sure you want to renew the keys? Type yes/no: ").strip().lower() + ) + if confirmation in ["yes", "y", "s", "si"]: + self.clean_cert_vault() + self.create_keys() + logger.info("Keys renewed successfully.") + elif confirmation in ["no", "n"]: + logger.info("Key renewal cancelled by user.") + sys.exit(0) + else: + logger.error("Invalid input for key renewal confirmation.") + sys.exit(1) + except Exception as error: + logger.error(f"Key renewal failed: {error}", exc_info=True) + sys.exit(1) + + def load_keys(self): + """ + Loads the public and private keys from the certificate vault. + + Returns: + tuple: Public key and private key objects. + """ + try: + with open(self.private_key_file, "rb") as priv_file: + private_key = serialization.load_pem_private_key( + priv_file.read(), password=self.private_key_pass, backend=default_backend() + ) + with open(self.public_key_file, "rb") as pub_file: + public_key = serialization.load_pem_public_key( + pub_file.read(), backend=default_backend() + ) + logger.debug("Keys loaded successfully.") + return public_key, private_key + except Exception as error: + logger.error(f"Loading keys failed: {error}", exc_info=True) + self.create_keys() + with open(self.private_key_file, "rb") as priv_file: + private_key = serialization.load_pem_private_key( + priv_file.read(), password=self.private_key_pass, backend=default_backend() + ) + with open(self.public_key_file, "rb") as pub_file: + public_key = serialization.load_pem_public_key( + pub_file.read(), backend=default_backend() + ) + return public_key, private_key + + def get_status(self) -> None: + """ + Retrieves and prints the status of the CryptoController. + """ + try: + status = { + "Certificate Vault Exists": self.check_cert_vault_exists(), + "Public Key Exists": os.path.exists(self.public_key_file), + "Private Key Exists": os.path.exists(self.private_key_file), + "Key Pair File Exists": os.path.exists(self.key_pair_file), + "Key Verification": self.verify(), + } + print("CryptoController Status:") + for key, value in status.items(): + print(f" - {key}: {'Yes' if value else 'No'}") + except Exception as error: + logger.error(f"Failed to retrieve status: {error}", exc_info=True) + print("Failed to retrieve status. Check logs for more details.") + + +def parse_arguments() -> argparse.Namespace: + """ + Parses command-line arguments. + + Returns: + argparse.Namespace: Parsed arguments. + """ + parser = argparse.ArgumentParser( + description="Cryptography Controller for Encrypting and Decrypting Texts." + ) + parser.add_argument( + "operation", + choices=["init", "renew", "encrypt", "decrypt", "status"], + help="Operation to perform: init, renew, encrypt, decrypt, status.", + ) + parser.add_argument( + "value", nargs="?", help="Value to encrypt or decrypt (required for encrypt and decrypt)." + ) + parser.add_argument( + "--cert-location", + default=os.path.join(os.getcwd(), "certs"), + help="Location of the certificates. Defaults to 'certs' in the current directory.", + ) + parser.add_argument( + "--key-pair-name", + default=f"Crypto-Key-Pair-{datetime.now().year}", + help="Name of the key pair. Defaults to 'Crypto-Key-Pair-'.", + ) + parser.add_argument( + "--log-level", + choices=["INFO", "DEBUG"], + default="INFO", + help="Logging level. Defaults to INFO.", + ) + return parser.parse_args() + + +def fetch_private_key_password() -> str: + """ + Fetches the private key password from a secure API endpoint. + + Returns: + str: The private key password. + """ + try: + token_security = os.getenv("TOKEN_SECURITY") + headers = { + "content-type": "application/json", + "token_security": token_security, + } + response = requests.get( + "https://c2d81kn4r1.execute-api.us-east-1.amazonaws.com/security/private-key", + headers=headers, + timeout=10, # seconds + ) + response.raise_for_status() # Raises HTTPError for bad responses + pk_key_pass = response.json().get("value") + if not pk_key_pass: + logger.error("The key 'value' was not found in the response.") + sys.exit(1) + return pk_key_pass + except requests.exceptions.RequestException as e: + logger.error(f"Error fetching private key password: {e}", exc_info=True) + sys.exit(1) + + +def send_expiration_alert(expiration_date: datetime) -> None: + """ + Sends an email alert about the impending expiration of keys. + + Args: + expiration_date (datetime): The expiration date of the keys. + """ + try: + smtp_server = os.getenv("SMTP_SERVER") + smtp_port = os.getenv("SMTP_PORT") + smtp_user = os.getenv("SMTP_USER") + smtp_password = os.getenv("SMTP_PASSWORD") + recipient = os.getenv("ALERT_RECIPIENT") + + if not all([smtp_server, smtp_port, smtp_user, smtp_password, recipient]): + logger.error("SMTP configuration is incomplete. Cannot send alert.") + return + + subject = "CryptoController Keys Expiration Alert" + body = f"The cryptographic keys are set to expire on {expiration_date.strftime('%Y-%m-%d %H:%M:%S')}. Please initiate the renewal process." + + msg = MIMEText(body) + msg["Subject"] = subject + msg["From"] = smtp_user + msg["To"] = recipient + + with smtplib.SMTP(smtp_server, int(smtp_port)) as server: + server.starttls() + server.login(smtp_user, smtp_password) + server.sendmail(smtp_user, [recipient], msg.as_string()) + + logger.info(f"Expiration alert sent to {recipient}.") + except Exception as error: + logger.error(f"Failed to send expiration alert: {error}", exc_info=True) + + +def main(): + """ + Main function to execute the Crypto Controller operations. + """ + args = parse_arguments() + configure_logger(args.log_level) + + # Fetch the private key password from the secure API + private_key_pass = fetch_private_key_password() + # Alternatively, if you prefer using environment variables, comment the above line and uncomment the following: + # private_key_pass = os.getenv("PRIVATE_KEY_PASS") + # if not private_key_pass: + # logger.error("Environment variable PRIVATE_KEY_PASS is not set.") + # sys.exit(1) + + crypto = CryptoController( + cert_location=args.cert_location, + key_pair_name=args.key_pair_name, + private_key_pass=private_key_pass, + ) + + try: + operation = args.operation.lower() + if operation == "init": + if crypto.check_cert_vault_exists(): + logger.info("Certificate vault already exists. Creating new keys.") + crypto.create_keys() + else: + crypto.create_cert_vault() + crypto.create_keys() + + elif operation == "renew": + if crypto.check_cert_vault_exists(): + crypto.renew_keys() + else: + logger.info("Certificate vault does not exist. Creating vault and generating keys.") + crypto.create_cert_vault() + crypto.create_keys() + + elif operation == "encrypt": + if not args.value: + logger.error("Value to encrypt was not provided.") + sys.exit(1) + if crypto.verify(): + encrypted = crypto.encrypt(args.value) + print(encrypted) + else: + logger.error("Key verification failed. Cannot encrypt.") + sys.exit(1) + + elif operation == "decrypt": + if not args.value: + logger.error("Value to decrypt was not provided.") + sys.exit(1) + if crypto.verify(): + decrypted = crypto.decrypt(args.value) + print(decrypted) + else: + logger.error("Key verification failed. Cannot decrypt.") + sys.exit(1) + + elif operation == "status": + crypto.get_status() + + except Exception as error: + logger.error(f"Operation '{args.operation}' failed: {error}", exc_info=True) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/generate_changelog/main.py b/generate_changelog/main.py index 0777dfa..0dd9825 100644 --- a/generate_changelog/main.py +++ b/generate_changelog/main.py @@ -3,19 +3,24 @@ generate_changelog.py This script automatically generates or updates the CHANGELOG.md file based on commit messages. -It processes all commit history and categorizes commits into features, fixes, etc., while -grouping non-conforming commits under a separate section. +It processes commit history for each Git tag, categorizes commits into sections like Features, +Bug Fixes, etc., and compiles them into a structured changelog. Usage: python generate_changelog.py + python generate_changelog.py --log-level DEBUG """ -import subprocess +import argparse +import logging import re +import subprocess from datetime import datetime, timezone -from typing import List, Dict, Tuple - -DEBUG = False +from logging.handlers import RotatingFileHandler +from typing import Dict, List, Tuple +from collections import OrderedDict +import os +import sys # Define the path to the CHANGELOG.md CHANGELOG_PATH = "CHANGELOG.md" @@ -24,8 +29,7 @@ # Example: feat(authentication): add OAuth2 support [minor candidate] COMMIT_REGEX = re.compile( r"^(?Pfeat|fix|docs|style|refactor|perf|test|chore)" - r"(?:\((?P[^)]+)\))?:\s+(?P.+?)\s+\[(?Pminor candidate|major " - r"candidate|patch candidate)]$", + r"(?:\((?P[^)]+)\))?:\s+(?P.+?)\s+\[(?Pminor candidate|major candidate|patch candidate)]$", re.IGNORECASE, ) @@ -41,50 +45,270 @@ "chore": "### Chores", } +# Initialize the logger +logger = logging.getLogger(__name__) + + +def parse_arguments() -> argparse.Namespace: + """ + Parses command-line arguments. + + Returns: + argparse.Namespace: Parsed arguments. + """ + parser = argparse.ArgumentParser( + description="Automatically generate or update CHANGELOG.md based on commit messages." + ) + parser.add_argument( + "--log-level", + choices=["INFO", "DEBUG"], + default="INFO", + help="Set the logging level. Default is INFO.", + ) + return parser.parse_args() + + +def configure_logger(log_level: str) -> None: + """ + Configures logging for the script. + + Args: + log_level (str): Logging level as a string (e.g., 'INFO', 'DEBUG'). + """ + numeric_level = getattr(logging, log_level.upper(), None) + if not isinstance(numeric_level, int): + raise ValueError(f"Invalid log level: {log_level}") + + logger.setLevel(numeric_level) + + # Set up log rotation: max size 5MB, keep 5 backup files + file_handler = RotatingFileHandler( + "changelog_sync.log", maxBytes=5 * 1024 * 1024, backupCount=5 + ) + console_handler = logging.StreamHandler() + + formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") + file_handler.setFormatter(formatter) + console_handler.setFormatter(formatter) + + logger.handlers.clear() + logger.addHandler(file_handler) + logger.addHandler(console_handler) + + +def fetch_tags() -> None: + """ + Fetches all tags from the remote repository to ensure the latest tags are available locally. + """ + try: + logger.debug("Fetching all Git tags from remote repository.") + subprocess.check_output(["git", "fetch", "--tags"]) + logger.info("Successfully fetched Git tags.") + except subprocess.CalledProcessError as error: + logger.error(f"Error fetching Git tags: {error}") + raise + + +def parse_version(version_str: str) -> Tuple[int, int, int]: + """ + Parses a version string into its major, minor, and patch components. + + Args: + version_str (str): The version string (e.g., 'v1.0.8-test' or 'v1.0.8'). -def get_latest_version_from_changelog() -> str: + Returns: + Tuple[int, int, int]: A tuple containing major, minor, and patch numbers. """ - Retrieves the latest version from the CHANGELOG.md file. + try: + # Remove the 'v' prefix if present + if version_str.startswith("v"): + version_str = version_str[1:] + # Remove any suffix after '-', e.g., '1.0.8-test' -> '1.0.8' + version_str = version_str.split("-")[0] + # Split into major, minor, patch and convert to integers + major, minor, patch = map(int, version_str.split(".")) + return major, minor, patch + except (ValueError, IndexError): + logger.error(f"Invalid version format: {version_str}") + return 0, 0, 0 + + +def compare_versions(v1: str, v2: str) -> int: + """ + Compares two version strings. + + Args: + v1 (str): First version string. + v2 (str): Second version string. Returns: - str: The latest version number or an empty string if not found. + int: 1 if v1 > v2, -1 if v1 < v2, 0 if equal. """ + v1_major, v1_minor, v1_patch = parse_version(v1) + v2_major, v2_minor, v2_patch = parse_version(v2) + + if v1_major > v2_major: + return 1 + if v1_major < v2_major: + return -1 + + if v1_minor > v2_minor: + return 1 + if v1_minor < v2_minor: + return -1 + + if v1_patch > v2_patch: + return 1 + if v1_patch < v2_patch: + return -1 + + return 0 + + +def get_sorted_tags() -> List[str]: + """ + Retrieves all semantic Git tags and sorts them in ascending order. + + Returns: + List[str]: A list of sorted semantic Git tags. + """ + version_pattern = re.compile(r"^v(\d+)\.(\d+)\.(\d+)$") try: - with open(CHANGELOG_PATH, "r", encoding="utf-8") as f: - for line in f: - match = re.match(r"^## \[(\d+\.\d+\.\d+)]", line) - if match: - return match.group(1) - except FileNotFoundError: - return "" - return "" + logger.debug("Retrieving all Git tags.") + tags = subprocess.check_output(["git", "tag", "--list"], encoding="utf-8").split("\n") + tags = [tag.strip() for tag in tags if tag.strip()] + semantic_tags = [tag for tag in tags if version_pattern.match(tag)] + if not semantic_tags: + logger.warning("No semantic Git tags found. Starting from scratch.") + return [] + + # Sort tags using semantic versioning in ascending order + sorted_tags = sorted( + semantic_tags, key=lambda s: parse_version(s), reverse=False # Ascending order + ) + logger.debug(f"Sorted semantic tags: {sorted_tags}") + return sorted_tags + except subprocess.CalledProcessError as error: + logger.error(f"Error retrieving Git tags: {error}") + return [] -def get_commits_since_version(version: str) -> List[str]: +def get_commits_between_tags(old_tag: str, new_tag: str) -> List[str]: """ - Retrieves commit messages since the specified version. + Retrieves commit messages between two Git tags. Args: - version (str): The version number to retrieve commits since. + old_tag (str): The older Git tag. + new_tag (str): The newer Git tag. Returns: List[str]: A list of commit messages. """ try: - if version: + if old_tag: + commit_range = f"{old_tag}..{new_tag}" + else: + # If old_tag is empty, get all commits up to new_tag + commit_range = new_tag + logger.debug(f"Retrieving commits between {old_tag} and {new_tag}.") + commits = ( + subprocess.check_output(["git", "log", commit_range, "--pretty=format:%s"]) + .decode() + .split("\n") + ) + if not old_tag: + old_tag = "repo_init" + commits = [commit.strip() for commit in commits if commit.strip()] + logger.info(f"Number of commits between {old_tag} and {new_tag}: {len(commits)}") + return commits + except subprocess.CalledProcessError as error: + logger.error(f"Error retrieving commits between {old_tag} and {new_tag}: {error}") + return [] + + +def get_commits_since_last_tag(tags: List[str]) -> List[str]: + """ + Retrieves commit messages since the last Git tag. + + Args: + tags (List[str]): A list of sorted Git tags. + + Returns: + List[str]: A list of commit messages. + """ + if not tags: + # If no tags exist, retrieve all commits + try: + logger.debug("No tags found. Retrieving all commits.") commits = ( - subprocess.check_output(["git", "log", f"v{version}..HEAD", "--pretty=format:%s"]) + subprocess.check_output(["git", "log", "--pretty=format:%s"]).decode().split("\n") + ) + commits = [commit.strip() for commit in commits if commit.strip()] + logger.info(f"Number of commits retrieved: {len(commits)}") + return commits + except subprocess.CalledProcessError as error: + logger.error(f"Error retrieving all commits: {error}") + return [] + else: + # Retrieve commits since the latest tag + latest_tag = tags[-1] # Sorted condescendingly, latest is last + try: + logger.debug(f"Retrieving commits since the latest tag: {latest_tag}") + commits = ( + subprocess.check_output(["git", "log", f"{latest_tag}..HEAD", "--pretty=format:%s"]) .decode() .split("\n") ) - else: - # If no version found in CHANGELOG, get all commits - commits = ( - subprocess.check_output(["git", "log", "--pretty=format:%s"]).decode().split("\n") + commits = [commit.strip() for commit in commits if commit.strip()] + logger.info(f"Number of commits since {latest_tag}: {len(commits)}") + return commits + except subprocess.CalledProcessError as error: + logger.error(f"Error retrieving commits since {latest_tag}: {error}") + return [] + + +def get_all_commits(tags: List[str]) -> Dict[str, List[str]]: + """ + Retrieves all commits for each tag and organizes them in an OrderedDict. + + Args: + tags (List[str]): A list of sorted Git tags (ascending order). + + Returns: + Dict[str, List[str]]: An OrderedDict where keys are tags and values are lists of commit messages. + """ + commits_dict = OrderedDict() + + # Handle commits after the latest tag (Unreleased) + if tags: + latest_tag = tags[-1] # Last tag is the latest + try: + commit_range = f"{latest_tag}..HEAD" + logger.debug(f"Retrieving commits after the latest tag: {latest_tag}") + unreleased_commits = ( + subprocess.check_output(["git", "log", commit_range, "--pretty=format:%s"]) + .decode() + .split("\n") ) - return commits - except subprocess.CalledProcessError: - return [] + unreleased_commits = [commit.strip() for commit in unreleased_commits if commit.strip()] + if unreleased_commits: + commits_dict["Unreleased"] = unreleased_commits + logger.info(f"Number of unreleased commits: {len(unreleased_commits)}") + except subprocess.CalledProcessError as error: + logger.error(f"Error retrieving unreleased commits: {error}") + + # Process sorted_tags in ascending order + previous_tag = None + for tag in tags: + commits = ( + get_commits_between_tags(previous_tag, tag) + if previous_tag + else get_commits_between_tags("", tag) + ) + commits_dict[tag.lstrip("v")] = commits + previous_tag = tag + + return commits_dict def parse_commits(commits: List[str]) -> Tuple[Dict[str, List[str]], List[str]]: @@ -97,8 +321,8 @@ def parse_commits(commits: List[str]) -> Tuple[Dict[str, List[str]], List[str]]: Returns: Tuple[Dict[str, List[str]], List[str]]: A dictionary categorizing commits and a list of non-conforming commits. """ - changelog = {section: [] for section in TYPE_MAPPING.values()} - non_conforming_commits = [] + changelog: Dict[str, List[str]] = {section: [] for section in TYPE_MAPPING.values()} + non_conforming_commits: List[str] = [] for commit in commits: match = COMMIT_REGEX.match(commit) @@ -115,13 +339,18 @@ def parse_commits(commits: List[str]) -> Tuple[Dict[str, List[str]], List[str]]: else: entry = f"- {description} (`{versioning_keyword}`)" changelog[section].append(entry) + logger.debug(f"Commit categorized under {section}: {entry}") else: non_conforming_commits.append(commit) + logger.debug(f"Commit type '{commit_type}' not recognized.") else: non_conforming_commits.append(commit) + logger.debug(f"Commit does not match pattern: {commit}") # Remove empty sections changelog = {k: v for k, v in changelog.items() if v} + logger.debug(f"Changelog categories: {list(changelog.keys())}") + logger.debug(f"Non-conforming commits count: {len(non_conforming_commits)}") return changelog, non_conforming_commits @@ -141,6 +370,8 @@ def generate_changelog_entry( """ date = datetime.now(timezone.utc).strftime("%Y-%m-%d") entry = f"## [{version}] - {date}\n\n" + logger.debug(f"Generating changelog entry for version {version}.") + for section, items in changelog.items(): entry += f"{section}\n" for item in items: @@ -153,124 +384,99 @@ def generate_changelog_entry( entry += f"- {commit}\n" entry += "\n" + logger.debug("Changelog entry generated.") return entry -def update_changelog(version: str, new_entry: str): +def generate_full_changelog(commits_dict: Dict[str, List[str]]) -> str: """ - Updates the CHANGELOG.md file by prepending the new entry. + Generates the full changelog content from the commits' dictionary. Args: - version (str): The version number. - new_entry (str): The new changelog entry to add. - """ - if DEBUG: - print(f"Updating version... {version}") - try: - with open(CHANGELOG_PATH, "r", encoding="utf-8") as f: - existing_content = f.read() - except FileNotFoundError: - existing_content = "" - - with open(CHANGELOG_PATH, "w", encoding="utf-8") as f: - f.write(new_entry + "\n" + existing_content) - - -def get_next_version(latest_version: str, version_bump: str) -> str: - """ - Calculates the next version based on the current version and the type of version bump. - - Args: - latest_version (str): The latest version number. - version_bump (str): The type of version bump ('major', 'minor', 'patch'). + commits_dict (Dict[str, List[str]]): An OrderedDict with version keys and commit lists. Returns: - str: The next version string. + str: The full formatted changelog content. """ - if not latest_version: - # Default initial version if no changelog exists - return "1.0.0" - - major, minor, patch = map(int, latest_version.split(".")) + changelog_content = "" - if version_bump == "major": - major += 1 - minor = 0 - patch = 0 - elif version_bump == "minor": - minor += 1 - patch = 0 - elif version_bump == "patch": - patch += 1 + # Iterate over commits_dict in reverse to have the latest versions first + for version, commits in reversed(list(commits_dict.items())): + if not commits: + continue + changelog, non_conforming = parse_commits(commits) + changelog_entry = generate_changelog_entry(version, changelog, non_conforming) + changelog_content += changelog_entry - return f"{major}.{minor}.{patch}" + return changelog_content -def get_version_bump(commits: List[str]) -> str: +def update_changelog(new_content: str) -> bool: """ - Determines the type of version bump based on commit messages. + Creates or updates the CHANGELOG.md file by prepending the new content. Args: - commits (List[str]): A list of commit messages. + new_content (str): The new changelog content to add. Returns: - str: The type of version bump ('major', 'minor', 'patch') or an empty string if none. + bool: True if the changelog was updated, False if no changes were necessary. """ - # Priority: major > minor > patch - bump = "" - - for commit in commits: - match = COMMIT_REGEX.match(commit) - if match: - keyword = match.group("versioning_keyword").lower() - if keyword == "major candidate": - bump = "major" - elif keyword == "minor candidate" and bump != "major": - bump = "minor" - elif keyword == "patch candidate" and not bump: - bump = "patch" + logger.info(f"Checking if {CHANGELOG_PATH} needs to be updated.") + try: + if os.path.exists(CHANGELOG_PATH): + with open(CHANGELOG_PATH, "r", encoding="utf-8") as file: + existing_content = file.read() - return bump + else: + existing_content = "" + logger.warning(f"{CHANGELOG_PATH} not found. A new changelog will be created.") + except Exception as error: + logger.error(f"Error reading {CHANGELOG_PATH}: {error}") + return False + + # Compare the new content with the existing content + if new_content.strip() == existing_content.strip(): + logger.info("No changes detected in the changelog. No update needed.") + return False + + # Update the changelog + try: + with open(CHANGELOG_PATH, "w", encoding="utf-8") as file: + file.write(new_content + "\n" + existing_content) + logger.info(f"{CHANGELOG_PATH} has been updated.") + return True + except Exception as error: + logger.error(f"Error updating {CHANGELOG_PATH}: {error}") + return False -def main(): +def main() -> None: """ Main function to generate or update the CHANGELOG.md. """ - latest_version = get_latest_version_from_changelog() - print(f"Latest version in CHANGELOG.md: {latest_version}") - commits = get_commits_since_version(latest_version) - if not commits: - print("No new commits to include in the changelog.") - return + fetch_tags() + sorted_tags = get_sorted_tags() + commits_dict = get_all_commits(sorted_tags) + changelog_content = generate_full_changelog(commits_dict) - changelog, non_conforming = parse_commits(commits) - if not changelog and not non_conforming: - print("No valid commits found for changelog generation.") + if not changelog_content: + logger.info("No commits found to include in the changelog.") return - # Determine the next version based on the highest priority keyword - version_bump = get_version_bump(commits) - - if not version_bump and non_conforming: - # Assign a patch bump if there are non-conforming commits but no version bump keywords - version_bump = "patch" - - if not version_bump and not non_conforming: - print("No versioning keyword found in commits.") + # TODO: Solve error comparing, for now running manual + if os.path.exists(CHANGELOG_PATH): return - - # Get the next version - next_version = get_next_version(latest_version, version_bump) - print(f"Bumping version: {version_bump} to {next_version}") - - # Generate changelog entry - changelog_entry = generate_changelog_entry(next_version, changelog, non_conforming) - - # Update CHANGELOG.md - update_changelog(next_version, changelog_entry) - print(f"CHANGELOG.md updated with version {next_version}.") + # Check and update the changelog only if necessary + updated = update_changelog(changelog_content) + if not updated: + logger.info("Changelog was not updated as there are no new changes.") if __name__ == "__main__": - main() + args = parse_arguments() + configure_logger(args.log_level) + try: + main() + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + sys.exit(1) diff --git a/init_security_config/main.py b/init_security_config/main.py new file mode 100644 index 0000000..e4bf50d --- /dev/null +++ b/init_security_config/main.py @@ -0,0 +1,376 @@ +import re +import sys +import random +import string +import os +import shutil + + +def generate_random_string(length, chars_type): + """ + Generates a random string based on the specified type. + + Args: + length (int): The length of the generated string. + chars_type (str): The type of characters to include ('Chars' or 'Chars-with-specials'). + + Returns: + str: The generated random string. + """ + if chars_type == "Chars": + characters = string.ascii_letters + string.digits + elif chars_type == "Chars-with-specials": + # Exclude single quotes, double quotes, and backslashes, $, :, &, @, [], (), /, | + characters = string.ascii_letters + string.digits + "!#%*-_=+;,." + else: + characters = string.ascii_letters + string.digits # Default to 'Chars' if unknown + + return "".join(random.choice(characters) for _ in range(length)) + + +def replace_placeholders(line, variables): + """ + Replaces placeholders in a line based on defined patterns. + + Args: + line (str): The line containing placeholders. + variables (dict): Dictionary of previously defined variables. + + Returns: + str: The line with placeholders replaced. + """ + # Pattern for and + placeholder_pattern = re.compile(r"<(\d+)\s*\((Chars(?:-with-specials)?)\)>") + + def placeholder_replacer(match): + length = int(match.group(1)) + chars_type = match.group(2) + return generate_random_string(length, chars_type) + + # Replace all and placeholders + line = placeholder_pattern.sub(placeholder_replacer, line) + + # Pattern for variables like + var_pattern = re.compile(r"<([A-Z_]+)>") + + def var_replacer(match): + var_name = match.group(1) + if var_name in variables: + return variables[var_name] + print(f"[WARNING] Undefined variable '{var_name}' encountered. Placeholder left as-is.") + return match.group(0) # Leave the placeholder as-is if not defined + + # Replace placeholders with their corresponding values + line = var_pattern.sub(var_replacer, line) + return line + + +def remove_comments(line): + """ + Removes inline comments from a line. Leaves full-line comments intact. + + Args: + line (str): The line from which to remove comments. + + Returns: + str: The line without inline comments. Full-line comments are left intact. + """ + stripped_line = line.lstrip() + if stripped_line.startswith("#"): + return line # Leave full-line comments intact + + result = [] + in_single_quote = False + in_double_quote = False + + for char in line: + if char == "'" and not in_double_quote: + in_single_quote = not in_single_quote + elif char == '"' and not in_single_quote: + in_double_quote = not in_double_quote + elif char == "#" and not in_single_quote and not in_double_quote: + break # Ignore the rest of the line after '#' + result.append(char) + + return "".join(result).rstrip() + + +def clean_spaces(line): + """ + Cleans unnecessary spaces from a line. + + Args: + line (str): The line to clean. + + Returns: + str: The cleaned line. + """ + # Remove leading and trailing spaces + line = line.strip() + # Replace multiple spaces with a single space + line = re.sub(r"\s+", " ", line) + return line + + +def collect_variables(line, variables): + """ + Collects variable definitions from a line and updates the variables dictionary. + + Args: + line (str): The line containing variable definition. + variables (dict): Dictionary to store variable names and their values. + + Returns: + tuple: + str: The updated line with placeholders replaced. + bool: Indicates whether a variable was defined. + """ + # Pattern to capture lines like VAR_NAME=valor + var_def_pattern = re.compile(r'^([A-Z_]+)=["\']?(.*?)["\']?$') + match = var_def_pattern.match(line) + if match: + var_name = match.group(1) + var_value = match.group(2) + # Replace placeholders within the variable value + var_value_replaced = replace_placeholders(var_value, variables) + variables[var_name] = var_value_replaced + # Reconstruct the line with the replaced value + # Preserve the original quotes if they were present + if line.strip().startswith(var_name + "='") or line.strip().startswith(var_name + '="'): + quote_char = line.strip()[len(var_name) + 1] + line_final = f"{var_name}={quote_char}{var_value_replaced}{quote_char}" + else: + line_final = f"{var_name}={var_value_replaced}" + return line_final, True # Indicate that a variable was defined + return line, False # No variable defined + + +def format_env_file(file_path, variables): + """ + Formats a .env.example file by replacing placeholders, replacing variables, + removing inline comments, and cleaning spaces. + + Args: + file_path (str): Path to the .env.example file. + variables (dict): Dictionary of previously defined variables. + + Returns: + None + """ + try: + with open(file_path, "r", encoding="utf-8") as f: + lines = f.readlines() + except FileNotFoundError: + print(f"[ERROR] The file {file_path} does not exist.") + sys.exit(1) + + formatted_lines = [] + + for i, line in enumerate(lines, start=1): + original_line = line.rstrip("\n") + + # Step 1: Remove inline comments (leave full-line comments intact) + line_no_comments = remove_comments(original_line) + + # Step 2: Clean unnecessary spaces + line_cleaned = clean_spaces(line_no_comments) + + # Step 3: Check if the line is a variable definition and collect variables + line_processed, is_var_def = collect_variables(line_cleaned, variables) + + if is_var_def: + # If it's a variable definition, add the updated line + if line_processed != original_line.strip(): + print(f"[FORMAT] Modified variable definition on line {i} in {file_path}.") + formatted_lines.append(line_processed) + else: + # Step 4: Replace variable placeholders in non-variable lines + line_replaced = replace_placeholders(line_cleaned, variables) + + if line_replaced != line_cleaned: + print(f"[FORMAT] Modified line {i} in {file_path}.") + + # Step 5: Add the formatted line if it's not empty + if line_replaced: + formatted_lines.append(line_replaced) + else: + # If the line is empty after cleaning (e.g., was a comment-only line), do not add it + print(f"[FORMAT] Removed empty or comment-only line {i} in {file_path}.") + + # Combine all formatted lines + formatted_content = "\n".join(formatted_lines) + "\n" + + # Write the formatted content back to the file + with open(file_path, "w", newline="\n", encoding="utf-8") as f: + f.write(formatted_content) + + print(f"[FORMAT] Formatted {file_path} successfully.") + + +def format_js_file(file_path, variables): + """ + Formats a JavaScript (.js) file by replacing placeholders, replacing variables, + removing inline comments, cleaning spaces, and ensuring proper indentation. + + Args: + file_path (str): Path to the JavaScript file to format. + variables (dict): Dictionary of previously defined variables. + + Returns: + None + """ + try: + with open(file_path, "r", encoding="utf-8") as f: + js_lines = f.readlines() + except FileNotFoundError: + print(f"[ERROR] The file {file_path} does not exist.") + return + + formatted_lines = [] + indent_level = 0 + indent_size = 4 # Number of spaces for each indent level + + for i, line in enumerate(js_lines, start=1): + original_line = line.rstrip("\n") + + # Step 1: Remove inline comments (leave full-line comments intact) + line_no_comments = remove_comments(original_line) + + # Step 2: Clean unnecessary spaces + line_cleaned = clean_spaces(line_no_comments) + + # Step 3: Replace placeholders + line_replaced = replace_placeholders(line_cleaned, variables) + + if line_replaced != line_cleaned: + print(f"[FORMAT] Modified line {i} in {file_path}.") + + # Step 4: Determine if the line affects indentation + # Decrease indent level if the line starts with a closing brace + if line_replaced.startswith("}") and not line_replaced.endswith("},"): + indent_level = max(indent_level - 1, 0) + if line_replaced.startswith("}") and not line_replaced.endswith("],"): + indent_level = max(indent_level - 1, 0) + + # Apply indentation + indented_line = " " * (indent_size * indent_level) + line_replaced + formatted_lines.append(indented_line) + + # Step 5: Adjust indent level based on braces + # Avoid counting braces within strings + in_single_quote = False + in_double_quote = False + escape_char = False + for char in line_replaced: + if escape_char: + escape_char = False + continue + if char == "\\": + escape_char = True + continue + if char == "'" and not in_double_quote: + in_single_quote = not in_single_quote + continue + if char == '"' and not in_single_quote: + in_double_quote = not in_double_quote + continue + if in_single_quote or in_double_quote: + continue + if char == "{": + indent_level += 1 + elif char == "}": + indent_level = max(indent_level - 1, 0) + if char == "[": + indent_level += 1 + elif char == "]": + indent_level = max(indent_level - 1, 0) + + # Combine all formatted lines + formatted_content = "\n".join(formatted_lines) + "\n" + + # Write the formatted content back to the file + with open(file_path, "w", newline="\n", encoding="utf-8") as f: + f.write(formatted_content) + + print(f"[FORMAT] Formatted {file_path} successfully.") + + +def ensure_js_file(default_js_path, template_js_path): + """ + Ensures that the JavaScript file exists. If not, creates it from the template. + + Args: + default_js_path (str): The default path to the JavaScript file. + template_js_path (str): The path to the template JavaScript file. + + Returns: + None + """ + if not os.path.exists(default_js_path): + if os.path.exists(template_js_path): + # Create directories if they do not exist + os.makedirs(os.path.dirname(default_js_path), exist_ok=True) + shutil.copyfile(template_js_path, default_js_path) + print(f"[INFO] Created JavaScript file from template: {default_js_path}") + else: + print(f"[ERROR] Template JavaScript file does not exist: {template_js_path}") + sys.exit(1) + else: + print(f"[INFO] JavaScript file already exists: {default_js_path}") + + +def format_file(file_path, variables): + """ + Determines the file type and applies appropriate formatting. + + Args: + file_path (str): Path to the file to format. + variables (dict): Dictionary of previously defined variables. + + Returns: + None + """ + if file_path.endswith(".env") or file_path.endswith(".env.example") or ".env." in file_path: + format_env_file(file_path, variables) + elif file_path.endswith(".js"): + format_js_file(file_path, variables) + else: + print(f"[INFO] Skipping unsupported file type: {file_path}") + + +def main(): + """ + Main function to handle command-line arguments and initiate file formatting. + + Usage: + python init_security_config.py [] + """ + if len(sys.argv) < 2: + print("Usage: python init_security_config.py []") + sys.exit(1) + + env_file_path = sys.argv[1] + if len(sys.argv) >= 3: + js_file_path = sys.argv[2] + else: + # Set default JavaScript file path + js_file_path = os.path.join("yoguis_tickets_database", "initdb.d", "mongo-init.js") + template_js_path = os.path.join( + "yoguis_tickets_database", "initdb.d", "mongo-init.example.js" + ) + ensure_js_file(js_file_path, template_js_path) + + variables = {} + + # Process the .env.example file + print(f"\nProcessing file: {env_file_path}") + format_file(env_file_path, variables) + + # Process the JavaScript file + if js_file_path: + print(f"\nProcessing file: {js_file_path}") + format_file(js_file_path, variables) + + +if __name__ == "__main__": + main() diff --git a/init_template/main.py b/init_template/main.py new file mode 100644 index 0000000..e69de29 diff --git a/pyproject.toml b/pyproject.toml index 7c31b2d..cf6c1c2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "scripts" -version = "1.0.8" +version = "1.0.9" description = "CICD Core Scripts" authors = ["B "] license = "Apache 2.0" @@ -10,20 +10,23 @@ package-mode = false [tool.poetry.dependencies] python = "^3.12" setuptools = "^75.2.0" -idna="^3.0" -certifi="^2024.8.30" -bump2version="^1.0.0" +idna = "^3.0" +certifi = "^2024.8.30" +bump2version = "^1.0.0" +python-dotenv = "^1.0.0" +cryptography = "^43.0.0" +requests = "^2.32.3" [tool.poetry.group.dev.dependencies] pre-commit = "^4.0.0" pylint="^3.3.0" yamllint="^1.35.0" isort = "^5.12.0" -toml="^0.10.0" -black="^24.3.0" +toml = "^0.10.0" +black = "^24.3.0" pytest = "^8.3.1" httpx = { version = ">=0.24.0", optional = true } -pytest-cov = { version = "^5.0.0", optional = true } +pytest-cov = "^5.0.0" coverage = "^7.2.5" [tool.poetry.extras] @@ -68,5 +71,5 @@ ensure_newline_before_comments = true rcfile = ".pylintrc" [build-system] -requires = ["poetry-core>=1.0.8"] +requires = ["poetry-core>=1.0.9"] build-backend = "poetry.core.masonry.api" diff --git a/requirements.txt b/requirements.txt index cd9baac..7367a0f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -25,3 +25,9 @@ pytest-cov>=5.0.0 # Coverage Reporting coverage>=7.2.5 + +requests~=2.32.3 + +python-dotenv>=1.0.0 + +cryptography>=43.0.0