diff --git a/docs/interpreter/springxd.md b/docs/interpreter/springxd.md
new file mode 100644
index 00000000000..55aa43dfb47
--- /dev/null
+++ b/docs/interpreter/springxd.md
@@ -0,0 +1,150 @@
+---
+layout: page
+title: "SpringXD Interpreter"
+description: ""
+group: manual
+---
+{% include JB/setup %}
+
+
+## SpringXD Interpreter for Apache Zeppelin
+
+### Overview
+[Spring XD](http://docs.spring.io/spring-xd/docs/current/reference/html/#_reference_guide) is a unified, distributed, and extensible service for data ingestion, real time analytics, batch processing, and data export. SpringXD helps to build big-data applications that integrate many disparate systems into one cohesive solution across a range of use-cases.
+SpringXD supports two processing abstractions: (1) [Streams](http://docs.spring.io/spring-xd/docs/current/reference/html/#streams) for event driven data, and (2) [Jobs](http://docs.spring.io/spring-xd/docs/current/reference/html/#jobs) like MR, PIG, Hive, Cascading, SQL and so on, for batch data types. Spring XD tries to blur the boundaries between the two using a channel abstraction, so that, for example, a stream can trigger a batch job, and a batch job can send events and, in turn, trigger other streams. An expressive DSL is provided for defining streams and jobs:
+
+```
+myStream = http | filter --expression=payload='foo' | log
+```
+
+Zeppelin provides interpreters for SpringXD `Streams` and `Jobs`:
+
+
+
+
Name
+
Class
+
Description
+
+
+
%xd.stream
+
SpringXdStreamInterpreter
+
Provides an environment to create SpringXD Streams - defines the ingestion of event driven data from a source to a sink that passes through any number of processors.
+
+
+
%xd.job
+
SpringXdJobInterpreter
+
Provides an environment to create SpringXD Jobs - combine interactions with standard enterprise systems (e.g. RDBMS) as well as Hadoop operations (e.g. MapReduce, HDFS, Pig, Hive or HBase).
+
+
+
+> Note: A single Zeppelin `paragraph` can contain multiple `streams` or multiple `jobs`, but it can not mix-up jobs and streams in the same paragraph!
+
+
+[](https://www.youtube.com/watch?v=uSUBe8Hgk8w)
+
+Following [video tutorial](https://www.youtube.com/watch?v=uSUBe8Hgk8w) illustrates some of the features provided by the `SpringXD Interpreter`. It shows how to build SpringXD pipeline to ingest a real-time Tweet stream into Geode, HDFS and HAWQ backends.
+
+### Create Interpreter
+
+By default Zeppelin creates one `SpringXD` interpreter instance that communicates with the backend SpringXD system
+
+To create new SpringXD instance open the `Interpreter` section and click the `+Create` button. Pick a `Name` of your choice and from the `Interpreter` drop-down select `xd`. Then follow the configuration instructions and `Save` the new instance.
+
+> Note: The `Name` of the instance is used only to distinct the instances while binding them to the `Notebook`. The `Name` is irrelevant inside the `Notebook`. In the `Notebook` you must use `%xd.stream` or `%xd.job` tag.
+
+### Bind to Notebook
+In the `Notebook` click on the `settings` icon in the top right corner. Then select/deselect the XD interpreter to be bound with the `Notebook`.
+
+### Configuration
+You can modify the configuration of the SpringXD from the `Interpreter` section. The SpringXD interpreter express the following properties:
+
+
+
+
Property Name
+
Description
+
Default Value
+
+
+
springxd.url
+
The URL for SpringXD REST API.
+
http://localhost:9393
+
+
+
+### How to use
+Following sections will explain how to create and destroy Streams and Jobs inside the Zeppelin paragraphs.
+> Tip: Use `CTRL + .` for SpringXD auto-completion.
+
+
+#### XD Streams
+
+* Start the paragraphs with the full `%xd.stream` prefix tag to identifie the SpringXD Streams interpreter.
+* Use `Ctrl+.` for auto-completion.
+* In the paragraph define one or more stream definitions - each on a separate line.
+* Every stream definition must have name. Follow the convention: `stream name = stream definition`. Streams without names are ignored. The stream definition follows the Spring XD DSL: [DSL Guide](http://docs.spring.io/spring-xd/docs/current/reference/html/#dsl-guide).
+* Every time a Paragraph is Run it destroys any previous streams created in the paragraph. The previous streams are destroyed even if their names have been changed or removed.
+* Paragraph `Run` command will `Create` and automatically `Deploy` all streams defined in the Paragraph.
+* If one stream fails to create or to deploy then all streams in the paragraph are destroyed.
+* Zeppelin returns after the streams deployment (e.g. Zeppelin goes into Finished state).
+* When streams have successfully been deployed the result contains a `button` that lists the just deployed streams and status `DEPLOYED`.
+* To destroy streams in a paragraph press the `Annular Button` in the paragraph result section. The button state will switch from `DEPLOYED` to `DESTROYED`.
+* Streams can refer other streams or jobs in any paragraph and even different notebooks.
+
+###### Stream Examples:
+Following example creates a stream that ingests a live tweeter stream and stores it into HDFS files:
+
+```
+%xd.stream
+tweets = twittersearch --query=Zeppelin --outputType=application/json | hdfs
+```
+
+Another example is to fork the above `tweets` stream, extract the tweets's IDs and use the SpringXD [Counter Sink](http://docs.spring.io/spring-xd/docs/current/reference/html/#counters-and-gauges) to count the number for tweets:
+
+```
+%xd.stream
+tweetsCount = tap:stream:tweets > json-to-tuple | transform --expression='payload.id_str' | counter --name=tweetCounter
+```
+
+#### XD Jobs
+
+* Start the paragraphs with the full `%xd.job` prefix tag.
+* Use `Ctrl+.` for auto-completion.
+* In the paragraph define one or more Job definitions - each on a separate line.
+* Jobs must have names. Follow the convention: `job name = job definition`. Streams without names are ignored. The job definition follows the Spring XD DSL for job definition.
+* Every new Paragraph `Run` will destroy any previous running jobs, created in the same paragraph. The jobs are destroyed even if their names have been changed or removed.
+* A Paragraph Run will `Create`, automatically `Deploy` and `Start` the jobs defined in the Paragraph.
+* If a job deployment fail then all jobs in the paragraph are destroyed.
+* Zeppelin returns after the jobs have started (e.g. Zeppelin goes into `Finished` state).
+* When job have successfully been deployed the result contains a button that lists the just deployed jobs and status `DEPLOYED`.
+* To destroy all jobs in the paragraph, press the `Annular button` in the result section. Button state should switch from `DEPLOYED` to `DESTROYED`.
+* Jobs can refer other streams or jobs in any paragraph and even different notebooks.
+
+###### Job Examples:
+Example module which loads [CSV files into a JDBC table](http://docs.spring.io/spring-xd/docs/current/reference/html/#file-jdbc) using a single batch job.
+
+```
+%xd.job
+myJob = filejdbc --resources=file:///mycsvdir/*.csv --names=forename,surname,address --url=jdbc:mysql://localhost:3306/myTable --username=jdbcUser --password=jdbcPassword --tableName=people --initializeDatabase=true
+```
+The job should be defined with the resources parameter defining the files which should be loaded. It also requires a names parameter (for the CSV field names) and these should match the database column names into which the data should be stored. You can either pre-create the database table or the module will create it for you if you use --initializeDatabase=true when the job is created. The table initialization is configured in a similar way to the JDBC sink and uses the same parameters. The default table name is the job name and can be customized by setting the tableName parameter. As an example, if you run the command
+
+Another example illustrates how a Spark Application can be deployed and launched from Spring XD as a batch job. SparkTasklet submits the Spark application into Spark cluster manager using org.apache.spark.deploy.SparkSubmit:
+
+```
+%xd.job
+SparkPiExample = sparkapp --appJar= --name=MyApp --master= --mainClass=org.apache.spark.examples.SparkPi
+```
+
+#### Apply Zeppelin Dynamic Forms
+
+You can leverage [Zeppelin Dynamic Form](../manual/dynamicform.html) inside your stream and job definitions. You can use both the `text input` and `select form` parameterization features. For example to parameterize the search parameter in the twitter search stream :
+
+```
+%xd.stream
+
+tweets = twittersearch --query=${MySearchParameter=DefaultSearchKeyword} --outputType=application/json | hdfs
+```
+
+
+### Auto-completion
+The SpringXD Interpreter provides a comprehensive auto-completion for `Job` and `Stream` DSL definitions. On `Ctrl+.` it lists the most relevant suggestions in a pop-up window. Implementation leverages [SpringXD's Completion API](http://docs.spring.io/spring-xd/docs/current/reference/html/#completions) and provides completion experience similar to the SpringXD Shell.
diff --git a/pom.xml b/pom.xml
index d73caf39a3b..e658fab7ef2 100755
--- a/pom.xml
+++ b/pom.xml
@@ -95,6 +95,7 @@
phoenixpostgresqljdbc
+ springxdtajoflinkignite
diff --git a/springxd/pom.xml b/springxd/pom.xml
new file mode 100644
index 00000000000..cf21b3963f0
--- /dev/null
+++ b/springxd/pom.xml
@@ -0,0 +1,152 @@
+
+
+
+
+ 4.0.0
+
+
+ zeppelin
+ org.apache.zeppelin
+ 0.6.0-incubating-SNAPSHOT
+
+
+ org.apache.zeppelin
+ zeppelin-springxd
+ jar
+ 0.6.0-incubating-SNAPSHOT
+
+
+
+ org.apache.zeppelin
+ zeppelin-interpreter
+ ${project.version}
+ provided
+
+
+
+ org.springframework.xd
+ spring-xd-rest-client
+ 1.0.1.RELEASE
+
+
+
+ com.google.guava
+ guava
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+
+ junit
+ junit
+ test
+
+
+
+ org.mockito
+ mockito-all
+ 1.10.12
+ test
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+ 2.7
+
+ true
+
+
+
+
+ maven-enforcer-plugin
+ 1.3.1
+
+
+ enforce
+ none
+
+
+
+
+
+ maven-dependency-plugin
+ 2.8
+
+
+ copy-dependencies
+ package
+
+ copy-dependencies
+
+
+ ${project.build.directory}/../../interpreter/springxd
+ false
+ false
+ true
+ runtime
+
+
+
+ copy-artifact
+ package
+
+ copy
+
+
+ ${project.build.directory}/../../interpreter/springxd
+ false
+ false
+ true
+ runtime
+
+
+ ${project.groupId}
+ ${project.artifactId}
+ ${project.version}
+ ${project.packaging}
+
+
+
+
+
+
+
+
+
+
+
+ spring-release
+ Spring Releases
+ http://repo.spring.io/libs-release
+
+
+ Zeppelin: SpringXD interpreter
+
\ No newline at end of file
diff --git a/springxd/src/main/java/org/apache/zeppelin/springxd/AbstractSpringXdInterpreter.java b/springxd/src/main/java/org/apache/zeppelin/springxd/AbstractSpringXdInterpreter.java
new file mode 100644
index 00000000000..feef0e8a9cc
--- /dev/null
+++ b/springxd/src/main/java/org/apache/zeppelin/springxd/AbstractSpringXdInterpreter.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.zeppelin.springxd;
+
+import static java.lang.String.format;
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.zeppelin.display.AngularObjectWatcher;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.springxd.AngularBinder.ResourceStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.xd.rest.client.impl.SpringXDTemplate;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Throwables;
+
+/**
+ * SpringXD {@link Interpreter} supper class extended by both {@link SpringXdStreamInterpreter} and
+ * {@link SpringXdJobInterpreter}.
+ */
+public abstract class AbstractSpringXdInterpreter extends Interpreter {
+
+ private Logger logger = LoggerFactory.getLogger(AbstractSpringXdInterpreter.class);
+
+ public static final String SPRINGXD_URL = "springxd.url";
+ public static final String DEFAULT_SPRINGXD_URL = "http://ambari.localdomain:9393";
+
+ private Exception exceptionOnConnect;
+
+ private SpringXDTemplate xdTemplate;
+
+ private AbstractSpringXdResourceManager xdResourcesManager;
+
+ private AbstractSpringXdResourceCompletion xdResourceCompletion;
+
+ public AbstractSpringXdInterpreter(Properties property) {
+ super(property);
+ }
+
+ /**
+ * @return Returns a Resource (Stream or Job) specific completion implementation
+ */
+ public abstract AbstractSpringXdResourceCompletion doCreateResourceCompletion();
+
+ /**
+ * @return Returns a Resource (stream or job) specific deployed resource manager.
+ */
+ public abstract AbstractSpringXdResourceManager doCreateResourceManager();
+
+ protected SpringXDTemplate doCreateSpringXDTemplate(URI uri) {
+ return new SpringXDTemplate(uri);
+ }
+
+ @Override
+ public void open() {
+ // Destroy any previously deployed resources
+ close();
+ try {
+ String springXdUrl = getProperty(SPRINGXD_URL);
+ xdTemplate = doCreateSpringXDTemplate(new URI(springXdUrl));
+ xdResourcesManager = doCreateResourceManager();
+ xdResourceCompletion = doCreateResourceCompletion();
+ exceptionOnConnect = null;
+ } catch (URISyntaxException e) {
+ logger.error("Failed to connect to the SpringXD cluster", e);
+ exceptionOnConnect = e;
+ close();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (xdResourcesManager != null) {
+ xdResourcesManager.destroyAllNotebookDeployedResources();
+ }
+ }
+
+ @Override
+ public InterpreterResult interpret(String multiLineResourceDefinitions, InterpreterContext ctx) {
+
+ if (exceptionOnConnect != null) {
+ return new InterpreterResult(Code.ERROR, exceptionOnConnect.getMessage());
+ }
+
+ // (Re)deploying jobs means that any previous instances (created by the same
+ // notebook/paragraph) will be destroyed
+ xdResourcesManager.destroyDeployedResourceBy(ctx.getNoteId(), ctx.getParagraphId());
+
+ String errorMessage = "";
+ try {
+ if (!isBlank(multiLineResourceDefinitions)) {
+ for (String line : multiLineResourceDefinitions
+ .split(AbstractSpringXdResourceCompletion.LINE_SEPARATOR)) {
+
+ Pair namedDefinition = NamedDefinitionParser.getNamedDefinition(line);
+
+ String resourceName = namedDefinition.getLeft();
+ String resourceDefinition = namedDefinition.getRight();
+
+ if (!isBlank(resourceName) && !isBlank(resourceDefinition)) {
+
+ xdResourcesManager.deployResource(ctx.getNoteId(), ctx.getParagraphId(), resourceName,
+ resourceDefinition);
+
+ logger.info("Deployed: [" + resourceName + "]:[" + resourceDefinition + "]");
+ } else {
+ logger.info("Skipped Line:" + line);
+ }
+ }
+ }
+
+ String angularDestroyButton = doCreateAngularResponse(ctx);
+
+ logger.info(angularDestroyButton);
+
+ return new InterpreterResult(Code.SUCCESS, angularDestroyButton);
+
+ } catch (Exception e) {
+ logger.error("Failed to deploy xd resource!", e);
+ errorMessage = Throwables.getRootCause(e).getMessage();
+ xdResourcesManager.destroyDeployedResourceBy(ctx.getNoteId(), ctx.getParagraphId());
+ }
+
+ return new InterpreterResult(Code.ERROR, "Failed to deploy XD resource: " + errorMessage);
+ }
+
+ protected String doCreateAngularResponse(InterpreterContext ctx) {
+
+ // Use the Angualr response to hook a resource destroy button
+ String xdResourceStatusId = "rxdResourceStatus_" + ctx.getParagraphId().replace("-", "_");
+
+ AngularBinder.bind(ctx, xdResourceStatusId, ResourceStatus.DEPLOYED.name(), ctx.getNoteId(),
+ ctx.getParagraphId(), new DestroyEventWatcher(ctx));
+
+ List deployedResources =
+ xdResourcesManager.getDeployedResourceBy(ctx.getNoteId(), ctx.getParagraphId());
+
+ String destroyButton =
+ format("%%angular ",
+ xdResourceStatusId, ResourceStatus.DESTROYED.name(),
+ Joiner.on(", ").join(deployedResources).toString(), xdResourceStatusId);
+
+ return destroyButton;
+ }
+
+ @Override
+ public void cancel(InterpreterContext context) {
+ if (xdResourcesManager != null) {
+ xdResourcesManager.destroyDeployedResourceBy(context.getNoteId(), context.getParagraphId());
+ }
+ }
+
+ @Override
+ public FormType getFormType() {
+ return FormType.SIMPLE;
+ }
+
+ @Override
+ public int getProgress(InterpreterContext context) {
+ return 0;
+ }
+
+ @Override
+ public List completion(String buf, int cursor) {
+ return xdResourceCompletion.completion(buf, cursor);
+ }
+
+ private class DestroyEventWatcher extends AngularObjectWatcher {
+
+ public DestroyEventWatcher(InterpreterContext context) {
+ super(context);
+ }
+
+ @Override
+ public void watch(Object oldValue, Object newValue, InterpreterContext context) {
+ if (ResourceStatus.DESTROYED.name().equals("" + newValue)) {
+ xdResourcesManager.destroyDeployedResourceBy(context.getNoteId(), context.getParagraphId());
+ }
+ }
+ }
+
+ protected SpringXDTemplate getXdTemplate() {
+ return xdTemplate;
+ }
+}
diff --git a/springxd/src/main/java/org/apache/zeppelin/springxd/AbstractSpringXdResourceCompletion.java b/springxd/src/main/java/org/apache/zeppelin/springxd/AbstractSpringXdResourceCompletion.java
new file mode 100644
index 00000000000..ae5d93e5403
--- /dev/null
+++ b/springxd/src/main/java/org/apache/zeppelin/springxd/AbstractSpringXdResourceCompletion.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.zeppelin.springxd;
+
+import static java.lang.Math.min;
+import static org.apache.commons.collections.CollectionUtils.isEmpty;
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Completion utility that helps to adapt the SpringXD completion API with the ACE editor
+ * requirements.
+ */
+public abstract class AbstractSpringXdResourceCompletion {
+
+ private Logger logger = LoggerFactory.getLogger(AbstractSpringXdResourceCompletion.class);
+
+ public static final String EMPTY_PREFFIX = "";
+
+ public static final String LINE_SEPARATOR = System.getProperty("line.separator");
+
+ public static final List EMPTY_ZEPPELIN_COMPLETION = null;
+
+ public static final int SINGLE_LEVEL_OF_DETAILS = 1;
+
+ /**
+ * To be implemented by the underlying SpringXD mechanism (Stream or Job) and use the SpringXD
+ * completion API.
+ *
+ * @param completionPreffix - input completion string
+ * @return Returns completions in SpringXD format
+ */
+ public abstract List doSpringXdCompletion(String completionPreffix);
+
+ /**
+ * Delegates the completion to the underlying doSpringXdCompletion method and transforms the
+ * completion results from SpringXD into ACE completions
+ *
+ * @param buf - input buffer from zeppelin editor
+ * @param cursor - cursor position in the zeppelin editor
+ * @return Returns completions adapted for the ACE editor.
+ */
+ public List completion(String buf, int cursor) {
+ List zeppelinCompletions = null;
+
+ if (buf != null) {
+ try {
+
+ String completionPreffix = getCompletionPreffix(buf, cursor);
+
+ logger.debug("Buffer [" + buf + "], Len=" + buf.length() + ", Cursor=" + cursor
+ + ", Preffix [" + completionPreffix + "]");
+
+ List xdCompletions = this.doSpringXdCompletion(completionPreffix);
+
+ zeppelinCompletions = convertXdToZeppelinCompletions(xdCompletions, completionPreffix);
+
+ } catch (Exception e) {
+ logger.warn("Completion error", e);
+ }
+ }
+ return zeppelinCompletions;
+
+ }
+
+ /**
+ * In multi-line buffer returns the a line part that starts with the line where the cursor is
+ * position until the cursor position (or the end of the line when the cursor spans beyond the
+ * buffer length)
+ *
+ * @param buffer - Input multi line buffer
+ * @param cursor - Cursor position in the buffer.
+ * @return Returns a line that start at the begining of the line cursor is positioned until the
+ * cursor position (or the end of the line).
+ */
+ String getCompletionPreffix(String buffer, int cursor) {
+
+ if (isBlank(buffer)) {
+ return EMPTY_PREFFIX;
+ }
+
+ // For completion we need only the first half of the buffer up to the cursor. The cursor can go
+ // beyond the length of the buffer. If this is the case use the end of the buffer.
+ int endIndex = min(buffer.length(), cursor - 1);
+ endIndex = (endIndex > 0) ? endIndex : 0;
+ String bufToCursorHalf = buffer.substring(0, endIndex);
+
+ // Separate the line where the cursor is positioned. It could be at the begining in the middle
+ // or after the buffer. From the cursor position (or the end of the buffer) go backwards to find
+ // the first occurrence of end-of-line character (or the start of the buffer)
+ int lastIndexOfLineSeparator = StringUtils.lastIndexOfAny(bufToCursorHalf, LINE_SEPARATOR) + 1;
+
+ String preffix = buffer.substring(lastIndexOfLineSeparator, endIndex);
+
+ return preffix;
+ }
+
+ /**
+ * SpringXD's completion response contains the entire line (including the prefix part already
+ * existing in the buffer). To be able to insert those completion within the ACE editor the
+ * prefixes have to be removed form the xd completion results.
+ *
+ * @param xdCompletions Spring XD completion result list.
+ * @param preffix String completion prefix use to search completions for.
+ * @return Returns a modified list of the same size. Each element in the list has the prefix part
+ * removed.
+ */
+ List convertXdToZeppelinCompletions(List xdCompletions, String preffix) {
+
+ // Noting to filter
+ if (isBlank(preffix)) {
+ return xdCompletions;
+ }
+
+ List zeppelinCompletions = EMPTY_ZEPPELIN_COMPLETION;
+
+ if (!isEmpty(xdCompletions)) {
+
+ zeppelinCompletions = new ArrayList();
+
+ String preffixToReplace = preffix.substring(0, getLastWhitespaceIndex(preffix) + 1);
+
+ for (String c : xdCompletions) {
+
+ String zeppelinCompletion = c.replace(preffixToReplace.trim(), "").trim();
+
+ zeppelinCompletions.add(zeppelinCompletion);
+ }
+ }
+
+ return zeppelinCompletions;
+ }
+
+ private int getLastWhitespaceIndex(String s) {
+ if (!isBlank(s)) {
+ for (int i = s.length() - 1; i >= 0; i--) {
+ if (Character.isWhitespace(s.charAt(i)) || s.charAt(i) == '|' || s.charAt(i) == '=') {
+ return i;
+ }
+ }
+ }
+ return 0;
+ }
+}
diff --git a/springxd/src/main/java/org/apache/zeppelin/springxd/AbstractSpringXdResourceManager.java b/springxd/src/main/java/org/apache/zeppelin/springxd/AbstractSpringXdResourceManager.java
new file mode 100644
index 00000000000..59de4483ec8
--- /dev/null
+++ b/springxd/src/main/java/org/apache/zeppelin/springxd/AbstractSpringXdResourceManager.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.zeppelin.springxd;
+
+import static org.apache.commons.collections.CollectionUtils.isEmpty;
+import static org.apache.commons.lang.StringUtils.isBlank;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Throwables;
+
+/**
+ * Supper calls used by the Stream and Job interpreters to control (and own) the resources (streams
+ * or jobs) deployed in SpringXD.
+ *
+ * Zeppelin starts one {@link RemoteInterpreter} instance per interpreter type. This single instance
+ * manages the resources for all notebooks and paragraphs. The
+ * {@link AbstractSpringXdResourceManager} keeps the track of which resource in which notebook and
+ * paragraph is created and consecutively allows its safe destruction.
+ *
+ * The implementing class should use the SpringXD client API to deploy and/or destroy the XD
+ * resources (e.g. streams or jobs).
+ *
+ * Note: SpringXD requires an unique Stream and Job names. They must be unique across all Notes and
+ * Paragraphs!
+ */
+public abstract class AbstractSpringXdResourceManager {
+
+ public static final boolean DEPLOY = true;
+
+ private Logger logger = LoggerFactory.getLogger(AbstractSpringXdResourceManager.class);
+
+ private Map>> note2paragraph2Resources;
+
+ public AbstractSpringXdResourceManager() {
+ this.note2paragraph2Resources = new HashMap>>();
+ }
+
+ /**
+ * Creates a new Stream or Job SpringXD resource.
+ *
+ * @param name Resource (stream or job) name
+ *
+ * @param definition Resource (stream or job) definition
+ */
+ public abstract void doCreateResource(String name, String definition);
+
+ /**
+ * Destroys stream or job resource by name
+ *
+ * @param name Stream or Job name to destroy.
+ */
+ public abstract void doDestroyRsource(String name);
+
+ public void deployResource(String noteId, String paragraphId, String resourceName,
+ String resourceDefininition) {
+
+ if (!isBlank(resourceName) && !isBlank(resourceDefininition)) {
+
+ doCreateResource(resourceName, resourceDefininition);
+
+ if (!note2paragraph2Resources.containsKey(noteId)) {
+ note2paragraph2Resources.put(noteId, new HashMap>());
+ }
+
+ if (!note2paragraph2Resources.get(noteId).containsKey(paragraphId)) {
+ note2paragraph2Resources.get(noteId).put(paragraphId, new ArrayList());
+ }
+ note2paragraph2Resources.get(noteId).get(paragraphId).add(resourceName);
+ }
+ }
+
+ public List getDeployedResourceBy(String noteId, String paragraphId) {
+ if (note2paragraph2Resources.containsKey(noteId)
+ && note2paragraph2Resources.get(noteId).containsKey(paragraphId)) {
+ return note2paragraph2Resources.get(noteId).get(paragraphId);
+ }
+ return new ArrayList();
+ }
+
+ public void destroyDeployedResourceBy(String noteId, String paragraphId) {
+ if (note2paragraph2Resources.containsKey(noteId)
+ && note2paragraph2Resources.get(noteId).containsKey(paragraphId)) {
+
+ Iterator it = note2paragraph2Resources.get(noteId).get(paragraphId).iterator();
+ while (it.hasNext()) {
+ String resourceName = it.next();
+ try {
+ doDestroyRsource(resourceName);
+ it.remove();
+ logger.debug("Destroyed :" + resourceName + " from [" + noteId + ":" + paragraphId + "]");
+ } catch (Exception e) {
+ logger.error("Failed to destroy resource: " + resourceName, Throwables.getRootCause(e));
+ }
+ }
+ }
+ }
+
+ public void destroyDeployedResourceBy(String noteId) {
+ if (note2paragraph2Resources.containsKey(noteId)) {
+ Iterator paragraphIds = note2paragraph2Resources.get(noteId).keySet().iterator();
+ while (paragraphIds.hasNext()) {
+ String paragraphId = paragraphIds.next();
+ destroyDeployedResourceBy(noteId, paragraphId);
+ }
+ }
+ }
+
+ public void destroyAllNotebookDeployedResources() {
+ if (!isEmpty(note2paragraph2Resources.keySet())) {
+ Iterator it = note2paragraph2Resources.keySet().iterator();
+ while (it.hasNext()) {
+ String noteId = it.next();
+ destroyDeployedResourceBy(noteId);
+ }
+ }
+ }
+}
diff --git a/springxd/src/main/java/org/apache/zeppelin/springxd/AngularBinder.java b/springxd/src/main/java/org/apache/zeppelin/springxd/AngularBinder.java
new file mode 100644
index 00000000000..3ab51fa6839
--- /dev/null
+++ b/springxd/src/main/java/org/apache/zeppelin/springxd/AngularBinder.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.zeppelin.springxd;
+
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectWatcher;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+
+/**
+ * Helper class that leverages the IntepreterContext Angular support to register client side
+ * controle with the back-end
+ */
+public class AngularBinder {
+
+ /**
+ * Defines the deployed streams status.
+ */
+ public enum ResourceStatus {
+ DEPLOYED, DESTROYED
+ };
+
+ @SuppressWarnings("unchecked")
+ public static void bind(InterpreterContext context, String name, Object value, String noteId,
+ String paragraphId, AngularObjectWatcher watcher) {
+
+ AngularObjectRegistry registry = context.getAngularObjectRegistry();
+
+ if (registry.get(name, noteId, paragraphId) == null) {
+ registry.add(name, value, noteId, paragraphId);
+ } else {
+ registry.get(name, noteId, paragraphId).set(value);
+ }
+
+ if (registry.get(name, noteId, paragraphId) != null) {
+ registry.get(name, noteId, paragraphId).addWatcher(watcher);
+ }
+ }
+}
diff --git a/springxd/src/main/java/org/apache/zeppelin/springxd/NamedDefinitionParser.java b/springxd/src/main/java/org/apache/zeppelin/springxd/NamedDefinitionParser.java
new file mode 100644
index 00000000000..2fe047a5ea7
--- /dev/null
+++ b/springxd/src/main/java/org/apache/zeppelin/springxd/NamedDefinitionParser.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.zeppelin.springxd;
+
+import static org.apache.commons.lang3.StringUtils.isBlank;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+/**
+ * Splits the input line into a "name" and a "definition" parts, divided by the '=' character. This
+ * is used for SpringXD's stream and job definitions.
+ */
+public class NamedDefinitionParser {
+
+ private static final Pattern NAMED_STREAM_PATTERN = Pattern.compile("\\s*(\\w+)\\s*=\\s*(.*)");
+ private static final int NAME_GROUP_INDEX = 1;
+ private static final int DEFINITION_GROUP_INDEX = 2;
+
+ /**
+ * Splits the input line into a name and a definition parts, divided by the '=' character.
+ *
+ * @param line - Input line in the 'name = definition' format
+ * @return Returns a (name, definition) pair. Returns an empty pair ("", "") when the input line
+ * doesn't comply with the name = definition convention.
+ */
+ public static Pair getNamedDefinition(String line) {
+
+ String name = "";
+ String definition = "";
+
+ if (!isBlank(line)) {
+ Matcher matcher = NAMED_STREAM_PATTERN.matcher(line);
+
+ if (matcher.matches()) {
+ name = matcher.group(NAME_GROUP_INDEX);
+ definition = matcher.group(DEFINITION_GROUP_INDEX);
+ }
+ }
+ return new ImmutablePair(name, definition);
+ }
+}
diff --git a/springxd/src/main/java/org/apache/zeppelin/springxd/SpringXdJobInterpreter.java b/springxd/src/main/java/org/apache/zeppelin/springxd/SpringXdJobInterpreter.java
new file mode 100644
index 00000000000..3fa91776787
--- /dev/null
+++ b/springxd/src/main/java/org/apache/zeppelin/springxd/SpringXdJobInterpreter.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.zeppelin.springxd;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.xd.rest.domain.CompletionKind;
+
+
+/**
+ * SpringXD Job interpreter for Zeppelin.
+ *
+ *
+ *
{@code springxd.url} - SpringXD URL to connect to.
+ *
+ *
+ *
+ * How to use:
+ * {@code %xd.job}
+ * {@code
+ * }
+ *
+ *
+ */
+public class SpringXdJobInterpreter extends AbstractSpringXdInterpreter {
+
+ private Logger logger = LoggerFactory.getLogger(SpringXdJobInterpreter.class);
+
+ static {
+ Interpreter.register(
+ "job",
+ "xd",
+ SpringXdJobInterpreter.class.getName(),
+ new InterpreterPropertyBuilder().add(SPRINGXD_URL, DEFAULT_SPRINGXD_URL,
+ "The URL for SpringXD REST API.").build());
+ }
+
+ public SpringXdJobInterpreter(Properties property) {
+ super(property);
+ logger.info("Create SpringXdJobInterpreter");
+ }
+
+ @Override
+ public AbstractSpringXdResourceCompletion doCreateResourceCompletion() {
+ return new AbstractSpringXdResourceCompletion() {
+ @Override
+ public List doSpringXdCompletion(String completionPreffix) {
+ return getXdTemplate().completionOperations().completions(CompletionKind.job,
+ completionPreffix, SINGLE_LEVEL_OF_DETAILS);
+ }
+ };
+ }
+
+ @Override
+ public AbstractSpringXdResourceManager doCreateResourceManager() {
+ return new AbstractSpringXdResourceManager() {
+ @Override
+ public void doCreateResource(String name, String definition) {
+ getXdTemplate().jobOperations().createJob(name, definition, DEPLOY);
+ }
+
+ @Override
+ public void doDestroyRsource(String name) {
+ getXdTemplate().jobOperations().destroy(name);
+ }
+ };
+ }
+}
diff --git a/springxd/src/main/java/org/apache/zeppelin/springxd/SpringXdStreamInterpreter.java b/springxd/src/main/java/org/apache/zeppelin/springxd/SpringXdStreamInterpreter.java
new file mode 100644
index 00000000000..7ed70cd3309
--- /dev/null
+++ b/springxd/src/main/java/org/apache/zeppelin/springxd/SpringXdStreamInterpreter.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.zeppelin.springxd;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.xd.rest.domain.CompletionKind;
+import org.springframework.xd.rest.domain.StreamDefinitionResource;
+
+/**
+ * SpringXD interpreter for Zeppelin.
+ *
+ *
+ *
{@code springxd.url} - SpringXD URL to connect to.