5
5
from typing_extensions import ParamSpec
6
6
from textwrap import indent , dedent
7
7
from dataclasses import dataclass , field
8
+ from importlib .abc import SourceLoader
9
+ import importlib
8
10
9
11
T = TypeVar ("T" )
10
12
P = ParamSpec ("P" )
11
13
12
14
13
- def _to_code (func : Union [FunctionWithRequirements [T , P ], Callable [P , T ]]) -> str :
15
+ def _to_code (func : Union [FunctionWithRequirements [T , P ], Callable [P , T ], FunctionWithRequirementsStr ]) -> str :
16
+ if isinstance (func , FunctionWithRequirementsStr ):
17
+ return func .func
18
+
14
19
code = inspect .getsource (func )
15
20
# Strip the decorator
16
21
if code .startswith ("@" ):
@@ -50,6 +55,57 @@ def to_str(i: Union[str, Alias]) -> str:
50
55
return f"from { im .module } import { imports } "
51
56
52
57
58
+ class _StringLoader (SourceLoader ):
59
+ def __init__ (self , data : str ):
60
+ self .data = data
61
+
62
+ def get_source (self , fullname : str ) -> str :
63
+ return self .data
64
+
65
+ def get_data (self , path : str ) -> bytes :
66
+ return self .data .encode ("utf-8" )
67
+
68
+ def get_filename (self , fullname : str ) -> str :
69
+ return "<not a real path>/" + fullname + ".py"
70
+
71
+
72
+ @dataclass
73
+ class FunctionWithRequirementsStr :
74
+ func : str
75
+ _compiled_func : Callable [..., Any ]
76
+ _func_name : str
77
+ python_packages : List [str ] = field (default_factory = list )
78
+ global_imports : List [Import ] = field (default_factory = list )
79
+
80
+ def __init__ (self , func : str , python_packages : List [str ] = [], global_imports : List [Import ] = []):
81
+ self .func = func
82
+ self .python_packages = python_packages
83
+ self .global_imports = global_imports
84
+
85
+ module_name = "func_module"
86
+ loader = _StringLoader (func )
87
+ spec = importlib .util .spec_from_loader (module_name , loader )
88
+ if spec is None :
89
+ raise ValueError ("Could not create spec" )
90
+ module = importlib .util .module_from_spec (spec )
91
+ if spec .loader is None :
92
+ raise ValueError ("Could not create loader" )
93
+
94
+ try :
95
+ spec .loader .exec_module (module )
96
+ except Exception as e :
97
+ raise ValueError (f"Could not compile function: { e } " ) from e
98
+
99
+ functions = inspect .getmembers (module , inspect .isfunction )
100
+ if len (functions ) != 1 :
101
+ raise ValueError ("The string must contain exactly one function" )
102
+
103
+ self ._func_name , self ._compiled_func = functions [0 ]
104
+
105
+ def __call__ (self , * args : Any , ** kwargs : Any ) -> None :
106
+ raise NotImplementedError ("String based function with requirement objects are not directly callable" )
107
+
108
+
53
109
@dataclass
54
110
class FunctionWithRequirements (Generic [T , P ]):
55
111
func : Callable [P , T ]
@@ -62,6 +118,12 @@ def from_callable(
62
118
) -> FunctionWithRequirements [T , P ]:
63
119
return cls (python_packages = python_packages , global_imports = global_imports , func = func )
64
120
121
+ @staticmethod
122
+ def from_str (
123
+ func : str , python_packages : List [str ] = [], global_imports : List [Import ] = []
124
+ ) -> FunctionWithRequirementsStr :
125
+ return FunctionWithRequirementsStr (func = func , python_packages = python_packages , global_imports = global_imports )
126
+
65
127
# Type this based on F
66
128
def __call__ (self , * args : P .args , ** kwargs : P .kwargs ) -> T :
67
129
return self .func (* args , ** kwargs )
@@ -91,11 +153,13 @@ def wrapper(func: Callable[P, T]) -> FunctionWithRequirements[T, P]:
91
153
return wrapper
92
154
93
155
94
- def _build_python_functions_file (funcs : List [Union [FunctionWithRequirements [Any , P ], Callable [..., Any ]]]) -> str :
156
+ def _build_python_functions_file (
157
+ funcs : List [Union [FunctionWithRequirements [Any , P ], Callable [..., Any ], FunctionWithRequirementsStr ]]
158
+ ) -> str :
95
159
# First collect all global imports
96
160
global_imports = set ()
97
161
for func in funcs :
98
- if isinstance (func , FunctionWithRequirements ):
162
+ if isinstance (func , ( FunctionWithRequirements , FunctionWithRequirementsStr ) ):
99
163
global_imports .update (func .global_imports )
100
164
101
165
content = "\n " .join (map (_import_to_str , global_imports )) + "\n \n "
@@ -106,7 +170,7 @@ def _build_python_functions_file(funcs: List[Union[FunctionWithRequirements[Any,
106
170
return content
107
171
108
172
109
- def to_stub (func : Callable [..., Any ]) -> str :
173
+ def to_stub (func : Union [ Callable [..., Any ], FunctionWithRequirementsStr ]) -> str :
110
174
"""Generate a stub for a function as a string
111
175
112
176
Args:
@@ -115,6 +179,9 @@ def to_stub(func: Callable[..., Any]) -> str:
115
179
Returns:
116
180
str: The stub for the function
117
181
"""
182
+ if isinstance (func , FunctionWithRequirementsStr ):
183
+ return to_stub (func ._compiled_func )
184
+
118
185
content = f"def { func .__name__ } { inspect .signature (func )} :\n "
119
186
docstring = func .__doc__
120
187
0 commit comments