-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathagent.py
115 lines (88 loc) · 3.52 KB
/
agent.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from orchestrate import execute_pipeline
import helpers as h
import time
import wandb
from datetime import datetime
from wandb.sdk.data_types.trace_tree import Trace
import globals
import yaml
import atexit
import json
import os
# create logs directory if it doesn't exist
if not os.path.exists('logs'):
os.makedirs('logs')
# Load the configuration
with open('config.yml', 'r') as f:
config = yaml.safe_load(f)
h.authenticate() # Authenticate with OpenAI
# Obtain the config variables
wandb_enabled = config['wandb_enabled']
tools_enabled = config['tools_enabled']
pipeline_name = config['pipeline']
pipeline_path = "pipelines/" + pipeline_name + ".yml"
if wandb_enabled: # Initialize wandb if it's enabled
wandb.init(project="OrchestrAI")
wandb.config.wandb_enabled = wandb_enabled
if tools_enabled:
print("\033[93mTools are enabled.\033[00m")
def main():
print("\n" + "-" * 80 + "\n")
print("\033[95m ------- Welcome to OrchestrAI ------\033[00m")
time.sleep(1) # dramatic effect
pipeline = h.load_pipeline(pipeline_path) # Load the pipeline
with open('memory_log.json', 'w') as file:
json.dump({
'agent_start_time': time.time(),
'pipeline_name': pipeline_name,
'actions': []
}, file)
## Just in case it crashes, we want to log the root span to wandb anyway so we use atexit
def closing_log():
agent_end_time_ms = round(datetime.now().timestamp() * 1000)
# Convert the timestamp to datetime
agent_end_time = datetime.fromtimestamp(agent_end_time_ms / 1000)
# Format the datetime object to a string
current_time = agent_end_time.strftime("%Y-%m-%d_%H-%M-%S")
os.replace('memory_log.json', f'logs/log_{current_time}.json')
if wandb.run is None:
return
else:
# Log the root span to Weights & Biases
root_span._span.end_time_ms = agent_end_time_ms
root_span.log(name="pipeline_trace")
# Register the function to be called on exit
atexit.register(closing_log)
if wandb_enabled:
globals.agent_start_time_ms = round(datetime.now().timestamp() * 1000)
# create a root span for the agent.
root_span = Trace(
name="Agent",
kind="agent",
start_time_ms=globals.agent_start_time_ms,
end_time_ms=globals.agent_start_time_ms,
metadata={"pipeline_name": pipeline_name},
)
# the agent calls into a pipeline, so we create a chain span.
globals.chain_span = Trace(
name=pipeline_name,
kind="chain",
start_time_ms=globals.agent_start_time_ms,
end_time_ms=globals.agent_start_time_ms,
)
root_span.add_child(globals.chain_span) # add the chain span as a child of the root span
try:
execute_pipeline(pipeline) # Execute the pipeline using the orchestrate.py script
except Exception as e:
print(f"An error occurred during pipeline execution: {e}")
finally:
if wandb_enabled:
# Log the root span to Weights & Biases
agent_end_time_ms = round(datetime.now().timestamp() * 1000)
root_span._span.end_time_ms = agent_end_time_ms
root_span.log(name="pipeline_trace")
wandb.finish()
print ("\033[93m Wandb logging completed.\033[00m")
print("\033[95m ------- OrchestrAI finished ------\033[00m")
if __name__ == "__main__":
main()