-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsimulator.py
62 lines (47 loc) · 1.71 KB
/
simulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import os
import subprocess
import time
def main():
route_request_sender = 'kafka_producer_route.py'
traffic_sender = 'kafka_producer_traffic.py'
weather_sender = 'kafka_producer_weather.py'
global_consumer = 'kafka_consumer.py'
nrt_processor = 'nrt_stream_process.py'
response_d = 'response_distribute.py'
# Set the number of trials and simulation trial total time
NUM_TRIALS = 1
SIMULATION_TRIAL_TOTAL_TIME = 3000
# Set the number of customers threads
CUSTOMERS_THREADS = 2
# Redirect output to the null device
devnull = open(os.devnull, 'w')
for _ in range(NUM_TRIALS):
# Start the script in a separate process
# Execute consumers:
consumer = subprocess.Popen(['python', global_consumer])
# Execute producers
traffic = subprocess.Popen(['python', traffic_sender])
weather = subprocess.Popen(['python', weather_sender])
route = subprocess.Popen(['python', route_request_sender], stdout=devnull, stderr=devnull)
# Execute near realtime processor
nrt = subprocess.Popen(['python', nrt_processor])
response = subprocess.Popen(['python', response_d])
# Wait for the simulation trial time
time.sleep(SIMULATION_TRIAL_TOTAL_TIME)
# Terminate the process
traffic.terminate()
traffic.wait()
weather.terminate()
weather.wait()
route.terminate()
route.wait()
consumer.terminate()
consumer.wait()
nrt.terminate()
nrt.wait()
response.terminate()
response.wait()
# Wait for a brief period before starting the next trial
time.sleep(1)
if __name__ == '__main__':
main()