@@ -170,6 +170,7 @@ def __init__(
170
170
gie_graph_manager_mem = None ,
171
171
engine_cpu = None ,
172
172
engine_mem = None ,
173
+ vineyard_daemonset = None ,
173
174
vineyard_cpu = None ,
174
175
vineyard_mem = None ,
175
176
vineyard_shared_mem = None ,
@@ -203,12 +204,6 @@ def __init__(
203
204
self ._gie_graph_manager_name = (
204
205
self ._gie_graph_manager_name_prefix + self ._instance_id
205
206
)
206
- self ._gie_graph_manager_service_name = (
207
- self ._gie_graph_manager_service_name_prefix + self ._instance_id
208
- )
209
- self ._vineyard_service_name = (
210
- self ._vineyard_service_name_prefix + self ._instance_id
211
- )
212
207
213
208
self ._namespace = namespace
214
209
self ._service_type = service_type
@@ -226,6 +221,7 @@ def __init__(
226
221
self ._engine_mem = engine_mem
227
222
228
223
# vineyard container info
224
+ self ._vineyard_daemonset = vineyard_daemonset
229
225
self ._vineyard_cpu = vineyard_cpu
230
226
self ._vineyard_mem = vineyard_mem
231
227
self ._vineyard_shared_mem = vineyard_shared_mem
@@ -278,6 +274,17 @@ def __init__(
278
274
self ._graphlearn_services = dict ()
279
275
self ._learning_instance_processes = {}
280
276
277
+ # component service name
278
+ self ._gie_graph_manager_service_name = (
279
+ self ._gie_graph_manager_service_name_prefix + self ._instance_id
280
+ )
281
+ if self ._exists_vineyard_daemonset (self ._vineyard_daemonset ):
282
+ self ._vineyard_service_name = self ._vineyard_daemonset + "-rpc"
283
+ else :
284
+ self ._vineyard_service_name = (
285
+ self ._vineyard_service_name_prefix + self ._instance_id
286
+ )
287
+
281
288
def __del__ (self ):
282
289
self .stop ()
283
290
@@ -318,11 +325,21 @@ def _create_engine_replicaset(self):
318
325
)
319
326
# volume1 is for vineyard ipc socket
320
327
# MaxGraph: /home/maxgraph/data/vineyard
328
+ if self ._exists_vineyard_daemonset (self ._vineyard_daemonset ):
329
+ vineyard_socket_volume_type = "hostPath"
330
+ vineyard_socket_volume_fields = {
331
+ "type" : "Directory" ,
332
+ "path" : "/var/run/vineyard-%s-%s"
333
+ % (self ._namespace , self ._vineyard_daemonset ),
334
+ }
335
+ else :
336
+ vineyard_socket_volume_type = "emptyDir"
337
+ vineyard_socket_volume_fields = {}
321
338
engine_builder .add_volume (
322
339
VolumeBuilder (
323
340
name = "vineyard-ipc-volume" ,
324
- type = "emptyDir" ,
325
- field = {} ,
341
+ type = vineyard_socket_volume_type ,
342
+ field = vineyard_socket_volume_fields ,
326
343
mounts_list = [
327
344
{"mountPath" : "/tmp/vineyard_workspace" },
328
345
{"mountPath" : "/home/maxgraph/data/vineyard" },
@@ -347,16 +364,17 @@ def _create_engine_replicaset(self):
347
364
# add env
348
365
engine_builder .add_simple_envs ({"GLOG_v" : str (self ._glog_level )})
349
366
# add vineyard container
350
- engine_builder .add_vineyard_container (
351
- name = self ._vineyard_container_name ,
352
- image = self ._gs_image ,
353
- cpu = self ._vineyard_cpu ,
354
- mem = self ._vineyard_mem ,
355
- shared_mem = self ._vineyard_shared_mem ,
356
- preemptive = self ._preemptive ,
357
- etcd_endpoint = self ._etcd_endpoint ,
358
- port = self ._vineyard_service_port ,
359
- )
367
+ if not self ._exists_vineyard_daemonset (self ._vineyard_daemonset ):
368
+ engine_builder .add_vineyard_container (
369
+ name = self ._vineyard_container_name ,
370
+ image = self ._gs_image ,
371
+ cpu = self ._vineyard_cpu ,
372
+ mem = self ._vineyard_mem ,
373
+ shared_mem = self ._vineyard_shared_mem ,
374
+ preemptive = self ._preemptive ,
375
+ etcd_endpoint = self ._etcd_endpoint ,
376
+ port = self ._vineyard_service_port ,
377
+ )
360
378
# add engine container
361
379
engine_builder .add_engine_container (
362
380
name = self ._engine_container_name ,
@@ -606,7 +624,8 @@ def _create_services(self):
606
624
logger .info ("Etcd is ready, endpoint is {}" .format (self ._etcd_endpoint ))
607
625
608
626
self ._create_engine_replicaset ()
609
- self ._create_vineyard_service ()
627
+ if not self ._exists_vineyard_daemonset (self ._vineyard_daemonset ):
628
+ self ._create_vineyard_service ()
610
629
611
630
def _waiting_for_services_ready (self ):
612
631
start_time = time .time ()
@@ -796,6 +815,17 @@ def _delete_dangling_coordinator(self):
796
815
)
797
816
)
798
817
818
+ def _exists_vineyard_daemonset (self , release ):
819
+ # check if vineyard daemonset exists.
820
+ if not release :
821
+ return False
822
+ try :
823
+ self ._app_api .read_namespaced_daemon_set (release , self ._namespace )
824
+ except K8SApiException :
825
+ return False
826
+ else :
827
+ return True
828
+
799
829
def start (self ):
800
830
try :
801
831
self ._create_services ()
0 commit comments