Skip to content

Commit f2087fc

Browse files
authored
Merge pull request #565 from ddps-lab/gcp-collector
Fix: GCP Collector gpu df issue
2 parents 43c5549 + 80460c7 commit f2087fc

File tree

1 file changed

+53
-31
lines changed

1 file changed

+53
-31
lines changed

collector/spot-dataset/gcp/lambda/lambda_function.py

+53-31
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
},
3333
}
3434

35+
3536
def get_url(version, sku_id=None):
3637
if version == 'v2beta/skus':
3738
return urls[version]['BASE_URL'] + urls[version]['QUERY_STRING']
@@ -40,6 +41,7 @@ def get_url(version, sku_id=None):
4041
else:
4142
return None
4243

44+
4345
# Get authentication token
4446
def get_access_token():
4547
try:
@@ -53,6 +55,7 @@ def get_access_token():
5355
send_slack_message(f"[GCP Collector]\nError in get_access_token: {str(e)}")
5456
raise
5557

58+
5659
# str1: GPU model name string (e.g. nvidia-h100-mega-80gb)
5760
# str2: List of GPU model names to compare similarity with (e.g. ['A100', 'H100', 'L4'])
5861
def jaccard_similarity(str1, str2):
@@ -77,6 +80,7 @@ def jaccard_similarity(str1, str2):
7780
max_str = str
7881
return max_str
7982

83+
8084
def call_api(version=None, sku_id=None, page_token=None):
8185
token = get_access_token()
8286
headers = {"Authorization": f"Bearer {token}"}
@@ -97,6 +101,7 @@ def call_api(version=None, sku_id=None, page_token=None):
97101
send_slack_message(error_msg)
98102
raise
99103

104+
100105
# Get SKU information
101106
def get_sku_infos(response):
102107
try:
@@ -106,26 +111,26 @@ def get_sku_infos(response):
106111
for sku in skus:
107112
info_type = None
108113
if (len(sku['productTaxonomy']['taxonomyCategories']) == 6 and
109-
sku['productTaxonomy']['taxonomyCategories'][0]['category'] == 'GCP' and
110-
sku['productTaxonomy']['taxonomyCategories'][1]['category'] == 'Compute' and
111-
sku['productTaxonomy']['taxonomyCategories'][2]['category'] == 'GCE' and
112-
(sku['productTaxonomy']['taxonomyCategories'][3]['category'] == 'VMs Preemptible' or sku['productTaxonomy']['taxonomyCategories'][3]['category'] == 'VMs On Demand') and
114+
sku['productTaxonomy']['taxonomyCategories'][0]['category'] == 'GCP' and
115+
sku['productTaxonomy']['taxonomyCategories'][1]['category'] == 'Compute' and
116+
sku['productTaxonomy']['taxonomyCategories'][2]['category'] == 'GCE' and
117+
(sku['productTaxonomy']['taxonomyCategories'][3]['category'] == 'VMs Preemptible' or sku['productTaxonomy']['taxonomyCategories'][3]['category'] == 'VMs On Demand') and
113118
(sku['productTaxonomy']['taxonomyCategories'][4]['category'] == 'Memory: Per GB' or sku['productTaxonomy']['taxonomyCategories'][4]['category'] == 'Cores: Per Core' or sku['productTaxonomy']['taxonomyCategories'][4]['category'] == 'Cores: 1 to 64') and
114119
'Custom' not in sku['displayName'] and
115120
'Sole Tenancy' not in sku['displayName'] and
116-
sku['productTaxonomy']['taxonomyCategories'][5]['category'] != 'Cross VM'):
121+
sku['productTaxonomy']['taxonomyCategories'][5]['category'] != 'Cross VM'):
117122
info_type = "VMs"
118123
elif (len(sku['productTaxonomy']['taxonomyCategories']) == 5 and
119-
sku['productTaxonomy']['taxonomyCategories'][0]['category'] == 'GCP' and
120-
sku['productTaxonomy']['taxonomyCategories'][1]['category'] == 'Compute' and
121-
sku['productTaxonomy']['taxonomyCategories'][2]['category'] == 'GPUs' and
122-
(sku['productTaxonomy']['taxonomyCategories'][3]['category'] == 'GPUs Preemptible' or sku['productTaxonomy']['taxonomyCategories'][3]['category'] == 'GPUs On Demand')):
124+
sku['productTaxonomy']['taxonomyCategories'][0]['category'] == 'GCP' and
125+
sku['productTaxonomy']['taxonomyCategories'][1]['category'] == 'Compute' and
126+
sku['productTaxonomy']['taxonomyCategories'][2]['category'] == 'GPUs' and
127+
(sku['productTaxonomy']['taxonomyCategories'][3]['category'] == 'GPUs Preemptible' or sku['productTaxonomy']['taxonomyCategories'][3]['category'] == 'GPUs On Demand')):
123128
info_type = "GPUs"
124129
elif (len(sku['productTaxonomy']['taxonomyCategories']) == 6 and
125-
sku['productTaxonomy']['taxonomyCategories'][0]['category'] == 'GCP' and
126-
sku['productTaxonomy']['taxonomyCategories'][1]['category'] == 'Compute' and
127-
sku['productTaxonomy']['taxonomyCategories'][2]['category'] == 'GPUs' and
128-
(sku['productTaxonomy']['taxonomyCategories'][3]['category'] == 'GPUs Preemptible' or sku['productTaxonomy']['taxonomyCategories'][3]['category'] == 'GPUs On Demand')):
130+
sku['productTaxonomy']['taxonomyCategories'][0]['category'] == 'GCP' and
131+
sku['productTaxonomy']['taxonomyCategories'][1]['category'] == 'Compute' and
132+
sku['productTaxonomy']['taxonomyCategories'][2]['category'] == 'GPUs' and
133+
(sku['productTaxonomy']['taxonomyCategories'][3]['category'] == 'GPUs Preemptible' or sku['productTaxonomy']['taxonomyCategories'][3]['category'] == 'GPUs On Demand')):
129134
info_type = "GPUs_with_Core_and_Memory"
130135
else:
131136
continue
@@ -192,6 +197,8 @@ def get_sku_infos(response):
192197
gpu_type = sku['productTaxonomy']['taxonomyCategories'][4]['category']
193198
price_model = "On-demand" if sku['productTaxonomy']['taxonomyCategories'][3]['category'] == 'GPUs On Demand' else "Preemptible"
194199
price_resource = sku['productTaxonomy']['taxonomyCategories'][5]['category'].split(":")[0]
200+
if price_resource == 'GPU slice': # Skip GPU slice resource
201+
continue
195202
if sku['geoTaxonomy']['type'] == 'TYPE_REGIONAL':
196203
region = sku['geoTaxonomy']['regionalMetadata']['region']['region']
197204
gpu_sku_infos.append({
@@ -221,6 +228,7 @@ def get_sku_infos(response):
221228
send_slack_message(error_msg)
222229
raise
223230

231+
224232
def get_price_infos(response, sku_ids, gpu_sku_ids):
225233
try:
226234
prices = response['prices']
@@ -231,7 +239,8 @@ def get_price_infos(response, sku_ids, gpu_sku_ids):
231239
if sku_id in sku_ids:
232240
price_value = None
233241
try:
234-
price_value = int(price['rate']['tiers'][0]['listPrice']['units']) + price['rate']['tiers'][0]['listPrice']['nanos'] * 0.000000001
242+
price_value = int(price['rate']['tiers'][0]['listPrice']['units']) + \
243+
price['rate']['tiers'][0]['listPrice']['nanos'] * 0.000000001
235244
except:
236245
price_value = price['rate']['tiers'][0]['listPrice']['nanos'] * 0.000000001
237246
price_infos.append({
@@ -246,7 +255,8 @@ def get_price_infos(response, sku_ids, gpu_sku_ids):
246255
price_value = None
247256
try:
248257
try:
249-
price_value = int(price['rate']['tiers'][0]['listPrice']['units']) + price['rate']['tiers'][0]['listPrice']['nanos'] * 0.000000001
258+
price_value = int(price['rate']['tiers'][0]['listPrice']['units']) + \
259+
price['rate']['tiers'][0]['listPrice']['nanos'] * 0.000000001
250260
except:
251261
price_value = price['rate']['tiers'][0]['listPrice']['nanos'] * 0.000000001
252262
except:
@@ -264,20 +274,21 @@ def get_price_infos(response, sku_ids, gpu_sku_ids):
264274
send_slack_message(f"[GCP Collector]\nKeyError in get_price_infos: {str(e)}")
265275
raise
266276

277+
267278
def list_regions_and_machine_types(gpu_families):
268279
try:
269280
# Create Compute Engine API client (using JSON key file)
270281
client = compute_v1.RegionsClient.from_service_account_file(SERVICE_ACCOUNT_FILE)
271282
machine_types_client = compute_v1.MachineTypesClient.from_service_account_file(SERVICE_ACCOUNT_FILE)
272-
283+
273284
# Get project ID (read from JSON file)
274285
with open(SERVICE_ACCOUNT_FILE, 'r') as f:
275286
import json
276287
project_id = json.load(f)['project_id']
277-
288+
278289
# Get all regions
279290
regions = client.list(project=project_id)
280-
291+
281292
# Save results
282293
region_machine_types = []
283294

@@ -286,7 +297,7 @@ def list_regions_and_machine_types(gpu_families):
286297
# Get machine types for each region
287298
for region in regions:
288299
zone_list = list_zones_in_region(region.name, project_id)
289-
300+
290301
for zone in zone_list:
291302
machine_types = machine_types_client.list(project=project_id, zone=zone)
292303
for machine_type in machine_types:
@@ -307,28 +318,30 @@ def list_regions_and_machine_types(gpu_families):
307318
"gpuCount": gpu_count,
308319
"gpuType": gpu_type,
309320
})
310-
321+
311322
return region_machine_types
312323
except Exception as e:
313324
send_slack_message(f"[GCP Collector]\nError in list_regions_and_machine_types: {str(e)}")
314325
raise
315326

327+
316328
def list_zones_in_region(region_name, project_id):
317329
try:
318330
"""
319331
Get all available zones in the given region
320332
"""
321333
zones_client = compute_v1.ZonesClient.from_service_account_file(SERVICE_ACCOUNT_FILE)
322334
zones = zones_client.list(project=project_id)
323-
335+
324336
return [
325-
zone.name for zone in zones
337+
zone.name for zone in zones
326338
if zone.name.startswith(region_name)
327339
]
328340
except Exception as e:
329341
send_slack_message(f"[GCP Collector]\nError in list_zones_in_region: {str(e)}")
330342
raise
331343

344+
332345
# Define price calculation function
333346
def calculate_price(row, cores_key, memory_key, gpu_key):
334347
cores_price = row[cores_key]
@@ -344,10 +357,11 @@ def calculate_price(row, cores_key, memory_key, gpu_key):
344357
gpu_price = 0 # Treat as 0 if only GPU price is missing
345358
return max(row["vcpus"], 1) * cores_price + row["memory"] * memory_price + row["gpuCount"] * gpu_price
346359

360+
347361
def upload_cloudwatch(df_current, timestamp):
348362
ondemand_count = len(df_current.drop(columns=['Spot Price', 'Savings']).dropna())
349363
spot_count = len(df_current.drop(columns=['OnDemand Price', 'Savings']).dropna())
350-
364+
351365
cw_client = boto3.client('logs')
352366

353367
log_event = {
@@ -357,16 +371,17 @@ def upload_cloudwatch(df_current, timestamp):
357371

358372
cw_client.put_log_events(
359373
logGroupName=GCP_CONST.SPOT_DATA_COLLECTION_LOG_GROUP_NAME,
360-
logStreamName=GCP_CONST.LOG_STREAM_NAME,
374+
logStreamName=GCP_CONST.LOG_STREAM_NAME,
361375
logEvents=[log_event]
362376
)
363377

378+
364379
def lambda_handler(event, context):
365380
try:
366381
start_time = time.time()
367382
str_datetime = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M")
368383
timestamp = datetime.strptime(str_datetime, "%Y-%m-%dT%H:%M")
369-
384+
370385
response = call_api(version='v2beta/skus')
371386
sku_infos, gpu_sku_infos = get_sku_infos(response)
372387
while 'nextPageToken' in response:
@@ -376,9 +391,11 @@ def lambda_handler(event, context):
376391
gpu_sku_infos += new_gpu_sku_infos
377392
print("Complete to get sku_infos")
378393

379-
sku_df = pd.DataFrame(sku_infos).sort_values(by=["machineFamily", "region", "priceModel", "priceResource"], ascending=True).reset_index(drop=True)
394+
sku_df = pd.DataFrame(sku_infos).sort_values(
395+
by=["machineFamily", "region", "priceModel", "priceResource"], ascending=True).reset_index(drop=True)
380396

381-
gpu_sku_df = pd.DataFrame(gpu_sku_infos).sort_values(by=["gpuType", "region", "priceModel", "priceResource"], ascending=True).reset_index(drop=True)
397+
gpu_sku_df = pd.DataFrame(gpu_sku_infos).sort_values(
398+
by=["gpuType", "region", "priceModel", "priceResource"], ascending=True).reset_index(drop=True)
382399

383400
sku_ids = set([sku_info['skuId'] for sku_info in sku_infos])
384401
gpu_sku_ids = set([gpu_sku_info['skuId'] for gpu_sku_info in gpu_sku_infos])
@@ -404,7 +421,8 @@ def lambda_handler(event, context):
404421

405422
machine_types_infos = list_regions_and_machine_types(list(gpu_df['gpuType'].unique()))
406423

407-
machine_types_df = pd.DataFrame(machine_types_infos).sort_values(by=["machineFamily", "machineType", "region", "vcpus", "memory"], ascending=True).reset_index(drop=True)
424+
machine_types_df = pd.DataFrame(machine_types_infos).sort_values(
425+
by=["machineFamily", "machineType", "region", "vcpus", "memory"], ascending=True).reset_index(drop=True)
408426
machine_types_df['machineModel'] = 'Standard'
409427

410428
# DataFrame transformation code
@@ -464,10 +482,13 @@ def lambda_handler(event, context):
464482
df_final[col] = df_final[col].fillna(df_final[f"{col}_new"])
465483

466484
# Remove temporary columns with '_new' suffix
467-
df_final = df_final.drop(columns=[f"{col}_new" for col in ["ondemandCorePrice", "ondemandMemoryPrice", "preemptibleCorePrice", "preemptibleMemoryPrice"]])
485+
df_final = df_final.drop(columns=[f"{col}_new" for col in ["ondemandCorePrice",
486+
"ondemandMemoryPrice", "preemptibleCorePrice", "preemptibleMemoryPrice"]])
468487

469-
df_final['ondemandPrice'] = df_final.apply(lambda row: calculate_price(row, "ondemandCorePrice", "ondemandMemoryPrice", "ondemandGPUPrice"), axis=1)
470-
df_final['preemptiblePrice'] = df_final.apply(lambda row: calculate_price(row, "preemptibleCorePrice", "preemptibleMemoryPrice", "preemptibleGPUPrice"), axis=1)
488+
df_final['ondemandPrice'] = df_final.apply(lambda row: calculate_price(
489+
row, "ondemandCorePrice", "ondemandMemoryPrice", "ondemandGPUPrice"), axis=1)
490+
df_final['preemptiblePrice'] = df_final.apply(lambda row: calculate_price(
491+
row, "preemptibleCorePrice", "preemptibleMemoryPrice", "preemptibleGPUPrice"), axis=1)
471492

472493
# Construct final DataFrame
473494
df_final['Time'] = timestamp.strftime("%Y-%m-%d %H:%M:%S")
@@ -520,5 +541,6 @@ def lambda_handler(event, context):
520541
send_slack_message(f"[GCP Collector]\nUnhandled exception in main: {str(e)}")
521542
raise
522543

544+
523545
if __name__ == "__main__":
524546
lambda_handler({}, {})

0 commit comments

Comments
 (0)