diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml index 67b4d5fbc9a..da1f8d023ef 100644 --- a/zeppelin-interpreter/pom.xml +++ b/zeppelin-interpreter/pom.xml @@ -214,8 +214,18 @@ + + org.apache.commons + commons-vfs2 + 2.0 + + + plexus-utils + org.codehaus.plexus + + + - diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index b585e31321f..3de4bb47d3c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -134,17 +134,48 @@ public static void main(String[] args) System.exit(0); } - + private DistributedResourcePool getResourcePool() + /* InterpreterGroup group, + Properties prop, + RemoteInterpreterEventClient client) */ + throws TException { + if (resourcePool != null) + return resourcePool; + try { + Properties prop = interpreterGroup.getProperty(); + //Happens during tests. + if (prop == null) + prop = new Properties(); + String resourcePoolClassName = (String) prop.getProperty( + "zeppelin.interpreter.resourcePoolClass"); + logger.debug("Getting resource pool {}", resourcePoolClassName); + Class resourcePoolClass = Class.forName(resourcePoolClassName); + + Constructor constructor = resourcePoolClass + .getConstructor(new Class[] {String.class, + ResourcePoolConnector.class, + Properties.class }); + resourcePool = (DistributedResourcePool) constructor.newInstance(interpreterGroup.getId(), + this.eventClient, + prop); + } catch (Exception e) { + logger.error("Did not find resource pool. Using DistributedResourcePool"); + resourcePool = new DistributedResourcePool(interpreterGroup.getId(), this.eventClient); + // throw new TException(e); + } finally { + interpreterGroup.setResourcePool(resourcePool); + return resourcePool; + } + } + @Override public void createInterpreter(String interpreterGroupId, String noteId, String className, - Map properties) throws TException { + Map properties) throws TException { if (interpreterGroup == null) { interpreterGroup = new InterpreterGroup(interpreterGroupId); angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this); - resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient); interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); - interpreterGroup.setResourcePool(resourcePool); } try { @@ -171,7 +202,12 @@ public void createInterpreter(String interpreterGroupId, String noteId, String } logger.info("Instantiate interpreter {}", className); + + interpreterGroup.setResourcePool(getResourcePool()); + repl.setInterpreterGroup(interpreterGroup); + + //setResourcePool(interpreterGroup, p, eventClient); } catch (ClassNotFoundException | NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { @@ -179,7 +215,6 @@ public void createInterpreter(String interpreterGroupId, String noteId, String throw new TException(e); } } - private void setSystemProperty(Properties properties) { for (Object key : properties.keySet()) { if (!RemoteInterpreter.isEnvString((String) key)) { @@ -367,11 +402,13 @@ protected Object jobRun() throws Throwable { } // put result into resource pool - context.getResourcePool().put( - context.getNoteId(), - context.getParagraphId(), - WellKnownResourceName.ParagraphResult.toString(), - combinedResult); + if (context.getResourcePool() != null) { + context.getResourcePool().put( + context.getNoteId(), + context.getParagraphId(), + WellKnownResourceName.ParagraphResult.toString(), + combinedResult); + } return combinedResult; } finally { InterpreterContext.remove(); @@ -402,7 +439,7 @@ public void cancel(String noteId, String className, RemoteInterpreterContext int @Override public int getProgress(String noteId, String className, - RemoteInterpreterContext interpreterContext) + RemoteInterpreterContext interpreterContext) throws TException { Interpreter intp = getInterpreter(noteId, className); return intp.getProgress(convert(interpreterContext)); @@ -425,7 +462,7 @@ public List completion(String noteId, String className, String buf, int private InterpreterContext convert(RemoteInterpreterContext ric) { List contextRunners = new LinkedList(); List runners = gson.fromJson(ric.getRunners(), - new TypeToken>() { + new TypeToken>() { }.getType()); for (InterpreterContextRunner r : runners) { @@ -586,7 +623,7 @@ public void angularObjectUpdate(String name, String noteId, String paragraphId, if (value == null) { try { value = gson.fromJson(object, - new TypeToken>() { + new TypeToken>() { }.getType()); } catch (Exception e) { // it's not a generic json object, too. okay, proceed to threat as a string type @@ -622,7 +659,7 @@ public void angularObjectAdd(String name, String noteId, String paragraphId, Str try { value = gson.fromJson(object, new TypeToken>() { - }.getType()); + }.getType()); } catch (Exception e) { // it's okay. proceed to treat object as a string logger.debug(e.getMessage(), e); @@ -638,7 +675,7 @@ public void angularObjectAdd(String name, String noteId, String paragraphId, Str @Override public void angularObjectRemove(String name, String noteId, String paragraphId) throws - TException { + TException { AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry(); registry.remove(name, noteId, paragraphId, false); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java index ba31f017b83..12292f202ad 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java @@ -16,16 +16,27 @@ */ package org.apache.zeppelin.resource; +import java.util.Properties; + /** * distributed resource pool */ public class DistributedResourcePool extends LocalResourcePool { private final ResourcePoolConnector connector; - + protected Properties property; + public DistributedResourcePool(String id, ResourcePoolConnector connector) { super(id); this.connector = connector; + this.property = new Properties(); + } + + + public DistributedResourcePool(String id, ResourcePoolConnector connector, Properties property) { + super(id); + this.connector = connector; + this.property = property; } @Override diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java new file mode 100644 index 00000000000..a093ab77d45 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +import org.apache.thrift.TException; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; + +import com.google.gson.Gson; +/** + * Makes a remote interpreter service client act as a resource pool connector. + */ +public class RemoteInterpreterProcessResourcePoolConnector implements ResourcePoolConnector { + + private Client client; + private ResourceSet resources; + + public RemoteInterpreterProcessResourcePoolConnector(Client client) { + this.client = client; + } + + @Override + public ResourceSet getAllResources() { + try { + List resourceList = client.resourcePoolGetAll(); + resources = new ResourceSet(); + Gson gson = new Gson(); + + for (String res : resourceList) { + RemoteResource r = gson.fromJson(res, RemoteResource.class); + r.setResourcePoolConnector(this); + resources.add(r); + } + + return resources; + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object readResource(ResourceId id) { + try { + ByteBuffer buffer = client.resourceGet(id.getNoteId(), id.getParagraphId(), id.getName()); + for (Resource r: resources) { + if (r.getResourceId().equals(id)) + return r.deserializeObject(buffer); + } + return null; + } catch (TException | ClassNotFoundException | IOException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java index 1825bfed217..4eb1d6a8896 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java @@ -18,6 +18,8 @@ package org.apache.zeppelin.resource; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; @@ -54,10 +56,11 @@ public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsio boolean broken = false; try { client = remoteInterpreterProcess.getClient(); - List resourceList = client.resourcePoolGetAll(); - Gson gson = new Gson(); - for (String res : resourceList) { - resourceSet.add(gson.fromJson(res, Resource.class)); + RemoteInterpreterProcessResourcePoolConnector remoteConnector = + new RemoteInterpreterProcessResourcePoolConnector(client); + + for (Resource r: remoteConnector.getAllResources()) { + resourceSet.add(r); } } catch (Exception e) { logger.error(e.getMessage(), e); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java new file mode 100644 index 00000000000..887f3078525 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import java.lang.reflect.Type; + +import com.google.gson.Gson; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +/** + * Serializes and Deserializes resources if they are serializable. + */ +public class ResourceSerializer implements JsonDeserializer, JsonSerializer { + + public ResourceSerializer() { + } + + @Override + public JsonElement serialize(Resource src, Type typeOfSrc, JsonSerializationContext context) { + // This is straightforward at the moment. + Gson gson = new Gson(); + JsonElement elem = gson.toJsonTree(src); + JsonObject obj = elem.getAsJsonObject(); + if (src.isSerializable()) { + obj.add("r", gson.toJsonTree(src.get())); + } + return obj; + } + + @Override + public Resource deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) + throws JsonParseException { + // This requires that we use the class that's stored in the element to deserialize. + JsonObject obj = json.getAsJsonObject(); + String className = obj.getAsJsonPrimitive("className").getAsString(); + + Gson gson = new Gson(); + Object r; + try { + r = gson.fromJson(obj.get("r"), Class.forName(className)); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Unable to deserialize the resource"); + } + ResourceId id = gson.fromJson(obj.get("resourceId"), ResourceId.class); + + return new Resource(id, r); + } + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java new file mode 100644 index 00000000000..856b03a15b7 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Properties; + +import javax.management.RuntimeErrorException; + +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.FileSelectInfo; +import org.apache.commons.vfs2.FileSelector; +import org.apache.commons.vfs2.FileSystemException; +import org.apache.commons.vfs2.FileSystemManager; +import org.apache.commons.vfs2.FileType; +import org.apache.commons.vfs2.NameScope; +import org.apache.commons.vfs2.VFS; + +/** + * Resource pool that saves resources to the local file system. + * + */ +public class VFSResourcePool extends DistributedResourcePool { + Logger logger = LoggerFactory.getLogger(VFSResourcePool.class); + + + @Override + public void put(String name, Object object) { + try { + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(name, NameScope.CHILD); + if (!resultDir.exists()) { + resultDir.createFolder(); + } + + if (!isDirectory(resultDir)) { + throw new IOException(resultDir.getName().toString() + " is not a directory"); + } + Gson gson = new Gson(); + + FileObject resultFile = resultDir.resolveFile("result.dat", NameScope.CHILD); + // false means not appending. creates file if not exists + OutputStream out = resultFile.getContent().getOutputStream(false); + ObjectOutputStream oos = new ObjectOutputStream(out); + oos.writeObject(object); + out.close(); + } catch (IOException e) { + super.put(name, object); + throw new RuntimeException(e); + } + } + + @Override + public Resource get(String name) { + return get(name, true); + } + + @Override + public Resource get(String noteId, String paragraphId, String name) { + Resource r = get(noteId, paragraphId, name, true); + return new Resource( + new ResourceId(this.id(), noteId, paragraphId, name), r.get()); + } + + @Override + public Resource get(String name, boolean remote) { + try { + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(name, NameScope.CHILD); + if (!resultDir.exists() || !isDirectory(resultDir)) + return null; + FileObject resultFile = resultDir.resolveFile("result.dat", NameScope.CHILD); + + InputStream instream = resultFile.getContent().getInputStream(); + ObjectInputStream ois = new ObjectInputStream(instream); + + try { + Object o = ois.readObject(); + String[] splitName = name.split("___"); + ResourceId r = null; + if (splitName.length == 3) + { + r = new ResourceId(this.id(), splitName[0], splitName[1], splitName[2]); + } + else { + r = new ResourceId(this.id(), name); + } + return new Resource(r, o); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + + } catch (IOException e) { + } + return super.get(name, remote); + } + + @Override + public Resource get(String noteId, String paragraphId, String name, boolean remote) { + return get(noteId + "___" + paragraphId + "___" + name, remote); + } + + @Override + public ResourceSet getAll() { + return getAll(true); + } + + @Override + public ResourceSet getAll(boolean remote) { + ResourceSet resources = new ResourceSet(); + try { + FileObject rootDir = getRootDir(); + for (FileObject resourceDir: rootDir.getChildren()) + { + if (resourceDir.getType() == FileType.FOLDER) + resources.add(get(resourceDir.getName().getBaseName())); + } + } + catch (IOException ex) { + throw new RuntimeException(ex); + } + if (remote) + resources.addAll(super.getAll(remote)); + return resources; + } + + @Override + public void put(String noteId, String paragraphId, String name, Object object) { + put(noteId + "___" + paragraphId + "___name", object); + } + + @Override + public Resource remove(String name) { + try { + Resource r = get(name); + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(name, NameScope.CHILD); + if (resultDir.exists()) { + resultDir.delete(new FileSelector() { + @Override + public boolean traverseDescendents(FileSelectInfo fileInfo) throws Exception { + return true; + } + + @Override + public boolean includeFile(FileSelectInfo fileInfo) throws Exception { + return true; + } + }); + } + return r; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Resource remove(String noteId, String paragraphId, String name) { + return remove(noteId + "___" + paragraphId + "___" + name); + } + + private FileSystemManager fsManager; + private URI filesystemRoot; + public VFSResourcePool(String id, ResourcePoolConnector connector, Properties property) { + super(id, connector, property); + try { + this.filesystemRoot = new + URI(property.getProperty("Resource_Path", "notebook/zeppelin_resources")); + } catch (URISyntaxException e1) { + throw new RuntimeException(e1); + } + + if (filesystemRoot.getScheme() == null) { // it is local path + try { + this.filesystemRoot = new URI(new File( filesystemRoot.getPath()).getAbsolutePath()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } else { + this.filesystemRoot = filesystemRoot; + } + + try { + fsManager = VFS.getManager(); + FileObject file; + file = fsManager.resolveFile(filesystemRoot.getPath()); + if (!file.exists()) { + logger.info("Notebook dir doesn't exist, create."); + file.createFolder(); + } + } catch (FileSystemException e) { + throw new RuntimeException("Unable to load new file system."); + } + } + + private FileObject getRootDir() throws IOException { + FileObject rootDir = fsManager.resolveFile(getPath("/")); + // Does nothing if the folder already exists. + rootDir.createFolder(); + if (!isDirectory(rootDir)) { + throw new IOException("Root path is not a directory"); + } + + return rootDir; + } + + private boolean isDirectory(FileObject fo) throws IOException { + if (fo == null) return false; + if (fo.getType() == FileType.FOLDER) { + return true; + } else { + return false; + } + } + + private String getPath(String path) { + if (path == null || path.trim().length() == 0) { + return filesystemRoot.toString(); + } + if (path.startsWith("/")) { + return filesystemRoot.toString() + path; + } else { + return filesystemRoot.toString() + "/" + path; + } + } +} diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java index 3826b903115..8e408da58bd 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java @@ -37,7 +37,7 @@ public class MockInterpreterResourcePool extends Interpreter { Interpreter.register( "resourcePoolTest", "resourcePool", - MockInterpreterA.class.getName(), + MockInterpreterResourcePool.class.getName(), new InterpreterPropertyBuilder() .add("p1", "v1", "property1").build()); diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml index f2c9ced7d63..3344580eef8 100644 --- a/zeppelin-server/pom.xml +++ b/zeppelin-server/pom.xml @@ -243,7 +243,6 @@ org.apache.commons commons-exec 1.3 - test diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NoteRestSerializer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NoteRestSerializer.java new file mode 100644 index 00000000000..d1dc37fb2aa --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NoteRestSerializer.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.rest; + +import java.lang.reflect.Type; + +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.resource.Resource; +import org.apache.zeppelin.resource.ResourceSet; +import org.apache.zeppelin.scheduler.Job; + +import com.google.gson.ExclusionStrategy; +import com.google.gson.FieldAttributes; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +/** + * Responsible for passing along minimal information for a REST API to consume notes. + * + */ +public class NoteRestSerializer implements JsonSerializer { + + ResourceSet resources; + + @Override + public JsonElement serialize(Note src, Type typeOfSrc, JsonSerializationContext context) { + JsonObject obj = new JsonObject(); + obj.addProperty("id", src.getId()); + obj.addProperty("name", src.getName()); + GsonBuilder builder = new GsonBuilder(); + // We don't serialize paragraph results in this case. + builder.addSerializationExclusionStrategy(new ExclusionStrategy() { + @Override + public boolean shouldSkipField(FieldAttributes f) { + return (Job.class.isAssignableFrom(f.getDeclaringClass())) && + (f.getName().equals("config") || + f.getName().equals("settings") || + f.getName().equals("result") || + f.getName().equals("jobName")); + } + + @Override + public boolean shouldSkipClass(Class clazz) { + return false; + } + }); + Gson gson = builder.create(); + JsonArray paragraphs = new JsonArray(); + for (Resource r: resources) { + for (Paragraph p: src.getParagraphs()) { + if (r.getResourceId().getParagraphId().equals(p.getId())) { + paragraphs.add(gson.toJsonTree(p).getAsJsonObject()); + + } + } + } + obj.add("paragraphs", paragraphs); + return obj; + } + + public NoteRestSerializer(ResourceSet resources) { + this.resources = resources; + } +} diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index 2796500ac86..1ae38541288 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -22,6 +22,7 @@ import javax.ws.rs.DELETE; import javax.ws.rs.GET; +import javax.ws.rs.HeaderParam; import javax.ws.rs.POST; import javax.ws.rs.PUT; import javax.ws.rs.Path; @@ -29,14 +30,19 @@ import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; import javax.ws.rs.core.Response.Status; +import org.apache.commons.httpclient.util.DateUtil; import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.Notebook; import org.apache.zeppelin.notebook.NotebookAuthorization; import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.resource.Resource; +import org.apache.zeppelin.resource.ResourcePoolUtils; +import org.apache.zeppelin.resource.ResourceSet; import org.apache.zeppelin.rest.message.CronRequest; import org.apache.zeppelin.rest.message.InterpreterSettingListForNoteBind; import org.apache.zeppelin.rest.message.NewNotebookRequest; @@ -654,5 +660,66 @@ public Response search(@QueryParam("q") String queryTerm) { LOG.info("{} notbooks found", notebooksFound.size()); return new JsonResponse<>(Status.OK, notebooksFound).build(); } + + /** + * Get paragraphs that have a resource pool and what note they came from. + * @param + * @return JSON with status.OK + */ + @GET + @Path("/results") + public Response getParagraphsWithResults() + throws IOException { + LOG.info("Getting paragraphs from all notes"); + ResourceSet resources = ResourcePoolUtils.getAllResources(); + GsonBuilder builder = new GsonBuilder(); + builder.registerTypeAdapter(Note.class, new NoteRestSerializer(resources)); + List notes = new ArrayList(); + for (Resource r: resources) { + notes.add(notebook.getNote(r.getResourceId().getNoteId())); + } + + // Want to control how the element is built, but not return it as a + // singleton of type element. + // If we add this as a JSON Array rather than including it with the builder, + // then it adds a pesky "element" field to the middle of the result. + return new JsonResponse<>(Status.OK, "", notes, builder).build(); + } + + /** + * Get paragraph result REST API + * @param + * @return JSON with status.OK + * @throws IOException + */ + @GET + @Path("{notebookId}/paragraph/{paragraphId}/result") + public Response getParagraphResult(@PathParam("notebookId") String notebookId, + @PathParam("paragraphId") String paragraphId, + @HeaderParam("If-Modified-Since") String modificationDateString) + throws IOException { + LOG.info("Downloading paragraph {} {}", notebookId, paragraphId); + + Note note = notebook.getNote(notebookId); + if (note == null) { + return new JsonResponse(Status.NOT_FOUND, "note not found.").build(); + } + Paragraph p = note.getParagraph(paragraphId); + if (p == null) { + return new JsonResponse(Status.NOT_FOUND, "paragraph not found.").build(); + } + try { + Date modificationDate = DateUtil.parseDate(modificationDateString); + if (!p.getDateFinished().after(modificationDate)) + return Response.ok() + .status(Response.Status.NOT_MODIFIED) + .build(); + } catch (Exception e) { } + + ResponseBuilder builder = Response.ok(p.getResultFromPool().message()) + .header("Content-Disposition", "attachment; filename=" + paragraphId + ".txt"); + + return builder.build(); + } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java index 887d42ac631..88ed0b9f993 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/JsonResponse.java @@ -38,6 +38,7 @@ public class JsonResponse { private javax.ws.rs.core.Response.Status status; private String message; private T body; + private transient GsonBuilder builder; transient ArrayList cookies; transient boolean pretty = false; @@ -45,31 +46,42 @@ public JsonResponse(javax.ws.rs.core.Response.Status status) { this.status = status; this.message = null; this.body = null; - + this.builder = new GsonBuilder(); } public JsonResponse(javax.ws.rs.core.Response.Status status, String message) { this.status = status; this.message = message; this.body = null; + this.builder = new GsonBuilder(); } public JsonResponse(javax.ws.rs.core.Response.Status status, T body) { this.status = status; this.message = null; this.body = body; + this.builder = new GsonBuilder(); } public JsonResponse(javax.ws.rs.core.Response.Status status, String message, T body) { this.status = status; this.message = message; this.body = body; + this.builder = new GsonBuilder(); } public JsonResponse setPretty(boolean pretty) { this.pretty = pretty; return this; } + + public JsonResponse(javax.ws.rs.core.Response.Status status, String message, T body, + GsonBuilder builder) { + this.status = status; + this.message = message; + this.body = body; + this.builder = builder; + } /** * Add cookie for building. @@ -99,14 +111,14 @@ public JsonResponse addCookie(String name, String value) { @Override public String toString() { - GsonBuilder gsonBuilder = new GsonBuilder().registerTypeAdapter( + builder.registerTypeAdapter( InterpreterSetting.InterpreterInfo.class, new InterpreterInfoSerializer()); if (pretty) { - gsonBuilder.setPrettyPrinting(); + builder.setPrettyPrinting(); } - gsonBuilder.setExclusionStrategies(new JsonExclusionStrategy()); - Gson gson = gsonBuilder.create(); + builder.setExclusionStrategies(new JsonExclusionStrategy()); + Gson gson = builder.create(); return gson.toJson(this); } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java index 4c5cc5e8c89..d098ce953ae 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java @@ -100,10 +100,10 @@ protected static void startUp() throws Exception { if (!wasRunning) { LOG.info("Staring test Zeppelin up..."); executor = Executors.newSingleThreadExecutor(); - executor.submit(server); + executor.execute(server); long s = System.currentTimeMillis(); boolean started = false; - while (System.currentTimeMillis() - s < 1000 * 60 * 3) { // 3 minutes + while (System.currentTimeMillis() - s < 1000 * 60 * 5) { // 3 minutes Thread.sleep(2000); started = checkIfServerIsRunning(); if (started == true) { @@ -151,8 +151,11 @@ protected static void startUp() throws Exception { pySpark = true; sparkR = true; } - - ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.id()); + // Only do this if we find a spark interpreter. + // Not all tests that use this code depend on spark. + if(sparkIntpSetting != null) { + ZeppelinServer.notebook.getInterpreterFactory().restart(sparkIntpSetting.id()); + } } } } diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index 6c0275b344a..23fea0b09ec 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -107,6 +107,12 @@ org.apache.commons commons-vfs2 2.0 + + + plexus-utils + org.codehaus.plexus + + diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 541aae1397b..17e40769b40 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -508,6 +508,8 @@ public static enum ConfVars { // i.e. http://localhost:8080 ZEPPELIN_ALLOWED_ORIGINS("zeppelin.server.allowed.origins", "*"), ZEPPELIN_ANONYMOUS_ALLOWED("zeppelin.anonymous.allowed", true), + ZEPPELIN_RESOURCE_POOL_CLASS("zeppelin.interpreter.resourcePoolClass", + "org.apacheorg.apache.zeppelin.resource.DistributedResourcePool"), ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE("zeppelin.websocket.max.text.message.size", "1024000"); private String varName; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 269b54a1fce..e060a45ab80 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -198,6 +198,8 @@ private void init() throws InterpreterException, IOException, RepositoryExceptio logger.info("Interpreter setting group {} : id={}, name={}", setting.getGroup(), settingId, setting.getName()); } + + } private void loadFromFile() throws IOException { @@ -234,16 +236,19 @@ private void loadFromFile() throws IOException { // enable/disable option on GUI). // previously created setting should turn this feature on here. setting.getOption().setRemote(true); - + + Properties mergedProperties = + this.getInterpreterPropertiesFromZeppelinConf(); + mergedProperties.putAll(setting.getProperties());; + InterpreterSetting intpSetting = new InterpreterSetting( setting.id(), setting.getName(), setting.getGroup(), setting.getInterpreterInfos(), - setting.getProperties(), + mergedProperties, setting.getDependencies(), setting.getOption()); - InterpreterGroup interpreterGroup = createInterpreterGroup(setting.id(), setting.getOption()); intpSetting.setInterpreterGroup(interpreterGroup); @@ -260,6 +265,16 @@ private void loadFromFile() throws IOException { } } } + + private Properties getInterpreterPropertiesFromZeppelinConf() { + Iterator keySet = this.conf.getKeys("zeppelin.interpreter"); + Properties p = new Properties(); + while (keySet.hasNext()) { + String key = keySet.next(); + p.setProperty(key, this.conf.getProperty(key).toString()); + } + return p; + } private void loadInterpreterDependencies(InterpreterSetting intSetting) throws IOException, RepositoryException { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index ae0c4106e61..497562ef011 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -25,7 +25,10 @@ import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.Interpreter.FormType; import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.resource.Resource; import org.apache.zeppelin.resource.ResourcePool; +import org.apache.zeppelin.resource.ResourcePoolUtils; +import org.apache.zeppelin.resource.ResourceSet; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.JobListener; import org.apache.zeppelin.scheduler.Scheduler; @@ -208,6 +211,24 @@ public void setNoteReplLoader(NoteInterpreterLoader repls) { this.replLoader = repls; } + /** + * Gets the first resource for this paragraph that responds to the interpreter result class. + */ + public InterpreterResult getResultFromPool() { + ResourceSet resources = ResourcePoolUtils.getAllResources() + .filterByParagraphId(this.getId()).filterByNoteId(this.getNote().getId()); + if (resources.size() > 0) + { + for (Resource r: resources) + { + if (InterpreterResult.class.isAssignableFrom(r.get().getClass())) + return (InterpreterResult) r.get(); + } + } + return null; + } + + public InterpreterResult getResult() { return (InterpreterResult) getReturn(); }