1
+ from typing import Dict , List
2
+ import json
3
+ import os
4
+ from pathlib import Path
5
+ import sys
6
+ from datetime import datetime
7
+ import random
8
+ from scipy import stats # 添加scipy导入用于t检验
9
+
10
+ current_dir = Path (__file__ ).resolve ().parent
11
+ project_root = current_dir .parent .parent .parent
12
+ env_path = project_root / ".env.prod"
13
+
14
+ root_path = os .path .abspath (os .path .join (os .path .dirname (__file__ ), "../../.." ))
15
+ sys .path .append (root_path )
16
+
17
+ from src .plugins .personality .big5_test import BigFiveTest
18
+ from src .plugins .personality .renqingziji import PersonalityEvaluator_direct
19
+ from src .plugins .personality .questionnaire import FACTOR_DESCRIPTIONS , PERSONALITY_QUESTIONS
20
+
21
+ class CombinedPersonalityTest :
22
+ def __init__ (self ):
23
+ self .big5_test = BigFiveTest ()
24
+ self .scenario_test = PersonalityEvaluator_direct ()
25
+ self .dimensions = ["开放性" , "严谨性" , "外向性" , "宜人性" , "神经质" ]
26
+
27
+ def run_combined_test (self ):
28
+ """运行组合测试"""
29
+ print ("\n === 人格特征综合评估系统 ===" )
30
+ print ("\n 本测试将通过两种方式评估人格特征:" )
31
+ print ("1. 传统问卷测评(约40题)" )
32
+ print ("2. 情景反应测评(15个场景)" )
33
+ print ("\n 两种测评完成后,将对比分析结果的异同。" )
34
+ input ("\n 准备好开始第一部分(问卷测评)了吗?按回车继续..." )
35
+
36
+ # 运行问卷测试
37
+ print ("\n === 第一部分:问卷测评 ===" )
38
+ print ("本部分采用六级评分,请根据每个描述与您的符合程度进行打分:" )
39
+ print ("1 = 完全不符合" )
40
+ print ("2 = 比较不符合" )
41
+ print ("3 = 有点不符合" )
42
+ print ("4 = 有点符合" )
43
+ print ("5 = 比较符合" )
44
+ print ("6 = 完全符合" )
45
+ print ("\n 请认真阅读每个描述,选择最符合您实际情况的选项。" )
46
+ input ("\n 按回车开始答题..." )
47
+
48
+ questionnaire_results = self .run_questionnaire ()
49
+
50
+ # 转换问卷结果格式以便比较
51
+ questionnaire_scores = {
52
+ factor : data ["得分" ]
53
+ for factor , data in questionnaire_results .items ()
54
+ }
55
+
56
+ # 运行情景测试
57
+ print ("\n === 第二部分:情景反应测评 ===" )
58
+ print ("接下来,您将面对一系列具体场景,请描述您在每个场景中可能的反应。" )
59
+ print ("每个场景都会评估不同的人格维度,共15个场景。" )
60
+ input ("\n 准备好开始了吗?按回车继续..." )
61
+
62
+ scenario_results = self .run_scenario_test ()
63
+
64
+ # 比较和展示结果
65
+ self .compare_and_display_results (questionnaire_scores , scenario_results )
66
+
67
+ # 保存结果
68
+ self .save_results (questionnaire_scores , scenario_results )
69
+
70
+ def run_questionnaire (self ):
71
+ """运行问卷测试部分"""
72
+ # 创建题目序号到题目的映射
73
+ questions_map = {q ['id' ]: q for q in PERSONALITY_QUESTIONS }
74
+
75
+ # 获取所有题目ID并随机打乱顺序
76
+ question_ids = list (questions_map .keys ())
77
+ random .shuffle (question_ids )
78
+
79
+ answers = {}
80
+ total_questions = len (question_ids )
81
+
82
+ for i , question_id in enumerate (question_ids , 1 ):
83
+ question = questions_map [question_id ]
84
+ while True :
85
+ try :
86
+ print (f"\n 问题 [{ i } /{ total_questions } ]" )
87
+ print (f"{ question ['content' ]} " )
88
+ score = int (input ("您的评分(1-6): " ))
89
+ if 1 <= score <= 6 :
90
+ answers [question_id ] = score
91
+ break
92
+ else :
93
+ print ("请输入1-6之间的数字!" )
94
+ except ValueError :
95
+ print ("请输入有效的数字!" )
96
+
97
+ # 每10题显示一次进度
98
+ if i % 10 == 0 :
99
+ print (f"\n 已完成 { i } /{ total_questions } 题 ({ int (i / total_questions * 100 )} %)" )
100
+
101
+ return self .calculate_questionnaire_scores (answers )
102
+
103
+ def calculate_questionnaire_scores (self , answers ):
104
+ """计算问卷测试的维度得分"""
105
+ results = {}
106
+ factor_questions = {
107
+ "外向性" : [],
108
+ "神经质" : [],
109
+ "严谨性" : [],
110
+ "开放性" : [],
111
+ "宜人性" : []
112
+ }
113
+
114
+ # 将题目按因子分类
115
+ for q in PERSONALITY_QUESTIONS :
116
+ factor_questions [q ['factor' ]].append (q )
117
+
118
+ # 计算每个维度的得分
119
+ for factor , questions in factor_questions .items ():
120
+ total_score = 0
121
+ for q in questions :
122
+ score = answers [q ['id' ]]
123
+ # 处理反向计分题目
124
+ if q ['reverse_scoring' ]:
125
+ score = 7 - score # 6分量表反向计分为7减原始分
126
+ total_score += score
127
+
128
+ # 计算平均分
129
+ avg_score = round (total_score / len (questions ), 2 )
130
+ results [factor ] = {
131
+ "得分" : avg_score ,
132
+ "题目数" : len (questions ),
133
+ "总分" : total_score
134
+ }
135
+
136
+ return results
137
+
138
+ def run_scenario_test (self ):
139
+ """运行情景测试部分"""
140
+ final_scores = {"开放性" : 0 , "严谨性" : 0 , "外向性" : 0 , "宜人性" : 0 , "神经质" : 0 }
141
+ dimension_counts = {trait : 0 for trait in final_scores .keys ()}
142
+
143
+ # 随机打乱场景顺序
144
+ scenarios = self .scenario_test .scenarios .copy ()
145
+ random .shuffle (scenarios )
146
+
147
+ for i , scenario_data in enumerate (scenarios , 1 ):
148
+ print (f"\n 场景 [{ i } /{ len (scenarios )} ] - { scenario_data ['场景编号' ]} " )
149
+ print ("-" * 50 )
150
+ print (scenario_data ["场景" ])
151
+ print ("\n 请描述您在这种情况下会如何反应:" )
152
+ response = input ().strip ()
153
+
154
+ if not response :
155
+ print ("反应描述不能为空!" )
156
+ continue
157
+
158
+ print ("\n 正在评估您的描述..." )
159
+ scores = self .scenario_test .evaluate_response (
160
+ scenario_data ["场景" ],
161
+ response ,
162
+ scenario_data ["评估维度" ]
163
+ )
164
+
165
+ # 更新分数
166
+ for dimension , score in scores .items ():
167
+ final_scores [dimension ] += score
168
+ dimension_counts [dimension ] += 1
169
+
170
+ # print("\n当前场景评估结果:")
171
+ # print("-" * 30)
172
+ # for dimension, score in scores.items():
173
+ # print(f"{dimension}: {score}/6")
174
+
175
+ # 每5个场景显示一次总进度
176
+ if i % 5 == 0 :
177
+ print (f"\n 已完成 { i } /{ len (scenarios )} 个场景 ({ int (i / len (scenarios )* 100 )} %)" )
178
+
179
+ if i < len (scenarios ):
180
+ input ("\n 按回车继续下一个场景..." )
181
+
182
+ # 计算平均分
183
+ for dimension in final_scores :
184
+ if dimension_counts [dimension ] > 0 :
185
+ final_scores [dimension ] = round (
186
+ final_scores [dimension ] / dimension_counts [dimension ],
187
+ 2
188
+ )
189
+
190
+ return final_scores
191
+
192
+ def compare_and_display_results (self , questionnaire_scores : Dict , scenario_scores : Dict ):
193
+ """比较和展示两种测试的结果"""
194
+ print ("\n === 测评结果对比分析 ===" )
195
+ print ("\n " + "=" * 60 )
196
+ print (f"{ '维度' :<8} { '问卷得分' :>10} { '情景得分' :>10} { '差异' :>10} { '差异程度' :>10} " )
197
+ print ("-" * 60 )
198
+
199
+ # 收集每个维度的得分用于统计分析
200
+ questionnaire_values = []
201
+ scenario_values = []
202
+ diffs = []
203
+
204
+ for dimension in self .dimensions :
205
+ q_score = questionnaire_scores [dimension ]
206
+ s_score = scenario_scores [dimension ]
207
+ diff = round (abs (q_score - s_score ), 2 )
208
+
209
+ questionnaire_values .append (q_score )
210
+ scenario_values .append (s_score )
211
+ diffs .append (diff )
212
+
213
+ # 计算差异程度
214
+ diff_level = "低" if diff < 0.5 else "中" if diff < 1.0 else "高"
215
+ print (f"{ dimension :<8} { q_score :>10.2f} { s_score :>10.2f} { diff :>10.2f} { diff_level :>10} " )
216
+
217
+ print ("=" * 60 )
218
+
219
+ # 计算整体统计指标
220
+ mean_diff = sum (diffs ) / len (diffs )
221
+ std_diff = (sum ((x - mean_diff ) ** 2 for x in diffs ) / (len (diffs ) - 1 )) ** 0.5
222
+
223
+ # 计算效应量 (Cohen's d)
224
+ pooled_std = ((sum ((x - sum (questionnaire_values )/ len (questionnaire_values ))** 2 for x in questionnaire_values ) +
225
+ sum ((x - sum (scenario_values )/ len (scenario_values ))** 2 for x in scenario_values )) /
226
+ (2 * len (self .dimensions ) - 2 )) ** 0.5
227
+
228
+ if pooled_std != 0 :
229
+ cohens_d = abs (mean_diff / pooled_std )
230
+
231
+ # 解释效应量
232
+ if cohens_d < 0.2 :
233
+ effect_size = "微小"
234
+ elif cohens_d < 0.5 :
235
+ effect_size = "小"
236
+ elif cohens_d < 0.8 :
237
+ effect_size = "中等"
238
+ else :
239
+ effect_size = "大"
240
+
241
+ # 对所有维度进行整体t检验
242
+ t_stat , p_value = stats .ttest_rel (questionnaire_values , scenario_values )
243
+ print (f"\n 整体统计分析:" )
244
+ print (f"平均差异: { mean_diff :.3f} " )
245
+ print (f"差异标准差: { std_diff :.3f} " )
246
+ print (f"效应量(Cohen's d): { cohens_d :.3f} " )
247
+ print (f"效应量大小: { effect_size } " )
248
+ print (f"t统计量: { t_stat :.3f} " )
249
+ print (f"p值: { p_value :.3f} " )
250
+
251
+ if p_value < 0.05 :
252
+ print ("结论: 两种测评方法的结果存在显著差异 (p < 0.05)" )
253
+ else :
254
+ print ("结论: 两种测评方法的结果无显著差异 (p >= 0.05)" )
255
+
256
+ print ("\n 维度说明:" )
257
+ for dimension in self .dimensions :
258
+ print (f"\n { dimension } :" )
259
+ desc = FACTOR_DESCRIPTIONS [dimension ]
260
+ print (f"定义:{ desc ['description' ]} " )
261
+ print (f"特征词:{ ', ' .join (desc ['trait_words' ])} " )
262
+
263
+ # 分析显著差异
264
+ significant_diffs = []
265
+ for dimension in self .dimensions :
266
+ diff = abs (questionnaire_scores [dimension ] - scenario_scores [dimension ])
267
+ if diff >= 1.0 : # 差异大于等于1分视为显著
268
+ significant_diffs .append ({
269
+ "dimension" : dimension ,
270
+ "diff" : diff ,
271
+ "questionnaire" : questionnaire_scores [dimension ],
272
+ "scenario" : scenario_scores [dimension ]
273
+ })
274
+
275
+ if significant_diffs :
276
+ print ("\n \n 显著差异分析:" )
277
+ print ("-" * 40 )
278
+ for diff in significant_diffs :
279
+ print (f"\n { diff ['dimension' ]} 维度的测评结果存在显著差异:" )
280
+ print (f"问卷得分:{ diff ['questionnaire' ]:.2f} " )
281
+ print (f"情景得分:{ diff ['scenario' ]:.2f} " )
282
+ print (f"差异值:{ diff ['diff' ]:.2f} " )
283
+
284
+ # 分析可能的原因
285
+ if diff ['questionnaire' ] > diff ['scenario' ]:
286
+ print ("可能原因:在问卷中的自我评价较高,但在具体情景中的表现较为保守。" )
287
+ else :
288
+ print ("可能原因:在具体情景中表现出更多该维度特征,而在问卷自评时较为保守。" )
289
+
290
+ def save_results (self , questionnaire_scores : Dict , scenario_scores : Dict ):
291
+ """保存测试结果"""
292
+ results = {
293
+ "测试时间" : datetime .now ().strftime ("%Y-%m-%d %H:%M:%S" ),
294
+ "问卷测评结果" : questionnaire_scores ,
295
+ "情景测评结果" : scenario_scores ,
296
+ "维度说明" : FACTOR_DESCRIPTIONS
297
+ }
298
+
299
+ # 确保目录存在
300
+ os .makedirs ("results" , exist_ok = True )
301
+
302
+ # 生成带时间戳的文件名
303
+ filename = f"results/personality_combined_{ datetime .now ().strftime ('%Y%m%d_%H%M%S' )} .json"
304
+
305
+ # 保存到文件
306
+ with open (filename , "w" , encoding = "utf-8" ) as f :
307
+ json .dump (results , f , ensure_ascii = False , indent = 2 )
308
+
309
+ print (f"\n 完整的测评结果已保存到:{ filename } " )
310
+
311
+ def load_existing_results ():
312
+ """检查并加载已有的测试结果"""
313
+ results_dir = "results"
314
+ if not os .path .exists (results_dir ):
315
+ return None
316
+
317
+ # 获取所有personality_combined开头的文件
318
+ result_files = [f for f in os .listdir (results_dir )
319
+ if f .startswith ("personality_combined_" ) and f .endswith (".json" )]
320
+
321
+ if not result_files :
322
+ return None
323
+
324
+ # 按文件修改时间排序,获取最新的结果文件
325
+ latest_file = max (result_files ,
326
+ key = lambda f : os .path .getmtime (os .path .join (results_dir , f )))
327
+
328
+ print (f"\n 发现已有的测试结果:{ latest_file } " )
329
+ try :
330
+ with open (os .path .join (results_dir , latest_file ), "r" , encoding = "utf-8" ) as f :
331
+ results = json .load (f )
332
+ return results
333
+ except Exception as e :
334
+ print (f"读取结果文件时出错:{ str (e )} " )
335
+ return None
336
+
337
+ def main ():
338
+ test = CombinedPersonalityTest ()
339
+
340
+ # 检查是否存在已有结果
341
+ existing_results = load_existing_results ()
342
+
343
+ if existing_results :
344
+ print ("\n === 使用已有测试结果进行分析 ===" )
345
+ print (f"测试时间:{ existing_results ['测试时间' ]} " )
346
+
347
+ questionnaire_scores = existing_results ["问卷测评结果" ]
348
+ scenario_scores = existing_results ["情景测评结果" ]
349
+
350
+ # 直接进行结果对比分析
351
+ test .compare_and_display_results (questionnaire_scores , scenario_scores )
352
+ else :
353
+ print ("\n 未找到已有的测试结果,开始新的测试..." )
354
+ test .run_combined_test ()
355
+
356
+ if __name__ == "__main__" :
357
+ main ()
0 commit comments