Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed ETCD with 3 pods. #197

Merged
merged 2 commits into from
Mar 22, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 14 additions & 16 deletions coordinator/gscoordinator/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,32 +390,30 @@ def _create_etcd(self):

time.sleep(1)

# create etcd deployment
# create distributed etcd with 3 pods
lidongze0629 marked this conversation as resolved.
Show resolved Hide resolved
etcd_builder = self._gs_etcd_builder_cls(
name=self._etcd_name,
labels=labels,
replicas=1,
image_pull_policy=self._image_pull_policy,
)

for name in self._image_pull_secrets:
etcd_builder.add_image_pull_secret(name)

etcd_builder.add_etcd_container(
name=self._etcd_container_name,
name_prefix=self._etcd_name,
container_name=self._etcd_container_name,
service_name=self._etcd_service_name,
image=self._etcd_image,
cpu=self._etcd_cpu,
mem=self._etcd_mem,
preemptive=self._preemptive,
labels=labels,
image_pull_policy=self._image_pull_policy,
num_pods=3,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe num_pods of etcd can be extracted to a configurable flag?
I thought it's better, but it's up to you.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expose etcd pod's number as a param named k8s_etcd_num_pods to client

restart_policy="Always",
image_pull_secrets=self._image_pull_secrets,
listen_peer_service_port=self._random_etcd_listen_peer_service_port,
listen_client_service_port=self._random_etcd_listen_client_service_port,
)
self._resource_object.append(
self._app_api.create_namespaced_deployment(
self._namespace, etcd_builder.build()

for pod_builder in etcd_builder.build():
self._resource_object.append(
self._core_api.create_namespaced_pod(
self._namespace, pod_builder.build()
)
)
)

def _create_vineyard_service(self):
labels = {"name": self._engine_name} # vineyard in engine pod
Expand Down
254 changes: 186 additions & 68 deletions python/graphscope/deploy/kubernetes/resource_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -693,98 +693,216 @@ def add_engine_container(self, name, image, cpu, mem, preemptive, **kwargs):
)


class GSEtcdBuilder(DeploymentBuilder):
"""Builder for graphscope etcd."""

_requests_cpu = 0.5
_requests_mem = "128Mi"
class PodBuilder(object):
"""Base builder for k8s pod."""

def __init__(self, name, labels, image_pull_policy, replicas=1):
def __init__(
self, name, labels, hostname=None, subdomain=None, restart_policy="Never"
):
self._name = name
self._labels = labels
self._replicas = replicas
self._image_pull_policy = image_pull_policy
super().__init__(
self._name, self._labels, self._replicas, self._image_pull_policy
self._hostname = hostname
self._subdomain = subdomain
self._restart_policy = restart_policy

self._containers = []
self._image_pull_secrets = []
self._volumes = []

def add_volume(self, vol):
if isinstance(vol, list):
self._volumes.extend(vol)
else:
self._volumes.append(vol)

def add_container(self, ctn):
self._containers.append(ctn)

def add_image_pull_secret(self, name):
self._image_pull_secrets.append(LocalObjectRefBuilder(name))

def build_pod_spec(self):
return _remove_nones(
{
"hostname": self._hostname,
"subdomain": self._subdomain,
"containers": [ctn for ctn in self._containers],
"volumes": [vol.build() for vol in self._volumes] or None,
"imagePullSecrets": [ips.build() for ips in self._image_pull_secrets]
or None,
"restartPolicy": self._restart_policy,
}
)

def add_etcd_container(
def build(self):
return {
"kind": "Pod",
"metadata": {"name": self._name, "labels": self._labels},
"spec": self.build_pod_spec(),
}


class GSEtcdBuilder(object):
"""Builder for graphscope etcd."""

_requests_cpu = 0.5
_requests_mem = "128Mi"

def __init__(
self,
name,
name_prefix,
container_name,
service_name,
image,
cpu,
mem,
preemptive,
listen_peer_service_port,
listen_client_service_port,
labels,
image_pull_policy,
num_pods=3,
restart_policy="Always",
image_pull_secrets=None,
max_txn_ops=1024000,
):
cmd = [
"etcd",
"--name",
service_name,
"--max-txn-ops=%s" % max_txn_ops,
"--initial-advertise-peer-urls",
"http://%s:%s" % (service_name, str(listen_peer_service_port)),
"--advertise-client-urls=http://%s:%s"
% (service_name, str(listen_client_service_port)),
"--data-dir=/var/lib/etcd",
"--listen-client-urls=http://0.0.0.0:%s" % str(listen_client_service_port),
"--listen-peer-urls=http://0.0.0.0:%s" % str(listen_peer_service_port),
"--initial-cluster",
"%s=http://%s:%s"
% (
service_name,
service_name,
str(listen_peer_service_port),
),
"--initial-cluster-state",
"new",
]
self._name_prefix = name_prefix
self._container_name = container_name
self._service_name = service_name
self._image = image
self._cpu = cpu
self._mem = mem
self._preemptive = preemptive
self._listen_peer_service_port = listen_peer_service_port
self._listen_client_service_port = listen_client_service_port
self._labels = labels
self._image_pull_policy = image_pull_policy
self._num_pods = num_pods
self._restart_policy = restart_policy
self._image_pull_secrets = image_pull_secrets
self._max_txn_ops = 1024000

resources_dict = {
"requests": ResourceBuilder(self._requests_cpu, self._requests_mem).build()
if preemptive
else ResourceBuilder(cpu, mem).build(),
"limits": ResourceBuilder(cpu, mem).build(),
}
self._envs = dict()
self._volumes = []

volumeMounts = []
for vol in self._volumes:
for vol_mount in vol.build_mount():
volumeMounts.append(vol_mount)
def add_volume(self, vol):
if isinstance(vol, list):
self._volumes.extend(vol)
else:
self._volumes.append(vol)

super().add_container(
_remove_nones(
{
"command": cmd,
"env": [env.build() for env in self._envs.values()] or None,
"image": image,
"name": name,
"imagePullPolicy": self._image_pull_policy,
"resources": dict((k, v) for k, v in resources_dict.items() if v)
or None,
"ports": [
PortBuilder(listen_peer_service_port).build(),
PortBuilder(listen_client_service_port).build(),
],
"volumeMounts": volumeMounts or None,
"livenessProbe": self.build_liveness_probe(
listen_client_service_port
).build(),
"readinessProbe": None,
"lifecycle": None,
}
def add_env(self, name, value=None):
self._envs[name] = ContainerEnvBuilder(name, value)

def add_simple_envs(self, envs):
for k, v in envs.items() or ():
self.add_env(k, v)

def build(self):
"""
Returns: a list of :class:`PodBuilder`.
"""
pods_name = []
initial_cluster = ""
for i in range(self._num_pods):
name = "%s-%s" % (self._name_prefix, str(i))
pods_name.append(name)
initial_cluster += "%s=%s," % (
name,
"http://%s:%s"
% (
"%s.%s" % (name, self._service_name),
str(self._listen_peer_service_port),
),
)
)
# remove last comma
initial_cluster = initial_cluster[0:-1]

pods_builder = []
for _, name in enumerate(pods_name):
pod_builder = PodBuilder(
name=name,
labels=self._labels,
hostname=name,
subdomain=self._service_name,
restart_policy=self._restart_policy,
)

# volumes
pod_builder.add_volume(self._volumes)

cmd = [
"etcd",
"--name",
name,
"--max-txn-ops=%s" % self._max_txn_ops,
"--initial-advertise-peer-urls",
"http://%s:%s"
% (
"%s.%s" % (name, self._service_name),
str(self._listen_peer_service_port),
),
"--advertise-client-urls=http://%s:%s"
% (name, str(self._listen_client_service_port)),
"--data-dir=/var/lib/etcd",
"--listen-client-urls=http://0.0.0.0:%s"
% str(self._listen_client_service_port),
"--listen-peer-urls=http://0.0.0.0:%s"
% str(self._listen_peer_service_port),
"--initial-cluster",
initial_cluster,
"--initial-cluster-state",
"new",
]

resources_dict = {
"requests": ResourceBuilder(
self._requests_cpu, self._requests_mem
).build()
if self._preemptive
else ResourceBuilder(self._cpu, self._mem).build(),
"limits": ResourceBuilder(self._cpu, self._mem).build(),
}

volumeMounts = []
for vol in self._volumes:
for vol_mount in vol.build_mount():
volumeMounts.append(vol_mount)

pod_builder.add_container(
_remove_nones(
{
"command": cmd,
"env": [env.build() for env in self._envs.values()] or None,
"image": self._image,
"name": self._container_name,
"imagePullPolicy": self._image_pull_policy,
"resources": dict(
(k, v) for k, v in resources_dict.items() if v
)
or None,
"ports": [
PortBuilder(self._listen_peer_service_port).build(),
PortBuilder(self._listen_client_service_port).build(),
],
"volumeMounts": volumeMounts or None,
"livenessProbe": self.build_liveness_probe().build(),
"readinessProbe": None,
"lifecycle": None,
}
)
)

pods_builder.append(pod_builder)

return pods_builder

def build_liveness_probe(self, listen_client_service_port):
def build_liveness_probe(self):
liveness_cmd = [
"/bin/sh",
"-ec",
"ETCDCTL_API=3 etcdctl --endpoints=http://[127.0.0.1]:%s get foo"
% str(listen_client_service_port),
% str(self._listen_client_service_port),
]
return ExecProbeBuilder(liveness_cmd, timeout=15, failure_thresh=8)

Expand Down