Skip to content

Commit 7f23780

Browse files
matyixsancyx
authored andcommitted
ZEPPELIN-3021. Add support to run Spark interpreter on a Kubernetes cluster
1 parent 3712ce6 commit 7f23780

File tree

11 files changed

+908
-8
lines changed

11 files changed

+908
-8
lines changed

bin/common.sh

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,14 @@ JAVA_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
122122
export JAVA_OPTS
123123

124124
JAVA_INTP_OPTS="${ZEPPELIN_INTP_JAVA_OPTS} -Dfile.encoding=${ZEPPELIN_ENCODING}"
125-
if [[ -z "${ZEPPELIN_SPARK_YARN_CLUSTER}" ]]; then
126-
JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
127-
else
125+
126+
if [[ ! -z "${ZEPPELIN_SPARK_YARN_CLUSTER}" ]]; then
128127
JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_yarn_cluster.properties"
128+
129+
elif [[ ! -z "${ZEPPELIN_SPARK_K8_CLUSTER}" ]]; then
130+
JAVA_INTP_OPTS+=" -Dlog4j.configuration=log4j_k8_cluster.properties"
131+
else
132+
JAVA_INTP_OPTS+=" -Dlog4j.configuration=file://${ZEPPELIN_CONF_DIR}/log4j.properties"
129133
fi
130134
export JAVA_INTP_OPTS
131135

bin/interpreter.sh

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,10 @@ if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]]; then
105105
ZEPPELIN_LOGFILE+="${ZEPPELIN_IMPERSONATE_USER}-"
106106
fi
107107
ZEPPELIN_LOGFILE+="${ZEPPELIN_IDENT_STRING}-${HOSTNAME}.log"
108-
JAVA_INTP_OPTS+=" -Dzeppelin.log.file=${ZEPPELIN_LOGFILE}"
108+
109+
if [[ -z "${ZEPPELIN_SPARK_K8_CLUSTER}" ]]; then
110+
JAVA_INTP_OPTS+=" -Dzeppelin.log.file=${ZEPPELIN_LOGFILE}"
111+
fi
109112

110113
if [[ ! -d "${ZEPPELIN_LOG_DIR}" ]]; then
111114
echo "Log dir doesn't exist, create ${ZEPPELIN_LOG_DIR}"
@@ -228,6 +231,7 @@ if [[ ! -z "$ZEPPELIN_IMPERSONATE_USER" ]] && [[ -n "${suid}" || -z "${SPARK_SUB
228231
INTERPRETER_RUN_COMMAND+="'"
229232
fi
230233

234+
echo $INTERPRETER_RUN_COMMAND
231235
eval $INTERPRETER_RUN_COMMAND &
232236
pid=$!
233237

conf/log4j_k8_cluster.properties

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
log4j.rootLogger = INFO, stdout
19+
20+
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
21+
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
22+
log4j.appender.stdout.layout.ConversionPattern=%5p [%d] ({%t} %F[%M]:%L) - %m%n
23+
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
---
2+
layout: page
3+
title: "Apache Spark Interpreter for Apache Zeppelin on Kubernetes"
4+
description: "Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution engine. This interpreter runs on the https://github.com/apache-spark-on-k8s/spark version of Spark"
5+
group: interpreter
6+
---
7+
<!--
8+
Licensed under the Apache License, Version 2.0 (the "License");
9+
you may not use this file except in compliance with the License.
10+
You may obtain a copy of the License at
11+
12+
http://www.apache.org/licenses/LICENSE-2.0
13+
14+
Unless required by applicable law or agreed to in writing, software
15+
distributed under the License is distributed on an "AS IS" BASIS,
16+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
See the License for the specific language governing permissions and
18+
limitations under the License.
19+
-->
20+
{% include JB/setup %}
21+
22+
# How to run Zeppelin Spark notebooks on a Kubernetes cluster
23+
24+
<div id="toc"></div>
25+
26+
## Prerequisites
27+
28+
The following tools are required:
29+
30+
- Kubernetes cluster & kubectl
31+
32+
For local testing Minikube can be used to create a single node cluster: https://kubernetes.io/docs/tasks/tools/install-minikube/
33+
34+
- Docker https://kubernetes.io/docs/tasks/tools/install-minikube/
35+
36+
This documentation uses a pre-built Spark 2.2 Docker images, however you may also build these images as described here: https://github.com/apache-spark-on-k8s/spark/blob/branch-2.2-kubernetes/resource-managers/kubernetes/README.md
37+
38+
## Checkout Zeppelin source code
39+
40+
Checkout the latest source code from https://github.com/apache/zeppelin then apply changes from the [Add support to run Spark interpreter on a Kubernetes cluster](https://github.com/apache/zeppelin/pull/2637) pull request.
41+
42+
## Build Zeppelin
43+
- `./dev/change_scala_version.sh 2.11`
44+
- `mvn clean install -DskipTests -Pspark-2.2 -Phadoop-2.4 -Pyarn -Ppyspark -Pscala-2.11`
45+
46+
47+
## Create distribution
48+
- `cd zeppelin-distribution`
49+
- `mvn org.apache.maven.plugins:maven-assembly-plugin:3.0.0:single -P apache-release`
50+
51+
## Create Zeppelin Dockerfile in Zeppelin distribution target folder
52+
```
53+
cd {zeppelin_source}/zeppelin-distribution/target/zeppelin-0.8.0-SNAPSHOT
54+
cat > Dockerfile <<EOF
55+
FROM kubespark/spark-base:v2.2.0-kubernetes-0.5.0
56+
COPY zeppelin-0.8.0-SNAPSHOT /opt/zeppelin
57+
ADD https://storage.googleapis.com/kubernetes-release/release/v1.7.4/bin/linux/amd64/kubectl /usr/local/bin
58+
WORKDIR /opt/zeppelin
59+
ENTRYPOINT bin/zeppelin.sh
60+
EOF
61+
```
62+
63+
## Create / Start a Kubernetes cluster
64+
In case of using Minikube on Linux with KVM:
65+
66+
`minikube start --vm-driver=kvm --cpus={nr_of_cpus} --memory={mem}`
67+
68+
You can check the Kubernetes dashboard address by running: `minikube dashboard`.
69+
70+
Init docker env: `eval $(minikube docker-env)`
71+
72+
## Build & tag Docker image
73+
74+
```
75+
docker build -t zeppelin-server:v2.2.0-kubernetes -f Dockerfile .
76+
```
77+
78+
You can retrieve the `imageid` by running docker images`
79+
80+
## Start ResourceStagingServer for spark-submit
81+
82+
Spark-submit will use ResourceStagingServer to distribute resources (in our case the Zeppelin Spark interpreter JAR) across Spark driver and executors.
83+
84+
```
85+
wget https://github.com/apache-spark-on-k8s/spark/blob/branch-2.2-kubernetes/conf/kubernetes-resource-staging-server.yaml
86+
kubectl create -f kubernetes-resource-staging-server.yaml
87+
```
88+
89+
## Create a Kubernetes service to reach Zeppelin server from outside the cluster
90+
91+
```
92+
cat > zeppelin-service.yaml <<EOF
93+
apiVersion: v1
94+
kind: Service
95+
metadata:
96+
name: zeppelin-k8-service
97+
labels:
98+
app: zeppelin-server
99+
spec:
100+
ports:
101+
- port: 8080
102+
targetPort: 8080
103+
selector:
104+
app: zeppelin-server
105+
type: NodePort
106+
EOF
107+
108+
kubectl create -f zeppelin-service.yaml
109+
110+
```
111+
112+
## Start Zeppelin server
113+
114+
```
115+
cat > zeppelin-pod-local.yaml <<EOF
116+
apiVersion: v1
117+
kind: Pod
118+
metadata:
119+
name: zeppelin-server
120+
labels:
121+
app: zeppelin-server
122+
spec:
123+
containers:
124+
- name: zeppelin-server
125+
image: zeppelin-server:v2.2.0-kubernetes
126+
env:
127+
- name: SPARK_SUBMIT_OPTIONS
128+
value: --kubernetes-namespace default
129+
--conf spark.executor.instances=1
130+
--conf spark.kubernetes.resourceStagingServer.uri=http://{RESOURCE_STAGING_SERVER_ADDRESS}:10000
131+
--conf spark.kubernetes.resourceStagingServer.internal.uri=http://{RESOURCE_STAGING_SERVER_ADDRESS}:10000
132+
--conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:v2.2.0-kubernetes-0.5.0 --conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:v2.2.0-kubernetes-0.5.0 --conf spark.kubernetes.initcontainer.docker.image=kubespark/spark-init:v2.2.0-kubernetes-0.5.0
133+
ports:
134+
- containerPort: 8080
135+
EOF
136+
```
137+
138+
## Edit SPARK_SUBMIT_OPTIONS:
139+
140+
- Set RESOURCE_STAGING_SERVER_ADDRESS address retrieving either from K8 dashboard or running:
141+
142+
`kubectl get svc spark-resource-staging-service -o jsonpath='{.spec.clusterIP}'`
143+
144+
## Start Zeppelin server:
145+
146+
`kubectl create -f zeppelin-pod-local.yaml`
147+
148+
You can retrieve Zeppelin server address either from K8 dashboard or using kubectl.
149+
Zeppelin server should be reachable from outside of K8 cluster on K8 node address (same as in k8 master url KUBERNATES_NODE_ADDRESS) and nodePort property returned by running:
150+
151+
`kubectl get svc --selector=app=zeppelin-server -o jsonpath='{.items[0].spec.ports}'.`
152+
153+
## Edit spark interpreter settings
154+
Set master url to point to your Kubernetes cluster: k8s://https://x.x.x.x:8443 or use default address which works inside a Kubernetes cluster:
155+
k8s://https://kubernetes:443.
156+
Add property 'spark.submit.deployMode' and set value to 'cluster'.
157+
158+
159+
## Run ’Zeppelin Tutorial/Basic Features (Spark)’ notebook
160+
In case of problems you can check for spark-submit output in Zeppelin logs after logging into zeppelin-server pod and restart Spark interpreter to try again.
161+
162+
`kubectl exec -it zeppelin-server bash`
163+
Logs files are in /opt/zeppelin/logs folder.

docs/interpreter/spark.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,9 +204,16 @@ Zeppelin support both yarn client and yarn cluster mode (yarn cluster mode is su
204204
You can either specify them in `zeppelin-env.sh`, or in interpreter setting page. Specifying them in `zeppelin-env.sh` means you can use only one version of `spark` & `hadoop`. Specifying them
205205
in interpreter setting page means you can use multiple versions of `spark` & `hadoop` in one zeppelin instance.
206206

207+
<<<<<<< HEAD
207208
### 4. New Version of SparkInterpreter
208209
There's one new version of SparkInterpreter starting with better spark support and code completion from Zeppelin 0.8.0, by default we still use the old version of SparkInterpreter.
209210
If you want to use the new one, you can configure `zeppelin.spark.useNew` as `true` in its interpreter setting.
211+
=======
212+
### 4. Kubernetes cluster modules
213+
214+
Zeppelin supports running Spark notebooks on Kubernetes in cluster mode, you can find more detailed description here: [How to run Zeppelin Spark notebooks on a Kubernetes cluster
215+
](../interpreter/spark-interpreter-k8s.html)
216+
>>>>>>> ZEPPELIN-3021. Add support to run Spark interpreter on a Kubernetes cluster
210217
211218
## SparkContext, SQLContext, SparkSession, ZeppelinContext
212219
SparkContext, SQLContext and ZeppelinContext are automatically created and exposed as variable names `sc`, `sqlContext` and `z`, respectively, in Scala, Python and R environments.

zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,13 +260,16 @@ public static void main(String[] args)
260260
String callbackHost = null;
261261
int port = Constants.ZEPPELIN_INTERPRETER_DEFAUlT_PORT;
262262
String portRange = ":";
263-
if (args.length > 0) {
263+
if (args.length == 1) {
264+
port = Integer.parseInt(args[0]);
265+
} else if (args.length > 0) {
264266
callbackHost = args[0];
265267
port = Integer.parseInt(args[1]);
266268
if (args.length > 2) {
267269
portRange = args[2];
268270
}
269271
}
272+
270273
RemoteInterpreterServer remoteInterpreterServer =
271274
new RemoteInterpreterServer(callbackHost, port, portRange);
272275
remoteInterpreterServer.start();

zeppelin-zengine/pom.xml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,37 @@
257257
<version>1.5</version>
258258
</dependency>
259259

260+
<dependency>
261+
<groupId>org.mongodb</groupId>
262+
<artifactId>mongo-java-driver</artifactId>
263+
<version>3.4.1</version>
264+
</dependency>
265+
266+
<dependency>
267+
<groupId>io.fabric8</groupId>
268+
<artifactId>kubernetes-client</artifactId>
269+
<version>3.0.0</version>
270+
<exclusions>
271+
<exclusion>
272+
<groupId>com.fasterxml.jackson.core</groupId>
273+
<artifactId>jackson-core</artifactId>
274+
</exclusion>
275+
<exclusion>
276+
<groupId>com.fasterxml.jackson.core</groupId>
277+
<artifactId>jackson-databind</artifactId>
278+
</exclusion>
279+
<exclusion>
280+
<groupId>com.fasterxml.jackson.core</groupId>
281+
<artifactId>jackson-annotations</artifactId>
282+
</exclusion>
283+
<exclusion>
284+
<groupId>com.fasterxml.jackson.module</groupId>
285+
<artifactId>jackson-module-jaxb-annotations</artifactId>
286+
</exclusion>
287+
</exclusions>
288+
</dependency>
289+
290+
260291
</dependencies>
261292

262293
<build>

zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher;
3838
import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher;
3939
import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher;
40+
import org.apache.zeppelin.interpreter.launcher.SparkK8SInterpreterLauncher;
4041
import org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager;
4142
import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
4243
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
@@ -295,9 +296,16 @@ public InterpreterSetting(InterpreterSetting o) {
295296
this.conf = o.getConf();
296297
}
297298

298-
private void createLauncher() {
299+
private void createLauncher(Properties properties) {
299300
if (group.equals("spark")) {
300-
this.launcher = new SparkInterpreterLauncher(this.conf, this.recoveryStorage);
301+
String deployMode = properties.getProperty("spark.submit.deployMode");
302+
String masterUrl = properties.getProperty("master");
303+
if (deployMode != null && deployMode.equals("cluster") &&
304+
masterUrl != null && masterUrl.startsWith("k8s://")) {
305+
this.launcher = new SparkK8SInterpreterLauncher(this.conf, this.recoveryStorage);
306+
} else {
307+
this.launcher = new SparkInterpreterLauncher(this.conf, this.recoveryStorage);
308+
}
301309
} else {
302310
this.launcher = new ShellScriptLauncher(this.conf, this.recoveryStorage);
303311
}
@@ -709,7 +717,7 @@ synchronized RemoteInterpreterProcess createInterpreterProcess(String interprete
709717
Properties properties)
710718
throws IOException {
711719
if (launcher == null) {
712-
createLauncher();
720+
createLauncher(properties);
713721
}
714722
InterpreterLaunchContext launchContext = new
715723
InterpreterLaunchContext(properties, option, interpreterRunner, userName,

0 commit comments

Comments
 (0)