Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add logging to decrypt.py #18

Merged
merged 7 commits into from
Jul 3, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions middleware/decrypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

logger = logging.getLogger(__name__)

# pylint: disable=logging-fstring-interpolation

athith-g marked this conversation as resolved.
Show resolved Hide resolved

def get_private_keys(file_paths: list[Path]) -> list[bytes]:
"""Retrieve private keys from a list of files.
Expand All @@ -36,9 +38,9 @@ def get_private_keys(file_paths: list[Path]) -> list[bytes]:
# Callback returns password of sk
key = get_private_key(filepath=file_path, callback=lambda x: '')
private_keys.append(key)
logger.debug("%s identified as a private key", file_path)
logger.debug(f"{file_path} identified as a private key", )
except ValueError:
logger.debug("%s not identified as a private key", file_path)
logger.debug(f"{file_path} not identified as a private key")
continue
return private_keys

Expand All @@ -61,7 +63,7 @@ def decrypt_files(file_paths: list[Path], private_keys: list[bytes]):
try:
decrypt(keys=key_tuples, infile=f_in, outfile=f_out) # Checks for magic
shutil.move(f_out.name, file_path)
logger.info("Decrypted %s successfully", file_path)
logger.info(f"Decrypted {file_path} successfully")
except ValueError as e:
if str(e) != "Not a CRYPT4GH formatted file":
print(f"Private key for {file_path} not provided")
Expand All @@ -87,7 +89,7 @@ def move_files(file_paths: list[Path], output_dir: Path) -> list[Path]:
existing_names.add(file_path.name)
for src, dest in zip(file_paths, output_paths):
shutil.move(src, dest)
logger.debug("Moved %s to %s", src, dest)
logger.debug(f"Moved {src} to {dest}")
return output_paths


Expand All @@ -104,7 +106,7 @@ def remove_files(directory: Path):
raise ValueError(f"Could not remove files: {directory} is not a directory.")
for file in directory.iterdir():
subprocess.run(["rm", "-P", str(file)], check=True)
logger.debug("Removed %s", file.name)
logger.debug(f"Removed {file.name}")


def get_args():
Expand All @@ -131,8 +133,8 @@ def get_args():
def main():
"""Coordinate execution of script."""
args = get_args()
logger.debug("file_paths: %s", ", ".join([f.name for f in args.file_paths]))
logger.debug("output_dir: %s", args.output_dir)
logger.debug(f"file paths: {", ".join([f.name for f in args.file_paths])}")
logger.debug(f"output directory: {args.output_dir}")
athith-g marked this conversation as resolved.
Show resolved Hide resolved
new_paths = move_files(file_paths=args.file_paths, output_dir=args.output_dir)
keys = get_private_keys(file_paths=new_paths)
try:
Expand Down