-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexperiment.py
231 lines (185 loc) · 9.38 KB
/
experiment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
''' Script to analyse the benefit of training with multi-core processing'''
import os
import time
import json
import datetime
import logging
import pathlib
import numpy as np
from sklearn.datasets import make_blobs
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import SGDClassifier
import joblib
import matplotlib.pyplot as plt
class Config():
def __init__(self, n_jobs_to_test, models, parallelization_backends, n_samples, n_features, n_trials, results_directory, image_name_base):
self.n_jobs_to_test = n_jobs_to_test
self.models = models
self.parallelization_backends = parallelization_backends
self.n_samples = n_samples
self.n_features = n_features
self.n_trials = n_trials
self.results_directory = results_directory
self.image_name_base = image_name_base
def create_results_directory(results_directory):
script_path = pathlib.Path(__file__).parent.resolve()
results_path = os.path.join(script_path, results_directory)
if not os.path.exists(results_directory):
os.makedirs(results_directory)
return results_path
def create_experiment_results_directory(results_path):
current_datetime_string = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
experiment_results_directory = f"{current_datetime_string}_{config.n_samples}x{config.n_features}"
experiment_results_path = os.path.join(results_path, experiment_results_directory)
os.makedirs(experiment_results_path)
return experiment_results_path
def generate_dummy_data(n_samples, n_features):
data_generation_start = time.time()
X, y = make_blobs(n_samples=n_samples, n_features=n_features, centers=20, cluster_std=100, random_state=0)
data_generation_time = time.time() - data_generation_start
logging.info(f"Created dummy dataset ({n_samples} samples, {n_features} dimensions) in {data_generation_time:.2f} seconds")
return X, y
def validate_n_jobs_to_test(n_jobs_to_test):
n_cpu = os.cpu_count()
logging.info(f"Found {n_cpu} logical processors")
if n_jobs_to_test != -1:
if max(n_jobs_to_test) > n_cpu:
raise ValueError(f"Found {n_cpu} logical processors, but intended to use {max(n_jobs_to_test)}")
else:
n_jobs_to_test = list(range(1, n_cpu))
logging.info(f"N_JOBS_TO_TEST = -1. All n_jobs up to one minus the number of available logical processors will be tested: {n_jobs_to_test}")
return n_jobs_to_test
def initialize_model(selected_model):
match selected_model:
case "RandomForestClassifier":
model = RandomForestClassifier()
case "KNeighborsClassifier":
model = KNeighborsClassifier()
case "DefaultLogisticRegression":
model = LogisticRegression()
case "SagaLogisticRegression":
model = LogisticRegression(multi_class="ovr", solver="saga")
case "MLPClassifier":
model = MLPClassifier()
case "SVC":
model = SVC()
case "GaussianNB":
model = GaussianNB()
case "DecisionTreeClassifier":
model = DecisionTreeClassifier()
case "SGDClassifier":
model = SGDClassifier()
case _:
raise NotImplementedError(f"Model {selected_model} is not available.")
logging.info(f"Initialized {selected_model}")
return model
def train_with_multiple_n_jobs(n_jobs_to_test, n_trials):
training_times = np.zeros((len(n_jobs_to_test), n_trials))
for i, n_jobs in enumerate(n_jobs_to_test):
for n in range(config.n_trials):
model = initialize_model(selected_model)
training_time = train_model(model, n_jobs)
training_times[i, n] = training_time
return training_times
def train_model(model, n_jobs):
training_start = time.time()
with joblib.parallel_backend(selected_parallelization_backend, n_jobs=n_jobs):
model.fit(X, y)
training_time = time.time() - training_start
logging.info(f"Trained with n_jobs={n_jobs}: {training_time:.2f} seconds")
return training_time
def compute_duration_statistics(training_times):
training_times_means = np.mean(training_times, axis=-1)
training_times_variances = np.var(training_times, axis=-1)
logging.info(f"Mean training durations with {config.n_trials} trials: {training_times_means}")
logging.info(f"Variance of the training durations with {config.n_trials} trials: {training_times_means}")
return training_times_means, training_times_variances
def plot_absolute_durations(n_jobs_to_test, training_times_means, training_times_variances, iteration_name, saving_path):
plt.errorbar(n_jobs_to_test, training_times_means, training_times_variances)
plt.xlabel("n_jobs (number of logical processors)")
plt.ylabel("Training time (seconds)")
plt.title(iteration_name)
plt.xticks(n_jobs_to_test)
plt.ylim([0, 1.1 * max(training_times_means)])
plt.grid()
plot_name = f"AbsoluteTrainingDurations_{iteration_name}.png"
plt.savefig(os.path.join(saving_path, plot_name))
logging.info(f"Saved iteration results into {plot_name}")
plt.clf()
def plot_percentual_durations(n_jobs_to_test, training_times_means, training_times_variances, iteration_name, saving_path):
normalized_training_times_means = (np.array(training_times_means) / training_times_means[0]) * 100
plt.plot(n_jobs_to_test, normalized_training_times_means)
plt.xlabel("n_jobs (number of logical processors)")
plt.ylabel("Normalized training time (%)")
plt.title(iteration_name)
plt.xticks(n_jobs_to_test)
plt.ylim([0, 1.1 * max(normalized_training_times_means)])
plt.grid()
plot_name = f"PercentualTrainingDurations__{iteration_name}.png"
plt.savefig(os.path.join(saving_path, plot_name))
logging.info(f"Saved iteration results into {plot_name}")
plt.clf()
if __name__ == "__main__":
start = time.time()
# Read configuration
with open('config.json', 'r') as f:
config = json.load(f, object_hook=lambda d: Config(**d))
# Create results directory structure
results_path = create_results_directory(config.results_directory)
experiment_results_path = create_experiment_results_directory(results_path)
# Set up logging configuration
experiment_results_directory = os.path.basename(experiment_results_path)
logfile_name = f"{experiment_results_directory}.log"
logging.basicConfig(
level=logging.INFO,
format="{asctime} {levelname:<8} {message}",
style='{',
filename=os.path.join(experiment_results_path, logfile_name),
filemode='w'
)
logging.info(f"Read experiment configuration: {config}")
# Generate dummy data
X, y = generate_dummy_data(config.n_samples, config.n_features)
# Train with different number of logical processors (n_jobs)
n_jobs_to_test = validate_n_jobs_to_test(config.n_jobs_to_test)
all_training_times_means = []
all_training_times_variances = []
overall_plot_labels = []
for selected_model in config.models:
logging.info(f"Selected model: {selected_model}")
for selected_parallelization_backend in config.parallelization_backends:
logging.info(f"Selected parallelization backend: {selected_parallelization_backend}")
# Train with multiple n_jobs
training_times = train_with_multiple_n_jobs(n_jobs_to_test, config.n_trials)
# Compute duration statistics (mean and variance)
training_times_means, training_times_variances = compute_duration_statistics(training_times)
all_training_times_means.append(list(training_times_means))
all_training_times_variances.append(list(training_times_variances))
# Plot and save iteration results in absolute and percentual durations
iteration_name = f"{selected_model}_{selected_parallelization_backend}"
overall_plot_labels.append(iteration_name)
plot_absolute_durations(n_jobs_to_test, training_times_means, training_times_variances, iteration_name, experiment_results_path)
plot_percentual_durations(n_jobs_to_test, training_times_means, training_times_variances, iteration_name, experiment_results_path)
logging.info(f"All mean training durations: {all_training_times_means}")
logging.info(f"All training durations variances: {all_training_times_variances}")
logging.info(f"All iteration labels: {overall_plot_labels}")
# Plot and save overall results
plt.figure(1)
plt.plot(n_jobs_to_test, np.transpose(np.array(all_training_times_means)))
plt.xlabel("n_jobs (number of logical processors)")
plt.ylabel("Training time (seconds)")
plt.legend(overall_plot_labels, bbox_to_anchor=(1.05, 1.0), loc='upper left')
plt.xticks(n_jobs_to_test)
plt.grid()
image_name = f"{config.image_name_base}_{config.n_samples}x{config.n_features}_OverallResults.png"
plt.savefig(os.path.join(experiment_results_path, image_name), bbox_inches='tight')
plt.clf()
logging.info(f"Saved overall results into {image_name}")
total_time = time.time() - start
logging.info(f"Total wall time: {total_time:.2f} seconds")