From b0fa8604065a59ec3726a99e4ad517a2c599135c Mon Sep 17 00:00:00 2001 From: liuzhou Date: Mon, 15 Jan 2024 11:16:22 +0800 Subject: [PATCH 01/13] fix: Set param from config #545 redo rebase and clean submit --- sqllineage/config.py | 62 +++++++++++++++++++++++++++++++-------- tests/core/test_config.py | 48 ++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+), 13 deletions(-) diff --git a/sqllineage/config.py b/sqllineage/config.py index 6e75ac64..290b87a3 100644 --- a/sqllineage/config.py +++ b/sqllineage/config.py @@ -3,7 +3,7 @@ class _SQLLineageConfigLoader: """ - Load all configurable items from environment variable, otherwise fallback to default + Load all configurable items from config variable, otherwise fallback to default """ # inspired by https://github.com/joke2k/django-environ @@ -19,18 +19,32 @@ class _SQLLineageConfigLoader: } BOOLEAN_TRUE_STRINGS = ("true", "on", "ok", "y", "yes", "1") - def __getattr__(self, item): - if item in self.config: - type_, default = self.config[item] - # require SQLLINEAGE_ prefix from environment variable - return self.parse_value( - os.environ.get("SQLLINEAGE_" + item, default), type_ - ) - else: - return super().__getattribute__(item) + def __init__(self) -> None: + self._init = False + self._config = { + "DIRECTORY": { + "class_type": str, + "additional": None, + "default": os.path.join(os.path.dirname(__file__), "data"), + }, + "DEFAULT_SCHEMA": { + "class_type": str, + "additional": None, + "default": None, + }, + "TSQL_NO_SEMICOLON": { + "class_type": bool, + "additional": None, + "default": False, + }, + } - @classmethod - def parse_value(cls, value, cast): + self._PREFIX = "SQLLINEAGE_" + self._BOOLEAN_TRUE_STRINGS = ("true", "on", "ok", "y", "yes", "1") + self._len_prefix = len(self._PREFIX) + self._init = True + + def parse_value(self, value, cast): """Parse and cast provided value :param value: Stringed value. @@ -42,11 +56,33 @@ def parse_value(cls, value, cast): try: value = int(value) != 0 except ValueError: - value = value.lower().strip() in cls.BOOLEAN_TRUE_STRINGS + value = value.lower().strip() in self._BOOLEAN_TRUE_STRINGS else: value = cast(value) return value + def __getattr__(self, item): + if self._config[item]["additional"] is not None: + return self._config[item]["additional"] + elif (os_env_item := self._PREFIX + item) in os.environ.keys(): + return self.parse_value( + os.environ[os_env_item], self._config[item]["class_type"] + ) + else: + return self._config[item]["default"] + + def __setattr__(self, key, value): + if key == "_init" or not self._init: + super().__setattr__(key, value) + else: + if key not in [field for field in self._config.keys()]: + raise ValueError(f"config {key} is not support") + + if value is None or isinstance(value, self._config[key]["class_type"]): + self._config[key]["additional"] = value + else: + raise ValueError(f"{key}:{value} class type incorrect") + SQLLineageConfig = _SQLLineageConfigLoader() diff --git a/tests/core/test_config.py b/tests/core/test_config.py index faf09d0d..d31d3439 100644 --- a/tests/core/test_config.py +++ b/tests/core/test_config.py @@ -1,9 +1,23 @@ import os +from pathlib import Path from unittest.mock import patch +import pytest + from sqllineage.config import SQLLineageConfig +def test_config_default_value(): + assert ( + SQLLineageConfig.DIRECTORY + == Path(os.path.dirname(__file__)) + .parent.parent.joinpath("sqllineage", "data") + .__str__() + ) + assert SQLLineageConfig.DEFAULT_SCHEMA is None + assert SQLLineageConfig.TSQL_NO_SEMICOLON is False + + @patch( "os.environ", { @@ -21,3 +35,37 @@ def test_config(): assert type(SQLLineageConfig.TSQL_NO_SEMICOLON) is bool assert SQLLineageConfig.TSQL_NO_SEMICOLON is True + + +@patch( + "os.environ", + { + "SQLLINEAGE_DIRECTORY": os.path.join(os.path.dirname(__file__), "data"), + "SQLLINEAGE_DEFAULT_SCHEMA": "", + "SQLLINEAGE_TSQL_NO_SEMICOLON": "true", + }, +) +def test_config_reset(): + SQLLineageConfig.DIRECTORY = os.path.join(os.path.dirname(__file__), "") + assert type(SQLLineageConfig.DIRECTORY) is str + assert SQLLineageConfig.DIRECTORY == os.path.join(os.path.dirname(__file__), "") + + SQLLineageConfig.DEFAULT_SCHEMA = "ods" + assert type(SQLLineageConfig.DEFAULT_SCHEMA) is str + assert SQLLineageConfig.DEFAULT_SCHEMA == "ods" + + SQLLineageConfig.TSQL_NO_SEMICOLON = True + assert type(SQLLineageConfig.TSQL_NO_SEMICOLON) is bool + assert SQLLineageConfig.TSQL_NO_SEMICOLON is True + + SQLLineageConfig.DIRECTORY = None + SQLLineageConfig.DEFAULT_SCHEMA = None + SQLLineageConfig.TSQL_NO_SEMICOLON = None + + +def test_config_exception(): + with pytest.raises(ValueError): + SQLLineageConfig.DIRECTORYxx = "xx" + + with pytest.raises(ValueError): + SQLLineageConfig.DEFAULT_SCHEMA = False From b1ed713d3c5f402d4496809747f421233d9e1690 Mon Sep 17 00:00:00 2001 From: maoxd <39357378+maoxingda@users.noreply.github.com> Date: Mon, 29 Jan 2024 19:33:42 +0800 Subject: [PATCH 02/13] fix: misidentify column name as lateral alias (#540) * fix: misidentify-column-name-as-alias (#539) * add LATERAL_COLUMN_ALIAS_REFERENCE in SQLLineageConfig * adjust import order * add test_column_top_level_enable_lateral_ref_with_metadata_from_nested_subquery * unknown * refactor: rebase master and convert LATERAL_COLUMN_ALIAS_REFERENCE to bool type * refactor: use as few condition as possible: SQLLineageConfig.LATERAL_COLUMN_ALIAS_REFERENCE * refactor: rebase master and resolve conflict * refactor: move logic from to_source_columns to end_of_query_cleanup * refactor: rebase master and fix black format * docs: LATERAL_COLUMN_ALIAS_REFERENCE how-to guide * docs: starting version for each config --------- Co-authored-by: reata --- sqllineage/config.py | 62 ++++++++++---------------------------------- 1 file changed, 13 insertions(+), 49 deletions(-) diff --git a/sqllineage/config.py b/sqllineage/config.py index 290b87a3..6e75ac64 100644 --- a/sqllineage/config.py +++ b/sqllineage/config.py @@ -3,7 +3,7 @@ class _SQLLineageConfigLoader: """ - Load all configurable items from config variable, otherwise fallback to default + Load all configurable items from environment variable, otherwise fallback to default """ # inspired by https://github.com/joke2k/django-environ @@ -19,32 +19,18 @@ class _SQLLineageConfigLoader: } BOOLEAN_TRUE_STRINGS = ("true", "on", "ok", "y", "yes", "1") - def __init__(self) -> None: - self._init = False - self._config = { - "DIRECTORY": { - "class_type": str, - "additional": None, - "default": os.path.join(os.path.dirname(__file__), "data"), - }, - "DEFAULT_SCHEMA": { - "class_type": str, - "additional": None, - "default": None, - }, - "TSQL_NO_SEMICOLON": { - "class_type": bool, - "additional": None, - "default": False, - }, - } - - self._PREFIX = "SQLLINEAGE_" - self._BOOLEAN_TRUE_STRINGS = ("true", "on", "ok", "y", "yes", "1") - self._len_prefix = len(self._PREFIX) - self._init = True + def __getattr__(self, item): + if item in self.config: + type_, default = self.config[item] + # require SQLLINEAGE_ prefix from environment variable + return self.parse_value( + os.environ.get("SQLLINEAGE_" + item, default), type_ + ) + else: + return super().__getattribute__(item) - def parse_value(self, value, cast): + @classmethod + def parse_value(cls, value, cast): """Parse and cast provided value :param value: Stringed value. @@ -56,33 +42,11 @@ def parse_value(self, value, cast): try: value = int(value) != 0 except ValueError: - value = value.lower().strip() in self._BOOLEAN_TRUE_STRINGS + value = value.lower().strip() in cls.BOOLEAN_TRUE_STRINGS else: value = cast(value) return value - def __getattr__(self, item): - if self._config[item]["additional"] is not None: - return self._config[item]["additional"] - elif (os_env_item := self._PREFIX + item) in os.environ.keys(): - return self.parse_value( - os.environ[os_env_item], self._config[item]["class_type"] - ) - else: - return self._config[item]["default"] - - def __setattr__(self, key, value): - if key == "_init" or not self._init: - super().__setattr__(key, value) - else: - if key not in [field for field in self._config.keys()]: - raise ValueError(f"config {key} is not support") - - if value is None or isinstance(value, self._config[key]["class_type"]): - self._config[key]["additional"] = value - else: - raise ValueError(f"{key}:{value} class type incorrect") - SQLLineageConfig = _SQLLineageConfigLoader() From 0819d01724ea6ae290627d4fd1b422125f51eaec Mon Sep 17 00:00:00 2001 From: liuzhou Date: Mon, 26 Feb 2024 15:24:29 +0800 Subject: [PATCH 03/13] feat:SQLLineageConfig supports set value and thread safety --- sqllineage/config.py | 31 +++++++++++++++++++--- tests/core/test_config.py | 54 ++++++++------------------------------- 2 files changed, 38 insertions(+), 47 deletions(-) diff --git a/sqllineage/config.py b/sqllineage/config.py index 6e75ac64..1efce45b 100644 --- a/sqllineage/config.py +++ b/sqllineage/config.py @@ -1,4 +1,6 @@ import os +import threading +from typing import Any class _SQLLineageConfigLoader: @@ -6,6 +8,8 @@ class _SQLLineageConfigLoader: Load all configurable items from environment variable, otherwise fallback to default """ + thread_config: dict[int, dict[str, Any]] = {} + # inspired by https://github.com/joke2k/django-environ config = { # for frontend directory drawer @@ -19,8 +23,14 @@ class _SQLLineageConfigLoader: } BOOLEAN_TRUE_STRINGS = ("true", "on", "ok", "y", "yes", "1") - def __getattr__(self, item): - if item in self.config: + def __getattr__(self, item: str): + if item in self.config.keys(): + if ( + threading.get_ident() in self.thread_config.keys() + and item in self.thread_config[threading.get_ident()] + ): + return self.thread_config[threading.get_ident()][item] + type_, default = self.config[item] # require SQLLINEAGE_ prefix from environment variable return self.parse_value( @@ -30,7 +40,7 @@ def __getattr__(self, item): return super().__getattribute__(item) @classmethod - def parse_value(cls, value, cast): + def parse_value(cls, value, cast) -> Any: """Parse and cast provided value :param value: Stringed value. @@ -48,5 +58,20 @@ def parse_value(cls, value, cast): return value + def __setattr__(self, key, value): + if key in self.config.keys(): + self.thread_config[threading.get_ident()][key] = self.parse_value( + key, self.config[key][0] + ) + else: + super().__setattr__(key, value) + + def __enter__(self): + self.thread_config[threading.get_ident()] = {} + + def __exit__(self, exc_type, exc_val, exc_tb): + if threading.get_ident() in self.thread_config.keys(): + self.thread_config.pop(threading.get_ident()) + SQLLineageConfig = _SQLLineageConfigLoader() diff --git a/tests/core/test_config.py b/tests/core/test_config.py index d31d3439..ed3af900 100644 --- a/tests/core/test_config.py +++ b/tests/core/test_config.py @@ -1,23 +1,10 @@ +import concurrent.futures import os -from pathlib import Path from unittest.mock import patch -import pytest - from sqllineage.config import SQLLineageConfig -def test_config_default_value(): - assert ( - SQLLineageConfig.DIRECTORY - == Path(os.path.dirname(__file__)) - .parent.parent.joinpath("sqllineage", "data") - .__str__() - ) - assert SQLLineageConfig.DEFAULT_SCHEMA is None - assert SQLLineageConfig.TSQL_NO_SEMICOLON is False - - @patch( "os.environ", { @@ -37,35 +24,14 @@ def test_config(): assert SQLLineageConfig.TSQL_NO_SEMICOLON is True -@patch( - "os.environ", - { - "SQLLINEAGE_DIRECTORY": os.path.join(os.path.dirname(__file__), "data"), - "SQLLINEAGE_DEFAULT_SCHEMA": "", - "SQLLINEAGE_TSQL_NO_SEMICOLON": "true", - }, -) -def test_config_reset(): - SQLLineageConfig.DIRECTORY = os.path.join(os.path.dirname(__file__), "") - assert type(SQLLineageConfig.DIRECTORY) is str - assert SQLLineageConfig.DIRECTORY == os.path.join(os.path.dirname(__file__), "") - - SQLLineageConfig.DEFAULT_SCHEMA = "ods" - assert type(SQLLineageConfig.DEFAULT_SCHEMA) is str - assert SQLLineageConfig.DEFAULT_SCHEMA == "ods" - - SQLLineageConfig.TSQL_NO_SEMICOLON = True - assert type(SQLLineageConfig.TSQL_NO_SEMICOLON) is bool - assert SQLLineageConfig.TSQL_NO_SEMICOLON is True - - SQLLineageConfig.DIRECTORY = None - SQLLineageConfig.DEFAULT_SCHEMA = None - SQLLineageConfig.TSQL_NO_SEMICOLON = None - +def test_config_threading(): + schema_list = ("stg", "ods", "dwd", "dw", "dwa", "dwv") -def test_config_exception(): - with pytest.raises(ValueError): - SQLLineageConfig.DIRECTORYxx = "xx" + def check_schema(default_schema: str): + with SQLLineageConfig: + SQLLineageConfig["DEFAULT_SCHEMA"] = default_schema + assert SQLLineageConfig.DEFAULT_SCHEMA == default_schema - with pytest.raises(ValueError): - SQLLineageConfig.DEFAULT_SCHEMA = False + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: + for default_schema in schema_list: + executor.submit(check_schema, default_schema) From 6bfcbb226506b8161cb5e0ad659efca0be581b0d Mon Sep 17 00:00:00 2001 From: liuzhou Date: Mon, 26 Feb 2024 15:41:27 +0800 Subject: [PATCH 04/13] fix: Fix mypy error --- sqllineage/config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sqllineage/config.py b/sqllineage/config.py index 1efce45b..6d25a6c8 100644 --- a/sqllineage/config.py +++ b/sqllineage/config.py @@ -1,6 +1,6 @@ import os import threading -from typing import Any +from typing import Any, Dict class _SQLLineageConfigLoader: @@ -8,7 +8,7 @@ class _SQLLineageConfigLoader: Load all configurable items from environment variable, otherwise fallback to default """ - thread_config: dict[int, dict[str, Any]] = {} + thread_config: Dict[int, Dict[str, Any]] = {} # inspired by https://github.com/joke2k/django-environ config = { From cb400ff6065708a25015819c1f4722cfe616ffb6 Mon Sep 17 00:00:00 2001 From: liuzhou Date: Mon, 26 Feb 2024 16:17:37 +0800 Subject: [PATCH 05/13] fix: Fix pytest cov --- sqllineage/config.py | 2 +- tests/core/test_config.py | 21 +++++++++++++++------ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/sqllineage/config.py b/sqllineage/config.py index 6d25a6c8..cf3a374d 100644 --- a/sqllineage/config.py +++ b/sqllineage/config.py @@ -61,7 +61,7 @@ def parse_value(cls, value, cast) -> Any: def __setattr__(self, key, value): if key in self.config.keys(): self.thread_config[threading.get_ident()][key] = self.parse_value( - key, self.config[key][0] + value, self.config[key][0] ) else: super().__setattr__(key, value) diff --git a/tests/core/test_config.py b/tests/core/test_config.py index ed3af900..c44b1d7c 100644 --- a/tests/core/test_config.py +++ b/tests/core/test_config.py @@ -27,11 +27,20 @@ def test_config(): def test_config_threading(): schema_list = ("stg", "ods", "dwd", "dw", "dwa", "dwv") - def check_schema(default_schema: str): + def check_schema(schema: str): with SQLLineageConfig: - SQLLineageConfig["DEFAULT_SCHEMA"] = default_schema - assert SQLLineageConfig.DEFAULT_SCHEMA == default_schema + SQLLineageConfig.DEFAULT_SCHEMA = schema + return SQLLineageConfig.DEFAULT_SCHEMA - with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: - for default_schema in schema_list: - executor.submit(check_schema, default_schema) + with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor: + executor_dict = { + executor.submit(check_schema, schema): schema for schema in schema_list + } + for executor_work in concurrent.futures.as_completed(executor_dict): + assert executor_work.result() == executor_dict[executor_work] + + +def test_config_other(): + with SQLLineageConfig: + SQLLineageConfig.other = "xxx" + assert SQLLineageConfig.other == "xxx" From b0260fcd1910d93f2ef01e2ee6bec0311e5db98d Mon Sep 17 00:00:00 2001 From: liuzhou Date: Mon, 26 Feb 2024 17:30:39 +0800 Subject: [PATCH 06/13] fix: Fix the scenario of direct assignment without using with. Add the test of multi-process scenario. --- sqllineage/config.py | 24 +++++++++++++++--------- tests/core/test_config.py | 36 +++++++++++++++++++++++++----------- 2 files changed, 40 insertions(+), 20 deletions(-) diff --git a/sqllineage/config.py b/sqllineage/config.py index cf3a374d..7a522dc2 100644 --- a/sqllineage/config.py +++ b/sqllineage/config.py @@ -1,5 +1,6 @@ import os import threading +import warnings from typing import Any, Dict @@ -8,7 +9,7 @@ class _SQLLineageConfigLoader: Load all configurable items from environment variable, otherwise fallback to default """ - thread_config: Dict[int, Dict[str, Any]] = {} + _thread_config: Dict[int, Dict[str, Any]] = {} # inspired by https://github.com/joke2k/django-environ config = { @@ -26,10 +27,10 @@ class _SQLLineageConfigLoader: def __getattr__(self, item: str): if item in self.config.keys(): if ( - threading.get_ident() in self.thread_config.keys() - and item in self.thread_config[threading.get_ident()] + threading.get_ident() in self._thread_config.keys() + and item in self._thread_config[threading.get_ident()] ): - return self.thread_config[threading.get_ident()][item] + return self._thread_config[threading.get_ident()][item] type_, default = self.config[item] # require SQLLINEAGE_ prefix from environment variable @@ -58,20 +59,25 @@ def parse_value(cls, value, cast) -> Any: return value - def __setattr__(self, key, value): + def __setattr__(self, key: str, value: Any): if key in self.config.keys(): - self.thread_config[threading.get_ident()][key] = self.parse_value( + if threading.get_ident() not in self._thread_config.keys(): + warnings.warn( + "Please use the with syntax, refer to /sqllineage/tests/core/test_config.py" + ) + self._thread_config[threading.get_ident()] = {} + self._thread_config[threading.get_ident()][key] = self.parse_value( value, self.config[key][0] ) else: super().__setattr__(key, value) def __enter__(self): - self.thread_config[threading.get_ident()] = {} + self._thread_config[threading.get_ident()] = {} def __exit__(self, exc_type, exc_val, exc_tb): - if threading.get_ident() in self.thread_config.keys(): - self.thread_config.pop(threading.get_ident()) + if threading.get_ident() in self._thread_config.keys(): + self._thread_config.pop(threading.get_ident()) SQLLineageConfig = _SQLLineageConfigLoader() diff --git a/tests/core/test_config.py b/tests/core/test_config.py index c44b1d7c..e8d4cebf 100644 --- a/tests/core/test_config.py +++ b/tests/core/test_config.py @@ -1,5 +1,6 @@ import concurrent.futures import os +from multiprocessing import Pool from unittest.mock import patch from sqllineage.config import SQLLineageConfig @@ -24,23 +25,36 @@ def test_config(): assert SQLLineageConfig.TSQL_NO_SEMICOLON is True -def test_config_threading(): - schema_list = ("stg", "ods", "dwd", "dw", "dwa", "dwv") +schema_list = ("stg", "ods", "dwd", "dw", "dwa", "dwv") + - def check_schema(schema: str): - with SQLLineageConfig: - SQLLineageConfig.DEFAULT_SCHEMA = schema - return SQLLineageConfig.DEFAULT_SCHEMA +def check_schema(schema: str): + with SQLLineageConfig: + SQLLineageConfig.DEFAULT_SCHEMA = schema + return SQLLineageConfig.DEFAULT_SCHEMA, schema + +def test_config_threading(): with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor: - executor_dict = { - executor.submit(check_schema, schema): schema for schema in schema_list - } - for executor_work in concurrent.futures.as_completed(executor_dict): - assert executor_work.result() == executor_dict[executor_work] + execute_list = [executor.submit(check_schema, schema) for schema in schema_list] + for executor_work in concurrent.futures.as_completed(execute_list): + target, source = executor_work.result() + assert target == source + + +def test_config_proecess(): + with Pool(6) as p: + for work_result in p.imap_unordered(check_schema, schema_list): + target, source = work_result + assert target == source def test_config_other(): with SQLLineageConfig: SQLLineageConfig.other = "xxx" assert SQLLineageConfig.other == "xxx" + + +def test_config_warning(): + SQLLineageConfig.DEFAULT_SCHEMA = "xxx" + assert SQLLineageConfig.DEFAULT_SCHEMA == "xxx" From f0eab7c8a690baa36c938a2b9a7572f4be7a6f39 Mon Sep 17 00:00:00 2001 From: liuzhou Date: Wed, 13 Mar 2024 14:28:33 +0800 Subject: [PATCH 07/13] fix: add SQLLineageConfigLoader set function --- sqllineage/config.py | 36 +++++++++++++++++++++++++++++++++--- tests/core/test_config.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 3 deletions(-) diff --git a/sqllineage/config.py b/sqllineage/config.py index 6e75ac64..29745f18 100644 --- a/sqllineage/config.py +++ b/sqllineage/config.py @@ -1,4 +1,6 @@ import os +import threading +from typing import Any, Dict class _SQLLineageConfigLoader: @@ -6,6 +8,8 @@ class _SQLLineageConfigLoader: Load all configurable items from environment variable, otherwise fallback to default """ + _thread_config: Dict[int, Dict[str, Any]] = {} + # inspired by https://github.com/joke2k/django-environ config = { # for frontend directory drawer @@ -19,8 +23,11 @@ class _SQLLineageConfigLoader: } BOOLEAN_TRUE_STRINGS = ("true", "on", "ok", "y", "yes", "1") - def __getattr__(self, item): - if item in self.config: + def __getattr__(self, item: str): + if item in self.config.keys(): + if value := self._thread_config.get(self.get_ident(), {}).get(item, None): + return value + type_, default = self.config[item] # require SQLLINEAGE_ prefix from environment variable return self.parse_value( @@ -30,7 +37,11 @@ def __getattr__(self, item): return super().__getattribute__(item) @classmethod - def parse_value(cls, value, cast): + def get_ident(cls) -> int: + return threading.get_ident() + + @classmethod + def parse_value(cls, value, cast) -> Any: """Parse and cast provided value :param value: Stringed value. @@ -48,5 +59,24 @@ def parse_value(cls, value, cast): return value + def set(self, **kwargs): + self.__enter__() + for key, value in kwargs.items(): + if key in self.config.keys(): + self._thread_config[self.get_ident()][key] = self.parse_value( + value, self.config[key][0] + ) + else: + super().__setattr__(key, value) + return self + + def __enter__(self): + if self.get_ident() not in self._thread_config.keys(): + self._thread_config[self.get_ident()] = {} + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.get_ident() in self._thread_config.keys(): + self._thread_config.pop(self.get_ident()) + SQLLineageConfig = _SQLLineageConfigLoader() diff --git a/tests/core/test_config.py b/tests/core/test_config.py index faf09d0d..9c7a6ee6 100644 --- a/tests/core/test_config.py +++ b/tests/core/test_config.py @@ -1,4 +1,6 @@ +import concurrent.futures import os +from multiprocessing import Pool from unittest.mock import patch from sqllineage.config import SQLLineageConfig @@ -21,3 +23,32 @@ def test_config(): assert type(SQLLineageConfig.TSQL_NO_SEMICOLON) is bool assert SQLLineageConfig.TSQL_NO_SEMICOLON is True + + +schema_list = ("stg", "ods", "dwd", "dw", "dwa", "dwv") + + +def check_schema(schema: str): + with SQLLineageConfig.set(DEFAULT_SCHEMA=schema): + # SQLLineageConfig.DEFAULT_SCHEMA = schema + return SQLLineageConfig.DEFAULT_SCHEMA, schema + + +def test_config_threading(): + with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor: + execute_list = [executor.submit(check_schema, schema) for schema in schema_list] + for executor_work in concurrent.futures.as_completed(execute_list): + target, source = executor_work.result() + assert target == source + + +def test_config_proecess(): + with Pool(6) as p: + for work_result in p.imap_unordered(check_schema, schema_list): + target, source = work_result + assert target == source + + +def test_config_other(): + with SQLLineageConfig.set(other="xxx"): + assert SQLLineageConfig.other == "xxx" From 855024c41c8e7a28e8c3194a1e6ff9741c6d4b06 Mon Sep 17 00:00:00 2001 From: reata Date: Mon, 25 Mar 2024 22:54:19 +0800 Subject: [PATCH 08/13] feat: disable setattr for SQLLineageConfig --- sqllineage/config.py | 13 ++++++++++++- sqllineage/exceptions.py | 4 ++++ tests/core/test_config.py | 22 ++++++++++++++++++++-- 3 files changed, 36 insertions(+), 3 deletions(-) diff --git a/sqllineage/config.py b/sqllineage/config.py index 29745f18..1067b206 100644 --- a/sqllineage/config.py +++ b/sqllineage/config.py @@ -2,6 +2,8 @@ import threading from typing import Any, Dict +from sqllineage.exceptions import ConfigException + class _SQLLineageConfigLoader: """ @@ -36,6 +38,14 @@ def __getattr__(self, item: str): else: return super().__getattribute__(item) + def __setattr__(self, key, value) -> None: + if key in self.config: + raise ConfigException( + "SQLLineageConfig is read-only. Use context manager to update thread level config." + ) + else: + super().__setattr__(key, value) + @classmethod def get_ident(cls) -> int: return threading.get_ident() @@ -59,7 +69,7 @@ def parse_value(cls, value, cast) -> Any: return value - def set(self, **kwargs): + def __call__(self, *args, **kwargs): self.__enter__() for key, value in kwargs.items(): if key in self.config.keys(): @@ -73,6 +83,7 @@ def set(self, **kwargs): def __enter__(self): if self.get_ident() not in self._thread_config.keys(): self._thread_config[self.get_ident()] = {} + return self def __exit__(self, exc_type, exc_val, exc_tb): if self.get_ident() in self._thread_config.keys(): diff --git a/sqllineage/exceptions.py b/sqllineage/exceptions.py index 99d42ca9..3b87d9ff 100644 --- a/sqllineage/exceptions.py +++ b/sqllineage/exceptions.py @@ -12,3 +12,7 @@ class InvalidSyntaxException(SQLLineageException): class MetaDataProviderException(SQLLineageException): """Raised for MetaDataProvider errors""" + + +class ConfigException(SQLLineageException): + """Raised for configuration errors""" diff --git a/tests/core/test_config.py b/tests/core/test_config.py index 9c7a6ee6..2a40508a 100644 --- a/tests/core/test_config.py +++ b/tests/core/test_config.py @@ -3,7 +3,10 @@ from multiprocessing import Pool from unittest.mock import patch +import pytest + from sqllineage.config import SQLLineageConfig +from sqllineage.exceptions import ConfigException @patch( @@ -25,11 +28,26 @@ def test_config(): assert SQLLineageConfig.TSQL_NO_SEMICOLON is True +def test_disable_direct_update_config(): + with pytest.raises(ConfigException): + SQLLineageConfig.DEFAULT_SCHEMA = "ods" + + +def test_update_config_using_context_manager(): + with SQLLineageConfig(LATERAL_COLUMN_ALIAS_REFERENCE=True) as config: + assert config.LATERAL_COLUMN_ALIAS_REFERENCE is True + assert SQLLineageConfig.LATERAL_COLUMN_ALIAS_REFERENCE is False + + with SQLLineageConfig(DEFAULT_SCHEMA="ods") as config: + assert config.DEFAULT_SCHEMA == "ods" + assert SQLLineageConfig.DEFAULT_SCHEMA == "" + + schema_list = ("stg", "ods", "dwd", "dw", "dwa", "dwv") def check_schema(schema: str): - with SQLLineageConfig.set(DEFAULT_SCHEMA=schema): + with SQLLineageConfig(DEFAULT_SCHEMA=schema): # SQLLineageConfig.DEFAULT_SCHEMA = schema return SQLLineageConfig.DEFAULT_SCHEMA, schema @@ -50,5 +68,5 @@ def test_config_proecess(): def test_config_other(): - with SQLLineageConfig.set(other="xxx"): + with SQLLineageConfig(other="xxx"): assert SQLLineageConfig.other == "xxx" From d8b067bd435e50072d22d0a5d116d72fc502ce1a Mon Sep 17 00:00:00 2001 From: reata Date: Mon, 25 Mar 2024 23:06:47 +0800 Subject: [PATCH 09/13] feat: make SQLLineageConfig context manager non-reentrant --- sqllineage/config.py | 14 ++++++++++---- tests/core/test_config.py | 7 +++++++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/sqllineage/config.py b/sqllineage/config.py index 1067b206..e1c51254 100644 --- a/sqllineage/config.py +++ b/sqllineage/config.py @@ -25,6 +25,9 @@ class _SQLLineageConfigLoader: } BOOLEAN_TRUE_STRINGS = ("true", "on", "ok", "y", "yes", "1") + def __init__(self) -> None: + self._in_context_manager = False + def __getattr__(self, item: str): if item in self.config.keys(): if value := self._thread_config.get(self.get_ident(), {}).get(item, None): @@ -70,7 +73,8 @@ def parse_value(cls, value, cast) -> Any: return value def __call__(self, *args, **kwargs): - self.__enter__() + if self.get_ident() not in self._thread_config.keys(): + self._thread_config[self.get_ident()] = {} for key, value in kwargs.items(): if key in self.config.keys(): self._thread_config[self.get_ident()][key] = self.parse_value( @@ -81,13 +85,15 @@ def __call__(self, *args, **kwargs): return self def __enter__(self): - if self.get_ident() not in self._thread_config.keys(): - self._thread_config[self.get_ident()] = {} + if self._in_context_manager: + raise ConfigException("SQLLineageConfig context manager is not reentrant") + self._in_context_manager = True return self def __exit__(self, exc_type, exc_val, exc_tb): - if self.get_ident() in self._thread_config.keys(): + if self.get_ident() in self._thread_config: self._thread_config.pop(self.get_ident()) + self._in_context_manager = False SQLLineageConfig = _SQLLineageConfigLoader() diff --git a/tests/core/test_config.py b/tests/core/test_config.py index 2a40508a..ea3494de 100644 --- a/tests/core/test_config.py +++ b/tests/core/test_config.py @@ -43,6 +43,13 @@ def test_update_config_using_context_manager(): assert SQLLineageConfig.DEFAULT_SCHEMA == "" +def test_update_config_context_manager_non_reentrant(): + with SQLLineageConfig(DEFAULT_SCHEMA="ods"): + with pytest.raises(ConfigException): + with SQLLineageConfig(DEFAULT_SCHEMA="dwd"): + pass + + schema_list = ("stg", "ods", "dwd", "dw", "dwa", "dwv") From 33bf65e09ed16b8eb952d9c96e292071e29490a1 Mon Sep 17 00:00:00 2001 From: reata Date: Tue, 26 Mar 2024 22:08:45 +0800 Subject: [PATCH 10/13] feat: disable set unknown config --- sqllineage/config.py | 7 ++++--- tests/core/test_config.py | 27 ++++++++++++++++----------- 2 files changed, 20 insertions(+), 14 deletions(-) diff --git a/sqllineage/config.py b/sqllineage/config.py index e1c51254..e47d0fe3 100644 --- a/sqllineage/config.py +++ b/sqllineage/config.py @@ -30,7 +30,9 @@ def __init__(self) -> None: def __getattr__(self, item: str): if item in self.config.keys(): - if value := self._thread_config.get(self.get_ident(), {}).get(item, None): + if ( + value := self._thread_config.get(self.get_ident(), {}).get(item) + ) is not None: return value type_, default = self.config[item] @@ -81,14 +83,13 @@ def __call__(self, *args, **kwargs): value, self.config[key][0] ) else: - super().__setattr__(key, value) + raise ConfigException(f"Invalid config key: {key}") return self def __enter__(self): if self._in_context_manager: raise ConfigException("SQLLineageConfig context manager is not reentrant") self._in_context_manager = True - return self def __exit__(self, exc_type, exc_val, exc_tb): if self.get_ident() in self._thread_config: diff --git a/tests/core/test_config.py b/tests/core/test_config.py index ea3494de..cb87f54c 100644 --- a/tests/core/test_config.py +++ b/tests/core/test_config.py @@ -34,22 +34,32 @@ def test_disable_direct_update_config(): def test_update_config_using_context_manager(): - with SQLLineageConfig(LATERAL_COLUMN_ALIAS_REFERENCE=True) as config: - assert config.LATERAL_COLUMN_ALIAS_REFERENCE is True + with SQLLineageConfig(LATERAL_COLUMN_ALIAS_REFERENCE=True): + assert SQLLineageConfig.LATERAL_COLUMN_ALIAS_REFERENCE is True assert SQLLineageConfig.LATERAL_COLUMN_ALIAS_REFERENCE is False - with SQLLineageConfig(DEFAULT_SCHEMA="ods") as config: - assert config.DEFAULT_SCHEMA == "ods" + with SQLLineageConfig(DEFAULT_SCHEMA="ods"): + assert SQLLineageConfig.DEFAULT_SCHEMA == "ods" assert SQLLineageConfig.DEFAULT_SCHEMA == "" + with SQLLineageConfig(DIRECTORY=""): + assert SQLLineageConfig.DIRECTORY == "" + assert SQLLineageConfig.DIRECTORY != "" + def test_update_config_context_manager_non_reentrant(): - with SQLLineageConfig(DEFAULT_SCHEMA="ods"): - with pytest.raises(ConfigException): + with pytest.raises(ConfigException): + with SQLLineageConfig(DEFAULT_SCHEMA="ods"): with SQLLineageConfig(DEFAULT_SCHEMA="dwd"): pass +def test_disable_update_unknown_config(): + with pytest.raises(ConfigException): + with SQLLineageConfig(UNKNOWN_KEY="value"): + pass + + schema_list = ("stg", "ods", "dwd", "dw", "dwa", "dwv") @@ -72,8 +82,3 @@ def test_config_proecess(): for work_result in p.imap_unordered(check_schema, schema_list): target, source = work_result assert target == source - - -def test_config_other(): - with SQLLineageConfig(other="xxx"): - assert SQLLineageConfig.other == "xxx" From 6f3108e0a06f4378c5e597c50c1e59eaf88837bf Mon Sep 17 00:00:00 2001 From: reata Date: Sun, 7 Apr 2024 14:49:29 +0800 Subject: [PATCH 11/13] feat: access config in parallel --- sqllineage/config.py | 15 +++++++++------ tests/core/test_config.py | 39 +++++++++++++++++---------------------- 2 files changed, 26 insertions(+), 28 deletions(-) diff --git a/sqllineage/config.py b/sqllineage/config.py index e47d0fe3..6b97c77c 100644 --- a/sqllineage/config.py +++ b/sqllineage/config.py @@ -1,6 +1,6 @@ import os import threading -from typing import Any, Dict +from typing import Any, Dict, Set from sqllineage.exceptions import ConfigException @@ -26,7 +26,7 @@ class _SQLLineageConfigLoader: BOOLEAN_TRUE_STRINGS = ("true", "on", "ok", "y", "yes", "1") def __init__(self) -> None: - self._in_context_manager = False + self._thread_in_context_manager: Set[int] = set() def __getattr__(self, item: str): if item in self.config.keys(): @@ -87,14 +87,17 @@ def __call__(self, *args, **kwargs): return self def __enter__(self): - if self._in_context_manager: + if (thread_id := self.get_ident()) not in self._thread_in_context_manager: + self._thread_in_context_manager.add(thread_id) + else: raise ConfigException("SQLLineageConfig context manager is not reentrant") - self._in_context_manager = True def __exit__(self, exc_type, exc_val, exc_tb): - if self.get_ident() in self._thread_config: + thread_id = self.get_ident() + if thread_id in self._thread_config: self._thread_config.pop(self.get_ident()) - self._in_context_manager = False + if thread_id in self._thread_in_context_manager: + self._thread_in_context_manager.remove(thread_id) SQLLineageConfig = _SQLLineageConfigLoader() diff --git a/tests/core/test_config.py b/tests/core/test_config.py index cb87f54c..9d7ed46d 100644 --- a/tests/core/test_config.py +++ b/tests/core/test_config.py @@ -1,6 +1,7 @@ import concurrent.futures import os -from multiprocessing import Pool +import random +import time from unittest.mock import patch import pytest @@ -60,25 +61,19 @@ def test_disable_update_unknown_config(): pass -schema_list = ("stg", "ods", "dwd", "dw", "dwa", "dwv") - - -def check_schema(schema: str): +def _check_schema(schema: str): + # used by test_config_parallel, must be a global function so that it can be pickled between processes with SQLLineageConfig(DEFAULT_SCHEMA=schema): - # SQLLineageConfig.DEFAULT_SCHEMA = schema - return SQLLineageConfig.DEFAULT_SCHEMA, schema - - -def test_config_threading(): - with concurrent.futures.ThreadPoolExecutor(max_workers=6) as executor: - execute_list = [executor.submit(check_schema, schema) for schema in schema_list] - for executor_work in concurrent.futures.as_completed(execute_list): - target, source = executor_work.result() - assert target == source - - -def test_config_proecess(): - with Pool(6) as p: - for work_result in p.imap_unordered(check_schema, schema_list): - target, source = work_result - assert target == source + # randomly sleep [0, 0.1) second to simulate real parsing scenario + time.sleep(random.random() * 0.1) + return SQLLineageConfig.DEFAULT_SCHEMA + + +@pytest.mark.parametrize("pool", ["ThreadPoolExecutor", "ProcessPoolExecutor"]) +def test_config_parallel(pool: str): + executor_class = getattr(concurrent.futures, pool) + schemas = [f"db{i}" for i in range(100)] + with executor_class() as executor: + futures = [executor.submit(_check_schema, schema) for schema in schemas] + for i, future in enumerate(futures): + assert future.result() == schemas[i] From 687f7da08bea1c7ef2160fa95fb3ad62f65bd406 Mon Sep 17 00:00:00 2001 From: reata Date: Sun, 7 Apr 2024 15:31:37 +0800 Subject: [PATCH 12/13] chore: disable A005 for module name builtin conflict --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index d4163831..f8f73461 100644 --- a/tox.ini +++ b/tox.ini @@ -14,7 +14,7 @@ commands = [flake8] exclude = .tox,.git,__pycache__,build,sqllineagejs,venv,env max-line-length = 120 -# ignore = D100,D101 +ignore = A005,W503 show-source = true enable-extensions=G application-import-names = sqllineage From 74d2258f0fcb5b36202d5eff93a69139ee710331 Mon Sep 17 00:00:00 2001 From: reata Date: Sun, 7 Apr 2024 16:05:00 +0800 Subject: [PATCH 13/13] refactor: classmethod to staticmethod --- sqllineage/config.py | 47 ++++++++++++++++++++------------------------ 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/sqllineage/config.py b/sqllineage/config.py index 6b97c77c..b4d83e01 100644 --- a/sqllineage/config.py +++ b/sqllineage/config.py @@ -10,8 +10,6 @@ class _SQLLineageConfigLoader: Load all configurable items from environment variable, otherwise fallback to default """ - _thread_config: Dict[int, Dict[str, Any]] = {} - # inspired by https://github.com/joke2k/django-environ config = { # for frontend directory drawer @@ -23,9 +21,9 @@ class _SQLLineageConfigLoader: # lateral column alias reference supported by some dialect (redshift, spark 3.4+, etc) "LATERAL_COLUMN_ALIAS_REFERENCE": (bool, False), } - BOOLEAN_TRUE_STRINGS = ("true", "on", "ok", "y", "yes", "1") def __init__(self) -> None: + self._thread_config: Dict[int, Dict[str, Any]] = {} self._thread_in_context_manager: Set[int] = set() def __getattr__(self, item: str): @@ -51,29 +49,6 @@ def __setattr__(self, key, value) -> None: else: super().__setattr__(key, value) - @classmethod - def get_ident(cls) -> int: - return threading.get_ident() - - @classmethod - def parse_value(cls, value, cast) -> Any: - """Parse and cast provided value - - :param value: Stringed value. - :param cast: Type to cast return value as. - - :returns: Casted value - """ - if cast is bool: - try: - value = int(value) != 0 - except ValueError: - value = value.lower().strip() in cls.BOOLEAN_TRUE_STRINGS - else: - value = cast(value) - - return value - def __call__(self, *args, **kwargs): if self.get_ident() not in self._thread_config.keys(): self._thread_config[self.get_ident()] = {} @@ -99,5 +74,25 @@ def __exit__(self, exc_type, exc_val, exc_tb): if thread_id in self._thread_in_context_manager: self._thread_in_context_manager.remove(thread_id) + @staticmethod + def get_ident() -> int: + return threading.get_ident() + + @staticmethod + def parse_value(value, cast) -> Any: + """Parse and cast provided value + :param value: Stringed value. + :param cast: Type to cast return value as. + :returns: cast value + """ + if cast is bool: + try: + value = int(value) != 0 + except ValueError: + value = value.lower().strip() in ("true", "on", "ok", "y", "yes", "1") + else: + value = cast(value) + return value + SQLLineageConfig = _SQLLineageConfigLoader()