Skip to content

Commit f1972e0

Browse files
delphisharpliuzhoumaoxingdareata
authored
fix: Set param from config (#553)
* fix: Set param from config #545 redo rebase and clean submit * fix: misidentify column name as lateral alias (#540) * fix: misidentify-column-name-as-alias (#539) * add LATERAL_COLUMN_ALIAS_REFERENCE in SQLLineageConfig * adjust import order * add test_column_top_level_enable_lateral_ref_with_metadata_from_nested_subquery * unknown * refactor: rebase master and convert LATERAL_COLUMN_ALIAS_REFERENCE to bool type * refactor: use as few condition as possible: SQLLineageConfig.LATERAL_COLUMN_ALIAS_REFERENCE * refactor: rebase master and resolve conflict * refactor: move logic from to_source_columns to end_of_query_cleanup * refactor: rebase master and fix black format * docs: LATERAL_COLUMN_ALIAS_REFERENCE how-to guide * docs: starting version for each config --------- Co-authored-by: reata <[email protected]> * feat:SQLLineageConfig supports set value and thread safety * fix: Fix mypy error * fix: Fix pytest cov * fix: Fix the scenario of direct assignment without using with. Add the test of multi-process scenario. * fix: add SQLLineageConfigLoader set function * feat: disable setattr for SQLLineageConfig * feat: make SQLLineageConfig context manager non-reentrant * feat: disable set unknown config * feat: access config in parallel * chore: disable A005 for module name builtin conflict * refactor: classmethod to staticmethod --------- Co-authored-by: liuzhou <[email protected]> Co-authored-by: maoxd <[email protected]> Co-authored-by: reata <[email protected]>
1 parent 3574f2d commit f1972e0

File tree

4 files changed

+117
-11
lines changed

4 files changed

+117
-11
lines changed

sqllineage/config.py

+56-10
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
import os
2+
import threading
3+
from typing import Any, Dict, Set
4+
5+
from sqllineage.exceptions import ConfigException
26

37

48
class _SQLLineageConfigLoader:
@@ -17,10 +21,18 @@ class _SQLLineageConfigLoader:
1721
# lateral column alias reference supported by some dialect (redshift, spark 3.4+, etc)
1822
"LATERAL_COLUMN_ALIAS_REFERENCE": (bool, False),
1923
}
20-
BOOLEAN_TRUE_STRINGS = ("true", "on", "ok", "y", "yes", "1")
2124

22-
def __getattr__(self, item):
23-
if item in self.config:
25+
def __init__(self) -> None:
26+
self._thread_config: Dict[int, Dict[str, Any]] = {}
27+
self._thread_in_context_manager: Set[int] = set()
28+
29+
def __getattr__(self, item: str):
30+
if item in self.config.keys():
31+
if (
32+
value := self._thread_config.get(self.get_ident(), {}).get(item)
33+
) is not None:
34+
return value
35+
2436
type_, default = self.config[item]
2537
# require SQLLINEAGE_ prefix from environment variable
2638
return self.parse_value(
@@ -29,23 +41,57 @@ def __getattr__(self, item):
2941
else:
3042
return super().__getattribute__(item)
3143

32-
@classmethod
33-
def parse_value(cls, value, cast):
34-
"""Parse and cast provided value
44+
def __setattr__(self, key, value) -> None:
45+
if key in self.config:
46+
raise ConfigException(
47+
"SQLLineageConfig is read-only. Use context manager to update thread level config."
48+
)
49+
else:
50+
super().__setattr__(key, value)
51+
52+
def __call__(self, *args, **kwargs):
53+
if self.get_ident() not in self._thread_config.keys():
54+
self._thread_config[self.get_ident()] = {}
55+
for key, value in kwargs.items():
56+
if key in self.config.keys():
57+
self._thread_config[self.get_ident()][key] = self.parse_value(
58+
value, self.config[key][0]
59+
)
60+
else:
61+
raise ConfigException(f"Invalid config key: {key}")
62+
return self
3563

64+
def __enter__(self):
65+
if (thread_id := self.get_ident()) not in self._thread_in_context_manager:
66+
self._thread_in_context_manager.add(thread_id)
67+
else:
68+
raise ConfigException("SQLLineageConfig context manager is not reentrant")
69+
70+
def __exit__(self, exc_type, exc_val, exc_tb):
71+
thread_id = self.get_ident()
72+
if thread_id in self._thread_config:
73+
self._thread_config.pop(self.get_ident())
74+
if thread_id in self._thread_in_context_manager:
75+
self._thread_in_context_manager.remove(thread_id)
76+
77+
@staticmethod
78+
def get_ident() -> int:
79+
return threading.get_ident()
80+
81+
@staticmethod
82+
def parse_value(value, cast) -> Any:
83+
"""Parse and cast provided value
3684
:param value: Stringed value.
3785
:param cast: Type to cast return value as.
38-
39-
:returns: Casted value
86+
:returns: cast value
4087
"""
4188
if cast is bool:
4289
try:
4390
value = int(value) != 0
4491
except ValueError:
45-
value = value.lower().strip() in cls.BOOLEAN_TRUE_STRINGS
92+
value = value.lower().strip() in ("true", "on", "ok", "y", "yes", "1")
4693
else:
4794
value = cast(value)
48-
4995
return value
5096

5197

sqllineage/exceptions.py

+4
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,7 @@ class InvalidSyntaxException(SQLLineageException):
1212

1313
class MetaDataProviderException(SQLLineageException):
1414
"""Raised for MetaDataProvider errors"""
15+
16+
17+
class ConfigException(SQLLineageException):
18+
"""Raised for configuration errors"""

tests/core/test_config.py

+56
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
1+
import concurrent.futures
12
import os
3+
import random
4+
import time
25
from unittest.mock import patch
36

7+
import pytest
8+
49
from sqllineage.config import SQLLineageConfig
10+
from sqllineage.exceptions import ConfigException
511

612

713
@patch(
@@ -21,3 +27,53 @@ def test_config():
2127

2228
assert type(SQLLineageConfig.TSQL_NO_SEMICOLON) is bool
2329
assert SQLLineageConfig.TSQL_NO_SEMICOLON is True
30+
31+
32+
def test_disable_direct_update_config():
33+
with pytest.raises(ConfigException):
34+
SQLLineageConfig.DEFAULT_SCHEMA = "ods"
35+
36+
37+
def test_update_config_using_context_manager():
38+
with SQLLineageConfig(LATERAL_COLUMN_ALIAS_REFERENCE=True):
39+
assert SQLLineageConfig.LATERAL_COLUMN_ALIAS_REFERENCE is True
40+
assert SQLLineageConfig.LATERAL_COLUMN_ALIAS_REFERENCE is False
41+
42+
with SQLLineageConfig(DEFAULT_SCHEMA="ods"):
43+
assert SQLLineageConfig.DEFAULT_SCHEMA == "ods"
44+
assert SQLLineageConfig.DEFAULT_SCHEMA == ""
45+
46+
with SQLLineageConfig(DIRECTORY=""):
47+
assert SQLLineageConfig.DIRECTORY == ""
48+
assert SQLLineageConfig.DIRECTORY != ""
49+
50+
51+
def test_update_config_context_manager_non_reentrant():
52+
with pytest.raises(ConfigException):
53+
with SQLLineageConfig(DEFAULT_SCHEMA="ods"):
54+
with SQLLineageConfig(DEFAULT_SCHEMA="dwd"):
55+
pass
56+
57+
58+
def test_disable_update_unknown_config():
59+
with pytest.raises(ConfigException):
60+
with SQLLineageConfig(UNKNOWN_KEY="value"):
61+
pass
62+
63+
64+
def _check_schema(schema: str):
65+
# used by test_config_parallel, must be a global function so that it can be pickled between processes
66+
with SQLLineageConfig(DEFAULT_SCHEMA=schema):
67+
# randomly sleep [0, 0.1) second to simulate real parsing scenario
68+
time.sleep(random.random() * 0.1)
69+
return SQLLineageConfig.DEFAULT_SCHEMA
70+
71+
72+
@pytest.mark.parametrize("pool", ["ThreadPoolExecutor", "ProcessPoolExecutor"])
73+
def test_config_parallel(pool: str):
74+
executor_class = getattr(concurrent.futures, pool)
75+
schemas = [f"db{i}" for i in range(100)]
76+
with executor_class() as executor:
77+
futures = [executor.submit(_check_schema, schema) for schema in schemas]
78+
for i, future in enumerate(futures):
79+
assert future.result() == schemas[i]

tox.ini

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ commands =
1414
[flake8]
1515
exclude = .tox,.git,__pycache__,build,sqllineagejs,venv,env
1616
max-line-length = 120
17-
# ignore = D100,D101
17+
ignore = A005,W503
1818
show-source = true
1919
enable-extensions=G
2020
application-import-names = sqllineage

0 commit comments

Comments
 (0)