Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/interpreter/livy.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,12 @@ Example: `spark.driver.memory` to `livy.spark.driver.memory`
<td>zeppelin.livy.ssl.trustStorePassword</td>
<td></td>
<td>password for trustStore file. Used when livy ssl is enabled</td>
</tr>
</tr>
<tr>
<td>zeppelin.livy.http.headers</td>
<td>key_1: value_1; key_2: value_2</td>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are ; and : valid separators that would not be present in any header itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<td>custom http headers when calling livy rest api. Each http header is separated by `;`, and each header is one key value pair where key value is separated by `:`</td>
</tr>
</table>

**We remove livy.spark.master in zeppelin-0.7. Because we sugguest user to use livy 0.3 in zeppelin-0.7. And livy 0.3 don't allow to specify livy.spark.master, it enfornce yarn-cluster mode.**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;


/**
Expand All @@ -80,6 +82,7 @@ public abstract class BaseLivyInterpreter extends Interpreter {
protected boolean displayAppInfo;
protected LivyVersion livyVersion;
private RestTemplate restTemplate;
private Map<String, String> customHeaders = new HashMap<>();

Set<Object> paragraphsToCancel = Collections.newSetFromMap(
new ConcurrentHashMap<Object, Boolean>());
Expand All @@ -96,6 +99,33 @@ public BaseLivyInterpreter(Properties property) {
this.pullStatusInterval = Integer.parseInt(
property.getProperty("zeppelin.livy.pull_status.interval.millis", 1000 + ""));
this.restTemplate = createRestTemplate();
if (!StringUtils.isBlank(property.getProperty("zeppelin.livy.http.headers"))) {
String[] headers = property.getProperty("zeppelin.livy.http.headers").split(";");
for (String header : headers) {
String[] splits = header.split(":", -1);
if (splits.length != 2) {
throw new RuntimeException("Invalid format of http headers: " + header +
", valid http header format is HEADER_NAME:HEADER_VALUE");
}
customHeaders.put(splits[0].trim(), envSubstitute(splits[1].trim()));
}
}
}

private String envSubstitute(String value) {
String newValue = new String(value);
Pattern pattern = Pattern.compile("\\$\\{(.*)\\}");
Matcher matcher = pattern.matcher(value);
while (matcher.find()) {
String env = matcher.group(1);
newValue = newValue.replace("${" + env + "}", System.getenv(env));
}
return newValue;
}

// only for testing
Map<String, String> getCustomHeaders() {
return customHeaders;
}

public abstract String getSessionKind();
Expand Down Expand Up @@ -523,6 +553,9 @@ private String callRestAPI(String targetURL, String method, String jsonData)
HttpHeaders headers = new HttpHeaders();
headers.add("Content-Type", MediaType.APPLICATION_JSON_UTF8_VALUE);
headers.add("X-Requested-By", "zeppelin");
for (Map.Entry<String, String> entry : customHeaders.entrySet()) {
headers.add(entry.getKey(), entry.getValue());
}
ResponseEntity<String> response = null;
try {
if (method.equals("POST")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,17 @@ public void setUp() {
properties.setProperty("zeppelin.livy.url", "http://localhost:8998");
properties.setProperty("zeppelin.livy.session.create_timeout", "120");
properties.setProperty("zeppelin.livy.spark.sql.maxResult", "3");
properties.setProperty("zeppelin.livy.http.headers", "HEADER_1: VALUE_1_${HOME}");
sqlInterpreter = new LivySparkSQLInterpreter(properties);
}

@Test
public void testHttpHeaders() {
assertEquals(1, sqlInterpreter.getCustomHeaders().size());
assertTrue(sqlInterpreter.getCustomHeaders().get("HEADER_1").startsWith("VALUE_1_"));
assertNotEquals("VALUE_1_${HOME}", sqlInterpreter.getCustomHeaders().get("HEADER_1"));
}

@Test
public void testParseSQLOutput() {
// Empty sql output
Expand Down