Skip to content

Commit 36b658d

Browse files
authored
Merge pull request #558 from ddps-lab/azure-collector-fix
availability_zones True/False lambda 분리
2 parents 0fbbd08 + 7bb489a commit 36b658d

File tree

10 files changed

+258
-192
lines changed

10 files changed

+258
-192
lines changed

.github/workflows/azure-sps-lambda-sync.yml

+2-1
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,5 @@ jobs:
4747

4848
- name: Deploy to AWS Lambda
4949
run: |
50-
aws lambda update-function-code --function-name azure-sps-collector --zip-file fileb://azure_sps_lambda.zip
50+
aws lambda update-function-code --function-name azure-sps-collector-az-true --zip-file fileb://azure_sps_lambda.zip
51+
aws lambda update-function-code --function-name azure-sps-collector-az-false --zip-file fileb://azure_sps_lambda.zip

collector/spot-dataset/azure/lambda/current_collector/lambda_function_sps.py

+49-44
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,18 @@
1313
EVERY_10MIN_ACTION = "Every_10Min" # 10분마다 실행 액션
1414
UTC_1500_TIME = "15:00" # UTC 15:00 (KST 00:00)
1515

16+
availability_zones = os.environ.get("availability_zones", "False").lower() == "true"
17+
1618
def lambda_handler(event, context):
17-
action = event.get("action")
1819
log_stream_name = context.log_stream_name
1920
event_time_utc = event.get("time")
2021
event_time_utc_datetime = datetime.strptime(event_time_utc, "%Y-%m-%dT%H:%M:%SZ")
2122

23+
if event_time_utc_datetime.strftime("%H:%M") == UTC_1500_TIME:
24+
action = FIRST_TIME_ACTION
25+
else:
26+
action = EVERY_10MIN_ACTION
27+
2228
try:
2329
if not action or not event_time_utc:
2430
raise ValueError("Invalid event info: action or time is missing")
@@ -30,47 +36,42 @@ def lambda_handler(event, context):
3036
specific_instance_types = [specific_instance_type.strip() for specific_instance_type in os.environ.get('specific_instance_types').split(",") if specific_instance_type.strip()]
3137

3238
if action == FIRST_TIME_ACTION:
33-
sps_res_az_true_desired_count_1_df, sps_res_az_false_desired_count_1_df = load_sps.collect_spot_placement_score_first_time(desired_counts=[1])
39+
if availability_zones is True:
40+
sps_res_desired_count_1_df = load_sps.collect_spot_placement_score_first_time(desired_counts=[1])
41+
else:
42+
sps_res_desired_count_1_df = load_sps.collect_spot_placement_score(desired_counts=[1])
3443

35-
sps_res_az_true_desired_count_loop_df, sps_res_az_false_desired_count_loop_df = load_sps.collect_spot_placement_score(
36-
desired_counts=[desired_count])
44+
sps_res_desired_count_loop_df = load_sps.collect_spot_placement_score(desired_counts=[desired_count])
3745

38-
sps_res_specific_az_true_df, sps_res_specific_az_false_df = load_sps.collect_spot_placement_score(
46+
sps_res_specific_df = load_sps.collect_spot_placement_score(
3947
desired_counts=specific_desired_counts, instance_types=specific_instance_types)
4048

4149

4250
elif action == EVERY_10MIN_ACTION:
43-
# UTC 15:00 (KST 00:00)인 경우 실행 건너뛰기
44-
if event_time_utc_datetime.strftime("%H:%M") == UTC_1500_TIME:
45-
Logger.info("Skipping scheduled time (UTC 15:00, KST 00:00)")
46-
return handle_response(200, "Executed successfully. Scheduled time skipped.", action, event_time_utc_datetime)
47-
sps_res_az_true_desired_count_1_df, sps_res_az_false_desired_count_1_df = load_sps.collect_spot_placement_score(desired_counts=[1])
51+
sps_res_desired_count_1_df = load_sps.collect_spot_placement_score(desired_counts=[1])
4852

49-
sps_res_az_true_desired_count_loop_df, sps_res_az_false_desired_count_loop_df = load_sps.collect_spot_placement_score(
50-
desired_counts=[desired_count])
53+
sps_res_desired_count_loop_df = load_sps.collect_spot_placement_score(desired_counts=[desired_count])
5154

52-
sps_res_specific_az_true_df, sps_res_specific_az_false_df = load_sps.collect_spot_placement_score(
55+
sps_res_specific_df = load_sps.collect_spot_placement_score(
5356
desired_counts=specific_desired_counts, instance_types=specific_instance_types)
5457

5558
else:
5659
raise ValueError(f"Invalid lambda action.")
5760

5861

59-
if sps_res_az_true_desired_count_1_df is None: raise ValueError("sps_res_az_true_desired_count_1_df is None")
60-
if sps_res_az_true_desired_count_loop_df is None: raise ValueError("sps_res_az_true_desired_count_loop_df is None")
61-
if sps_res_az_false_desired_count_loop_df is None: raise ValueError("sps_res_az_false_desired_count_loop_df is None")
62-
if sps_res_specific_az_true_df is None: raise ValueError("sps_res_specific_az_true_df is None")
63-
if sps_res_specific_az_false_df is None: raise ValueError("sps_res_specific_az_false_df is None")
62+
if sps_res_desired_count_1_df is None: raise ValueError("sps_res_desired_count_1_df is None")
63+
if sps_res_desired_count_loop_df is None: raise ValueError("sps_res_desired_count_loop_df is None")
64+
if sps_res_specific_df is None: raise ValueError("sps_res_specific_df is None")
6465

65-
price_saving_if_df = S3.read_file(AZURE_CONST.S3_LATEST_PRICE_SAVING_IF_GZIP_SAVE_PATH, 'pkl.gz')
66-
if price_saving_if_df is None:
67-
raise ValueError("price_if_df is None")
66+
if availability_zones is True:
67+
price_saving_if_df = S3.read_file(AZURE_CONST.S3_LATEST_PRICE_SAVING_IF_GZIP_SAVE_PATH, 'pkl.gz')
68+
if price_saving_if_df is None:
69+
raise ValueError("price_if_df is None")
6870

69-
if not handle_res_df_for_spotlake(price_saving_if_df, sps_res_az_true_desired_count_1_df, event_time_utc_datetime):
70-
raise RuntimeError("Failed to handle_res_df_for_spotlake")
71+
if not handle_res_df_for_spotlake(price_saving_if_df, sps_res_desired_count_1_df, event_time_utc_datetime):
72+
raise RuntimeError("Failed to handle_res_df_for_spotlake")
7173

72-
if not handle_res_df_for_research(sps_res_az_false_desired_count_1_df, sps_res_az_true_desired_count_loop_df, sps_res_az_false_desired_count_loop_df,
73-
sps_res_specific_az_true_df, sps_res_specific_az_false_df,
74+
if not handle_res_df_for_research(sps_res_desired_count_1_df, sps_res_desired_count_loop_df, sps_res_specific_df,
7475
event_time_utc_datetime):
7576
raise RuntimeError("Failed to handle_res_for_research_df")
7677

@@ -80,7 +81,7 @@ def lambda_handler(event, context):
8081
error_msg = f"Unexpected error: {e}"
8182
Logger.error(error_msg)
8283
Logger.error(traceback.format_exc())
83-
send_slack_message(f"LOCAL TEST AZURE SPS MODULE EXCEPTION!\n{error_msg}\log_stream_id: {log_stream_name}")
84+
send_slack_message(f"AZURE SPS MODULE EXCEPTION!\n{error_msg}\log_stream_id: {log_stream_name}")
8485
return handle_response(500, "Execute Failed!", action, event_time_utc_datetime, str(e))
8586

8687

@@ -98,6 +99,9 @@ def handle_res_df_for_spotlake(price_saving_if_df, sps_res_az_true_desired_count
9899
workload_cols = ['InstanceTier', 'InstanceType', 'Region', 'AvailabilityZone', 'DesiredCount']
99100
feature_cols = ['OndemandPrice', 'SpotPrice', 'IF', 'Score', 'SPS_Update_Time']
100101

102+
query_success = timestream_success = cloudwatch_success = \
103+
update_latest_success = save_raw_az_true_desired_count_1_success = False
104+
101105
if prev_availability_zone_true_all_data_df is not None and not prev_availability_zone_true_all_data_df.empty:
102106
prev_availability_zone_true_all_data_df.drop(columns=['id'], inplace=True)
103107
changed_df = compare_sps(prev_availability_zone_true_all_data_df, sps_res_az_true_desired_count_1_merged_df, workload_cols, feature_cols)
@@ -106,15 +110,14 @@ def handle_res_df_for_spotlake(price_saving_if_df, sps_res_az_true_desired_count
106110
cloudwatch_success = upload_cloudwatch(sps_res_az_true_desired_count_1_merged_df, time_datetime)
107111

108112
update_latest_success = update_latest(sps_res_az_true_desired_count_1_merged_df)
109-
save_raw_az_true_desired_count_1_success = save_raw(sps_res_az_true_desired_count_1_merged_df, time_datetime,
110-
data_type='az_true_desired_count_1')
113+
save_raw_az_true_desired_count_1_success = save_raw(sps_res_az_true_desired_count_1_merged_df, time_datetime, availability_zones,
114+
data_type='desired_count_1')
111115

112116
success_flag = all([query_success, timestream_success, cloudwatch_success, update_latest_success, save_raw_az_true_desired_count_1_success])
113117
log_details = (
114118
f"update_latest_success: {update_latest_success}, save: {save_raw_az_true_desired_count_1_success}, cloudwatch: {cloudwatch_success}"
115119
f"query: {query_success}, timestream: {timestream_success}"
116120
)
117-
118121
if success_flag:
119122
Logger.info("Successfully merged the price/if/sps df, process data for spotlake!")
120123
return True
@@ -124,28 +127,30 @@ def handle_res_df_for_spotlake(price_saving_if_df, sps_res_az_true_desired_count
124127
return False
125128

126129
except Exception as e:
127-
Logger.error(f"Error in handle_res_df function: {e}")
130+
Logger.error(f"Error in handle_res_df_for_spotlake function: {e}")
128131
return False
129132

130133

131-
def handle_res_df_for_research(sps_res_az_false_desired_count_1_df, sps_res_az_true_desired_count_loop_df, sps_res_az_false_desired_count_loop_df,
132-
sps_res_specific_az_true_df, sps_res_specific_az_false_df, time_datetime):
134+
def handle_res_df_for_research(sps_res_desired_count_1_df, sps_res_desired_count_loop_df, sps_res_specific_df, time_datetime):
133135
try:
136+
134137
time_str = time_datetime.strftime("%Y-%m-%d %H:%M:%S")
135-
sps_res_az_false_desired_count_1_df['time'] = time_str
136-
sps_res_az_true_desired_count_loop_df['time'] = time_str
137-
sps_res_az_false_desired_count_loop_df['time'] = time_str
138-
sps_res_specific_az_true_df['time'] = time_str
139-
sps_res_specific_az_false_df['time'] = time_str
138+
sps_res_desired_count_1_df['time'] = time_str
139+
sps_res_desired_count_loop_df['time'] = time_str
140+
sps_res_specific_df['time'] = time_str
141+
142+
if availability_zones is True:
143+
save_raw_az_desired_count_1_success = True
144+
else:
145+
save_raw_az_desired_count_1_success = save_raw(sps_res_desired_count_1_df, time_datetime, availability_zones,
146+
data_type='desired_count_1')
140147

141-
save_raw_az_false_desired_count_1_success = save_raw(sps_res_az_true_desired_count_loop_df, time_datetime, data_type='az_false_desired_count_1')
142-
save_raw_az_true_desired_count_loop_success = save_raw(sps_res_az_true_desired_count_loop_df, time_datetime, data_type='az_true_desired_count_loop')
143-
save_raw_az_false_desired_count_loop_success = save_raw(sps_res_az_false_desired_count_loop_df, time_datetime, data_type='az_false_desired_count_loop')
144-
save_raw_specific_az_true_success = save_raw(sps_res_specific_az_true_df, time_datetime, data_type='specific_az_true')
145-
save_raw_specific_az_false_success = save_raw(sps_res_specific_az_false_df, time_datetime, data_type='specific_az_false')
148+
save_raw_az_true_desired_count_loop_success = save_raw(sps_res_desired_count_loop_df, time_datetime, availability_zones,
149+
data_type='desired_count_loop')
150+
save_raw_specific_az_true_success = save_raw(sps_res_specific_df, time_datetime, availability_zones, data_type='specific')
146151

147152

148-
success_flag = all([save_raw_az_false_desired_count_1_success, save_raw_az_true_desired_count_loop_success, save_raw_az_false_desired_count_loop_success, save_raw_specific_az_true_success, save_raw_specific_az_false_success])
153+
success_flag = all([save_raw_az_desired_count_1_success, save_raw_az_true_desired_count_loop_success, save_raw_specific_az_true_success])
149154
if success_flag:
150155
Logger.info("Successfully merged the price/if/sps df, process data for research!")
151156
return True
@@ -154,7 +159,7 @@ def handle_res_df_for_research(sps_res_az_false_desired_count_1_df, sps_res_az_t
154159
return False
155160

156161
except Exception as e:
157-
Logger.error(f"Error in handle_res_df function: {e}")
162+
Logger.error(f"Error in handle_res_df_for_research function: {e}")
158163
return False
159164

160165

0 commit comments

Comments
 (0)