11
11
from .markdown_code_extractor import MarkdownCodeExtractor
12
12
13
13
__all__ = (
14
- "LocalCommandlineCodeExecutor " ,
15
- "CommandlineCodeResult " ,
14
+ "LocalCommandLineCodeExecutor " ,
15
+ "CommandLineCodeResult " ,
16
16
)
17
17
18
18
19
- class CommandlineCodeResult (CodeResult ):
19
+ class CommandLineCodeResult (CodeResult ):
20
20
"""(Experimental) A code result class for command line code executor."""
21
21
22
22
code_file : Optional [str ] = Field (
@@ -25,7 +25,7 @@ class CommandlineCodeResult(CodeResult):
25
25
)
26
26
27
27
28
- class LocalCommandlineCodeExecutor (BaseModel ):
28
+ class LocalCommandLineCodeExecutor (BaseModel ):
29
29
"""(Experimental) A code executor class that executes code through a local command line
30
30
environment.
31
31
@@ -49,7 +49,7 @@ class LocalCommandlineCodeExecutor(BaseModel):
49
49
directory is the current directory ".".
50
50
system_message_update (str): The system message update for agent that
51
51
produces code to run on this executor.
52
- Default is `LocalCommandlineCodeExecutor .DEFAULT_SYSTEM_MESSAGE_UPDATE`.
52
+ Default is `LocalCommandLineCodeExecutor .DEFAULT_SYSTEM_MESSAGE_UPDATE`.
53
53
"""
54
54
55
55
DEFAULT_SYSTEM_MESSAGE_UPDATE : ClassVar [
@@ -92,10 +92,10 @@ def _check_work_dir(cls, v: str) -> str:
92
92
raise ValueError (f"Working directory { v } does not exist." )
93
93
94
94
@property
95
- def user_capability (self ) -> "LocalCommandlineCodeExecutor .UserCapability" :
95
+ def user_capability (self ) -> "LocalCommandLineCodeExecutor .UserCapability" :
96
96
"""Export a user capability for this executor that can be added to
97
97
an agent that produces code to be executed by this executor."""
98
- return LocalCommandlineCodeExecutor .UserCapability (self .system_message_update )
98
+ return LocalCommandLineCodeExecutor .UserCapability (self .system_message_update )
99
99
100
100
@property
101
101
def code_extractor (self ) -> CodeExtractor :
@@ -124,19 +124,19 @@ def sanitize_command(lang: str, code: str) -> None:
124
124
if re .search (pattern , code ):
125
125
raise ValueError (f"Potentially dangerous command detected: { message } " )
126
126
127
- def execute_code_blocks (self , code_blocks : List [CodeBlock ]) -> CommandlineCodeResult :
127
+ def execute_code_blocks (self , code_blocks : List [CodeBlock ]) -> CommandLineCodeResult :
128
128
"""(Experimental) Execute the code blocks and return the result.
129
129
130
130
Args:
131
131
code_blocks (List[CodeBlock]): The code blocks to execute.
132
132
133
133
Returns:
134
- CommandlineCodeResult : The result of the code execution."""
134
+ CommandLineCodeResult : The result of the code execution."""
135
135
logs_all = ""
136
136
for i , code_block in enumerate (code_blocks ):
137
137
lang , code = code_block .language , code_block .code
138
138
139
- LocalCommandlineCodeExecutor .sanitize_command (lang , code )
139
+ LocalCommandLineCodeExecutor .sanitize_command (lang , code )
140
140
filename_uuid = uuid .uuid4 ().hex
141
141
filename = None
142
142
if lang in ["bash" , "shell" , "sh" , "pwsh" , "powershell" , "ps1" ]:
@@ -166,8 +166,75 @@ def execute_code_blocks(self, code_blocks: List[CodeBlock]) -> CommandlineCodeRe
166
166
if exitcode != 0 :
167
167
break
168
168
code_filename = os .path .join (self .work_dir , filename ) if filename is not None else None
169
- return CommandlineCodeResult (exit_code = exitcode , output = logs_all , code_file = code_filename )
169
+ return CommandLineCodeResult (exit_code = exitcode , output = logs_all , code_file = code_filename )
170
170
171
171
def restart (self ) -> None :
172
172
"""(Experimental) Restart the code executor."""
173
173
warnings .warn ("Restarting local command line code executor is not supported. No action is taken." )
174
+
175
+
176
+ # From stack overflow: https://stackoverflow.com/a/52087847/2214524
177
+ class _DeprecatedClassMeta (type ):
178
+ def __new__ (cls , name , bases , classdict , * args , ** kwargs ):
179
+ alias = classdict .get ("_DeprecatedClassMeta__alias" )
180
+
181
+ if alias is not None :
182
+
183
+ def new (cls , * args , ** kwargs ):
184
+ alias = getattr (cls , "_DeprecatedClassMeta__alias" )
185
+
186
+ if alias is not None :
187
+ warnings .warn (
188
+ "{} has been renamed to {}, the alias will be "
189
+ "removed in the future" .format (cls .__name__ , alias .__name__ ),
190
+ DeprecationWarning ,
191
+ stacklevel = 2 ,
192
+ )
193
+
194
+ return alias (* args , ** kwargs )
195
+
196
+ classdict ["__new__" ] = new
197
+ classdict ["_DeprecatedClassMeta__alias" ] = alias
198
+
199
+ fixed_bases = []
200
+
201
+ for b in bases :
202
+ alias = getattr (b , "_DeprecatedClassMeta__alias" , None )
203
+
204
+ if alias is not None :
205
+ warnings .warn (
206
+ "{} has been renamed to {}, the alias will be "
207
+ "removed in the future" .format (b .__name__ , alias .__name__ ),
208
+ DeprecationWarning ,
209
+ stacklevel = 2 ,
210
+ )
211
+
212
+ # Avoid duplicate base classes.
213
+ b = alias or b
214
+ if b not in fixed_bases :
215
+ fixed_bases .append (b )
216
+
217
+ fixed_bases = tuple (fixed_bases )
218
+
219
+ return super ().__new__ (cls , name , fixed_bases , classdict , * args , ** kwargs )
220
+
221
+ def __instancecheck__ (cls , instance ):
222
+ return any (cls .__subclasscheck__ (c ) for c in {type (instance ), instance .__class__ })
223
+
224
+ def __subclasscheck__ (cls , subclass ):
225
+ if subclass is cls :
226
+ return True
227
+ else :
228
+ return issubclass (subclass , getattr (cls , "_DeprecatedClassMeta__alias" ))
229
+
230
+
231
+ class LocalCommandlineCodeExecutor (metaclass = _DeprecatedClassMeta ):
232
+ """LocalCommandlineCodeExecutor renamed to LocalCommandLineCodeExecutor"""
233
+
234
+ _DeprecatedClassMeta__alias = LocalCommandLineCodeExecutor
235
+
236
+
237
+ class CommandlineCodeResult (metaclass = _DeprecatedClassMeta ):
238
+ """CommandlineCodeResult renamed to CommandLineCodeResult"""
239
+
240
+ _DeprecatedClassMeta__alias = CommandLineCodeResult
0 commit comments