diff --git a/docs/interpreter/livy.md b/docs/interpreter/livy.md index 1741a80c8b7..09bf6e1c279 100644 --- a/docs/interpreter/livy.md +++ b/docs/interpreter/livy.md @@ -144,7 +144,12 @@ Example: `spark.driver.memory` to `livy.spark.driver.memory` zeppelin.livy.ssl.trustStorePassword password for trustStore file. Used when livy ssl is enabled - + + + zeppelin.livy.http.headers + key_1: value_1; key_2: value_2 + custom http headers when calling livy rest api. Each http header is separated by `;`, and each header is one key value pair where key value is separated by `:` + **We remove livy.spark.master in zeppelin-0.7. Because we sugguest user to use livy 0.3 in zeppelin-0.7. And livy 0.3 don't allow to specify livy.spark.master, it enfornce yarn-cluster mode.** diff --git a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java index ccab09bfceb..b7253485c3f 100644 --- a/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java +++ b/livy/src/main/java/org/apache/zeppelin/livy/BaseLivyInterpreter.java @@ -62,6 +62,8 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** @@ -80,6 +82,7 @@ public abstract class BaseLivyInterpreter extends Interpreter { protected boolean displayAppInfo; protected LivyVersion livyVersion; private RestTemplate restTemplate; + private Map customHeaders = new HashMap<>(); Set paragraphsToCancel = Collections.newSetFromMap( new ConcurrentHashMap()); @@ -96,6 +99,33 @@ public BaseLivyInterpreter(Properties property) { this.pullStatusInterval = Integer.parseInt( property.getProperty("zeppelin.livy.pull_status.interval.millis", 1000 + "")); this.restTemplate = createRestTemplate(); + if (!StringUtils.isBlank(property.getProperty("zeppelin.livy.http.headers"))) { + String[] headers = property.getProperty("zeppelin.livy.http.headers").split(";"); + for (String header : headers) { + String[] splits = header.split(":", -1); + if (splits.length != 2) { + throw new RuntimeException("Invalid format of http headers: " + header + + ", valid http header format is HEADER_NAME:HEADER_VALUE"); + } + customHeaders.put(splits[0].trim(), envSubstitute(splits[1].trim())); + } + } + } + + private String envSubstitute(String value) { + String newValue = new String(value); + Pattern pattern = Pattern.compile("\\$\\{(.*)\\}"); + Matcher matcher = pattern.matcher(value); + while (matcher.find()) { + String env = matcher.group(1); + newValue = newValue.replace("${" + env + "}", System.getenv(env)); + } + return newValue; + } + + // only for testing + Map getCustomHeaders() { + return customHeaders; } public abstract String getSessionKind(); @@ -523,6 +553,9 @@ private String callRestAPI(String targetURL, String method, String jsonData) HttpHeaders headers = new HttpHeaders(); headers.add("Content-Type", MediaType.APPLICATION_JSON_UTF8_VALUE); headers.add("X-Requested-By", "zeppelin"); + for (Map.Entry entry : customHeaders.entrySet()) { + headers.add(entry.getKey(), entry.getValue()); + } ResponseEntity response = null; try { if (method.equals("POST")) { diff --git a/livy/src/test/java/org/apache/zeppelin/livy/LivySQLInterpreterTest.java b/livy/src/test/java/org/apache/zeppelin/livy/LivySQLInterpreterTest.java index fdef9b13d1a..24d70ec2b04 100644 --- a/livy/src/test/java/org/apache/zeppelin/livy/LivySQLInterpreterTest.java +++ b/livy/src/test/java/org/apache/zeppelin/livy/LivySQLInterpreterTest.java @@ -39,9 +39,17 @@ public void setUp() { properties.setProperty("zeppelin.livy.url", "http://localhost:8998"); properties.setProperty("zeppelin.livy.session.create_timeout", "120"); properties.setProperty("zeppelin.livy.spark.sql.maxResult", "3"); + properties.setProperty("zeppelin.livy.http.headers", "HEADER_1: VALUE_1_${HOME}"); sqlInterpreter = new LivySparkSQLInterpreter(properties); } + @Test + public void testHttpHeaders() { + assertEquals(1, sqlInterpreter.getCustomHeaders().size()); + assertTrue(sqlInterpreter.getCustomHeaders().get("HEADER_1").startsWith("VALUE_1_")); + assertNotEquals("VALUE_1_${HOME}", sqlInterpreter.getCustomHeaders().get("HEADER_1")); + } + @Test public void testParseSQLOutput() { // Empty sql output