-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathevaluation_hf_testing.py
245 lines (202 loc) · 7.76 KB
/
evaluation_hf_testing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# -*- encoding: utf-8 -*-
import argparse
import json
import os
import time
import numpy as np
import pandas as pd
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
from config.log_config import logging
from categories import verify_categories
choices = ["A", "B", "C", "D"]
def get_subject(data_dir):
subjects = sorted(
[
f.split("_test.csv")[0]
for f in os.listdir(os.path.join(data_dir, "test"))
if "_test.csv" in f
]
)
return subjects
def format_subject(subject):
"""
Formats the subject string by replacing underscores with spaces.
Args:
subject (str): The subject string to format.
Returns:
str: The formatted subject string.
"""
return " ".join(subject.split("_"))
def create_result_folder(args):
if not os.path.exists(args.save_dir):
os.makedirs(args.save_dir)
if not os.path.exists(os.path.join(args.save_dir, "results_{}".format(args.model.split("/")[-1]))):
os.makedirs(os.path.join(args.save_dir, "results_{}".format(args.model.split("/")[-1])))
def initial_model(args):
model = AutoModelForCausalLM.from_pretrained(
args.model,
torch_dtype=torch.float16,
load_in_8bit=False,
low_cpu_mem_usage=True,
device_map="auto",
trust_remote_code=True
)
tokenizer = AutoTokenizer.from_pretrained(args.model, trust_remote_code=True)
model.eval()
return model, tokenizer
def format_example(df, idx, include_answer=True):
"""
Formats a question example from a dataframe.
Args:
df (pd.DataFrame): The dataframe containing the questions.
idx (int): The index of the question in the dataframe.
include_answer (bool): Whether to include the answer in the formatted string.
Returns:
str: The formatted question string.
"""
prompt = df.iloc[idx, 0]
num_options = df.shape[1] - 2
for j in range(num_options):
prompt += "\n{}. {}".format(choices[j], df.iloc[idx, j + 1])
prompt += "\nAnswer:"
if include_answer:
prompt += " {}\n\n".format(df.iloc[idx, num_options + 1])
return prompt
def gen_prompt(train_df, subject, k=-1):
"""
Generates a prompt with multiple choice questions.
Args:
train_df (pd.DataFrame): The dataframe containing training data.
subject (str): The subject of the questions.
k (int): The number of questions to include in the prompt.
Returns:
str: The generated prompt.
"""
if k == -1:
k = train_df.shape[0]
prompt = "The following are multiple choice questions (with answers) about {}.\n\n".format(format_subject(subject))
for i in range(k):
prompt += format_example(train_df, i)
return prompt
@torch.no_grad()
def eval(args, subject, model, tokenizer, dev_df, test_df):
"""
Evaluates the model on a given subject.
Args:
args (Namespace): Command line arguments.
subject (str): The subject to evaluate on.
model (AutoModelForCausalLM): The pre-trained model.
tokenizer (AutoTokenizer): The tokenizer.
dev_df (pd.DataFrame): The development set dataframe.
test_df (pd.DataFrame): The test set dataframe.
Returns:
tuple: A tuple containing the correct answers array, accuracy, and probabilities array.
"""
cors = []
all_probs = []
answers = choices[: test_df.shape[1] - 2]
all_times = []
all_preds = []
for i in range(test_df.shape[0]):
start_time = time.time()
# get prompt and make sure it fits
k = args.ntrain
prompt_end = format_example(test_df, i, include_answer=False)
train_prompt = gen_prompt(dev_df, subject, k)
prompt = train_prompt + prompt_end
input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(model.device)
while input_ids.shape[-1] > 2048:
k -= 1
train_prompt = gen_prompt(dev_df, subject, k)
prompt = train_prompt + prompt_end
input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(
model.device
)
label = test_df.iloc[i, test_df.shape[1] - 1]
logits = model(input_ids=input_ids).logits[0, -1]
probs = (
torch.nn.functional.softmax(
torch.tensor(
[
logits[tokenizer("A").input_ids[-1]],
logits[tokenizer("B").input_ids[-1]],
logits[tokenizer("C").input_ids[-1]],
logits[tokenizer("D").input_ids[-1]],
]
).float(),
dim=0,
)
.detach()
.cpu()
.numpy()
)
pred = {0: "A", 1: "B", 2: "C", 3: "D"}[np.argmax(probs)]
cor = pred == label
cors.append(cor)
all_preds.append(pred)
all_probs.append(probs)
all_times.append(time.time() - start_time)
acc = np.mean(cors)
cors = np.array(cors)
all_probs = np.array(all_probs)
logging.info("Average accuracy {:.3f} - {}".format(acc, subject))
return cors, all_probs, all_preds, all_times
def main(args):
"""
Main function to run the evaluation script.
Args:
args (Namespace): Command line arguments.
"""
logging.info("===== [Start] Evaluation by huggingface model ===== ")
start_time = time.time()
old_checkpoint_time = start_time
logging.info("<Spend Time> Starting time: {}".format(start_time))
# Initialize the model and tokenizer
model, tokenizer = initial_model(args)
# Retrieve list of subjects
subjects = get_subject(args.data_dir)
# Create folder for saving results
create_result_folder(args)
# Loop through each subject in the 'subjects' list
for subject in subjects:
logging.info("Start the subject: {}".format(subject))
# Read development and test datasets
dev_df = pd.read_csv(
os.path.join(args.data_dir, "dev", subject + "_dev.csv"), header=None
)[: args.ntrain]
test_df = pd.read_csv(
os.path.join(args.data_dir, "test", subject + "_test.csv"), header=None
)
# Evaluate the model on the current subject's data
cors, probs, all_preds, all_times = eval(args, subject, model, tokenizer, dev_df, test_df)
# Process and save the results
test_df["{}_prediction".format(args.model)] = all_preds
test_df["{}_correct".format(args.model)] = cors
for j in range(probs.shape[1]):
choice = choices[j]
test_df["{}_choice{}_probs".format(args.model, choice)] = probs[:, j]
test_df["{}_spend_time".format(args.model)] = all_times
test_df.to_csv(
os.path.join(
args.save_dir, "results_{}".format(args.model.split("/")[-1]), "{}.csv".format(subject)
),
index=None,
)
# Logging the time spent on the current subject
checkpoint_time = time.time()
logging.info("<Spend Time> In {}, spend time: {}.".format(subject, checkpoint_time - old_checkpoint_time))
old_checkpoint_time = checkpoint_time
# Logging the total time spent
end_time = time.time()
logging.info("<Spend Time> Total Spending Time: {}.".format(start_time, end_time, end_time-start_time))
logging.info("===== [Finish] Evaluation by huggingface model ===== ")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--category_type", "-c", type=str, default="tmmluplus")
parser.add_argument("--ntrain", "-k", type=int, default=5)
parser.add_argument("--data_dir", "-d", type=str, default="data")
parser.add_argument("--save_dir", "-s", type=str, default="results")
parser.add_argument("--model", "-m", type=str)
args = parser.parse_args()
main(args)