diff --git a/charts/graphscope-store-one-pod/templates/configmap.yaml b/charts/graphscope-store-one-pod/templates/configmap.yaml index 2326a91dcb66..fc6a5c88acc1 100644 --- a/charts/graphscope-store-one-pod/templates/configmap.yaml +++ b/charts/graphscope-store-one-pod/templates/configmap.yaml @@ -60,7 +60,6 @@ data: pegasus.timeout={{ .Values.pegasus.timeout }} pegasus.batch.size=1024 pegasus.output.capacity=16 - pegasus.hosts=localhost:8080 ## Kafka Config kafka.servers=KAFKA_SERVERS diff --git a/charts/graphscope-store-one-pod/templates/statefulset.yaml b/charts/graphscope-store-one-pod/templates/statefulset.yaml index b98a34c77912..7637be060893 100644 --- a/charts/graphscope-store-one-pod/templates/statefulset.yaml +++ b/charts/graphscope-store-one-pod/templates/statefulset.yaml @@ -114,6 +114,14 @@ spec: {{- if .Values.store.resources }} resources: {{- toYaml .Values.store.resources | nindent 12 }} {{- end }} + readinessProbe: + failureThreshold: 5 + tcpSocket: + port: gaia-rpc + initialDelaySeconds: 30 + periodSeconds: 30 + successThreshold: 1 + timeoutSeconds: 1 volumeMounts: - name: data mountPath: {{ .Values.storeDataPath }} diff --git a/charts/graphscope-store/templates/_helpers.tpl b/charts/graphscope-store/templates/_helpers.tpl index 5d4979c7d921..6541aee0c5e7 100644 --- a/charts/graphscope-store/templates/_helpers.tpl +++ b/charts/graphscope-store/templates/_helpers.tpl @@ -157,3 +157,10 @@ Get full broker list. {{- end }} {{- join "," $brokerList | printf "%s" -}} {{- end -}} +{{/* +Create a default fully qualified zookeeper name. +We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). +*/}} +{{- define "graphscope-store.zookeeper.fullname" -}} +{{- printf "%s-%s" .Release.Name "zookeeper" | trunc 63 | trimSuffix "-" -}} +{{- end -}} diff --git a/charts/graphscope-store/templates/configmap.yaml b/charts/graphscope-store/templates/configmap.yaml index 29e3f84a4fd9..419f90ccd14c 100644 --- a/charts/graphscope-store/templates/configmap.yaml +++ b/charts/graphscope-store/templates/configmap.yaml @@ -48,6 +48,10 @@ data: kafka.producer.custom.configs={{ .Values.kafkaProducerCustomConfigs }} kafka.test.cluster.enable=false + ## Zk Config + zk.base.path={{ .Values.zkBasePath }} + zk.connect.string=ZK_CONNECT + ## Frontend Config gremlin.server.port=12312 ## disable neo4j when launching groot server by default @@ -72,13 +76,15 @@ data: pegasus.timeout={{ .Values.pegasus.timeout }} pegasus.batch.size=1024 pegasus.output.capacity=16 - pegasus.hosts=PEGASUS_HOSTS ## Secondary config secondary.instance.enabled={{ .Values.secondary.enabled }} store.data.secondary.path={{ .Values.secondary.storeDataPath }} store.gc.interval.ms={{ .Values.storeGcIntervalMs }} + + write.ha.enabled={{ .Values.backup.enabled }} + ## Extra Config {{- if .Values.extraConfig }} {{- $config_list := regexSplit ";" .Values.extraConfig -1 }} @@ -95,24 +101,14 @@ data: [[ `hostname` =~ -([0-9]+)$ ]] || exit 1 ordinal=${BASH_REMATCH[1]} - pegasus_hosts="" - i=0 - while [ $i -ne $STORE_COUNT ]; do - pod=`echo $DNS_NAME_PREFIX_STORE | sed -e "s/{}/$i/g"` - # 60001 is fixed gaia engine port - pegasus_hosts="${pegasus_hosts},${pod}:60001" - i=$(($i+1)) - done - pegasus_hosts=${pegasus_hosts:1} - sudo sed -e "s/GRAPH_NAME/${GRAPH_NAME}/g" \ -e "s/ROLE/${ROLE}/g" \ -e "s/INDEX/${ordinal}/g" \ + -e "s/ZK_CONNECT/${ZK_CONNECT}/g" \ -e "s/KAFKA_SERVERS/${KAFKA_SERVERS}/g" \ -e "s/FRONTEND/${DNS_NAME_PREFIX_FRONTEND}/g" \ -e "s/COORDINATOR/${DNS_NAME_PREFIX_COORDINATOR}/g" \ -e "s/STORE/${DNS_NAME_PREFIX_STORE}/g" \ - -e "s/PEGASUS_HOSTS/${pegasus_hosts}/g" \ -e "s@LOG4RS_CONFIG@${GRAPHSCOPE_HOME}/groot/conf/log4rs.yml@g" \ /etc/groot/groot.config.tpl | sudo tee -a /etc/groot/groot.config diff --git a/charts/graphscope-store/templates/coordinator/statefulset.yaml b/charts/graphscope-store/templates/coordinator/statefulset.yaml index 466364ecdf96..8910388908de 100644 --- a/charts/graphscope-store/templates/coordinator/statefulset.yaml +++ b/charts/graphscope-store/templates/coordinator/statefulset.yaml @@ -123,6 +123,8 @@ spec: value: {{ $kafkaFullname}}-headless.{{ $releaseNamespace }} - name: KAFKA_SERVERS value: {{ include "graphscope-store.kafka.brokerlist" . }} + - name: ZK_CONNECT + value: {{ printf "%s-headless" (include "graphscope-store.zookeeper.fullname" .) | quote }} ports: - name: port containerPort: 55555 diff --git a/charts/graphscope-store/templates/frontend/statefulset.yaml b/charts/graphscope-store/templates/frontend/statefulset.yaml index c9919d197cfd..56534fd9f3c2 100644 --- a/charts/graphscope-store/templates/frontend/statefulset.yaml +++ b/charts/graphscope-store/templates/frontend/statefulset.yaml @@ -123,6 +123,8 @@ spec: value: {{ $kafkaFullname}}-headless.{{ $releaseNamespace }} - name: KAFKA_SERVERS value: {{ include "graphscope-store.kafka.brokerlist" . }} + - name: ZK_CONNECT + value: {{ printf "%s-headless" (include "graphscope-store.zookeeper.fullname" .) | quote }} ports: - name: service-port containerPort: 55556 @@ -139,6 +141,11 @@ spec: readinessProbe: {{- include "common.tplvalues.render" (dict "value" (omit .Values.readinessProbe "enabled") "context" $) | nindent 12 }} tcpSocket: port: gremlin + failureThreshold: 5 + initialDelaySeconds: 30 + periodSeconds: 30 + successThreshold: 1 + timeoutSeconds: 1 {{- end }} {{- if .Values.startupProbe.enabled }} startupProbe: {{- include "common.tplvalues.render" (dict "value" (omit .Values.startupProbe "enabled") "context" $) | nindent 12 }} diff --git a/charts/graphscope-store/templates/store/statefulset-backup.yaml b/charts/graphscope-store/templates/store/statefulset-backup.yaml new file mode 100644 index 000000000000..37147090e9ab --- /dev/null +++ b/charts/graphscope-store/templates/store/statefulset-backup.yaml @@ -0,0 +1,191 @@ +{{- if .Values.backup.enabled }} +{{- $frontendFullname := include "graphscope-store.frontend.fullname" . }} +{{- $coordinatorFullname := include "graphscope-store.coordinator.fullname" . }} +{{- $storeFullname := include "graphscope-store.store.fullname" . }} +{{- $kafkaFullname := include "graphscope-store.kafka.fullname" . -}} +{{- $releaseNamespace := .Release.Namespace }} +{{- $clusterDomain := .Values.clusterDomain }} + +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: {{ include "graphscope-store.store.fullname" . }}-backup + namespace: {{ .Release.Namespace }} + labels: {{- include "common.labels.standard" . | nindent 4 }} + app.kubernetes.io/component: store + {{- if .Values.commonLabels }} + {{- include "common.tplvalues.render" ( dict "value" .Values.commonLabels "context" $ ) | nindent 4 }} + {{- end }} + {{- if .Values.commonAnnotations }} + annotations: {{- include "common.tplvalues.render" ( dict "value" .Values.commonAnnotations "context" $ ) | nindent 4 }} + {{- end }} +spec: + podManagementPolicy: {{ .Values.podManagementPolicy }} + replicas: {{ .Values.store.replicaCount }} + selector: + matchLabels: {{- include "common.labels.matchLabels" . | nindent 6 }} + app.kubernetes.io/component: store + serviceName: {{ printf "%s-store-headless" (include "common.names.fullname" .) | trunc 63 | trimSuffix "-" }} + updateStrategy: {{- include "common.tplvalues.render" (dict "value" .Values.updateStrategy "context" $ ) | nindent 4 }} + template: + metadata: + labels: {{- include "common.labels.standard" . | nindent 8 }} + app.kubernetes.io/component: store + {{- if .Values.store.podLabels }} + {{- include "common.tplvalues.render" (dict "value" .Values.store.podLabels "context" $) | nindent 8 }} + {{- end }} + annotations: + {{- if (include "graphscope-store.createConfigmap" .) }} + checksum/configuration: {{ include (print $.Template.BasePath "/configmap.yaml") . | sha256sum }} + {{- end }} + {{- if .Values.store.podAnnotations }} + {{- include "common.tplvalues.render" (dict "value" .Values.store.podAnnotations "context" $) | nindent 8 }} + {{- end }} + spec: + {{- include "graphscope-store.imagePullSecrets" . | nindent 6 }} + {{- if .Values.store.hostAliases }} + hostAliases: {{- include "common.tplvalues.render" (dict "value" .Values.store.hostAliases "context" $) | nindent 8 }} + {{- end }} + hostNetwork: {{ .Values.store.hostNetwork }} + hostIPC: {{ .Values.store.hostIPC }} + {{- if .Values.store.schedulerName }} + schedulerName: {{ .Values.store.schedulerName | quote }} + {{- end }} + {{- if .Values.affinity }} + affinity: {{- include "common.tplvalues.render" (dict "value" .Values.affinity "context" $) | nindent 8 }} + {{- else }} + affinity: + podAffinity: {{- include "common.affinities.pods" (dict "type" .Values.podAffinityPreset "component" "store" "context" $) | nindent 10 }} + podAntiAffinity: {{- include "common.affinities.pods" (dict "type" .Values.podAntiAffinityPreset "component" "store" "context" $) | nindent 10 }} + nodeAffinity: {{- include "common.affinities.nodes" (dict "type" .Values.nodeAffinityPreset.type "key" .Values.nodeAffinityPreset.key "values" .Values.nodeAffinityPreset.values) | nindent 10 }} + {{- end }} + {{- if .Values.nodeSelector }} + nodeSelector: {{- include "common.tplvalues.render" (dict "value" .Values.nodeSelector "context" $) | nindent 8 }} + {{- end }} + {{- if .Values.dnsPolicy }} + dnsPolicy: {{ .Values.dnsPolicy | quote }} + {{- end }} + {{- if .Values.dnsConfig }} + dnsConfig: {{- include "common.tplvalues.render" (dict "value" .Values.dnsConfig "context" $) | nindent 8 }} + {{- end }} + {{- if .Values.tolerations }} + tolerations: {{- include "common.tplvalues.render" (dict "value" .Values.tolerations "context" $) | nindent 8 }} + {{- end }} + {{- if .Values.topologySpreadConstraints }} + topologySpreadConstraints: {{- include "common.tplvalues.render" (dict "value" .Values.topologySpreadConstraints "context" $) | nindent 8 }} + {{- end }} + {{- if .Values.terminationGracePeriodSeconds }} + terminationGracePeriodSeconds: {{ .Values.terminationGracePeriodSeconds }} + {{- end }} + {{- if .Values.priorityClassName }} + priorityClassName: {{ .Values.priorityClassName }} + {{- end }} + {{- if .Values.podSecurityContext.enabled }} + securityContext: {{- omit .Values.podSecurityContext "enabled" | toYaml | nindent 8 }} + {{- end }} + serviceAccountName: {{ include "graphscope-store.serviceAccountName" . }} + {{- if .Values.initContainers }} + initContainers: + {{- if .Values.initContainers }} + {{- include "common.tplvalues.render" ( dict "value" .Values.initContainers "context" $ ) | nindent 8 }} + {{- end }} + {{- end }} + containers: + - name: store + image: {{ include "graphscope-store.image" . }} + imagePullPolicy: {{ .Values.image.pullPolicy | quote }} + {{- if .Values.containerSecurityContext.enabled }} + securityContext: {{- omit .Values.containerSecurityContext "enabled" | toYaml | nindent 12 }} + {{- end }} + command: {{- include "common.tplvalues.render" (dict "value" .Values.command "context" $) | nindent 12 }} + {{- if .Values.args }} + args: {{- include "common.tplvalues.render" (dict "value" .Values.args "context" $) | nindent 12 }} + {{- end }} + env: + - name: GRAPH_NAME + value: {{ .Values.graphName | quote }} + - name: GROOT_JAVA_OPTS + value: {{ .Values.javaOpts | quote }} + - name: ROLE + value: "store" + - name: FRONTEND_COUNT + value: {{ .Values.frontend.replicaCount | quote }} + - name: COORDINATOR_COUNT + value: {{ .Values.coordinator.replicaCount | quote }} + - name: STORE_COUNT + value: {{ .Values.store.replicaCount | quote }} + - name: DNS_NAME_PREFIX_FRONTEND + value: {{ $frontendFullname }}-{}.{{ $frontendFullname }}-headless + - name: DNS_NAME_PREFIX_COORDINATOR + value: {{ $coordinatorFullname }}-{}.{{ $coordinatorFullname }}-headless + - name: DNS_NAME_PREFIX_STORE + value: {{ $storeFullname }}-{}.{{ $storeFullname }}-headless + - name: DNS_NAME_SERVICE_KAFKA + value: {{ $kafkaFullname}}-headless.{{ $releaseNamespace }} + - name: KAFKA_SERVERS + value: {{ include "graphscope-store.kafka.brokerlist" . }} + - name: ZK_CONNECT + value: {{ printf "%s-headless" (include "graphscope-store.zookeeper.fullname" .) | quote }} + ports: + - name: port + containerPort: 55555 + - name: gaia-rpc + containerPort: 60000 + - name: gaia-engine + containerPort: 60001 + {{- if .Values.store.resources }} + resources: {{- toYaml .Values.store.resources | nindent 12 }} + {{- end }} + readinessProbe: + failureThreshold: 5 + tcpSocket: + port: gaia-rpc + initialDelaySeconds: 30 + periodSeconds: 30 + successThreshold: 1 + timeoutSeconds: 1 + volumeMounts: + - name: data + mountPath: {{ .Values.storeDataPath }} + - name: config + mountPath: /etc/groot/groot.config.tpl + subPath: groot.config + - name: config + mountPath: /etc/groot/setup.sh + subPath: setup.sh + volumes: + - name: config + configMap: + name: {{ include "graphscope-store.configmapName" . }} + defaultMode: 0755 + {{- if and .Values.store.persistence.enabled .Values.store.persistence.existingClaim }} + - name: data + persistentVolumeClaim: + claimName: {{ tpl .Values.store.persistence.existingClaim . }} + {{- else if not .Values.store.persistence.enabled }} + - name: data + emptyDir: {} + {{- else if and .Values.store.persistence.enabled (not .Values.store.persistence.existingClaim) }} + volumeClaimTemplates: + - metadata: + name: data + {{- if .Values.persistence.annotations }} + annotations: {{- include "common.tplvalues.render" (dict "value" .Values.persistence.annotations "context" $) | nindent 10 }} + {{- end }} + {{- if .Values.persistence.labels }} + labels: {{- include "common.tplvalues.render" (dict "value" .Values.persistence.labels "context" $) | nindent 10 }} + {{- end }} + spec: + accessModes: + {{- range .Values.persistence.accessModes }} + - {{ . | quote }} + {{- end }} + resources: + requests: + storage: {{ .Values.store.persistence.size | quote }} + {{ include "graphscope-store.storageClass" . | nindent 8 }} + {{- if .Values.store.persistence.selector }} + selector: {{- include "graphscope-store.tplvalues.render" (dict "value" .Values.store.persistence.selector "context" $) | nindent 10 }} + {{- end -}} + {{- end }} +{{- end }} diff --git a/charts/graphscope-store/templates/store/statefulset.yaml b/charts/graphscope-store/templates/store/statefulset.yaml index ba087dbfc3a8..28e404a7992a 100644 --- a/charts/graphscope-store/templates/store/statefulset.yaml +++ b/charts/graphscope-store/templates/store/statefulset.yaml @@ -123,6 +123,8 @@ spec: value: {{ $kafkaFullname}}-headless.{{ $releaseNamespace }} - name: KAFKA_SERVERS value: {{ include "graphscope-store.kafka.brokerlist" . }} + - name: ZK_CONNECT + value: {{ printf "%s-headless" (include "graphscope-store.zookeeper.fullname" .) | quote }} ports: - name: port containerPort: 55555 @@ -133,6 +135,14 @@ spec: {{- if .Values.store.resources }} resources: {{- toYaml .Values.store.resources | nindent 12 }} {{- end }} + readinessProbe: + failureThreshold: 5 + tcpSocket: + port: gaia-rpc + initialDelaySeconds: 30 + periodSeconds: 30 + successThreshold: 1 + timeoutSeconds: 1 volumeMounts: - name: data mountPath: {{ .Values.storeDataPath }} diff --git a/charts/graphscope-store/values.yaml b/charts/graphscope-store/values.yaml index 122ebe409982..a410badca7ec 100644 --- a/charts/graphscope-store/values.yaml +++ b/charts/graphscope-store/values.yaml @@ -441,6 +441,9 @@ kafka: port: 9092 socketRequestMaxBytes: _1048576000 +## Zk Config +zkBasePath: "/graphscope/groot" + ## This value is only used when kafka.enabled is set to false ## externalKafka: @@ -494,3 +497,6 @@ pegasus: secondary: enabled: false storeDataPath: "./data_secondary" + +backup: + enabled: false diff --git a/docs/storage_engine/groot.md b/docs/storage_engine/groot.md index 2e2ef7c49ca5..0f87adc7851f 100644 --- a/docs/storage_engine/groot.md +++ b/docs/storage_engine/groot.md @@ -692,3 +692,7 @@ And use a different `zk.base.path` for each secondary instance to avoid conflict ### Traces use `--set otel.enabled=true` to enable trace export. + +### Write High-availability + +use `--set write.ha.enabled=True` in multi-pod deployment mode to open a backup store pod. diff --git a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java index d5b6e3aa95ed..a5ab7010dc19 100644 --- a/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java +++ b/interactive_engine/common/src/main/java/com/alibaba/graphscope/groot/common/config/CommonConfig.java @@ -74,12 +74,11 @@ public class CommonConfig { public static final Config SECONDARY_INSTANCE_ENABLED = Config.boolConfig("secondary.instance.enabled", false); + public static final Config TRACING_ENABLED = + Config.boolConfig("tracing.enabled", false); // Create an extra store pod for each original store pod for backup. // Only available in multi pod mode. - public static final Config WRITE_HIGH_AVAILABILITY_ENABLED = - Config.boolConfig("write.high.availability.enabled", false); - - public static final Config TRACING_ENABLED = - Config.boolConfig("tracing.enabled", false); + public static final Config WRITE_HA_ENABLED = + Config.boolConfig("write.ha.enabled", false); } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/CuratorUtils.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/CuratorUtils.java index 45bf1a9988f9..cd4a1b991bb4 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/CuratorUtils.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/CuratorUtils.java @@ -21,14 +21,19 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.state.ConnectionState; +import org.apache.curator.framework.state.ConnectionStateErrorPolicy; import org.apache.curator.retry.BoundedExponentialBackoffRetry; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.charset.StandardCharsets; import java.util.List; public class CuratorUtils { + private static final Logger logger = LoggerFactory.getLogger(CuratorUtils.class); public static CuratorFramework makeCurator(Configs configs) { String connectionString = ZkConfig.ZK_CONNECT_STRING.get(configs); @@ -41,11 +46,13 @@ public static CuratorFramework makeCurator(Configs configs) { String authUser = ZkConfig.ZK_AUTH_USER.get(configs); String authPassword = ZkConfig.ZK_AUTH_PASSWORD.get(configs); - BoundedExponentialBackoffRetry policy = + BoundedExponentialBackoffRetry retryPolicy = new BoundedExponentialBackoffRetry(baseSleepMs, maxSleepMs, maxRetry); + ConnectionStateErrorPolicy errorPolicy = state -> state == ConnectionState.LOST; CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); builder.connectString(connectionString) - .retryPolicy(policy) + .retryPolicy(retryPolicy) + .connectionStateErrorPolicy(errorPolicy) .sessionTimeoutMs(sessionTimeoutMs) .connectionTimeoutMs(connectionTimeoutMs); if (authEnable) { diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/Utils.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/Utils.java index 0cd6daed8265..7c3aec9200f4 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/Utils.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/Utils.java @@ -17,13 +17,23 @@ import com.alibaba.graphscope.groot.common.config.CommonConfig; import com.alibaba.graphscope.groot.common.config.Configs; import com.alibaba.graphscope.groot.common.config.DiscoveryConfig; +import com.alibaba.graphscope.groot.common.config.StoreConfig; import com.alibaba.graphscope.groot.operation.OperationBatch; import com.alibaba.graphscope.groot.operation.OperationBlob; import com.alibaba.graphscope.groot.operation.OperationType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.Paths; import java.util.List; public class Utils { + public static final Logger logger = LoggerFactory.getLogger(Utils.class); public static final OperationBatch MARKER_BATCH = OperationBatch.newBuilder() .addOperationBlob(OperationBlob.MARKER_OPERATION_BLOB) @@ -116,4 +126,40 @@ public static OperationBatch extractOperations( } return batchBuilder.build(); } + + public static boolean fileExists(String name) { + return new File(name).exists(); + } + + public static boolean fileClosed(String name) { + String filePath = new File(name).getAbsolutePath(); + try { + Process proc = new ProcessBuilder("lsof", filePath).start(); + try (BufferedReader reader = + new BufferedReader(new InputStreamReader(proc.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + if (line.contains(filePath)) { + return false; + } + } + } + } catch (IOException e) { + logger.error("Exception when checking file {}", filePath, e); + return false; + } + return true; + } + + // Check if the lock of 0 is available + public static boolean isLockAvailable(Configs configs) { + String dataRoot = StoreConfig.STORE_DATA_PATH.get(configs); + // Get the LOCK file of first partition + String LOCK = Paths.get(dataRoot, "" + 0, "LOCK").toAbsolutePath().toString(); + if (fileExists(LOCK) && !fileClosed(LOCK)) { + logger.warn("LOCK {} is unavailable", LOCK); + return false; + } + return true; + } } diff --git a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java index 27997ed25c72..10de7d54cbe0 100644 --- a/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java +++ b/interactive_engine/groot-module/src/main/java/com/alibaba/graphscope/groot/store/StoreService.java @@ -86,7 +86,7 @@ public StoreService( this.writeThreadCount = StoreConfig.STORE_WRITE_THREAD_COUNT.get(storeConfigs); this.metaService = metaService; this.isSecondary = CommonConfig.SECONDARY_INSTANCE_ENABLED.get(storeConfigs); - metricsCollector.register(this, () -> updateMetrics()); + metricsCollector.register(this, this::updateMetrics); } public void start() throws IOException { diff --git a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/GrootGraph.java b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/GrootGraph.java index f03801e4c735..419ed5a0afe8 100644 --- a/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/GrootGraph.java +++ b/interactive_engine/groot-server/src/main/java/com/alibaba/graphscope/groot/servers/GrootGraph.java @@ -13,27 +13,24 @@ */ package com.alibaba.graphscope.groot.servers; +import com.alibaba.graphscope.groot.CuratorUtils; +import com.alibaba.graphscope.groot.Utils; import com.alibaba.graphscope.groot.common.RoleType; import com.alibaba.graphscope.groot.common.config.*; import com.alibaba.graphscope.groot.common.exception.GrootException; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - public class GrootGraph { private static final Logger logger = LoggerFactory.getLogger(GrootGraph.class); - public static void main(String[] args) throws IOException { + public static void main(String[] args) throws Exception { String configFile = System.getProperty("config.file"); Configs conf = new Configs(configFile); - if (CommonConfig.SECONDARY_INSTANCE_ENABLED.get(conf)) { - conf = - Configs.newBuilder(conf) - .put(StoreConfig.STORE_STORAGE_ENGINE.getKey(), "rocksdb_as_secondary") - .build(); - } + conf = reConfig(conf); logger.info("Configs {}", conf); NodeBase node; @@ -60,8 +57,59 @@ public static void main(String[] args) throws IOException { default: throw new IllegalArgumentException("invalid roleType [" + roleType + "]"); } + + boolean writeHAEnabled = CommonConfig.WRITE_HA_ENABLED.get(conf); + LeaderLatch latch; + if (writeHAEnabled && roleType == RoleType.STORE) { + int nodeID = CommonConfig.NODE_IDX.get(conf); + String latchPath = ZkConfig.ZK_BASE_PATH.get(conf) + "/store/leader/" + nodeID; + CuratorFramework curator = CuratorUtils.makeCurator(conf); + curator.start(); + try { + while (true) { + latch = new LeaderLatch(curator, latchPath); + latch.start(); + logger.info( + "latch id: {}, leader: {}, state: {}", + latch.getId(), + latch.getLeader(), + latch.getState()); + latch.await(); + // Sleep 5s before check the lock to prevent the leader has not + // released the resource yet. + Thread.sleep(5000); + if (Utils.isLockAvailable(conf)) { + logger.info("LOCK is available, node starting"); + break; + } + latch.close(); + logger.info("LOCK is unavailable, the leader may still exists"); + // The leader has lost connection but still alive, + // give it another chance + Thread.sleep(60000); + } + } catch (Exception e) { + logger.error("Exception while leader election", e); + throw e; + } + // curator.close(); + } } - new NodeLauncher(node).start(); + NodeLauncher launcher = new NodeLauncher(node); + launcher.start(); logger.info("node started. [" + node.getName() + "]"); } + + private static Configs reConfig(Configs in) { + Configs.Builder out = Configs.newBuilder(in); + if (CommonConfig.SECONDARY_INSTANCE_ENABLED.get(in)) { + out.put(StoreConfig.STORE_STORAGE_ENGINE.getKey(), "rocksdb_as_secondary"); + } + if (CommonConfig.WRITE_HA_ENABLED.get(in)) { + logger.info("Write HA mode needs discovery mode to be 'zookeeper'"); + out.put(CommonConfig.DISCOVERY_MODE.getKey(), "zookeeper"); + } + + return out.build(); + } } diff --git a/k8s/dockerfiles/graphscope-store.Dockerfile b/k8s/dockerfiles/graphscope-store.Dockerfile index a1125a41dce8..c70fbda92093 100644 --- a/k8s/dockerfiles/graphscope-store.Dockerfile +++ b/k8s/dockerfiles/graphscope-store.Dockerfile @@ -40,7 +40,7 @@ RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && \ echo '$TZ' > /etc/timezone RUN apt-get update -y && \ - apt-get install -y sudo default-jdk dnsutils tzdata \ + apt-get install -y sudo default-jdk dnsutils tzdata lsof \ libjemalloc-dev libunwind-dev binutils less python3 python3-pip && \ apt-get clean -y && \ rm -rf /var/lib/apt/lists/*