Skip to content

Commit f657056

Browse files
mingmwangGitHub Enterprise
authored andcommitted
Merge pull request #2 from xuluan/am-onyarn
[CARMEL-3186] Run spark thrift server in yarn cluster mode for spark3
2 parents 42beda2 + d1515a1 commit f657056

File tree

3 files changed

+150
-2
lines changed

3 files changed

+150
-2
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,6 @@ private[spark] class SparkSubmit extends Logging {
281281
error("Cluster deploy mode is not applicable to Spark shells.")
282282
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
283283
error("Cluster deploy mode is not applicable to Spark SQL shell.")
284-
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
285-
error("Cluster deploy mode is not applicable to Spark Thrift server.")
286284
case _ =>
287285
}
288286

sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,10 @@ object HiveThriftServer2 extends Logging {
115115
logError("SparkContext has stopped even if HiveServer2 has started, so exit")
116116
System.exit(-1)
117117
}
118+
119+
while (server.started.get()) {
120+
Thread.sleep(1000)
121+
}
118122
} catch {
119123
case e: Exception =>
120124
logError("Error starting HiveThriftServer2", e)

sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/server/HiveServer2.java

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@
1818

1919
package org.apache.hive.service.server;
2020

21+
import static org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR;
22+
23+
import java.io.IOException;
24+
import java.nio.charset.StandardCharsets;
2125
import java.util.Properties;
26+
import java.util.concurrent.TimeUnit;
2227

2328
import scala.runtime.AbstractFunction0;
2429
import scala.runtime.BoxedUnit;
@@ -29,14 +34,26 @@
2934
import org.apache.commons.cli.OptionBuilder;
3035
import org.apache.commons.cli.Options;
3136
import org.apache.commons.cli.ParseException;
37+
import org.apache.curator.framework.CuratorFramework;
38+
import org.apache.curator.framework.CuratorFrameworkFactory;
39+
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
40+
import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode;
41+
import org.apache.curator.retry.ExponentialBackoffRetry;
3242
import org.apache.hadoop.hive.common.JvmPauseMonitor;
3343
import org.apache.hadoop.hive.conf.HiveConf;
44+
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
45+
import org.apache.hadoop.hive.ql.util.ZooKeeperHiveHelper;
46+
import org.apache.hadoop.hive.shims.Utils;
47+
import org.apache.hadoop.security.UserGroupInformation;
3448
import org.apache.hive.common.util.HiveStringUtils;
3549
import org.apache.hive.service.CompositeService;
50+
import org.apache.hive.service.ServiceException;
3651
import org.apache.hive.service.cli.CLIService;
3752
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService;
3853
import org.apache.hive.service.cli.thrift.ThriftCLIService;
3954
import org.apache.hive.service.cli.thrift.ThriftHttpCLIService;
55+
import org.apache.zookeeper.CreateMode;
56+
import org.apache.zookeeper.KeeperException;
4057
import org.slf4j.Logger;
4158
import org.slf4j.LoggerFactory;
4259

@@ -51,6 +68,8 @@ public class HiveServer2 extends CompositeService {
5168

5269
private CLIService cliService;
5370
private ThriftCLIService thriftCLIService;
71+
private PersistentEphemeralNode znode;
72+
private CuratorFramework zooKeeperClient;
5473

5574
public HiveServer2() {
5675
super(HiveServer2.class.getSimpleName());
@@ -103,12 +122,139 @@ public static boolean isHTTPTransportMode(HiveConf hiveConf) {
103122
@Override
104123
public synchronized void start() {
105124
super.start();
125+
HiveConf hiveConf = this.getHiveConf();
126+
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
127+
try {
128+
addServerInstanceToZooKeeper(hiveConf);
129+
} catch (Exception e) {
130+
LOG.error("Error adding this HiveServer2 instance to ZooKeeper: ", e);
131+
throw new ServiceException(e);
132+
}
133+
}
106134
}
107135

108136
@Override
109137
public synchronized void stop() {
110138
LOG.info("Shutting down HiveServer2");
139+
HiveConf hiveConf = this.getHiveConf();
111140
super.stop();
141+
// Remove this server instance from ZooKeeper if dynamic service discovery is set
142+
if (hiveConf != null && hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) {
143+
try {
144+
removeServerInstanceFromZooKeeper();
145+
} catch (Exception e) {
146+
LOG.error("Error removing znode for this HiveServer2 instance from ZooKeeper.", e);
147+
}
148+
}
149+
}
150+
151+
private void removeServerInstanceFromZooKeeper() throws Exception {
152+
if (znode != null) {
153+
znode.close();
154+
}
155+
zooKeeperClient.close();
156+
LOG.info("Server instance removed from ZooKeeper.");
157+
}
158+
159+
private void addServerInstanceToZooKeeper(HiveConf hiveConf) throws Exception {
160+
String appId = System.getProperty("spark.yarn.app.id");
161+
if (appId == null || appId.trim().isEmpty()) {
162+
LOG.error("Only cluster mode thrift server register to ZK, invalid application is {}", appId);
163+
return;
164+
}
165+
zooKeeperClient = startZookeeperClient(hiveConf);
166+
// Create a znode under the rootNamespace parent for this instance of the server
167+
String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
168+
String queue = hiveConf.get("spark.yarn.queue");
169+
if (queue == null) {
170+
queue = "hdmi-default";
171+
}
172+
String ephemeralPath = appId + ":" + getServerInstanceURI();
173+
String zkPath = ZOOKEEPER_PATH_SEPARATOR + rootNamespace
174+
+ ZOOKEEPER_PATH_SEPARATOR + queue + ZOOKEEPER_PATH_SEPARATOR + ephemeralPath;
175+
LOG.info("Add server instance to zookeeper on {}", zkPath);
176+
try {
177+
znode = new PersistentEphemeralNode(zooKeeperClient,
178+
Mode.EPHEMERAL, zkPath, ephemeralPath.getBytes(StandardCharsets.UTF_8));
179+
znode.start();
180+
// We'll wait for 120s for node creation
181+
long znodeCreationTimeout = 120;
182+
if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) {
183+
throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted");
184+
}
185+
} catch (Exception e) {
186+
LOG.error("Unable to create a znode for this server instance", e);
187+
if (znode != null) {
188+
znode.close();
189+
}
190+
throw (e);
191+
}
192+
}
193+
194+
private CuratorFramework startZookeeperClient(HiveConf hiveConf) throws Exception {
195+
setUpZooKeeperAuth(hiveConf);
196+
String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
197+
String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
198+
int sessionTimeout =
199+
(int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
200+
TimeUnit.MILLISECONDS);
201+
int baseSleepTime =
202+
(int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
203+
TimeUnit.MILLISECONDS);
204+
int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
205+
// Create a CuratorFramework instance to be used as the ZooKeeper client
206+
// Use the zooKeeperAclProvider to create appropriate ACLs
207+
CuratorFramework zooKeeperClient =
208+
CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
209+
.sessionTimeoutMs(sessionTimeout)
210+
.retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build();
211+
zooKeeperClient.start();
212+
// Create the parent znodes recursively; ignore if the parent already exists.
213+
String queue = hiveConf.get("spark.yarn.queue");
214+
if (queue == null) {
215+
queue = "hdmi-default";
216+
}
217+
try {
218+
zooKeeperClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
219+
.forPath(ZOOKEEPER_PATH_SEPARATOR + rootNamespace + ZOOKEEPER_PATH_SEPARATOR + queue);
220+
LOG.info("Created the root name space: " + rootNamespace + " on ZooKeeper for HiveServer2");
221+
} catch (KeeperException e) {
222+
if (e.code() != KeeperException.Code.NODEEXISTS) {
223+
LOG.error("Unable to create HiveServer2 namespace: " + rootNamespace + " on ZooKeeper", e);
224+
throw e;
225+
}
226+
}
227+
return zooKeeperClient;
228+
}
229+
230+
/**
231+
* For a kerberized cluster, we dynamically set up the client's JAAS conf.
232+
*
233+
* @param hiveConf
234+
* @return
235+
* @throws Exception
236+
*/
237+
private void setUpZooKeeperAuth(HiveConf hiveConf) throws Exception {
238+
if (UserGroupInformation.isSecurityEnabled()) {
239+
String principal = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL);
240+
if (principal.isEmpty()) {
241+
throw new IOException("HiveServer2 Kerberos principal is empty");
242+
}
243+
String keyTabFile = hiveConf.getVar(ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB);
244+
if (keyTabFile.isEmpty()) {
245+
throw new IOException("HiveServer2 Kerberos keytab is empty");
246+
}
247+
// Install the JAAS Configuration for the runtime
248+
Utils.setZookeeperClientKerberosJaasConfig(principal, keyTabFile);
249+
}
250+
}
251+
252+
private String getServerInstanceURI() throws Exception {
253+
if ((thriftCLIService == null) || (thriftCLIService.getServerIPAddress() == null)) {
254+
throw new Exception("Unable to get the server address; it hasn't been initialized yet.");
255+
}
256+
return thriftCLIService.getServerIPAddress().getHostName() + ":"
257+
+ thriftCLIService.getPortNumber();
112258
}
113259

114260
private static void startHiveServer2() throws Throwable {

0 commit comments

Comments
 (0)