diff --git a/youqu3/cmd.py b/youqu3/cmd.py index 1552434..f52fb8d 100644 --- a/youqu3/cmd.py +++ b/youqu3/cmd.py @@ -88,8 +88,8 @@ def expect_run( ): """ expect_run( - "ssh username@machine.example.com 'ls -l'", - events={'(?i)password':'secret\\n'} + "ssh username@machine_ip 'ls -l'", + events={'password':'secret\n'} ) 如果 return_code=True,返回 (stdout, return_code) """ @@ -126,17 +126,23 @@ def sudo_run( class RemoteCmd: - def __init__(self, user: str, ip: str, password: str): + def __init__(self, user: str, ip: str, password: str, connect_timeout: int = None): self.user = user self.ip = ip self.password = password + self.connect_timeout = connect_timeout def remote_run(self, cmd: str, return_code: bool = False): try: from fabric import Connection except ImportError: raise exception.YouQuPluginInstalledError("fabric") - c = Connection(host=self.ip, user=self.user, connect_kwargs={'password': self.password}) + c = Connection( + host=self.ip, + user=self.user, + connect_timeout=self.connect_timeout, + connect_kwargs={'password': self.password}, + ) res = c.run(cmd) if return_code: return res.stdout, res.return_code @@ -148,8 +154,11 @@ def remote_sudo_run(self, cmd: str, return_code: bool = False): except ImportError: raise exception.YouQuPluginInstalledError("fabric") c = Connection( - host=self.ip, user=self.user, connect_kwargs={'password': self.password}, - config=Config(overrides={'sudo': {'password': self.password}}) + host=self.ip, + user=self.user, + config=Config(overrides={'sudo': {'password': self.password}}), + connect_timeout=self.connect_timeout, + connect_kwargs={'password': self.password}, ) res = c.sudo(cmd) if return_code: @@ -159,12 +168,3 @@ def remote_sudo_run(self, cmd: str, return_code: bool = False): def remote_expect_run(self): # TODO ... - - -if __name__ == '__main__': - stdout = Cmd.run( - "ls", - interrupt=True, - print_log=False, - ) - print(stdout) diff --git a/youqu3/driver/remote.py b/youqu3/driver/remote.py index f54812b..9af8067 100644 --- a/youqu3/driver/remote.py +++ b/youqu3/driver/remote.py @@ -4,9 +4,11 @@ # SPDX-License-Identifier: GPL-2.0-only import pathlib import re +import socket from concurrent.futures import ALL_COMPLETED from concurrent.futures import ThreadPoolExecutor from concurrent.futures import wait +from copy import deepcopy from youqu3 import logger from youqu3 import setting @@ -47,7 +49,8 @@ def __init__( _c = list(_cli_client_info[0]) if _c[2] == "": _c[2] = setting.PASSWORD - self.cli_clients[f"client{index + 1}"] = _c + connected = self.check_remote_connected(*_c) + self.cli_clients[f"client{index + 1}{f'' if connected else '-X'}"] = _c else: self.cli_groups = {} groups = re.findall(r'\{(.*?)\}', self.clients) @@ -59,7 +62,8 @@ def __init__( _c = list(_cli_client_info[0]) if _c[2] == "": _c[2] = setting.PASSWORD - cli_clients[f"client{client_index + 1}"] = _c + connected = self.check_remote_connected(*_c) + cli_clients[f"client{client_index + 1}{f'' if connected else '-X'}"] = _c self.cli_groups[f"group{group_index + 1}"] = cli_clients self.server_rootdir = pathlib.Path(".").absolute() @@ -69,16 +73,26 @@ def __init__( self.client_html_report_path = lambda x: f"{self.client_report_path(x)}/html" self.client_json_report_path = lambda x: f"{self.client_report_path(x)}/json" - self.rsync = "rsync -av -e ssh -o StrictHostKeyChecking=no" + self.rsync = 'rsync -av -e "ssh -o StrictHostKeyChecking=no"' self.empty = "> /dev/null 2>&1" self.collection_json = False self.server_json_dir_id = None + self.inside_filepath = None from funnylog.conf import setting as log_setting - log_setting.LOG_FILE_PATH = self.server_rootdir + def check_remote_connected(self, user, _ip, password): + logger.info(f"Checking remote: {user, _ip, password}") + try: + _, return_code = RemoteCmd(user, _ip, password, connect_timeout=1).remote_run("who", return_code=True) + if return_code == 0: + return True + return False + except socket.timeout: + return False + def send_code(self, user, _ip, password): logger.info(f"开始发送代码到测试机 - < {user}@{_ip} >") RemoteCmd(user, _ip, password).remote_sudo_run(f"rm -rf {self.client_rootdir(user)}") @@ -101,12 +115,14 @@ def send_code(self, user, _ip, password): ]: exclude += f"--exclude='{i}' " _, return_code = Cmd.expect_run( - f"{self.rsync} {exclude} {self.server_rootdir}/* {user}@{_ip}:{self.client_rootdir(user)}/", - events={'(?i)password': f'{password}\\n'} + f"/bin/bash -c '{self.rsync} --include='.env' {exclude} {self.server_rootdir}/* {user}@{_ip}:{self.client_rootdir(user)}/'", + events={'password': f'{password}\n'}, + return_code=True ) - _, return_code = Cmd.expect_run( - f"{self.rsync} {exclude} {self.server_rootdir}/.env {user}@{_ip}:{self.client_rootdir(user)}/", - events={'(?i)password': f'{password}\\n'} + a, return_code = Cmd.expect_run( + f"/bin/bash -c '{self.rsync} {self.server_rootdir}/.env {user}@{_ip}:{self.client_rootdir(user)}/'", + events={'password': f'{password}\n'}, + return_code=True ) logger.info(f"代码发送{'成功' if return_code == 0 else '失败'} - < {user}@{_ip} >") @@ -132,37 +148,47 @@ def send_code_and_env(self, user, _ip, password): def makedirs(dirs): pathlib.Path(dirs).mkdir(parents=True, exist_ok=True) + def get_back_report(self, user, _ip, password): + server_html_path = f"{self.server_rootdir}/report/remote/{setting.TIME_STRING}_{_ip}_{self.rootdir_name}" + self.makedirs(server_html_path) + Cmd.expect_run( + f"/bin/bash -c '{self.rsync} {user}@{_ip}:{self.client_report_path(user)}/* {server_html_path}/'", + events={'password': f'{password}\n'}, + return_code=True + ) + def get_back_all_report(self, client_list, clients): - def get_back(user, _ip, password): - server_html_path = f"{self.server_rootdir}/report/remote/{setting.TIME_STRING}_{_ip}_{self.rootdir_name}" - self.makedirs(server_html_path) - Cmd.run( - f"{self.rsync % password} {user}@{_ip}:{self.client_report_path(user)}/* {server_html_path}/ {self.empty}") if len(clients) >= 2: _ps = [] executor = ThreadPoolExecutor() for client in client_list[:-1]: user, _ip, password = clients.get(client) - _p4 = executor.submit(get_back, user, _ip, password) + _p4 = executor.submit(self.get_back_report, user, _ip, password) _ps.append(_p4) sleep(2) user, _ip, password = clients.get(client_list[-1]) - get_back(user, _ip, password) + self.get_back_report(user, _ip, password) wait(_ps, return_when=ALL_COMPLETED) else: user, _ip, password = clients.get(client_list[0]) - get_back(user, _ip, password) + self.get_back_report(user, _ip, password) def changdir_remote_cmd(self, user): - return ["cd", f"{self.client_rootdir(user)}/", "&&"] + return [ + "export PATH=$PATH:$HOME/.local/bin;", + "cd", + f"{self.client_rootdir(user)}/", "&&" + ] @property def generate_cmd(self): cmd = ["youqu3-cargo", "run"] if self.filepath: - cmd.append(self.filepath) + cmd.append(f"'{self.filepath}'") + if self.inside_filepath: + cmd.append(f"'{self.inside_filepath}'") if self.keywords: cmd.extend(["-k", f"'{self.keywords}'"]) if self.tags: @@ -171,6 +197,9 @@ def generate_cmd(self): return cmd def run_test(self, user, _ip, password): + a = " ".join( + self.changdir_remote_cmd(user) + self.generate_cmd + ) RemoteCmd(user, _ip, password).remote_run( " ".join( self.changdir_remote_cmd(user) + self.generate_cmd @@ -193,9 +222,13 @@ def get_collection_only_cases(self): collection_cases = set(_collection_cases) return collection_cases - def parallel_run(self, clients): + def parallel_run(self, client_name_list, clients): _ps = [] executor = ThreadPoolExecutor() + all_clients_name_list = deepcopy(list(clients.keys())) + for i in all_clients_name_list: + if i not in client_name_list: + clients.pop(i) for client in list(clients.keys())[:-1]: user, _ip, password = clients.get(client) _p3 = executor.submit(self.run_test, user, _ip, password) @@ -225,46 +258,70 @@ def run(self): Cmd.run(f"rm -rf ~/.ssh/known_hosts {self.empty}") if self.group_type: - print("远程测试机列表".center(54)) - print("-" * 58) + print("┏" + "━" * 56 + "┓") + print("┃" + "YOUQU3 REMOTE DRIVER".center(56) + "┃") + print("┣" + "━" * 56 + "┫") print( - f"|{'GROUPS'.center(8)}|{'CLIENTS'.center(9)}|{'USER'.center(10)}|{'IP'.center(15)}|{'PASSWORD'.center(10)}|") - print("-" * 58) + f"┃{'GROUPS'.center(8)}┃{'CLIENTS'.center(9)}┃{'USER'.center(10)}┃{'IP'.center(15)}┃{'PASSWORD'.center(10)}┃") for group, clients in self.cli_groups.items(): + print("┣" + "━" * 56 + "┫") for c, (user, _ip, password) in clients.items(): - print(f"|{group.center(8)}|{c.center(9)}|{user.center(10)}|{_ip.center(15)}|{password.center(10)}|") - print("-" * 58) - - for group, clients in self.cli_groups.items(): - client_list = list(clients.keys()) - if len(client_list) > 1: - # TODO - cases = self.get_collection_only_cases - def split_list(lst, n): - k, m = divmod(len(lst), n) - return [lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n)] + print(f"┃{group.center(8)}┃{c.center(9)}┃{user.center(10)}┃{_ip.center(15)}┃{password.center(10)}┃") + print("┗" + "━" * 56 + "┛") - client_case_list = split_list(list(cases), len(client_list)) - client_case_map = dict(zip(client_list, client_case_list)) - print(1) + def split_case(lst, n): + k, m = divmod(len(lst), n) + return [lst[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n)] + task = [] + collection_cases = self.get_collection_only_cases + executor = ThreadPoolExecutor() + for group, clients in self.cli_groups.items(): + _client_name_list = list(clients.keys()) + client_name_list = [i for i in _client_name_list if not i.endswith("-X")] + if len(client_name_list) > 1: + client_cases_map = dict( + zip(client_name_list, split_case(list(collection_cases), len(client_name_list))) + ) + for client_name in client_name_list: + # self.group_client_worker(client_cases_map, client_name, clients) + t = executor.submit(self.group_client_worker, client_cases_map, client_name, clients) + task.append(t) else: - self.mul_do(self.send_code_and_env, client_list, clients) - self.parallel_run(clients) - self.get_back_all_report(client_list, clients) + if client_name_list: + # self.client_worker(client_name_list, clients) + t = executor.submit(self.client_worker, client_name_list, clients) + task.append(t) + wait(task, return_when=ALL_COMPLETED) else: - print("远程测试机列表".center(47)) - print("-" * 49) - print(f"|{'CLIENTS'.center(9)}|{'USER'.center(10)}|{'IP'.center(15)}|{'PASSWORD'.center(10)}|") - print("-" * 49) + print("┏" + "━" * 47 + "┓") + print("┃" + "YOUQU3 REMOTE DRIVER".center(47) + "┃") + print("┣" + "━" * 47 + "┫") + print(f"┃{'CLIENTS'.center(9)}┃{'USER'.center(10)}┃{'IP'.center(15)}┃{'PASSWORD'.center(10)}┃") + print("┣" + "━" * 47 + "┫") for c, (user, _ip, password) in self.cli_clients.items(): - print(f"|{c.center(9)}|{user.center(10)}|{_ip.center(15)}|{password.center(10)}|") - print("-" * 49) - - client_list = list(self.cli_clients.keys()) - self.mul_do(self.send_code_and_env, client_list, self.cli_clients) - self.parallel_run(self.cli_clients) - self.get_back_all_report(client_list, self.cli_clients) + print(f"┃{c.center(9)}┃{user.center(10)}┃{_ip.center(15)}┃{password.center(10)}┃") + print("┗" + "━" * 47 + "┛") + + _client_name_list = list(self.cli_clients.keys()) + client_name_list = [i for i in _client_name_list if not i.endswith("-X")] + self.mul_do(self.send_code_and_env, client_name_list, self.cli_clients) + self.parallel_run(client_name_list, self.cli_clients) + self.get_back_all_report(client_name_list, self.cli_clients) + + def client_worker(self, client_name_list, clients): + client_info = clients.get(client_name_list[0]) + self.send_code_and_env(*client_info) + self.run_test(*client_info) + self.get_back_report(*client_info) + + def group_client_worker(self, client_cases_map, client_name, clients): + client_info = clients.get(client_name) + client_cases = client_cases_map.get(client_name) + self.inside_filepath = " ".join(client_cases) + self.send_code_and_env(*client_info) + self.run_test(*client_info) + self.get_back_report(*client_info)