diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml index 67b4d5fbc9a..da1f8d023ef 100644 --- a/zeppelin-interpreter/pom.xml +++ b/zeppelin-interpreter/pom.xml @@ -214,8 +214,18 @@ + + org.apache.commons + commons-vfs2 + 2.0 + + + plexus-utils + org.codehaus.plexus + + + - diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 6e369c0694a..b19fd9cd09e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -134,17 +134,47 @@ public static void main(String[] args) System.exit(0); } + private DistributedResourcePool getResourcePool() + /* InterpreterGroup group, + Properties prop, + RemoteInterpreterEventClient client) */ + throws TException { + if (resourcePool != null) + return resourcePool; + try { + Properties prop = interpreterGroup.getProperty(); + //Happens during tests. + if (prop == null) + prop = new Properties(); + String resourcePoolClassName = (String) prop.getProperty( + "zeppelin.interpreter.resourcePoolClass"); + logger.debug("Getting resource pool {}", resourcePoolClassName); + Class resourcePoolClass = Class.forName(resourcePoolClassName); + + Constructor constructor = resourcePoolClass + .getConstructor(new Class[] {String.class, + ResourcePoolConnector.class, + Properties.class }); + resourcePool = (DistributedResourcePool) constructor.newInstance(interpreterGroup.getId(), + this.eventClient, + prop); + interpreterGroup.setResourcePool(resourcePool); + return resourcePool; + } catch (Exception e) { + logger.error(e.toString(), e); + return new DistributedResourcePool(interpreterGroup.getId(), this.eventClient); + // throw new TException(e); + } + } @Override public void createInterpreter(String interpreterGroupId, String noteId, String className, - Map properties) throws TException { + Map properties) throws TException { if (interpreterGroup == null) { interpreterGroup = new InterpreterGroup(interpreterGroupId); angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this); - resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient); interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); - interpreterGroup.setResourcePool(resourcePool); } try { @@ -170,7 +200,12 @@ public void createInterpreter(String interpreterGroupId, String noteId, String } logger.info("Instantiate interpreter {}", className); + + interpreterGroup.setResourcePool(getResourcePool()); + repl.setInterpreterGroup(interpreterGroup); + + //setResourcePool(interpreterGroup, p, eventClient); } catch (ClassNotFoundException | NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { @@ -178,7 +213,6 @@ public void createInterpreter(String interpreterGroupId, String noteId, String throw new TException(e); } } - private Interpreter getInterpreter(String noteId, String className) throws TException { if (interpreterGroup == null) { throw new TException( @@ -353,11 +387,13 @@ protected Object jobRun() throws Throwable { } // put result into resource pool - context.getResourcePool().put( - context.getNoteId(), - context.getParagraphId(), - WellKnownResourceName.ParagraphResult.toString(), - combinedResult); + if (context.getResourcePool() != null) { + context.getResourcePool().put( + context.getNoteId(), + context.getParagraphId(), + WellKnownResourceName.ParagraphResult.toString(), + combinedResult); + } return combinedResult; } finally { InterpreterContext.remove(); @@ -388,7 +424,7 @@ public void cancel(String noteId, String className, RemoteInterpreterContext int @Override public int getProgress(String noteId, String className, - RemoteInterpreterContext interpreterContext) + RemoteInterpreterContext interpreterContext) throws TException { Interpreter intp = getInterpreter(noteId, className); return intp.getProgress(convert(interpreterContext)); @@ -411,7 +447,7 @@ public List completion(String noteId, String className, String buf, int private InterpreterContext convert(RemoteInterpreterContext ric) { List contextRunners = new LinkedList(); List runners = gson.fromJson(ric.getRunners(), - new TypeToken>() { + new TypeToken>() { }.getType()); for (InterpreterContextRunner r : runners) { @@ -572,7 +608,7 @@ public void angularObjectUpdate(String name, String noteId, String paragraphId, if (value == null) { try { value = gson.fromJson(object, - new TypeToken>() { + new TypeToken>() { }.getType()); } catch (Exception e) { // it's not a generic json object, too. okay, proceed to threat as a string type @@ -608,7 +644,7 @@ public void angularObjectAdd(String name, String noteId, String paragraphId, Str try { value = gson.fromJson(object, new TypeToken>() { - }.getType()); + }.getType()); } catch (Exception e) { // it's okay. proceed to treat object as a string logger.debug(e.getMessage(), e); @@ -624,7 +660,7 @@ public void angularObjectAdd(String name, String noteId, String paragraphId, Str @Override public void angularObjectRemove(String name, String noteId, String paragraphId) throws - TException { + TException { AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry(); registry.remove(name, noteId, paragraphId, false); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java index ba31f017b83..12292f202ad 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/DistributedResourcePool.java @@ -16,16 +16,27 @@ */ package org.apache.zeppelin.resource; +import java.util.Properties; + /** * distributed resource pool */ public class DistributedResourcePool extends LocalResourcePool { private final ResourcePoolConnector connector; - + protected Properties property; + public DistributedResourcePool(String id, ResourcePoolConnector connector) { super(id); this.connector = connector; + this.property = new Properties(); + } + + + public DistributedResourcePool(String id, ResourcePoolConnector connector, Properties property) { + super(id); + this.connector = connector; + this.property = property; } @Override diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java new file mode 100644 index 00000000000..7315af7c266 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/RemoteInterpreterProcessResourcePoolConnector.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import java.util.List; + +import org.apache.thrift.TException; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; + +import com.google.gson.Gson; +/** + * Makes a remote interpreter service client act as a resource pool connector. + */ +public class RemoteInterpreterProcessResourcePoolConnector implements ResourcePoolConnector { + + private Client client; + + public RemoteInterpreterProcessResourcePoolConnector(Client client) { + this.client = client; + } + + @Override + public ResourceSet getAllResources() { + try { + List resourceList = client.resourcePoolGetAll(); + ResourceSet resources = new ResourceSet(); + Gson gson = new Gson(); + + for (String res : resourceList) { + RemoteResource r = gson.fromJson(res, RemoteResource.class); + r.setResourcePoolConnector(this); + resources.add(r); + } + + return resources; + } catch (TException e) { + throw new RuntimeException(e); + } + } + + @Override + public Object readResource(ResourceId id) { + try { + // TODO(Object): Deserialize object + return client.resourceGet(id.getNoteId(), id.getParagraphId(), id.getName()); + } catch (TException e) { + throw new RuntimeException(e); + } + } + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java index 1825bfed217..4eb1d6a8896 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourcePoolUtils.java @@ -18,6 +18,8 @@ package org.apache.zeppelin.resource; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; @@ -54,10 +56,11 @@ public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsio boolean broken = false; try { client = remoteInterpreterProcess.getClient(); - List resourceList = client.resourcePoolGetAll(); - Gson gson = new Gson(); - for (String res : resourceList) { - resourceSet.add(gson.fromJson(res, Resource.class)); + RemoteInterpreterProcessResourcePoolConnector remoteConnector = + new RemoteInterpreterProcessResourcePoolConnector(client); + + for (Resource r: remoteConnector.getAllResources()) { + resourceSet.add(r); } } catch (Exception e) { logger.error(e.getMessage(), e); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java new file mode 100644 index 00000000000..887f3078525 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/ResourceSerializer.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import java.lang.reflect.Type; + +import com.google.gson.Gson; +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; +/** + * Serializes and Deserializes resources if they are serializable. + */ +public class ResourceSerializer implements JsonDeserializer, JsonSerializer { + + public ResourceSerializer() { + } + + @Override + public JsonElement serialize(Resource src, Type typeOfSrc, JsonSerializationContext context) { + // This is straightforward at the moment. + Gson gson = new Gson(); + JsonElement elem = gson.toJsonTree(src); + JsonObject obj = elem.getAsJsonObject(); + if (src.isSerializable()) { + obj.add("r", gson.toJsonTree(src.get())); + } + return obj; + } + + @Override + public Resource deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) + throws JsonParseException { + // This requires that we use the class that's stored in the element to deserialize. + JsonObject obj = json.getAsJsonObject(); + String className = obj.getAsJsonPrimitive("className").getAsString(); + + Gson gson = new Gson(); + Object r; + try { + r = gson.fromJson(obj.get("r"), Class.forName(className)); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Unable to deserialize the resource"); + } + ResourceId id = gson.fromJson(obj.get("resourceId"), ResourceId.class); + + return new Resource(id, r); + } + +} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java new file mode 100644 index 00000000000..856b03a15b7 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/resource/VFSResourcePool.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.resource; + +import org.slf4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.OutputStream; +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Properties; + +import javax.management.RuntimeErrorException; + +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.FileSelectInfo; +import org.apache.commons.vfs2.FileSelector; +import org.apache.commons.vfs2.FileSystemException; +import org.apache.commons.vfs2.FileSystemManager; +import org.apache.commons.vfs2.FileType; +import org.apache.commons.vfs2.NameScope; +import org.apache.commons.vfs2.VFS; + +/** + * Resource pool that saves resources to the local file system. + * + */ +public class VFSResourcePool extends DistributedResourcePool { + Logger logger = LoggerFactory.getLogger(VFSResourcePool.class); + + + @Override + public void put(String name, Object object) { + try { + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(name, NameScope.CHILD); + if (!resultDir.exists()) { + resultDir.createFolder(); + } + + if (!isDirectory(resultDir)) { + throw new IOException(resultDir.getName().toString() + " is not a directory"); + } + Gson gson = new Gson(); + + FileObject resultFile = resultDir.resolveFile("result.dat", NameScope.CHILD); + // false means not appending. creates file if not exists + OutputStream out = resultFile.getContent().getOutputStream(false); + ObjectOutputStream oos = new ObjectOutputStream(out); + oos.writeObject(object); + out.close(); + } catch (IOException e) { + super.put(name, object); + throw new RuntimeException(e); + } + } + + @Override + public Resource get(String name) { + return get(name, true); + } + + @Override + public Resource get(String noteId, String paragraphId, String name) { + Resource r = get(noteId, paragraphId, name, true); + return new Resource( + new ResourceId(this.id(), noteId, paragraphId, name), r.get()); + } + + @Override + public Resource get(String name, boolean remote) { + try { + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(name, NameScope.CHILD); + if (!resultDir.exists() || !isDirectory(resultDir)) + return null; + FileObject resultFile = resultDir.resolveFile("result.dat", NameScope.CHILD); + + InputStream instream = resultFile.getContent().getInputStream(); + ObjectInputStream ois = new ObjectInputStream(instream); + + try { + Object o = ois.readObject(); + String[] splitName = name.split("___"); + ResourceId r = null; + if (splitName.length == 3) + { + r = new ResourceId(this.id(), splitName[0], splitName[1], splitName[2]); + } + else { + r = new ResourceId(this.id(), name); + } + return new Resource(r, o); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + + } catch (IOException e) { + } + return super.get(name, remote); + } + + @Override + public Resource get(String noteId, String paragraphId, String name, boolean remote) { + return get(noteId + "___" + paragraphId + "___" + name, remote); + } + + @Override + public ResourceSet getAll() { + return getAll(true); + } + + @Override + public ResourceSet getAll(boolean remote) { + ResourceSet resources = new ResourceSet(); + try { + FileObject rootDir = getRootDir(); + for (FileObject resourceDir: rootDir.getChildren()) + { + if (resourceDir.getType() == FileType.FOLDER) + resources.add(get(resourceDir.getName().getBaseName())); + } + } + catch (IOException ex) { + throw new RuntimeException(ex); + } + if (remote) + resources.addAll(super.getAll(remote)); + return resources; + } + + @Override + public void put(String noteId, String paragraphId, String name, Object object) { + put(noteId + "___" + paragraphId + "___name", object); + } + + @Override + public Resource remove(String name) { + try { + Resource r = get(name); + FileObject rootDir = getRootDir(); + FileObject resultDir = rootDir.resolveFile(name, NameScope.CHILD); + if (resultDir.exists()) { + resultDir.delete(new FileSelector() { + @Override + public boolean traverseDescendents(FileSelectInfo fileInfo) throws Exception { + return true; + } + + @Override + public boolean includeFile(FileSelectInfo fileInfo) throws Exception { + return true; + } + }); + } + return r; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public Resource remove(String noteId, String paragraphId, String name) { + return remove(noteId + "___" + paragraphId + "___" + name); + } + + private FileSystemManager fsManager; + private URI filesystemRoot; + public VFSResourcePool(String id, ResourcePoolConnector connector, Properties property) { + super(id, connector, property); + try { + this.filesystemRoot = new + URI(property.getProperty("Resource_Path", "notebook/zeppelin_resources")); + } catch (URISyntaxException e1) { + throw new RuntimeException(e1); + } + + if (filesystemRoot.getScheme() == null) { // it is local path + try { + this.filesystemRoot = new URI(new File( filesystemRoot.getPath()).getAbsolutePath()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } else { + this.filesystemRoot = filesystemRoot; + } + + try { + fsManager = VFS.getManager(); + FileObject file; + file = fsManager.resolveFile(filesystemRoot.getPath()); + if (!file.exists()) { + logger.info("Notebook dir doesn't exist, create."); + file.createFolder(); + } + } catch (FileSystemException e) { + throw new RuntimeException("Unable to load new file system."); + } + } + + private FileObject getRootDir() throws IOException { + FileObject rootDir = fsManager.resolveFile(getPath("/")); + // Does nothing if the folder already exists. + rootDir.createFolder(); + if (!isDirectory(rootDir)) { + throw new IOException("Root path is not a directory"); + } + + return rootDir; + } + + private boolean isDirectory(FileObject fo) throws IOException { + if (fo == null) return false; + if (fo.getType() == FileType.FOLDER) { + return true; + } else { + return false; + } + } + + private String getPath(String path) { + if (path == null || path.trim().length() == 0) { + return filesystemRoot.toString(); + } + if (path.startsWith("/")) { + return filesystemRoot.toString() + path; + } else { + return filesystemRoot.toString() + "/" + path; + } + } +} diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java index 3826b903115..8e408da58bd 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/mock/MockInterpreterResourcePool.java @@ -37,7 +37,7 @@ public class MockInterpreterResourcePool extends Interpreter { Interpreter.register( "resourcePoolTest", "resourcePool", - MockInterpreterA.class.getName(), + MockInterpreterResourcePool.class.getName(), new InterpreterPropertyBuilder() .add("p1", "v1", "property1").build()); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index eafbbbe224d..9fc32cd5a13 100755 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -504,6 +504,8 @@ public static enum ConfVars { // i.e. http://localhost:8080 ZEPPELIN_ALLOWED_ORIGINS("zeppelin.server.allowed.origins", "*"), ZEPPELIN_ANONYMOUS_ALLOWED("zeppelin.anonymous.allowed", true), + ZEPPELIN_RESOURCE_POOL_CLASS("zeppelin.interpreter.resourcePoolClass", + "org.apacheorg.apache.zeppelin.resource.DistributedResourcePool"); ZEPPELIN_WEBSOCKET_MAX_TEXT_MESSAGE_SIZE("zeppelin.websocket.max.text.message.size", "1024000"); private String varName; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 269b54a1fce..e060a45ab80 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -198,6 +198,8 @@ private void init() throws InterpreterException, IOException, RepositoryExceptio logger.info("Interpreter setting group {} : id={}, name={}", setting.getGroup(), settingId, setting.getName()); } + + } private void loadFromFile() throws IOException { @@ -234,16 +236,19 @@ private void loadFromFile() throws IOException { // enable/disable option on GUI). // previously created setting should turn this feature on here. setting.getOption().setRemote(true); - + + Properties mergedProperties = + this.getInterpreterPropertiesFromZeppelinConf(); + mergedProperties.putAll(setting.getProperties());; + InterpreterSetting intpSetting = new InterpreterSetting( setting.id(), setting.getName(), setting.getGroup(), setting.getInterpreterInfos(), - setting.getProperties(), + mergedProperties, setting.getDependencies(), setting.getOption()); - InterpreterGroup interpreterGroup = createInterpreterGroup(setting.id(), setting.getOption()); intpSetting.setInterpreterGroup(interpreterGroup); @@ -260,6 +265,16 @@ private void loadFromFile() throws IOException { } } } + + private Properties getInterpreterPropertiesFromZeppelinConf() { + Iterator keySet = this.conf.getKeys("zeppelin.interpreter"); + Properties p = new Properties(); + while (keySet.hasNext()) { + String key = keySet.next(); + p.setProperty(key, this.conf.getProperty(key).toString()); + } + return p; + } private void loadInterpreterDependencies(InterpreterSetting intSetting) throws IOException, RepositoryException {