-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathagent_operator.py
501 lines (390 loc) · 19.2 KB
/
agent_operator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
import re
import time
import json
import os
import threading
import logging
from datetime import datetime
from shortcode import handle_document_short_code
from gpt import GPTModel
# logging.basicConfig(level=logging.DEBUG)
domain_name = os.environ.get('DOMAIN_NAME', 'semantic-life.com')
def format_datetime_for_email():
return datetime.now().strftime('%a, %b %d, %Y at %I:%M %p')
def format_note(agent_name, email=f"agent@{domain_name}", timestamp=None):
if not timestamp:
timestamp = format_datetime_for_email()
return f'On {timestamp} {agent_name} <{email}> wrote:'
def load_instructions(filename='instructions.json'):
with open(filename, 'r') as file:
return json.load(file)
class AgentSelector:
def __init__(self, max_agents=12):
self.lock = threading.Lock()
self.openai_api_key = os.environ['OPENAI_API_KEY']
self.max_agents = max_agents
self.conversation_structure = {}
self.conversation_history = ""
self.invoked_agents = {}
self.last_agent_response = ""
self.instructions = load_instructions()
self.gpt = GPTModel()
# UTILITIES
def reset_for_new_thread(self):
self.invoked_agents.clear()
self.conversation_structure = {}
self.conversation_history = ""
@staticmethod
def safe_ascii_string(s):
return ''.join(c if ord(c) < 128 else '?' for c in s)
def save_temp_agents(self):
formatted_agents_list = []
for agent_id, agent_profile in self.invoked_agents.items():
formatted_agent = {
"id": agent_id,
"email": f"agent@{domain_name}",
"persona": agent_profile.get("description", "")
}
formatted_agents_list.append(formatted_agent)
print(
f"Debug: Saving the following agents to file: {formatted_agents_list}")
try:
# Read existing data from the file first
existing_data = []
if os.path.exists("agents/temp_agents.json"):
with open("agents/temp_agents.json", "r") as f:
existing_data = json.load(f)
# Make sure existing_data is a list
if not isinstance(existing_data, list):
existing_data = []
# Update existing data with new agents
existing_data.extend(
formatted_agents_list) # Using extend to append the list
# Write the updated data back to the file
with open("agents/temp_agents.json", "w") as f:
json.dump(existing_data, f, indent=4)
print("Debug: Successfully saved to temped agents.json")
except Exception as e:
print(f"Debug: Failed to save to temped agents.json, Error: {e}")
# GET AGENTS FROM EMAIL ADDRESSES AND CONTENT
def get_agent_names_from_content_and_emails(self, content, recipient_emails,
agent_loader, gpt):
agent_queue = []
atat_agent_queue = []
overall_order = 1
agents_to_remove = set()
# Get agents from recipient emails
for email in recipient_emails:
agent = agent_loader.get_agent_by_email(email)
if agent:
agent_queue.append((agent["id"], overall_order))
overall_order += 1
# Search for @@.creator and @@ tags in the content
regex_pattern = re.compile(r"@@\.creator\((.*?)\)|@@\(([\w\d_]+)\)",
re.DOTALL)
atat_tags = regex_pattern.findall(content)
for atat_creator_match, atat_match in atat_tags:
if atat_creator_match:
agent_description = atat_creator_match
generated_profile = gpt.generate_agent_profile(agent_description)
unique_id = f"GeneratedAgent_{hash(agent_description)}"
if unique_id not in self.invoked_agents:
self.invoked_agents[unique_id] = generated_profile
agent_loader.agents[unique_id] = generated_profile
atat_agent_queue.append((unique_id, overall_order))
elif atat_match:
agent = agent_loader.get_agent(atat_match, case_sensitive=False)
if agent and (atat_match, overall_order) not in agent_queue:
atat_agent_queue.append((atat_match, overall_order))
overall_order += 1
# Merge and filter the agent queue
agent_queue.extend(atat_agent_queue)
# Process explicit tags in the content
explicit_tags = regex_pattern.findall(content)
explicit_tags = [
tag for sublist in explicit_tags for tag in sublist if tag
]
# Identify agents to remove from the queue
for tag in explicit_tags:
if tag.startswith("no."):
agents_to_remove.add(tag[3:])
elif tag not in agents_to_remove and tag in self.invoked_agents:
# Add existing agents to the queue from explicit tags
agent_queue.append((tag, overall_order))
overall_order += 1
# Final filtering and sorting of the agent queue
agent_queue = [(agent_name, order) for agent_name, order in agent_queue
if agent_name not in agents_to_remove]
agent_queue = sorted(agent_queue, key=lambda x: x[1])[:self.max_agents]
print(f"Debug: Full agent queue: {agent_queue}")
return agent_queue
def extract_relationships(self, agent_loader, agent_name):
def dump_relationships(data):
# Base case: if data is not a dictionary or list, return it as a string
if not isinstance(data, (dict, list)):
return str(data)
if isinstance(data, list):
# Process each item in the list recursively and join with ", "
return '[ ' + ', '.join(dump_relationships(item) for item in data) + ' ]'
if isinstance(data, dict):
# Process each key-value pair in the dictionary
items = []
for key, value in data.items():
# Recursively process the value
dumped_value = dump_relationships(value)
items.append(f'"{key}": {dumped_value}')
return '{ ' + ', '.join(items) + ' }'
# Assuming agent_loader can access the agents.json data
agent_data = agent_loader.get_agent(agent_name)
relationships_data = agent_data.get("relationships", [])
# Use the recursive function to dump the relationships
dumped_relationships = dump_relationships(relationships_data)
return dumped_relationships
# CREATE PROMPT FOR AGENTS
def create_dynamic_prompt(self,
agent_loader,
agent_name,
order,
total_order,
structured_response=None,
modality=None,
content=None):
order_explanation = ", ".join([
f"('{resp[0]}', {resp[1]})"
for resp in self.conversation_structure.get("responses", [])
])
# Order and Persona Context
order_context = f"You are role-playing as the {agent_name}. This is response {order} in a conversation with {total_order} interactions. The agent sequence is: [{order_explanation}]."
print(f"Debug: create_dynamic_prompt called with agent_name: {agent_name}")
# Main Instructions
instructions = self.instructions['default']['main_instructions']
# Structured Response
if structured_response:
instructions += self.instructions['default'][
'structured_response_guidelines']
instructions += f"\n\n=== STRUCTURED RESPONSE GUIDELINES ===\n{structured_response}\n=== END OF GUIDELINES ==="
# Check for modality-specific instructions
if modality and modality in self.instructions['summarize']:
instructions = self.instructions['summarize'][modality]
if 'additional_context_chunk' in self.instructions['summarize']:
additional_context_chunk = self.instructions['summarize'][
'additional_context_chunk'].format(part_number=1, total_parts=1)
instructions += " " + additional_context_chunk
# Creating the dynamic prompt in the specified order
dynamic_prompt = f"{order_context}"
# Inserting the email content
if content:
dynamic_prompt += f"YOU ARE ON AN EMAIL THREAD. YOU ONLY RESPOND AS THE APPROPRIATE AGENT. THE EMAIL YOU NEED TO RESPOND TO IS AS FOLLOWS: '''{content}'''."
other_agent_names = [
name for name, _ in self.conversation_structure.get("responses", [])
if name != agent_name
]
other_agent_roles = ", ".join(
[agent_loader.get_agent_persona(name) for name in other_agent_names])
persona = agent_loader.get_agent_persona(agent_name)
relationships = self.extract_relationships(agent_loader, agent_name)
other_agent_names = [
name for name, _ in self.conversation_structure.get("responses", [])
if name != agent_name
]
other_agent_roles = ", ".join(
[agent_loader.get_agent_persona(name) for name in other_agent_names]
)
explicit_role_context = (
f"You are NOT {other_agent_roles}. You ARE ROLE PLAYING AS {agent_name}. "
f"YOU ARE HIGHLY INFLUENCED BY YOUR SOCIAL AND PROFESSIONAL MOTIVATIONS, "
f"CORRELATE TO THE PEOPLE IN YOUR LIFE. YOUR PERSONA DETAILS ARE: '{persona}' "
f"YOUR RELATIONSHIPS THAT MOST INFLUENCE YOU ARE: '{relationships}. NOW ROLE PLAY AS THIS AGENT."
)
dynamic_prompt += f" {explicit_role_context}. Role play as this agent."
print(f"{dynamic_prompt}")
return dynamic_prompt
# FORMATTING
def replace_agent_shortcodes(self, content):
"""
Replaces @@(agent_name) shortcodes with the agent's name.
"""
return re.sub(r"@@\((\w+)\)", r"\1", content)
def format_conversation_history_html(self, agent_responses, exclude_recent=1, existing_history=None):
formatted_history = existing_history or ""
for agent_name, agent_email, email_content in reversed(agent_responses[:-exclude_recent]):
timestamp = format_datetime_for_email()
gmail_note = format_note(agent_name, email=agent_email, timestamp=timestamp)
formatted_history += f"{gmail_note}<blockquote>{email_content}</blockquote>"
# Processing the most recent response
if agent_responses and exclude_recent > 0:
recent_agent_name, recent_agent_email, recent_email_content = agent_responses[-exclude_recent]
recent_timestamp = format_datetime_for_email()
recent_gmail_note = format_note(recent_agent_name, email=recent_agent_email, timestamp=recent_timestamp)
formatted_history += f"{recent_gmail_note}<blockquote>{recent_email_content}</blockquote>"
# Wrap the entire history in a single 'gmail_quote' div
formatted_history = f'<div>{formatted_history}</div>'
return formatted_history
def format_conversation_history_plain(self,
agent_responses,
exclude_recent=1,
existing_history=None):
formatted_plain_history = existing_history or ""
quote_level = 1
# Process all but the most recent response for nested history
for agent_name, agent_email, email_content in reversed(agent_responses[:-exclude_recent]):
timestamp = format_datetime_for_email()
gmail_note = format_note(agent_name, email=agent_email, timestamp=timestamp)
quoted_content = "\n".join(
[">" * quote_level + line if line.strip() else "" for line in email_content.split('\n')]
).strip() # Remove leading and trailing whitespace and blank lines
formatted_plain_history = f"{gmail_note}\n{quoted_content}\n\n{formatted_plain_history}"
quote_level += 1 # Increment the quote level for the next message
# Process the most recent response
if agent_responses and exclude_recent > 0:
recent_agent_name, recent_agent_email, recent_email_content = agent_responses[-exclude_recent]
recent_timestamp = format_datetime_for_email()
recent_gmail_note = format_note(recent_agent_name, email=recent_agent_email, timestamp=recent_timestamp)
quoted_recent_content = "\n".join(
[">" + line if line.strip() else "" for line in recent_email_content.split('\n')]
).strip() # Remove leading and trailing whitespace and blank lines
formatted_plain_history = f"{recent_gmail_note}\n{quoted_recent_content}\n\n{formatted_plain_history}"
return formatted_plain_history.strip() # Remove any extra newlines at the beginning and end
def get_response_for_agent(self,
agent_loader,
gpt,
agent_name,
order,
total_order,
content,
additional_context=None):
# Count tokens before the API call
tokens_for_this_request = gpt.count_tokens(content)
# Check rate limits
gpt.check_rate_limit(tokens_for_this_request)
custom_instruction_for_detail = self.instructions['default'][
'custom_instruction_for_detail']
content = self.replace_agent_shortcodes(content)
timestamp = format_datetime_for_email()
modality = 'default'
with self.lock:
if "!previousResponse" in content:
content = content.replace('!previousResponse',
self.last_agent_response)
content = content.replace('!useLastResponse', '').strip()
responses = []
dynamic_prompt = ""
agent = agent_loader.get_agent(agent_name, case_sensitive=False)
if not agent:
logging.warning(f"No agent found for name {agent_name}. Skipping...")
return ""
result = handle_document_short_code(content, self.openai_api_key,
self.conversation_history)
if result is None:
print(
"Error: agent_operator - handle_document_short_code returned None."
)
return False
structured_response = result.get('structured_response')
new_content = result.get('new_content')
if result['type'] == 'pro':
descriptions = result.get('content', [])
for desc in descriptions:
generated_profile = self.gpt.generate_agent_profile(desc)
# Generate a unique key for each generated agent, based on the description
unique_key = f"GeneratedAgent_{hash(desc)}"
self.invoked_agents[unique_key] = generated_profile
print("Debug: About to save invoked agents:", self.invoked_agents)
self.save_temp_agents()
# Handle Summarize Type
if result['type'] == 'summarize':
modality = result.get('modality', 'default')
additional_context = self.instructions['summarize'].get(
modality, self.instructions['summarize']['default'])
chunks = result.get('content', [])
self.conversation_history = self.conversation_history[-16000:]
for idx, chunk in enumerate(chunks):
dynamic_prompt = self.create_dynamic_prompt(agent_loader,
agent_name,
order,
total_order,
structured_response,
modality=modality,
content=chunk)
response = gpt.generate_response(dynamic_prompt,
chunk,
self.conversation_history,
is_summarize=False)
responses.append(response)
else:
pass
formatted_response = self.format_conversation_history_html(
[(agent_name, agent["email"], response)], exclude_recent=0)
# Update conversation history after each agent's response
self.conversation_history += f"\n{agent_name} said: {formatted_response}"
# Handle Detail Type
elif result['type'] == 'detail':
chunks = result.get('content', [])
#truncates conversation history to 100,000 characters - should be token count not characters
self.conversation_history = self.conversation_history[-100000:]
responses = []
agent_responses = []
for idx, chunk in enumerate(chunks):
dynamic_prompt = self.create_dynamic_prompt(agent_loader,
agent_name,
order,
total_order,
additional_context,
modality=modality)
# Add custom instruction to the dynamic prompt
dynamic_prompt += f" {custom_instruction_for_detail}"
response = gpt.generate_response(dynamic_prompt,
chunk,
self.conversation_history,
is_summarize=False)
responses.append(response)
agent_responses.append((agent_name, agent["email"], response))
final_response = ' '.join(
responses) # Join responses to avoid repetition
formatted_response = self.format_conversation_history_html(
agent_responses)
# Update conversation history after each agent's response
self.conversation_history += f"\n{agent_name} said: {formatted_response}"
#logging.debug(f"Appending response {idx}")
responses.append(response)
#logging.debug(f"Final Responses: {responses}")
# Handle Default Type
else:
# Handling the default type
if structured_response:
additional_context = structured_response
content = new_content
dynamic_prompt = self.create_dynamic_prompt(agent_loader, agent_name,
order, total_order,
additional_context,
modality)
response = gpt.generate_response(dynamic_prompt,
content,
self.conversation_history,
is_summarize=False)
if response is not None:
responses.append(response)
else:
print(f"Warning: Received None response for agent {agent_name}")
# Combine the responses, handling the case where no valid responses were generated
if responses:
final_response = " ".join(responses)
else:
final_response = "No response generated."
signature = "\n\n- GENERATIVE AI AGENT: " + agent_name
final_response_with_signature = final_response + signature
"""
# Formatting the nested history
agent_email = agent["email"]
timestamp = format_datetime_for_email()
gmail_note = format_note(agent_name, agent_email, timestamp)
agent_responses = [(agent_name, agent["email"],
final_response_with_signature)]
nested_history = self.format_conversation_history_html(
agent_responses, existing_history=gmail_note)
self.conversation_structure.setdefault("responses", []).append(
(agent_name,
nested_history)) # Store the formatted response with signature
self.last_agent_response = nested_history
"""
return final_response_with_signature