Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion zeppelin-interpreter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,18 @@
<exclusions>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-vfs2</artifactId>
<version>2.0</version>
<exclusions>
<exclusion>
<artifactId>plexus-utils</artifactId>
<groupId>org.codehaus.plexus</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,48 @@ public static void main(String[] args)
System.exit(0);
}


private DistributedResourcePool getResourcePool()
/* InterpreterGroup group,
Properties prop,
RemoteInterpreterEventClient client) */
throws TException {
if (resourcePool != null)
return resourcePool;
try {
Properties prop = interpreterGroup.getProperty();
//Happens during tests.
if (prop == null)
prop = new Properties();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we don't have style check for this official but could we put single line into bracket too? It would help readability and prevent possible merge mistakes.
(same for L143)
http://embeddedgurus.com/barr-code/2014/03/apples-gotofail-ssl-security-bug-was-easily-preventable/

String resourcePoolClassName = (String) prop.getProperty(
"zeppelin.interpreter.resourcePoolClass");
logger.debug("Getting resource pool {}", resourcePoolClassName);
Class resourcePoolClass = Class.forName(resourcePoolClassName);

Constructor<ResourcePool> constructor = resourcePoolClass
.getConstructor(new Class[] {String.class,
ResourcePoolConnector.class,
Properties.class });
resourcePool = (DistributedResourcePool) constructor.newInstance(interpreterGroup.getId(),
this.eventClient,
prop);
} catch (Exception e) {
logger.error("Did not find resource pool. Using DistributedResourcePool");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use logger.error("message", e) instead

resourcePool = new DistributedResourcePool(interpreterGroup.getId(), this.eventClient);
// throw new TException(e);
} finally {
interpreterGroup.setResourcePool(resourcePool);
return resourcePool;
}
}

@Override
public void createInterpreter(String interpreterGroupId, String noteId, String
className,
Map<String, String> properties) throws TException {
Map<String, String> properties) throws TException {
if (interpreterGroup == null) {
interpreterGroup = new InterpreterGroup(interpreterGroupId);
angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this);
resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient);
interpreterGroup.setAngularObjectRegistry(angularObjectRegistry);
interpreterGroup.setResourcePool(resourcePool);
}

try {
Expand All @@ -171,15 +202,19 @@ public void createInterpreter(String interpreterGroupId, String noteId, String
}

logger.info("Instantiate interpreter {}", className);

interpreterGroup.setResourcePool(getResourcePool());

repl.setInterpreterGroup(interpreterGroup);

//setResourcePool(interpreterGroup, p, eventClient);
} catch (ClassNotFoundException | NoSuchMethodException | SecurityException
| InstantiationException | IllegalAccessException
| IllegalArgumentException | InvocationTargetException e) {
logger.error(e.toString(), e);
throw new TException(e);
}
}

private void setSystemProperty(Properties properties) {
for (Object key : properties.keySet()) {
if (!RemoteInterpreter.isEnvString((String) key)) {
Expand Down Expand Up @@ -367,11 +402,13 @@ protected Object jobRun() throws Throwable {
}

// put result into resource pool
context.getResourcePool().put(
context.getNoteId(),
context.getParagraphId(),
WellKnownResourceName.ParagraphResult.toString(),
combinedResult);
if (context.getResourcePool() != null) {
context.getResourcePool().put(
context.getNoteId(),
context.getParagraphId(),
WellKnownResourceName.ParagraphResult.toString(),
combinedResult);
}
return combinedResult;
} finally {
InterpreterContext.remove();
Expand Down Expand Up @@ -402,7 +439,7 @@ public void cancel(String noteId, String className, RemoteInterpreterContext int

@Override
public int getProgress(String noteId, String className,
RemoteInterpreterContext interpreterContext)
RemoteInterpreterContext interpreterContext)
throws TException {
Interpreter intp = getInterpreter(noteId, className);
return intp.getProgress(convert(interpreterContext));
Expand All @@ -425,7 +462,7 @@ public List<String> completion(String noteId, String className, String buf, int
private InterpreterContext convert(RemoteInterpreterContext ric) {
List<InterpreterContextRunner> contextRunners = new LinkedList<InterpreterContextRunner>();
List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(),
new TypeToken<List<RemoteInterpreterContextRunner>>() {
new TypeToken<List<RemoteInterpreterContextRunner>>() {
}.getType());

for (InterpreterContextRunner r : runners) {
Expand Down Expand Up @@ -586,7 +623,7 @@ public void angularObjectUpdate(String name, String noteId, String paragraphId,
if (value == null) {
try {
value = gson.fromJson(object,
new TypeToken<Map<String, Object>>() {
new TypeToken<Map<String, Object>>() {
}.getType());
} catch (Exception e) {
// it's not a generic json object, too. okay, proceed to threat as a string type
Expand Down Expand Up @@ -622,7 +659,7 @@ public void angularObjectAdd(String name, String noteId, String paragraphId, Str
try {
value = gson.fromJson(object,
new TypeToken<Map<String, Object>>() {
}.getType());
}.getType());
} catch (Exception e) {
// it's okay. proceed to treat object as a string
logger.debug(e.getMessage(), e);
Expand All @@ -638,7 +675,7 @@ public void angularObjectAdd(String name, String noteId, String paragraphId, Str

@Override
public void angularObjectRemove(String name, String noteId, String paragraphId) throws
TException {
TException {
AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry();
registry.remove(name, noteId, paragraphId, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,27 @@
*/
package org.apache.zeppelin.resource;

import java.util.Properties;

/**
* distributed resource pool
*/
public class DistributedResourcePool extends LocalResourcePool {

private final ResourcePoolConnector connector;

protected Properties property;

public DistributedResourcePool(String id, ResourcePoolConnector connector) {
super(id);
this.connector = connector;
this.property = new Properties();
}


public DistributedResourcePool(String id, ResourcePoolConnector connector, Properties property) {
super(id);
this.connector = connector;
this.property = property;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.resource;

import java.util.List;

import org.apache.thrift.TException;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;

import com.google.gson.Gson;
/**
* Makes a remote interpreter service client act as a resource pool connector.
*/
public class RemoteInterpreterProcessResourcePoolConnector implements ResourcePoolConnector {

private Client client;

public RemoteInterpreterProcessResourcePoolConnector(Client client) {
this.client = client;
}

@Override
public ResourceSet getAllResources() {
try {
List<String> resourceList = client.resourcePoolGetAll();
ResourceSet resources = new ResourceSet();
Gson gson = new Gson();

for (String res : resourceList) {
RemoteResource r = gson.fromJson(res, RemoteResource.class);
r.setResourcePoolConnector(this);
resources.add(r);
}

return resources;
} catch (TException e) {
throw new RuntimeException(e);
}
}

@Override
public Object readResource(ResourceId id) {
try {
// TODO(Object): Deserialize object
return client.resourceGet(id.getNoteId(), id.getParagraphId(), id.getName());
} catch (TException e) {
throw new RuntimeException(e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.zeppelin.resource;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
Expand Down Expand Up @@ -54,10 +56,11 @@ public static ResourceSet getAllResourcesExcept(String interpreterGroupExcludsio
boolean broken = false;
try {
client = remoteInterpreterProcess.getClient();
List<String> resourceList = client.resourcePoolGetAll();
Gson gson = new Gson();
for (String res : resourceList) {
resourceSet.add(gson.fromJson(res, Resource.class));
RemoteInterpreterProcessResourcePoolConnector remoteConnector =
new RemoteInterpreterProcessResourcePoolConnector(client);

for (Resource r: remoteConnector.getAllResources()) {
resourceSet.add(r);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.resource;

import java.lang.reflect.Type;

import com.google.gson.Gson;
import com.google.gson.JsonDeserializationContext;
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
/**
* Serializes and Deserializes resources if they are serializable.
*/
public class ResourceSerializer implements JsonDeserializer<Resource>, JsonSerializer<Resource> {

public ResourceSerializer() {
}

@Override
public JsonElement serialize(Resource src, Type typeOfSrc, JsonSerializationContext context) {
// This is straightforward at the moment.
Gson gson = new Gson();
JsonElement elem = gson.toJsonTree(src);
JsonObject obj = elem.getAsJsonObject();
if (src.isSerializable()) {
obj.add("r", gson.toJsonTree(src.get()));
}
return obj;
}

@Override
public Resource deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context)
throws JsonParseException {
// This requires that we use the class that's stored in the element to deserialize.
JsonObject obj = json.getAsJsonObject();
String className = obj.getAsJsonPrimitive("className").getAsString();

Gson gson = new Gson();
Object r;
try {
r = gson.fromJson(obj.get("r"), Class.forName(className));
} catch (ClassNotFoundException e) {
throw new RuntimeException("Unable to deserialize the resource");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass e to create the RuntimeException?

}
ResourceId id = gson.fromJson(obj.get("resourceId"), ResourceId.class);

return new Resource(id, r);
}

}
Loading