diff --git a/detection_rules/misc.py b/detection_rules/misc.py index 989d78f0f10..dcf1fc51ad0 100644 --- a/detection_rules/misc.py +++ b/detection_rules/misc.py @@ -16,19 +16,7 @@ import click import requests - -# this is primarily for type hinting - all use of the github client should come from GithubClient class -try: - from github import Github - from github.Repository import Repository - from github.GitRelease import GitRelease - from github.GitReleaseAsset import GitReleaseAsset -except ImportError: - # for type hinting - Github = None # noqa: N806 - Repository = None # noqa: N806 - GitRelease = None # noqa: N806 - GitReleaseAsset = None # noqa: N806 +from kibana import Kibana from .utils import add_params, cached, get_path, load_etc_dump @@ -348,57 +336,28 @@ def get_elasticsearch_client(cloud_id: str = None, elasticsearch_url: str = None client_error(error_msg, e, ctx=ctx, err=True) -def get_kibana_client(cloud_id: str, kibana_url: str, kibana_user: str, kibana_password: str, kibana_cookie: str, - space: str, ignore_ssl_errors: bool, provider_type: str, provider_name: str, api_key: str, - **kwargs): +def get_kibana_client( + *, + api_key: str, + cloud_id: str | None = None, + kibana_url: str | None = None, + space: str | None = None, + ignore_ssl_errors: bool = False, + **kwargs +): """Get an authenticated Kibana client.""" - from requests import HTTPError - from kibana import Kibana - if not (cloud_id or kibana_url): client_error("Missing required --cloud-id or --kibana-url") - if not (kibana_cookie or api_key): - # don't prompt for these until there's a cloud id or Kibana URL - kibana_user = kibana_user or click.prompt("kibana_user") - kibana_password = kibana_password or click.prompt("kibana_password", hide_input=True) - verify = not ignore_ssl_errors - - with Kibana(cloud_id=cloud_id, kibana_url=kibana_url, space=space, verify=verify, **kwargs) as kibana: - if kibana_cookie: - kibana.add_cookie(kibana_cookie) - return kibana - elif api_key: - kibana.add_api_key(api_key) - return kibana - - try: - kibana.login(kibana_user, kibana_password, provider_type=provider_type, provider_name=provider_name) - except HTTPError as exc: - if exc.response.status_code == 401: - err_msg = f'Authentication failed for {kibana_url}. If credentials are valid, check --provider-name' - client_error(err_msg, exc, err=True) - else: - raise - - return kibana + return Kibana(cloud_id=cloud_id, kibana_url=kibana_url, space=space, verify=verify, api_key=api_key, **kwargs) client_options = { 'kibana': { - 'cloud_id': click.Option(['--cloud-id'], default=getdefault('cloud_id'), - help="ID of the cloud instance."), - 'api_key': click.Option(['--api-key'], default=getdefault('api_key')), - 'kibana_cookie': click.Option(['--kibana-cookie', '-kc'], default=getdefault('kibana_cookie'), - help='Cookie from an authed session'), - 'kibana_password': click.Option(['--kibana-password', '-kp'], default=getdefault('kibana_password')), 'kibana_url': click.Option(['--kibana-url'], default=getdefault('kibana_url')), - 'kibana_user': click.Option(['--kibana-user', '-ku'], default=getdefault('kibana_user')), - 'provider_type': click.Option(['--provider-type'], default=getdefault('provider_type'), - help="Elastic Cloud providers: basic and saml (for SSO)"), - 'provider_name': click.Option(['--provider-name'], default=getdefault('provider_name'), - help="Elastic Cloud providers: cloud-basic and cloud-saml (for SSO)"), + 'cloud_id': click.Option(['--cloud-id'], default=getdefault('cloud_id'), help="ID of the cloud instance."), + 'api_key': click.Option(['--api-key'], default=getdefault('api_key')), 'space': click.Option(['--space'], default=None, help='Kibana space'), 'ignore_ssl_errors': click.Option(['--ignore-ssl-errors'], default=getdefault('ignore_ssl_errors')) }, diff --git a/detection_rules/remote_validation.py b/detection_rules/remote_validation.py index c00d3bc37cc..db30c5e953c 100644 --- a/detection_rules/remote_validation.py +++ b/detection_rules/remote_validation.py @@ -41,10 +41,7 @@ class RemoteConnector: def __init__(self, parse_config: bool = False, **kwargs): es_args = ['cloud_id', 'ignore_ssl_errors', 'elasticsearch_url', 'es_user', 'es_password', 'timeout'] - kibana_args = [ - 'cloud_id', 'ignore_ssl_errors', 'kibana_url', 'kibana_user', 'kibana_password', 'space', 'kibana_cookie', - 'provider_type', 'provider_name' - ] + kibana_args = ['cloud_id', 'ignore_ssl_errors', 'kibana_url', 'api_key', 'space'] if parse_config: es_kwargs = {arg: getdefault(arg)() for arg in es_args} @@ -73,17 +70,25 @@ def auth_es(self, *, cloud_id: Optional[str] = None, ignore_ssl_errors: Optional es_password=es_password, timeout=timeout, **kwargs) return self.es_client - def auth_kibana(self, *, cloud_id: Optional[str] = None, ignore_ssl_errors: Optional[bool] = None, - kibana_url: Optional[str] = None, kibana_user: Optional[str] = None, - kibana_password: Optional[str] = None, space: Optional[str] = None, - kibana_cookie: Optional[str] = None, provider_type: Optional[str] = None, - provider_name: Optional[str] = None, **kwargs) -> Kibana: + def auth_kibana( + self, + *, + api_key: str, + cloud_id: str | None = None, + kibana_url: str | None = None, + space: str | None = None, + ignore_ssl_errors: bool = False, + **kwargs + ) -> Kibana: """Return an authenticated Kibana client.""" - self.kibana_client = get_kibana_client(cloud_id=cloud_id, ignore_ssl_errors=ignore_ssl_errors, - kibana_url=kibana_url, kibana_user=kibana_user, - kibana_password=kibana_password, space=space, - kibana_cookie=kibana_cookie, provider_type=provider_type, - provider_name=provider_name, **kwargs) + self.kibana_client = get_kibana_client( + cloud_id=cloud_id, + ignore_ssl_errors=ignore_ssl_errors, + kibana_url=kibana_url, + api_key=api_key, + space=space, + **kwargs + ) return self.kibana_client diff --git a/lib/kibana/kibana/connector.py b/lib/kibana/kibana/connector.py index 7de201193d3..5c720b698e6 100644 --- a/lib/kibana/kibana/connector.py +++ b/lib/kibana/kibana/connector.py @@ -12,23 +12,30 @@ import uuid from typing import List, Optional, Union -from urllib.parse import urljoin import requests from elasticsearch import Elasticsearch _context = threading.local() -class Kibana(object): +class Kibana: """Wrapper around the Kibana SIEM APIs.""" - CACHED = False - - def __init__(self, cloud_id=None, kibana_url=None, verify=True, elasticsearch=None, space=None): + def __init__(self, cloud_id=None, kibana_url=None, api_key=None, verify=True, elasticsearch=None, space=None): """"Open a session to the platform.""" self.authenticated = False + self.session = requests.Session() self.session.verify = verify + + if api_key: + self.session.headers.update( + { + "kbn-xsrf": "true", + "Authorization": f"ApiKey {api_key}", + } + ) + self.verify = verify self.cloud_id = cloud_id @@ -37,9 +44,6 @@ def __init__(self, cloud_id=None, kibana_url=None, verify=True, elasticsearch=No self.space = space if space and space.lower() != 'default' else None self.status = None - self.provider_name = None - self.provider_type = None - if self.cloud_id: self.cluster_name, cloud_info = self.cloud_id.split(":") self.domain, self.es_uuid, self.kibana_uuid = \ @@ -50,18 +54,24 @@ def __init__(self, cloud_id=None, kibana_url=None, verify=True, elasticsearch=No kibana_url_from_cloud = f"https://{self.kibana_uuid}.{self.domain}:9243" if self.kibana_url and self.kibana_url != kibana_url_from_cloud: - raise ValueError(f'kibana_url provided ({self.kibana_url}) does not match url derived from cloud_id ' - f'{kibana_url_from_cloud}') + raise ValueError( + f'kibana_url provided ({self.kibana_url}) does not match url derived from cloud_id ' + f'{kibana_url_from_cloud}' + ) self.kibana_url = kibana_url_from_cloud - self.elastic_url = f"https://{self.es_uuid}.{self.domain}:9243" - self.provider_name = 'cloud-basic' - self.provider_type = 'basic' - self.session.headers.update({'Content-Type': "application/json", "kbn-xsrf": str(uuid.uuid4())}) self.elasticsearch = elasticsearch + if not self.elasticsearch and self.elastic_url: + self.elasticsearch = Elasticsearch( + hosts=[self.elastic_url], + api_key=api_key, + verify_certs=self.verify, + ) + self.elasticsearch.info() + if not verify: from requests.packages.urllib3.exceptions import \ InsecureRequestWarning @@ -75,7 +85,7 @@ def version(self): return self.status.get("version", {}).get("number") @staticmethod - def ndjson_file_data_prep(lines: List[dict], filename: str) -> (dict, str): + def ndjson_file_data_prep(lines: List[dict], filename: str) -> tuple[dict, str]: """Prepare a request for an ndjson file upload to Kibana.""" data = ('\n'.join(json.dumps(r) for r in lines) + '\n') boundary = '----JustAnotherBoundary' @@ -144,63 +154,6 @@ def delete(self, uri, params=None, error=True, **kwargs): """Perform an HTTP DELETE.""" return self.request('DELETE', uri, params=params, error=error, **kwargs) - def login(self, kibana_username, kibana_password, provider_type=None, provider_name=None): - """Authenticate to Kibana using the API to update our cookies.""" - payload = {'username': kibana_username, 'password': kibana_password} - path = '/internal/security/login' - - try: - self.post(path, data=payload, error=True, verbose=False) - except requests.HTTPError as e: - # 7.10 changed the structure of the auth data - # providers dictated by Kibana configs in: - # https://www.elastic.co/guide/en/kibana/current/security-settings-kb.html#authentication-security-settings - # more details: https://discuss.elastic.co/t/kibana-7-10-login-issues/255201/2 - if e.response.status_code == 400 and '[undefined]' in e.response.text: - provider_type = provider_type or self.provider_type or 'basic' - provider_name = provider_name or self.provider_name or 'basic' - - payload = { - 'params': payload, - 'currentURL': '', - 'providerType': provider_type, - 'providerName': provider_name - } - self.post(path, data=payload, error=True) - else: - raise - - # Kibana will authenticate against URLs which contain invalid spaces - if self.space: - self.verify_space(self.space) - - self.authenticated = True - self.status = self.get("/api/status") - - # create ES and force authentication - if self.elasticsearch is None and self.elastic_url is not None: - self.elasticsearch = Elasticsearch(hosts=[self.elastic_url], http_auth=(kibana_username, kibana_password), - verify_certs=self.verify) - self.elasticsearch.info() - - # make chaining easier - return self - - def add_cookie(self, cookie): - """Add cookie to be used for auth (such as from an SSO session).""" - # https://www.elastic.co/guide/en/kibana/7.10/security-settings-kb.html#security-session-and-cookie-settings - self.session.headers['sid'] = cookie - self.session.cookies.set('sid', cookie) - self.status = self.get('/api/status') - self.authenticated = True - - def add_api_key(self, api_key: str) -> bool: - """Add an API key to be used for auth.""" - self.session.headers['Authorization'] = f'ApiKey {api_key}' - self.status = self.get('/api/status') - self.authenticated = True - return bool(self.status) - def logout(self): """Quit the current session.""" try: diff --git a/lib/kibana/pyproject.toml b/lib/kibana/pyproject.toml index 96b5ae0cbe4..a2a9edf34c6 100644 --- a/lib/kibana/pyproject.toml +++ b/lib/kibana/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection-rules-kibana" -version = "0.4.1" +version = "0.4.2" description = "Kibana API utilities for Elastic Detection Rules" license = {text = "Elastic License v2"} keywords = ["Elastic", "Kibana", "Detection Rules", "Security", "Elasticsearch"] diff --git a/pyproject.toml b/pyproject.toml index eb55b7fe47f..3a0a0bd5128 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "detection_rules" -version = "0.4.26" +version = "1.0.0" description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine." readme = "README.md" requires-python = ">=3.12"