diff --git a/flybirds/core/dsl/globalization/i18n.py b/flybirds/core/dsl/globalization/i18n.py index f64961ef..c576e502 100644 --- a/flybirds/core/dsl/globalization/i18n.py +++ b/flybirds/core/dsl/globalization/i18n.py @@ -89,6 +89,8 @@ "screenshot": ["全屏截图"], "ocr": ["全屏扫描"], "change ocr lang [{param}]": ["切换OCR语言[{param}]"], + "exist image [{param}]": ["存在图像[{param}]"], + "not exist image [{param}]": ["不存在图像[{param}]"], "information association of failed operation, run the {param1} time" " :[{param2}]": ["失败运行的信息关联,运行第{param1}次:[{param2}]"], "text[{selector}]property[{param2}]is {param3}": [ @@ -100,6 +102,7 @@ "click[{selector}]": ["点击[{selector}]"], "click text[{selector}]": ["点击文案[{selector}]"], "click ocr text[{selector}]": ["点击扫描文案[{selector}]"], + "click image[{selector}]": ["点击图像[{selector}]"], "click position[{x},{y}]": ["点击屏幕位置[{x},{y}]"], "in[{selector}]input[{param2}]": ["在[{selector}]中输入[{param2}]"], "in ocr[{selector}]input[{param2}]": ["在扫描文字[{selector}]中输入[{param2}]"], diff --git a/flybirds/core/dsl/step/element.py b/flybirds/core/dsl/step/element.py index 7de2cdcd..90487292 100644 --- a/flybirds/core/dsl/step/element.py +++ b/flybirds/core/dsl/step/element.py @@ -71,6 +71,17 @@ def click_ocr_text(context, selector=None): g_Context.step.click_ocr_text(context, selector) +@step("click image[{selector}]") +@ele_wrap +def click_image(context, selector=None): + """ + Click on the image + :param context: step context + :param selector: locator string for text element (or None). + """ + g_Context.step.click_image(context, selector) + + @step("click position[{x},{y}]") @ele_wrap def click_coordinates(context, x=None, y=None): @@ -383,3 +394,13 @@ def find_text_from_parent(context, p_selector=None, c_selector=None, """ g_Context.step.find_text_from_parent(context, p_selector, c_selector, param3) + +@step("exist image [{param}]") +def img_exist(context, param): + g_Context.step.img_exist(context, param) + + +@step("not exist image [{param}]") +def img_not_exist(context, param): + g_Context.step.img_not_exist(context, param) + diff --git a/flybirds/core/plugin/plugins/default/__init__.py b/flybirds/core/plugin/plugins/default/__init__.py index e69de29b..af51f261 100644 --- a/flybirds/core/plugin/plugins/default/__init__.py +++ b/flybirds/core/plugin/plugins/default/__init__.py @@ -0,0 +1,3 @@ +#! usr/bin/python +# -*- coding:utf-8 -*- +from .ui_driver import SIFT diff --git a/flybirds/core/plugin/plugins/default/app_base_step.py b/flybirds/core/plugin/plugins/default/app_base_step.py index 96039d6d..7aabba40 100644 --- a/flybirds/core/plugin/plugins/default/app_base_step.py +++ b/flybirds/core/plugin/plugins/default/app_base_step.py @@ -31,6 +31,12 @@ def init_device(self, context, param=None): def change_ocr_lang(self, context, param=None): step_common.change_ocr_lang(context, lang=param) + def img_exist(self, context, param): + step_verify.img_exist(context, param) + + def img_not_exist(self, context, param): + step_verify.img_not_exist(context, param) + def connect_device(self, context, param): step_app.connect_device(context, param) @@ -61,6 +67,9 @@ def click_coordinates(self, context, x, y): def click_ocr_text(self, context, selector): step_click.click_ocr_text(context, selector) + def click_image(self, context, selector): + step_click.click_image(context, selector) + def sleep(self, context, param): step_common.sleep(context, param) diff --git a/flybirds/core/plugin/plugins/default/screen.py b/flybirds/core/plugin/plugins/default/screen.py index 34bb48a7..6fd07813 100644 --- a/flybirds/core/plugin/plugins/default/screen.py +++ b/flybirds/core/plugin/plugins/default/screen.py @@ -5,8 +5,9 @@ import os import time import traceback +from baseImage import Image from base64 import b64decode -from PIL import Image +from .ui_driver import SIFT import flybirds.core.global_resource as gr import flybirds.utils.file_helper as file_helper @@ -46,7 +47,7 @@ def screen_shot(path): log.info("[screen_shot] screen shot end!") @staticmethod - def screen_link_to_behave(scenario, step_index, tag=None): + def screen_link_to_behave(scenario, step_index, tag=None, link=True): """ screenshot address and linked to the tag The label information is placed in the description of the scene, @@ -90,17 +91,21 @@ def screen_link_to_behave(scenario, step_index, tag=None): 'embeddingsTags, stepIndex={}, '.format(step_index, src_path) ) - scenario.description.append(data) + if link is True: + scenario.description.append(data) screen_path = os.path.join(current_screen_dir, file_name) g_Context.screen.screen_shot(screen_path) return screen_path @staticmethod def image_ocr(img_path): + """ + Take a screenshot and ocr + """ log.info(f"[image ocr path] image path is:{img_path}") ocr = g_Context.ocr_driver_instance g_Context.ocr_result = ocr.ocr(img_path, cls=True) - g_Context.image_size = Image.open(img_path).size + g_Context.image_size = Image(img_path).size log.info(f"[image ocr path] image size is:{g_Context.image_size}") for line in g_Context.ocr_result: log.info(f"[image ocr result] scan line info is:{line}") @@ -113,3 +118,19 @@ def image_ocr(img_path): # log.info(f"[image ocr result] scan txt info is:{txt}") # score = line[1][1] # log.info(f"[image ocr result] scan score info is:{score}") + + + @staticmethod + def image_verify(img_source_path, img_search_path): + """ + Take a screenshot and verify image + """ + match = SIFT() + img_source = Image(img_source_path) + img_search = Image(img_search_path) + + result = match.find_all_results(img_source, img_search) + return result + + + diff --git a/flybirds/core/plugin/plugins/default/screen_record.py b/flybirds/core/plugin/plugins/default/screen_record.py index 2440598d..4e82718f 100644 --- a/flybirds/core/plugin/plugins/default/screen_record.py +++ b/flybirds/core/plugin/plugins/default/screen_record.py @@ -168,7 +168,7 @@ def stop_record(self): return "No recording service" message = "" - proc.wait() + proc.wait(timeout=30) proc_code = proc.poll() if proc_code is None: proc.terminate() diff --git a/flybirds/core/plugin/plugins/default/step/click.py b/flybirds/core/plugin/plugins/default/step/click.py index cd8c61ca..cd71c337 100644 --- a/flybirds/core/plugin/plugins/default/step/click.py +++ b/flybirds/core/plugin/plugins/default/step/click.py @@ -8,6 +8,7 @@ import flybirds.utils.dsl_helper as dsl_helper from flybirds.core.global_context import GlobalContext as g_Context from flybirds.core.plugin.plugins.default.step.verify import ocr_txt_exist +from flybirds.core.plugin.plugins.default.step.common import img_verify def click_ele(context, param): @@ -115,7 +116,17 @@ def click_ocr_text(context, param): x = (box[0][0] + box[1][0]) / 2 y = (box[0][1] + box[2][1]) / 2 poco_instance = gr.get_value("pocoInstance") - x_coordinate = float(x) / g_Context.image_size[0] - y_coordinate = float(y) / g_Context.image_size[1] + x_coordinate = float(x) / g_Context.image_size[1] + y_coordinate = float(y) / g_Context.image_size[0] poco_instance.click([x_coordinate, y_coordinate]) + +def click_image(context, param): + result = img_verify(context, param) + x = result[0].get('rect').x + result[0].get('rect').width / 2 + y = result[0].get('rect').y + result[0].get('rect').height / 2 + poco_instance = gr.get_value("pocoInstance") + x_coordinate = float(x) / g_Context.image_size[1] + y_coordinate = float(y) / g_Context.image_size[0] + poco_instance.click([x_coordinate, y_coordinate]) + diff --git a/flybirds/core/plugin/plugins/default/step/common.py b/flybirds/core/plugin/plugins/default/step/common.py index 8ed2612b..2c0252c9 100644 --- a/flybirds/core/plugin/plugins/default/step/common.py +++ b/flybirds/core/plugin/plugins/default/step/common.py @@ -25,13 +25,13 @@ def screenshot(context): def ocr(context): step_index = context.cur_step_index - 1 - image_path = BaseScreen.screen_link_to_behave(context.scenario, step_index, "screen_") + image_path = BaseScreen.screen_link_to_behave(context.scenario, step_index, "screen_", False) BaseScreen.image_ocr(image_path) def change_ocr_lang(context,lang=None): """ - init ocr + change ocr language """ ocr_instance = ui_driver.init_ocr(lang) gr.set_value("ocrInstance", ocr_instance) @@ -84,3 +84,14 @@ def prev_fail_scenario_relevance(context, param1, param2): except Exception: log.warn("rerun failed senario error") log.warn(traceback.format_exc()) + + +def img_verify(context, search_image_path): + """ + verify image exist or not + """ + step_index = context.cur_step_index - 1 + source_image_path = BaseScreen.screen_link_to_behave(context.scenario, step_index, "screen_", False) + result = BaseScreen.image_verify(source_image_path, search_image_path) + return result + diff --git a/flybirds/core/plugin/plugins/default/step/verify.py b/flybirds/core/plugin/plugins/default/step/verify.py index e7122a8a..50d029b1 100644 --- a/flybirds/core/plugin/plugins/default/step/verify.py +++ b/flybirds/core/plugin/plugins/default/step/verify.py @@ -6,6 +6,7 @@ import time import flybirds.core.global_resource as gr +import flybirds.utils.flybirds_log as log from flybirds.core.global_context import GlobalContext as g_Context import flybirds.core.plugin.plugins.default.ui_driver.poco.poco_ele \ as poco_ele @@ -16,7 +17,7 @@ import flybirds.utils.dsl_helper as dsl_helper import flybirds.utils.verify_helper as verify from flybirds.core.exceptions import FlybirdVerifyException -from flybirds.core.plugin.plugins.default.step.common import ocr +from flybirds.core.plugin.plugins.default.step.common import ocr,img_verify def wait_text_exist(context, param): @@ -333,3 +334,39 @@ def paddle_fix_txt(txt): return txt +def img_exist(context, param): + start = time.time() + step_index = context.cur_step_index - 1 + result = img_verify(context, param) + if len(result) == 0: + src_path = "../../../{}".format(param) + data = ( + 'embeddingsTags, stepIndex={}, '.format(step_index, src_path) + ) + context.scenario.description.append(data) + # context.cur_step_index += 1 + raise Exception("[image exist verify] image not found !") + else: + log.info(f"[image exist verify] cost time:{time.time() - start}") + log.info(f"[image exist verify] result:{result}") + + +def img_not_exist(context, param): + start = time.time() + step_index = context.cur_step_index - 1 + result = img_verify(context, param) + if len(result) == 0: + log.info(f"[image not exist verify] cost time:{time.time() - start}") + log.info(f"[image not exist verify] result:{result}") + else: + src_path = "../../../{}".format(param) + data = ( + 'embeddingsTags, stepIndex={}, '.format(step_index, src_path) + ) + context.scenario.description.append(data) + # context.cur_step_index += 1 + raise Exception("[image not exist verify] image found !") + + diff --git a/flybirds/core/plugin/plugins/default/ui_driver/__init__.py b/flybirds/core/plugin/plugins/default/ui_driver/__init__.py index e69de29b..d9f0ce05 100644 --- a/flybirds/core/plugin/plugins/default/ui_driver/__init__.py +++ b/flybirds/core/plugin/plugins/default/ui_driver/__init__.py @@ -0,0 +1,3 @@ +#! usr/bin/python +# -*- coding:utf-8 -*- +from .opencv import SIFT diff --git a/flybirds/core/plugin/plugins/default/ui_driver/opencv/__init__.py b/flybirds/core/plugin/plugins/default/ui_driver/opencv/__init__.py new file mode 100644 index 00000000..3049b31f --- /dev/null +++ b/flybirds/core/plugin/plugins/default/ui_driver/opencv/__init__.py @@ -0,0 +1,7 @@ +#! usr/bin/python +# -*- coding:utf-8 -*- +from .matchTemplate import MatchTemplate +from .sift import SIFT +from .utils import generate_result, get_keypoint_from_matches, keypoint_distance, rectangle_transform +from .exceptions import NoEnoughPointsError, PerspectiveTransformError, HomographyError, MatchResultError, InputImageError +from .base import BaseKeypoint diff --git a/flybirds/core/plugin/plugins/default/ui_driver/opencv/base.py b/flybirds/core/plugin/plugins/default/ui_driver/opencv/base.py new file mode 100644 index 00000000..d854e976 --- /dev/null +++ b/flybirds/core/plugin/plugins/default/ui_driver/opencv/base.py @@ -0,0 +1,591 @@ +#! usr/bin/python +# -*- coding:utf-8 -*- +import cv2 +import numpy as np +from baseImage import Image, Rect + +from .matchTemplate import MatchTemplate +from .utils import (generate_result, get_keypoint_from_matches, keypoint_distance, rectangle_transform) +from .exceptions import (NoEnoughPointsError, PerspectiveTransformError, HomographyError, MatchResultError, + InputImageError) +from typing import List + + +class BaseKeypoint(object): + FILTER_RATIO = 1 + METHOD_NAME = None + Dtype = None + Place = None + template = MatchTemplate() + + def __init__(self, threshold=0.8, rgb=True, **kwargs): + """ + init + + Args: + threshold: recognition threshold(0~1) + rgb: Whether to use rgb channel for verification + """ + self.threshold = threshold + self.rgb = rgb + self.detector = self.create_detector(**kwargs) + self.matcher = self.create_matcher(**kwargs) + + def create_matcher(self, **kwargs): + raise NotImplementedError + + def create_detector(self, **kwargs): + raise NotImplementedError + + def find_best_result(self, im_source, im_search, threshold=None, rgb=None, **kwargs): + """ + Through feature point matching, find the range that best matches im_search in im_source + + Args: + im_source: image to be matched + im_search: image template + threshold: recognition threshold(0~1) + rgb: Whether to use rgb channel for verification + + Returns: + + """ + max_count = 1 + ret = self.find_all_results(im_source=im_source, im_search=im_search, threshold=threshold, rgb=rgb, + max_count=max_count, **kwargs) + if ret: + return ret[0] + return None + + def find_all_results(self, im_source, im_search, threshold=None, rgb=None, max_count=10, max_iter_counts=20, + distance_threshold=150): + """ + Through feature point matching, find all the ranges that match im_search in im_source + + Args: + im_source: the image to be matched + im_search: image template + threshold: recognition threshold (0~1) + rgb: whether to use the rgb channel for verification + max_count: the maximum number of matches that can be returned + max_iter_counts: The maximum number of searches, which needs to be greater than max_count + distance_threshold: distance threshold, after the feature point (first_point) is greater than + the threshold, no subsequent screening will be done + + Returns: + + """ + threshold = self.threshold if threshold is None else threshold + rgb = self.rgb if rgb is None else rgb + + im_source, im_search = self.input_image_check(im_source, im_search) + result = [] + if im_source.channels == 1: + rgb = False + + kp_src, des_src = self.get_keypoint_and_descriptor(image=im_source) + kp_sch, des_sch = self.get_keypoint_and_descriptor(image=im_search) + + kp_src, kp_sch = list(kp_src), list(kp_sch) + # In the feature point set, match the closest feature point + matches = np.array(self.match_keypoint(des_sch=des_sch, des_src=des_src)) + kp_sch_point = np.array([(kp.pt[0], kp.pt[1], kp.angle) for kp in kp_sch]) + kp_src_matches_point = np.array([[(*kp_src[dMatch.trainIdx].pt, kp_src[dMatch.trainIdx].angle) + if dMatch else np.nan for dMatch in match] for match in matches]) + _max_iter_counts = 0 + src_pop_list = [] + while True: + # not use of matches to judge nan, because the type is wrong + if (np.count_nonzero(~np.isnan(kp_src_matches_point)) == 0) or (len(result) == max_count) or ( + _max_iter_counts >= max_iter_counts): + break + _max_iter_counts += 1 + filtered_good_point, angle, first_point = self.filter_good_point(matches=matches, kp_src=kp_src, + kp_sch=kp_sch, + kp_sch_point=kp_sch_point, + kp_src_matches_point=kp_src_matches_point) + if first_point.distance > distance_threshold: + break + + rect, confidence = None, 0 + try: + rect, confidence = self.extract_good_points(im_source=im_source, im_search=im_search, kp_src=kp_src, + kp_sch=kp_sch, good=filtered_good_point, angle=angle, + rgb=rgb) + # print(f'good:{len(filtered_good_point)}, rect={rect}, confidence={confidence}') + except PerspectiveTransformError: + pass + finally: + + if rect and confidence >= threshold: + br, tl = rect.br, rect.tl + for index, match in enumerate(kp_src_matches_point): + x, y = match[:, 0], match[:, 1] + flag = np.argwhere((x < br.x) & (x > tl.x) & (y < br.y) & (y > tl.y)) + for _index in flag: + src_pop_list.append(matches[index, _index][0].trainIdx) + kp_src_matches_point[index, _index, :] = np.nan + matches[index, _index] = np.nan + result.append(generate_result(rect, confidence)) + else: + for match in filtered_good_point: + flags = np.argwhere(matches[match.queryIdx, :] == match) + for _index in flags: + kp_src_matches_point[match.queryIdx, _index, :] = np.nan + matches[match.queryIdx, _index] = np.nan + return result + + def get_keypoint_and_descriptor(self, image): + """ + Get image keypoint (keypoint) and descriptor (descriptor) + + Args: + image: Grayscale image to be detected + + Returns: + + """ + if image.channels == 3: + image = image.cvtColor(cv2.COLOR_BGR2GRAY).data + else: + image = image.data + keypoint, descriptor = self.detector.detectAndCompute(image, None) + + if len(keypoint) < 2: + raise NoEnoughPointsError('{} detect not enough feature points in input images'.format(self.METHOD_NAME)) + return keypoint, descriptor + + @staticmethod + def filter_good_point(matches, kp_src, kp_sch, kp_sch_point, kp_src_matches_point): + """ Filter the sweet spot """ + # Assume that the first point and the point with the smallest distance are the reference points + sort_list = [sorted(match, key=lambda x: x is np.nan and float('inf') or x.distance)[0] + for match in matches] + sort_list = [v for v in sort_list if v is not np.nan] + + first_good_point: cv2.DMatch = sorted(sort_list, key=lambda x: x.distance)[0] + first_good_point_train: cv2.KeyPoint = kp_src[first_good_point.trainIdx] + first_good_point_query: cv2.KeyPoint = kp_sch[first_good_point.queryIdx] + first_good_point_angle = first_good_point_train.angle - first_good_point_query.angle + + def get_points_origin_angle(point_x, point_y, offset): + points_origin_angle = np.arctan2( + (point_y - offset.pt[1]), + (point_x - offset.pt[0]) + ) * 180 / np.pi + + points_origin_angle = np.where( + points_origin_angle == 0, + points_origin_angle, points_origin_angle - offset.angle + ) + points_origin_angle = np.where( + points_origin_angle >= 0, + points_origin_angle, points_origin_angle + 360 + ) + return points_origin_angle + + # Calculate the rotation angle between this point and other feature points on the template image + first_good_point_sch_origin_angle = get_points_origin_angle(kp_sch_point[:, 0], kp_sch_point[:, 1], + first_good_point_query) + + # Calculate the angle between the point and other feature points in the target image + kp_sch_rotate_angle = kp_sch_point[:, 2] + first_good_point_angle + kp_sch_rotate_angle = np.where(kp_sch_rotate_angle >= 360, kp_sch_rotate_angle - 360, kp_sch_rotate_angle) + kp_sch_rotate_angle = kp_sch_rotate_angle.reshape(kp_sch_rotate_angle.shape + (1,)) + + kp_src_angle = kp_src_matches_point[:, :, 2] + good_point = np.array([matches[index][array[0]] for index, array in + enumerate(np.argsort(np.abs(kp_src_angle - kp_sch_rotate_angle)))]) + + # Calculate the rotation angle of each point with first_good_point as the origin + good_point_nan = (np.nan, np.nan) + good_point_pt = np.array([good_point_nan if dMatch is np.nan else (*kp_src[dMatch.trainIdx].pt,) + for dMatch in good_point]) + good_point_origin_angle = get_points_origin_angle(good_point_pt[:, 0], good_point_pt[:, 1], + first_good_point_train) + threshold = round(5 / 360, 2) * 100 + point_bool = (np.abs(good_point_origin_angle - first_good_point_sch_origin_angle) / 360) * 100 < threshold + _, index = np.unique(good_point_pt[point_bool], return_index=True, axis=0) + good = good_point[point_bool] + good = good[index] + return good, int(first_good_point_angle), first_good_point + + def match_keypoint(self, des_sch, des_src, k=10): + """ + Feature point matching + + Args: + des_src: descriptor set of the image to be matched + des_sch: descriptor set for image templates + k(int): how many matching points to get + + Returns: + List[List[cv2.DMatch]]: contains the best matching descriptor + """ + # k=2 means that each feature point takes out the 2 most matching corresponding points + matches = self.matcher.knnMatch(des_sch, des_src, k) + return matches + + def get_good_in_matches(self, matches): + """ + Feature point filtering + + Args: + matches: Feature point set + + Returns: + List[cv2.DMatch]: Filtered descriptor set + """ + if not matches: + return None + good = [] + for match_index in range(len(matches)): + match = matches[match_index] + for DMatch_index in range(len(match)): + if match[DMatch_index].distance <= self.FILTER_RATIO * match[-1].distance: + good.append(match[DMatch_index]) + return good + + def extract_good_points(self, im_source, im_search, kp_src, kp_sch, good, angle, rgb): + """ + According to the number of matching points (good), extract the recognition area + + Args: + im_source: the image to be matched + im_search: image template + kp_src: keypoint set + kp_sch: keypoint set + good: descriptor set + angle: rotation angle + rgb: whether to use the rgb channel for verification + + Returns: + range, and confidence + """ + len_good = len(good) + confidence, rect, target_img = None, None, None + + if len_good == 0: + pass + elif len_good == 1: + target_img, rect = self._handle_one_good_points(im_source=im_source, im_search=im_search, + kp_sch=kp_sch, kp_src=kp_src, good=good, angle=angle) + elif len_good == 2: + target_img, rect = self._handle_two_good_points(im_source=im_source, im_search=im_search, + kp_sch=kp_sch, kp_src=kp_src, good=good, angle=angle) + elif len_good == 3: + target_img, rect = self._handle_three_good_points(im_source=im_source, im_search=im_search, + kp_sch=kp_sch, kp_src=kp_src, good=good, angle=angle) + else: # len > 4 + target_img, rect = self._handle_many_good_points(im_source=im_source, im_search=im_search, + kp_sch=kp_sch, kp_src=kp_src, good=good) + + if target_img: + confidence = self._cal_confidence(im_source=im_search, im_search=target_img, rgb=rgb) + + return rect, confidence + + def _handle_one_good_points(self, im_source, im_search, kp_src, kp_sch, good, angle): + """ + When the number of feature point matching is equal to 1, the rectangle is scaled according + to the size of the feature point, and the recognized target image is obtained according to + the rotation angle. + + Args: + im_source: the image to be matched + im_search: image template + kp_sch: keypoint set + kp_src: keypoint set + good: descriptor set + angle: rotation angle + + Returns: + Image to be verified + """ + sch_point = get_keypoint_from_matches(kp=kp_sch, matches=good, mode='query')[0] + src_point = get_keypoint_from_matches(kp=kp_src, matches=good, mode='train')[0] + + scale = src_point.size / sch_point.size + h, w = im_search.size + _h, _w = h * scale, w * scale + src = np.float32(rectangle_transform(point=sch_point.pt, size=(h, w), mapping_point=src_point.pt, + mapping_size=(_h, _w), angle=angle)) + dst = np.float32([[0, 0], [w, 0], [0, h], [w, h]]) + output = self._perspective_transform(im_source=im_source, im_search=im_search, src=src, dst=dst) + rect = self._get_perspective_area_rect(im_source=im_source, src=src) + return output, rect + + def _handle_two_good_points(self, im_source, im_search, kp_src, kp_sch, good, angle): + """ + When the number of feature points matching is equal to 2, the rectangle is scaled + according to the distance difference between the two points, and the recognized target + image is obtained according to the rotation angle. + + Args: + im_source: the image to be matched + im_search: image template + kp_sch: keypoint set + kp_src: keypoint set + good: descriptor set + angle: rotation angle + + Returns: + Image to be verified + """ + sch_point = get_keypoint_from_matches(kp=kp_sch, matches=good, mode='query') + src_point = get_keypoint_from_matches(kp=kp_src, matches=good, mode='train') + + sch_distance = keypoint_distance(sch_point[0], sch_point[1]) + src_distance = keypoint_distance(src_point[0], src_point[1]) + + try: + scale = src_distance / sch_distance # Calculate the zoom size + except ZeroDivisionError: + if src_distance == sch_distance: + scale = 1 + else: + return None, None + + h, w = im_search.size + _h, _w = h * scale, w * scale + src = np.float32(rectangle_transform(point=sch_point[0].pt, size=(h, w), mapping_point=src_point[0].pt, + mapping_size=(_h, _w), angle=angle)) + dst = np.float32([[0, 0], [w, 0], [0, h], [w, h]]) + output = self._perspective_transform(im_source=im_source, im_search=im_search, src=src, dst=dst) + rect = self._get_perspective_area_rect(im_source=im_source, src=src) + return output, rect + + def _handle_three_good_points(self, im_source, im_search, kp_src, kp_sch, good, angle): + """ + When the number of feature points matching is equal to 3, the rectangle is scaled + according to the difference in the area of the triangle formed by the three points, + and the recognized target image is obtained according to the rotation angle. + + Args: + im_source: the image to be matched + im_search: image template + kp_sch: keypoint set + kp_src: keypoint set + good: descriptor set + angle: rotation angle + + Returns: + Image to be verified + """ + sch_point = get_keypoint_from_matches(kp=kp_sch, matches=good, mode='query') + src_point = get_keypoint_from_matches(kp=kp_src, matches=good, mode='train') + + def _area(point_list): + p1_2 = keypoint_distance(point_list[0], point_list[1]) + p1_3 = keypoint_distance(point_list[0], point_list[2]) + p2_3 = keypoint_distance(point_list[1], point_list[2]) + + s = (p1_2 + p1_3 + p2_3) / 2 + area = (s * (s - p1_2) * (s - p1_3) * (s - p2_3)) ** 0.5 + return area + + sch_area = _area(sch_point) + src_area = _area(src_point) + + try: + scale = src_area / sch_area # Calculate the zoom size + except ZeroDivisionError: + if sch_area == src_area: + scale = 1 + else: + return None, None + + h, w = im_search.size + _h, _w = h * scale, w * scale + src = np.float32(rectangle_transform(point=sch_point[0].pt, size=(h, w), mapping_point=src_point[0].pt, + mapping_size=(_h, _w), angle=angle)) + dst = np.float32([[0, 0], [w, 0], [0, h], [w, h]]) + output = self._perspective_transform(im_source=im_source, im_search=im_search, src=src, dst=dst) + rect = self._get_perspective_area_rect(im_source=im_source, src=src) + return output, rect + + def _handle_many_good_points(self, im_source, im_search, kp_src, kp_sch, good): + """ + When the number of feature point matching is >= 4, + use single matrix mapping to obtain the recognized target image + + Args: + im_source: the image to be matched + im_search: image template + kp_sch: keypoint set + kp_src: keypoint set + good: descriptor set + + Returns: + Perspective transformed image + """ + + sch_pts, img_pts = np.float32([kp_sch[m.queryIdx].pt for m in good]).reshape( + -1, 1, 2), np.float32([kp_src[m.trainIdx].pt for m in good]).reshape(-1, 1, 2) + # M is the transformation matrix + M, mask = self._find_homography(sch_pts, img_pts) + # Calculate the transformed coordinates of the four corner matrices, + # that is, the vertex coordinates of the target area in the large image: + h, w = im_search.size + h_s, w_s = im_source.size + pts = np.float32([[0, 0], [0, h - 1], [w - 1, h - 1], [w - 1, 0]]).reshape(-1, 1, 2) + try: + dst: np.ndarray = cv2.perspectiveTransform(pts, M) + # img = im_source.clone().data + # img2 = cv2.polylines(img, [np.int32(dst)], True, 255, 3, cv2.LINE_AA) + # Image(img).imshow('dst') + pypts = [tuple(npt[0]) for npt in dst.tolist()] + src = np.array([pypts[0], pypts[3], pypts[1], pypts[2]], dtype=np.float32) + dst = np.float32([[0, 0], [w, 0], [0, h], [w, h]]) + output = self._perspective_transform(im_source=im_source, im_search=im_search, src=src, dst=dst) + except cv2.error as err: + raise PerspectiveTransformError(err) + + # img = im_source.clone().data + # cv2.polylines(img, [np.int32(dst)], True, 255, 3, cv2.LINE_AA) + # Image(img).imshow() + # cv2.waitKey(0) + + rect = self._get_perspective_area_rect(im_source=im_source, src=src) + return output, rect + + @staticmethod + def _target_image_crop(img, rect): + """ + Capture target image + + Args: + img: image + rect: Image range + + Returns: + cropped image + """ + try: + target_img = img.crop(rect) + except OverflowError: + raise MatchResultError(f"Target area({rect}) out of screen{img.size}") + return target_img + + def _cal_confidence(self, im_source, im_search, rgb): + """ + Scale the screenshot and the recognition result to the same size, and calculate the reliability + + Args: + im_source: the image to be matched + im_search: image template + rgb: whether to use the rgb channel for verification + + Returns: + + """ + h, w = im_source.size + im_search = im_search.resize(w, h) + if rgb: + confidence = self.template.cal_rgb_confidence(im_source=im_source, im_search=im_search) + else: + confidence = self.template.cal_ccoeff_confidence(im_source=im_source, im_search=im_search) + + confidence = (1 + confidence) / 2 + return confidence + + def input_image_check(self, im_source, im_search): + im_source = self._image_check(im_source) + im_search = self._image_check(im_search) + + if im_source.place != im_search.place: + raise InputImageError( + 'image type must be same, source={}, search={}'.format(im_source.place, im_search.place)) + elif im_source.dtype != im_search.dtype: + raise InputImageError( + 'image data type must be same, source={}, search={}'.format(im_source.dtype, im_search.dtype)) + elif im_source.channels != im_search.channels: + raise InputImageError( + 'image channel must be same, source={}, search={}'.format(im_source.channels, im_search.channels)) + + return im_source, im_search + + def _image_check(self, data): + if not isinstance(data, Image): + data = Image(data, dtype=self.Dtype) + + if data.place not in self.Place: + raise TypeError(f'{self.METHOD_NAME}method,Image type must be(Place.UMat, Place.Ndarray)') + return data + + @staticmethod + def _find_homography(sch_pts, src_pts): + """ + When there are multiple sets of feature point pairs, obtain the unidirectional matrix + """ + try: + # M, mask = cv2.findHomography(sch_pts, src_pts, cv2.RANSAC) + M, mask = cv2.findHomography(sch_pts, src_pts, cv2.RANSAC, 4.0, None, 2000, 0.99) + except cv2.error: + import traceback + traceback.print_exc() + raise HomographyError("OpenCV error in _find_homography()...") + else: + if mask is None: + raise HomographyError("In _find_homography(), find no mask...") + else: + return M, mask + + @staticmethod + def _perspective_transform(im_source, im_search, src, dst): + """ + Calculate the perspective transformation according to the + four pairs of corresponding points, and crop the corresponding picture + + Args: + im_source: the image to be matched + im_search: template to be matched + src: The coordinates of the corresponding quad vertices in the target + image (upper left, upper right, lower left, lower right) + dst: coordinates of the quad vertices in the source image (upper left, + upper right, lower left, lower right) + + Returns: + + """ + h, w = im_search.size + matrix = cv2.getPerspectiveTransform(src=src, dst=dst) + # warpPerspective https://github.com/opencv/opencv/issues/11784 + output = im_source.warpPerspective(matrix, size=(w, h), flags=cv2.INTER_CUBIC) + + return output + + @staticmethod + def _get_perspective_area_rect(im_source, src): + """ + According to the coordinates of the four vertices + of the rectangle, obtain the largest circumscribed rectangle in the original image + + Args: + im_source: image to be matched + src: the coordinates of the corresponding quadrilateral vertices in the target image + + Returns: + Maximum circumscribed rectangle + """ + h, w = im_source.size + + x = [int(i[0]) for i in src] + y = [int(i[1]) for i in src] + x_min, x_max = min(x), max(x) + y_min, y_max = min(y), max(y) + # Selecting the target rectangular area may be out of bounds, + # and directly set it as the boundary when the boundary is exceeded: + # If it exceeds the left boundary, it takes 0, if it exceeds the right boundary, + # it takes w_s-1, if it exceeds the lower boundary, it takes 0, and if it exceeds + # the upper boundary, it takes h_s-1. + # When x_min is less than 0, take 0. When x_max is less than 0, take 0. + x_min, x_max = int(max(x_min, 0)), int(max(x_max, 0)) + # When x_min is greater than w_s, the value is w_s-1. When x_max is greater than w_s-1, take w_s-1。 + x_min, x_max = int(min(x_min, w - 1)), int(min(x_max, w - 1)) + # When y_min is less than 0, take 0. When y_max is less than 0, take 0. + y_min, y_max = int(max(y_min, 0)), int(max(y_max, 0)) + # When y_min is greater than h_s, the value is h_s-1. When y_max is greater than h_s-1, take h_s-1. + y_min, y_max = int(min(y_min, h - 1)), int(min(y_max, h - 1)) + rect = Rect(x=x_min, y=y_min, width=(x_max - x_min), height=(y_max - y_min)) + return rect diff --git a/flybirds/core/plugin/plugins/default/ui_driver/opencv/exceptions.py b/flybirds/core/plugin/plugins/default/ui_driver/opencv/exceptions.py new file mode 100644 index 00000000..eeaef088 --- /dev/null +++ b/flybirds/core/plugin/plugins/default/ui_driver/opencv/exceptions.py @@ -0,0 +1,28 @@ +#! usr/bin/python +# -*- coding:utf-8 -*- +class BaseError(Exception): + def __init__(self, message="", *args, **kwargs): + self.message = message + + def __repr__(self): + return repr(self.message) + + +class NoEnoughPointsError(BaseError): + """ detect not enough feature points in input images""" + + +class HomographyError(BaseError): + """ An error occurred while findHomography """ + + +class MatchResultError(BaseError): + """ An error occurred while result out of screen""" + + +class PerspectiveTransformError(BaseError): + """ An error occurred while perspectiveTransform """ + + +class InputImageError(BaseError): + """ An error occurred while input image place/dtype/channels error""" diff --git a/flybirds/core/plugin/plugins/default/ui_driver/opencv/matchTemplate.py b/flybirds/core/plugin/plugins/default/ui_driver/opencv/matchTemplate.py new file mode 100644 index 00000000..bf3ffe2a --- /dev/null +++ b/flybirds/core/plugin/plugins/default/ui_driver/opencv/matchTemplate.py @@ -0,0 +1,234 @@ +#! usr/bin/python +# -*- coding:utf-8 -*- +""" opencv matchTemplate""" +import warnings +import cv2 +import numpy as np +from baseImage import Image, Rect +from baseImage.constant import Place + +from .exceptions import MatchResultError, InputImageError +from .utils import generate_result + + +class MatchTemplate(object): + METHOD_NAME = 'tpl' + Dtype = np.uint8 + Place = (Place.Ndarray, ) + + def __init__(self, threshold=0.8, rgb=True): + """ + init + + Args: + threshold: recognition threshold (0~1) + rgb: whether to use the rgb channel for verification + """ + assert 0 <= threshold <= 1, 'threshold value between 0 and 1' + + self.threshold = threshold + self.rgb = rgb + self.matcher = cv2.matchTemplate + + def find_best_result(self, im_source, im_search, threshold=None, rgb=None): + """ + Template matching, return the range with the highest + matching degree and greater than the threshold + + Args: + im_source: the image to be matched + im_search: image template + threshold: recognition threshold (0~1) + rgb: whether to use the rgb channel for verification + + Returns: + generate_result + """ + threshold = threshold or self.threshold + rgb = rgb or self.rgb + + im_source, im_search = self.input_image_check(im_source, im_search) + if im_source.channels == 1: + rgb = False + + result = self._get_template_result_matrix(im_source=im_source, im_search=im_search) + # Find the best match + + min_val, max_val, min_loc, max_loc = self.minMaxLoc(result.data) + + h, w = im_search.size + # Seek credibility + crop_rect = Rect(max_loc[0], max_loc[1], w, h) + + confidence = self.cal_confidence(im_source, im_search, crop_rect, max_val, rgb) + # Returns None if the confidence is less than the threshold + if confidence < (threshold or self.threshold): + return None + x, y = max_loc + rect = Rect(x=x, y=y, width=w, height=h) + return generate_result(rect, confidence) + + def find_all_results(self, im_source, im_search, threshold=None, rgb=None, max_count=10): + """ + Template matching, return the range with matching degree greater + than the threshold, and the maximum number does not exceed max_count + + Args: + im_source: the image to be matched + im_search: image template + threshold:: recognition threshold (0~1) + rgb: whether to use the rgb channel for verification + max_count: maximum number of matches + + Returns: + + """ + threshold = threshold or self.threshold + rgb = rgb or self.rgb + + im_source, im_search = self.input_image_check(im_source, im_search) + if im_source.channels == 1: + rgb = False + + result = self._get_template_result_matrix(im_source=im_source, im_search=im_search) + results = [] + # Find the best match + h, w = im_search.size + while True: + min_val, max_val, min_loc, max_loc = self.minMaxLoc(result.data) + img_crop = im_source.crop(Rect(max_loc[0], max_loc[1], w, h)) + confidence = self._get_confidence_from_matrix(img_crop, im_search, max_val=max_val, rgb=rgb) + x, y = max_loc + rect = Rect(x, y, w, h) + + if (confidence < (threshold or self.threshold)) or len(results) >= max_count: + break + results.append(generate_result(rect, confidence)) + result.rectangle(rect=Rect(int(max_loc[0] - w / 2), int(max_loc[1] - h / 2), w, h), color=(0, 0, 0), thickness=-1) + + return results if results else None + + def _get_template_result_matrix(self, im_source, im_search): + """Get the result matrix of template matching.""" + if im_source.channels == 3: + i_gray = im_source.cvtColor(cv2.COLOR_BGR2GRAY).data + s_gray = im_search.cvtColor(cv2.COLOR_BGR2GRAY).data + else: + i_gray = im_source.data + s_gray = im_search.data + + result = self.match(i_gray, s_gray) + result = Image(data=result, dtype=np.float32, clone=False, place=im_source.place) + return result + + def input_image_check(self, im_source, im_search): + im_source = self._image_check(im_source) + im_search = self._image_check(im_search) + + if im_source.place != im_search.place: + raise InputImageError('image type must be same, source={}, search={}'.format(im_source.place, im_search.place)) + elif im_source.dtype != im_search.dtype: + raise InputImageError('image data type must be same, source={}, search={}'.format(im_source.dtype, im_search.dtype)) + elif im_source.channels != im_search.channels: + raise InputImageError('image channel must be same, source={}, search={}'.format(im_source.channels, im_search.channels)) + + if im_source.place == Place.UMat: + warnings.warn('Umat has error,will clone new image with np.ndarray ' + '(https://github.com/opencv/opencv/issues/21788)') + im_source = Image(im_source, place=Place.Ndarray, dtype=im_source.dtype) + im_search = Image(im_search, place=Place.Ndarray, dtype=im_search.dtype) + + return im_source, im_search + + def _image_check(self, data): + if not isinstance(data, Image): + data = Image(data, dtype=self.Dtype) + + if data.place not in self.Place: + raise TypeError('Image type must be(Place.UMat, Place.Ndarray)') + return data + + @staticmethod + def minMaxLoc(result): + return cv2.minMaxLoc(result) + + def match(self, img1, img2): + return self.matcher(img1, img2, cv2.TM_CCOEFF_NORMED) + + def cal_confidence(self, im_source, im_search, crop_rect, max_val, rgb): + """ + Scale the screenshot and the recognition result + to the same size, and calculate the reliability + + Args: + im_source: the image to be matched + im_search: image template + crop_rect: The area that needs to be intercepted in im_source + max_val: the maximum value obtained by matchTemplate + rgb: whether to use the rgb channel for verification + + Returns: + float: credibility(0~1) + """ + try: + target_img = im_source.crop(crop_rect) + except OverflowError: + raise MatchResultError(f"Target area({crop_rect}) out of screen{im_source.size}") + confidence = self._get_confidence_from_matrix(target_img, im_search, max_val, rgb) + return confidence + + def cal_rgb_confidence(self, im_source, im_search): + """ + Calculate the confidence of two picture rgb three-channel + + Args: + im_source: the image to be matched + im_search: image template + + Returns: + float: minimum confidence + """ + # im_search = im_search.copyMakeBorder(10, 10, 10, 10, cv2.BORDER_REPLICATE) + # + # img_src_hsv = im_source.cvtColor(cv2.COLOR_BGR2HSV) + # img_sch_hsv = im_search.cvtColor(cv2.COLOR_BGR2HSV) + + src_split = im_source.split() + sch_split = im_search.split() + + # Calculate the confidence of the BGR three channels and store it in bgr_confidence: + bgr_confidence = [0, 0, 0] + for i in range(3): + res_temp = self.match(sch_split[i], src_split[i]) + min_val, max_val, min_loc, max_loc = self.minMaxLoc(res_temp) + bgr_confidence[i] = max_val + + return min(bgr_confidence) + + def cal_ccoeff_confidence(self, im_source, im_search): + if im_source.channels == 3: + img_src_gray = im_source.cvtColor(cv2.COLOR_BGR2GRAY).data + else: + img_src_gray = im_source.data + + if im_search.channels == 3: + img_sch_gray = im_search.cvtColor(cv2.COLOR_BGR2GRAY).data + else: + img_sch_gray = im_search.data + + res_temp = self.match(img_sch_gray, img_src_gray) + min_val, max_val, min_loc, max_loc = self.minMaxLoc(res_temp) + return max_val + + def _get_confidence_from_matrix(self, img_crop, im_search, max_val, rgb): + """Find confidence from the result matrix.""" + # Seek credibility: + if rgb: + # If there is color verification, perform + # BGR three-channel verification on the target area: + confidence = self.cal_rgb_confidence(img_crop, im_search) + else: + confidence = max_val + return confidence + + diff --git a/flybirds/core/plugin/plugins/default/ui_driver/opencv/sift.py b/flybirds/core/plugin/plugins/default/ui_driver/opencv/sift.py new file mode 100644 index 00000000..1d5d797a --- /dev/null +++ b/flybirds/core/plugin/plugins/default/ui_driver/opencv/sift.py @@ -0,0 +1,39 @@ +#! usr/bin/python +# -*- coding:utf-8 -*- +import cv2 +import numpy as np +from baseImage.constant import Place + +from .base import BaseKeypoint + + +class SIFT(BaseKeypoint): + FLANN_INDEX_KDTREE = 0 + METHOD_NAME = 'SIFT' + Dtype = np.uint8 + Place = (Place.UMat, Place.Ndarray) + + def create_matcher(self, **kwargs) -> cv2.DescriptorMatcher: + """ + Create a feature point matcher + + Returns: + cv2.FlannBasedMatcher + """ + index_params = {'algorithm': self.FLANN_INDEX_KDTREE, 'tree': 5} + # Specifies the number of recursive traversals. + # The higher the value, the more accurate the result, but the more time it takes + search_params = {'checks': 50} + matcher = cv2.FlannBasedMatcher(index_params, search_params) + return matcher + + def create_detector(self, **kwargs) -> cv2.SIFT: + nfeatures = kwargs.get('nfeatures', 0) + nOctaveLayers = kwargs.get('nOctaveLayers', 3) + contrastThreshold = kwargs.get('contrastThreshold', 0.04) + edgeThreshold = kwargs.get('edgeThreshold', 10) + sigma = kwargs.get('sigma', 1.6) + + detector = cv2.SIFT_create(nfeatures=nfeatures, nOctaveLayers=nOctaveLayers, contrastThreshold=contrastThreshold, + edgeThreshold=edgeThreshold, sigma=sigma) + return detector diff --git a/flybirds/core/plugin/plugins/default/ui_driver/opencv/utils.py b/flybirds/core/plugin/plugins/default/ui_driver/opencv/utils.py new file mode 100644 index 00000000..3dfee6ba --- /dev/null +++ b/flybirds/core/plugin/plugins/default/ui_driver/opencv/utils.py @@ -0,0 +1,110 @@ +#! usr/bin/python +# -*- coding:utf-8 -*- +import math +import cv2 + + +def generate_result(rect, confi): + """Format the result: Define the image recognition result format.""" + ret = { + 'rect': rect, + 'confidence': confi, + } + return ret + + +def keypoint_distance(kp1, kp2): + """Find the distance between two keypoints""" + if isinstance(kp1, cv2.KeyPoint): + kp1 = kp1.pt + elif isinstance(kp1, (list, tuple)): + kp1 = kp1 + else: + raise ValueError('When kp1 needs keypoint or direct coordinates, kp1={}'.format(kp1)) + + if isinstance(kp2, cv2.KeyPoint): + kp2 = kp2.pt + elif isinstance(kp2, (list, tuple)): + kp2 = kp2 + else: + raise ValueError('When kp2 needs keypoint or direct coordinates, kp1={}'.format(kp2)) + + x = kp1[0] - kp2[0] + y = kp1[1] - kp2[1] + return math.sqrt((x ** 2) + (y ** 2)) + + +def get_keypoint_from_matches(kp, matches, mode): + res = [] + if mode == 'query': + for match in matches: + res.append(kp[match.queryIdx]) + elif mode == 'train': + for match in matches: + res.append(kp[match.trainIdx]) + + return res + + +def _mapping_angle_distance(distance, origin_angle, angle): + """ + + Args: + distance: distance + origin_angle: The angle corresponding to the origin + angle: Rotation angle + + """ + _angle = origin_angle + angle + _y = distance * math.cos((math.pi * _angle) / 180) + _x = distance * math.sin((math.pi * _angle) / 180) + return round(_x, 3), round(_y, 3) + + +def rectangle_transform(point, size, mapping_point, mapping_size, angle): + """ + According to the point, find the rectangle vertex coordinates mapped by mapping_point + + Args: + point: the coordinates of the coordinates in the rectangle + size: the size of the rectangle (h, w) + mapping_point: the coordinates of the mapping rectangle + mapping_size: the size of the mapping rectangle (h, w) + angle: rotation angle + + Returns: + + """ + h, w = size[0], size[1] + _h, _w = mapping_size[0], mapping_size[1] + + h_scale = _h / h + w_scale = _w / w + + tl = keypoint_distance((0, 0), point) # upper left + tr = keypoint_distance((w, 0), point) # top right + bl = keypoint_distance((0, h), point) # lower left + br = keypoint_distance((w, h), point) # lower right + + A = math.degrees(math.atan2(point[0], point[1])) + B = math.degrees(math.atan2((w - point[0]), point[1])) + C = math.degrees(math.atan2(point[0], (h - point[1]))) + D = math.degrees(math.atan2((w - point[0]), (h - point[1]))) + + new_tl = _mapping_angle_distance(tl, A, angle=angle) + new_tl = (-new_tl[0] * w_scale, -new_tl[1] * h_scale) + new_tl = (mapping_point[0] + new_tl[0], mapping_point[1] + new_tl[1]) + + new_tr = _mapping_angle_distance(tr, B, angle=angle) + new_tr = (new_tr[0] * w_scale, -new_tr[1] * h_scale) + new_tr = (mapping_point[0] + new_tr[0], mapping_point[1] + new_tr[1]) + + new_bl = _mapping_angle_distance(bl, C, angle=angle) + new_bl = (-new_bl[0] * w_scale, new_bl[1] * h_scale) + new_bl = (mapping_point[0] + new_bl[0], mapping_point[1] + new_bl[1]) + + new_br = _mapping_angle_distance(br, D, angle=angle) + new_br = (new_br[0] * w_scale, new_br[1] * h_scale) + new_br = (mapping_point[0] + new_br[0], mapping_point[1] + new_br[1]) + + return [new_tl, new_tr, new_bl, new_br] diff --git a/flybirds/template/config/flybirds_config.json b/flybirds/template/config/flybirds_config.json index 1ad48ece..7b27d79d 100644 --- a/flybirds/template/config/flybirds_config.json +++ b/flybirds/template/config/flybirds_config.json @@ -48,7 +48,7 @@ "swipeReadyTime": 3, "verifyPosNotChangeCount": 5, "screenRecordTime": 90, - "useSnap": true + "useSnap": false }, "report": {}, "log": {} diff --git a/requirements.txt b/requirements.txt index f30d2e12..19a890b2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,4 +13,5 @@ jsonpath_ng>=1.5.3 deepdiff>=5.8.1 paddleocr>=2.5.0 paddlepaddle>=2.3.0 -protobuf==3.20.1 \ No newline at end of file +protobuf==3.20.1 +baseImage>=2.1.1