Skip to content

Commit

Permalink
Setting up PR/push workflows (#34)
Browse files Browse the repository at this point in the history
* Setting up pull request/push workflows to run Black, MyPy and Flake8
* Fixing formatting
  • Loading branch information
joaojacome authored Jan 14, 2023
1 parent af79b9e commit 6035f80
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 72 deletions.
25 changes: 25 additions & 0 deletions .github/workflows/black.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: black

on:
push:
branches:
- master
pull_request: {}

jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install black
run: |
pip install --upgrade pip
python3.10 -m venv env
source env/bin/activate
pip install black
- name: Run black
run: |
env/bin/black --check --verbose bw_add_sshkeys.py
25 changes: 25 additions & 0 deletions .github/workflows/flake8.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: flake8

on:
push:
branches:
- master
pull_request: {}

jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install flake8
run: |
pip install --upgrade pip
python3.10 -m venv env
source env/bin/activate
pip install flake8
- name: Run black
run: |
env/bin/flake8 bw_add_sshkeys.py
25 changes: 25 additions & 0 deletions .github/workflows/mypy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
name: mypy

on:
push:
branches:
- master
pull_request: {}

jobs:
lint:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install mypy
run: |
pip install --upgrade pip
python3.10 -m venv env
source env/bin/activate
pip install mypy types-setuptools
- name: Run black
run: |
env/bin/mypy bw_add_sshkeys.py
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.pyenv/
.mypy_cache/
env/
146 changes: 74 additions & 72 deletions bw_add_sshkeys.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def bwcli_version() -> str:
Function to return the version of the Bitwarden CLI
"""
proc_version = subprocess.run(
['bw', '--version'],
["bw", "--version"],
stdout=subprocess.PIPE,
universal_newlines=True,
check=True,
Expand All @@ -51,7 +51,7 @@ def cli_supports(feature: str) -> bool:
"""
version = parse_version(bwcli_version())

if feature == 'nointeraction' and version >= parse_version('1.9.0'):
if feature == "nointeraction" and version >= parse_version("1.9.0"):
return True
return False

Expand All @@ -62,23 +62,23 @@ def get_session(session: str) -> str:
"""
# Check for an existing, user-supplied Bitwarden session
if not session:
session = os.environ.get('BW_SESSION', '')
session = os.environ.get("BW_SESSION", "")
if session:
logging.debug('Existing Bitwarden session found')
logging.debug("Existing Bitwarden session found")
return session

# Check if we're already logged in
proc_logged = subprocess.run(['bw', 'login', '--check', '--quiet'], check=True)
proc_logged = subprocess.run(["bw", "login", "--check", "--quiet"], check=True)

if proc_logged.returncode:
logging.debug('Not logged into Bitwarden')
operation = 'login'
logging.debug("Not logged into Bitwarden")
operation = "login"
else:
logging.debug('Bitwarden vault is locked')
operation = 'unlock'
logging.debug("Bitwarden vault is locked")
operation = "unlock"

proc_session = subprocess.run(
['bw', '--raw', operation],
["bw", "--raw", operation],
stdout=subprocess.PIPE,
universal_newlines=True,
check=True,
Expand All @@ -95,42 +95,42 @@ def get_folders(session: str, foldername: str) -> str:
"""
Function to return the ID of the folder that matches the provided name
"""
logging.debug('Folder name: %s', foldername)
logging.debug("Folder name: %s", foldername)

proc_folders = subprocess.run(
['bw', 'list', 'folders', '--search', foldername, '--session', session],
["bw", "list", "folders", "--search", foldername, "--session", session],
stdout=subprocess.PIPE,
universal_newlines=True,
check=True,
encoding = "utf-8",
encoding="utf-8",
)

folders = json.loads(proc_folders.stdout)

if not folders:
logging.error('"%s" folder not found', foldername)
return ''
return ""

# Do we have any folders
if len(folders) != 1:
logging.error('%d folders with the name "%s" found', len(folders), foldername)
return ''
return ""

return str(folders[0]['id'])
return str(folders[0]["id"])


def folder_items(session: str, folder_id: str) -> List[Dict[str, Any]]:
"""
Function to return items from a folder
"""
logging.debug('Folder ID: %s', folder_id)
logging.debug("Folder ID: %s", folder_id)

proc_items = subprocess.run(
['bw', 'list', 'items', '--folderid', folder_id, '--session', session],
["bw", "list", "items", "--folderid", folder_id, "--session", session],
stdout=subprocess.PIPE,
universal_newlines=True,
check=True,
encoding = "utf-8",
encoding="utf-8",
)

data: List[Dict[str, Any]] = json.loads(proc_items.stdout)
Expand All @@ -150,69 +150,69 @@ def add_ssh_keys(
for item in items:
try:
private_key_file = [
k['value'] for k in item['fields'] if k['name'] == keyname
k["value"] for k in item["fields"] if k["name"] == keyname
][0]
except IndexError:
logging.warning('No "%s" field found for item %s', keyname, item['name'])
logging.warning('No "%s" field found for item %s', keyname, item["name"])
continue
except KeyError as error:
logging.debug(
'No key "%s" found in item %s - skipping', error.args[0], item['name']
'No key "%s" found in item %s - skipping', error.args[0], item["name"]
)
continue
logging.debug('Private key file declared')
logging.debug("Private key file declared")

private_key_pw = None
try:
private_key_pw = [
k['value'] for k in item['fields'] if k['name'] == pwkeyname
k["value"] for k in item["fields"] if k["name"] == pwkeyname
][0]
logging.debug('Passphrase declared')
logging.debug("Passphrase declared")
except IndexError:
logging.warning('No "%s" field found for item %s', pwkeyname, item['name'])
logging.warning('No "%s" field found for item %s', pwkeyname, item["name"])
except KeyError as error:
logging.debug(
'No key "%s" found in item %s - skipping', error.args[0], item['name']
'No key "%s" found in item %s - skipping', error.args[0], item["name"]
)

try:
private_key_id = [
k['id']
for k in item['attachments']
if k['fileName'] == private_key_file
k["id"]
for k in item["attachments"]
if k["fileName"] == private_key_file
][0]
except IndexError:
logging.warning(
'No attachment called "%s" found for item %s',
private_key_file,
item['name'],
item["name"],
)
continue
logging.debug('Private key ID found')
logging.debug("Private key ID found")

try:
ssh_add(session, item['id'], private_key_id, private_key_pw)
ssh_add(session, item["id"], private_key_id, private_key_pw)
except subprocess.SubprocessError:
logging.warning('Could not add key to the SSH agent')
logging.warning("Could not add key to the SSH agent")


def ssh_add(session: str, item_id: str, key_id: str, key_pw: Optional[str]) -> None:
"""
Function to get the key contents from the Bitwarden vault
"""
logging.debug('Item ID: %s', item_id)
logging.debug('Key ID: %s', key_id)
logging.debug("Item ID: %s", item_id)
logging.debug("Key ID: %s", key_id)

proc_attachment = subprocess.run(
[
'bw',
'get',
'attachment',
"bw",
"get",
"attachment",
key_id,
'--itemid',
"--itemid",
item_id,
'--raw',
'--session',
"--raw",
"--session",
session,
],
stdout=subprocess.PIPE,
Expand All @@ -233,7 +233,7 @@ def ssh_add(session: str, item_id: str, key_id: str, key_pw: Optional[str]) -> N
logging.debug("Running ssh-add")
# CAVEAT: `ssh-add` provides no useful output, even with maximum verbosity
subprocess.run(
['ssh-add', '-'],
["ssh-add", "-"],
input=ssh_key,
# Works even if ssh-askpass is not installed
env=envdict,
Expand All @@ -242,42 +242,42 @@ def ssh_add(session: str, item_id: str, key_id: str, key_pw: Optional[str]) -> N
)


if __name__ == '__main__':
if __name__ == "__main__":

def parse_args() -> argparse.Namespace:
"""
Function to parse command line arguments
"""
parser = argparse.ArgumentParser()
parser.add_argument(
'-d',
'--debug',
action='store_true',
help='show debug output',
"-d",
"--debug",
action="store_true",
help="show debug output",
)
parser.add_argument(
'-f',
'--foldername',
default='ssh-agent',
help='folder name to use to search for SSH keys',
"-f",
"--foldername",
default="ssh-agent",
help="folder name to use to search for SSH keys",
)
parser.add_argument(
'-c',
'--customfield',
default='private',
help='custom field name where private key filename is stored',
"-c",
"--customfield",
default="private",
help="custom field name where private key filename is stored",
)
parser.add_argument(
'-p',
'--passphrasefield',
default='passphrase',
help='custom field name where key passphrase is stored',
"-p",
"--passphrasefield",
default="passphrase",
help="custom field name where key passphrase is stored",
)
parser.add_argument(
'-s',
'--session',
default='',
help='session key of bitwarden',
"-s",
"--session",
default="",
help="session key of bitwarden",
)

return parser.parse_args()
Expand All @@ -297,24 +297,26 @@ def main() -> None:
logging.basicConfig(level=loglevel)

try:
logging.info('Getting Bitwarden session')
logging.info("Getting Bitwarden session")
session = get_session(args.session)
logging.debug('Session = %s', session)
logging.debug("Session = %s", session)

logging.info('Getting folder list')
logging.info("Getting folder list")
folder_id = get_folders(session, args.foldername)

logging.info('Getting folder items')
logging.info("Getting folder items")
items = folder_items(session, folder_id)

logging.info('Attempting to add keys to ssh-agent')
logging.info("Attempting to add keys to ssh-agent")
add_ssh_keys(session, items, args.customfield, args.passphrasefield)
except subprocess.CalledProcessError as error:
if error.stderr:
logging.error('"%s" error: %s', error.cmd[0], error.stderr)
logging.debug('Error running %s', error.cmd)
logging.debug("Error running %s", error.cmd)

if os.environ.get('SSH_ASKPASS') and os.environ.get('SSH_ASKPASS') == os.path.realpath(__file__):
print(os.environ.get('SSH_KEY_PASSPHRASE'))
if os.environ.get("SSH_ASKPASS") and os.environ.get(
"SSH_ASKPASS"
) == os.path.realpath(__file__):
print(os.environ.get("SSH_KEY_PASSPHRASE"))
else:
main()
9 changes: 9 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[mypy]
python_version = 3.10
warn_return_any = True
disallow_untyped_defs = True
disallow_any_unimported = True
no_implicit_optional = True
check_untyped_defs = True
show_error_codes = True
warn_unused_ignores = True
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[tool.black]
target-version = ['py310']
Loading

0 comments on commit 6035f80

Please sign in to comment.