Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions src/cl/cl_launcher.star
Original file line number Diff line number Diff line change
Expand Up @@ -273,18 +273,23 @@ def launch(
"node_selectors": node_selectors,
"get_cl_context": get_cl_context,
"get_blobber_config": get_blobber_config,
"participant_index": index,
}

# add rest of cl's in parallel to speed package execution
cl_services = {}
if len(cl_service_configs) > 0:
cl_services = plan.add_services(cl_service_configs)

# Create CL contexts ordered by participant index
cl_contexts_temp = {}
blobber_configs_temp = {}
for beacon_service_name, beacon_service in cl_services.items():
info = cl_participant_info[beacon_service_name]
get_cl_context = info["get_cl_context"]
get_blobber_config = info["get_blobber_config"]
participant = info["participant"]
participant_index = info["participant_index"]

cl_context = get_cl_context(
plan,
Expand All @@ -305,20 +310,25 @@ def launch(
info["node_selectors"],
)
if blobber_config != None:
blobber_configs_with_contexts.append(
struct(
cl_context=cl_context,
blobber_config=blobber_config,
participant=participant,
)
blobber_configs_temp[participant_index] = struct(
cl_context=cl_context,
blobber_config=blobber_config,
participant=participant,
)

# Add participant cl additional prometheus labels
for metrics_info in cl_context.cl_nodes_metrics_info:
if metrics_info != None:
metrics_info["config"] = participant.prometheus_config

all_cl_contexts.append(cl_context)
cl_contexts_temp[participant_index] = cl_context

# Add remaining CL contexts in participant order (skipping index 0 which was added earlier)
for i in range(1, len(args_with_right_defaults.participants)):
if i in cl_contexts_temp:
all_cl_contexts.append(cl_contexts_temp[i])
if i in blobber_configs_temp:
blobber_configs_with_contexts.append(blobber_configs_temp[i])

return (
all_cl_contexts,
Expand Down
14 changes: 12 additions & 2 deletions src/el/el_launcher.star
Original file line number Diff line number Diff line change
Expand Up @@ -185,16 +185,21 @@ def launch(
el_participant_info[el_service_name] = {
"client_name": el_type,
"supernode": participant.supernode,
"participant_index": index,
"participant": participant,
}

# add remainder of el's in parallel to speed package execution
el_services = {}
if len(el_service_configs) > 0:
el_services = plan.add_services(el_service_configs)

# Create contexts for each service
# Create contexts ordered by participant index
el_contexts_temp = {}
for el_service_name, el_service in el_services.items():
el_type = el_participant_info[el_service_name]["client_name"]
participant_index = el_participant_info[el_service_name]["participant_index"]
participant = el_participant_info[el_service_name]["participant"]
get_el_context = el_launchers[el_type]["get_el_context"]

el_context = get_el_context(
Expand All @@ -209,7 +214,12 @@ def launch(
if metrics_info != None:
metrics_info["config"] = participant.prometheus_config

all_el_contexts.append(el_context)
el_contexts_temp[participant_index] = el_context

# Add remaining EL contexts in participant order (skipping index 0 which was added earlier)
for i in range(1, len(participants)):
if i in el_contexts_temp:
all_el_contexts.append(el_contexts_temp[i])

plan.print("Successfully added {0} EL participants".format(num_participants))
return all_el_contexts
19 changes: 16 additions & 3 deletions src/participant_network.star
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ def launch_participant_network(
vc_service_configs[service_name] = vc_service_config
vc_service_info[service_name] = {
"client_name": vc_type,
"participant_index": index,
}
current_vc_index += 1

Expand All @@ -486,7 +487,8 @@ def launch_participant_network(
if len(vc_service_configs) > 0:
vc_services = plan.add_services(vc_service_configs)

all_vc_contexts = []
# Create VC contexts ordered by participant index
vc_contexts_temp = {}
for vc_service_name, vc_service in vc_services.items():
vc_context = vc.get_vc_context(
plan,
Expand All @@ -495,10 +497,21 @@ def launch_participant_network(
vc_service_info[vc_service_name]["client_name"],
)

participant_index = vc_service_info[vc_service_name]["participant_index"]
if vc_context and vc_context.metrics_info:
vc_context.metrics_info["config"] = participant.prometheus_config
vc_context.metrics_info["config"] = args_with_right_defaults.participants[
participant_index
].prometheus_config

vc_contexts_temp[participant_index] = vc_context

all_vc_contexts.append(vc_context)
# Convert to ordered list
all_vc_contexts = []
for i in range(len(args_with_right_defaults.participants)):
if i in vc_contexts_temp:
all_vc_contexts.append(vc_contexts_temp[i])
else:
all_vc_contexts.append(None)

all_participants = []
for index, participant in enumerate(args_with_right_defaults.participants):
Expand Down