diff --git a/rlang/src/test/java/org/apache/zeppelin/r/ShinyInterpreterTest.java b/rlang/src/test/java/org/apache/zeppelin/r/ShinyInterpreterTest.java index 5939f9917f8..def64361020 100644 --- a/rlang/src/test/java/org/apache/zeppelin/r/ShinyInterpreterTest.java +++ b/rlang/src/test/java/org/apache/zeppelin/r/ShinyInterpreterTest.java @@ -28,6 +28,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResultMessage; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -42,6 +43,7 @@ import static junit.framework.TestCase.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; public class ShinyInterpreterTest { @@ -240,6 +242,7 @@ protected InterpreterContext getInterpreterContext() { .setInterpreterOut(new InterpreterOutput(null)) .setLocalProperties(new HashMap<>()) .setInterpreterClassName(ShinyInterpreter.class.getName()) + .setIntpEventClient(mock(RemoteInterpreterEventClient.class)) .build(); return context; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java index 5ac1c0a8dcd..41768adaf7e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java @@ -235,6 +235,15 @@ public synchronized void runParagraphs(String noteId, } } + public synchronized void checkpointOutput(String noteId, String paragraphId) { + try { + intpEventServiceClient.checkpointOutput(noteId, paragraphId); + } catch (TException e) { + LOGGER.warn("Fail to checkpointOutput of paragraph: " + + paragraphId + " of note: " + noteId, e); + } + } + public synchronized void onAppOutputAppend( String noteId, String paragraphId, int index, String appId, String output) { AppOutputAppendEvent event = diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java index fc1004a30c5..4b053d626b6 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AngularObjectId.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class AngularObjectId implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AngularObjectId"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputAppendEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputAppendEvent.java index 723754a0bb8..2511ab94b26 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputAppendEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputAppendEvent.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class AppOutputAppendEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AppOutputAppendEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputUpdateEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputUpdateEvent.java index 0d72b31597c..8f4c9ca70d4 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputUpdateEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppOutputUpdateEvent.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class AppOutputUpdateEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AppOutputUpdateEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppStatusUpdateEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppStatusUpdateEvent.java index dfc5a530344..550efebd399 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppStatusUpdateEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/AppStatusUpdateEvent.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class AppStatusUpdateEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("AppStatusUpdateEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java index 6d0c8f5c383..ddb5512ab39 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/InterpreterCompletion.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class InterpreterCompletion implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InterpreterCompletion"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputAppendEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputAppendEvent.java index 1a8fc2e764a..c0757fe1d33 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputAppendEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputAppendEvent.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class OutputAppendEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OutputAppendEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateAllEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateAllEvent.java index 0df11968994..944241e2c61 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateAllEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateAllEvent.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class OutputUpdateAllEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OutputUpdateAllEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateEvent.java index f4ac7d9cce1..cea1774df53 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/OutputUpdateEvent.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class OutputUpdateEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("OutputUpdateEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ParagraphInfo.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ParagraphInfo.java index a2acfc13f36..465b8bf31f1 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ParagraphInfo.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ParagraphInfo.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class ParagraphInfo implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ParagraphInfo"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RegisterInfo.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RegisterInfo.java index 7f40f5a8a96..8744e2b7718 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RegisterInfo.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RegisterInfo.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class RegisterInfo implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RegisterInfo"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java index 2965912084f..d869dffa324 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class RemoteApplicationResult implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java index 724b9dbd928..ce9dff3eba8 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterContext.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class RemoteInterpreterContext implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterContext"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java index 66f619977d8..2a992086f6f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEvent.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class RemoteInterpreterEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventService.java index daea0190054..e1eb9e8eab3 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventService.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventService.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class RemoteInterpreterEventService { public interface Iface { @@ -43,6 +43,8 @@ public interface Iface { public void updateAppStatus(AppStatusUpdateEvent event) throws org.apache.thrift.TException; + public void checkpointOutput(java.lang.String noteId, java.lang.String paragraphId) throws org.apache.thrift.TException; + public void runParagraphs(RunParagraphsEvent event) throws org.apache.thrift.TException; public void addAngularObject(java.lang.String intpGroupId, java.lang.String json) throws org.apache.thrift.TException; @@ -79,6 +81,8 @@ public interface AsyncIface { public void updateAppStatus(AppStatusUpdateEvent event, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + public void checkpointOutput(java.lang.String noteId, java.lang.String paragraphId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; + public void runParagraphs(RunParagraphsEvent event, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; public void addAngularObject(java.lang.String intpGroupId, java.lang.String json, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException; @@ -259,6 +263,27 @@ public void recv_updateAppStatus() throws org.apache.thrift.TException return; } + public void checkpointOutput(java.lang.String noteId, java.lang.String paragraphId) throws org.apache.thrift.TException + { + send_checkpointOutput(noteId, paragraphId); + recv_checkpointOutput(); + } + + public void send_checkpointOutput(java.lang.String noteId, java.lang.String paragraphId) throws org.apache.thrift.TException + { + checkpointOutput_args args = new checkpointOutput_args(); + args.setNoteId(noteId); + args.setParagraphId(paragraphId); + sendBase("checkpointOutput", args); + } + + public void recv_checkpointOutput() throws org.apache.thrift.TException + { + checkpointOutput_result result = new checkpointOutput_result(); + receiveBase(result, "checkpointOutput"); + return; + } + public void runParagraphs(RunParagraphsEvent event) throws org.apache.thrift.TException { send_runParagraphs(event); @@ -704,6 +729,41 @@ public Void getResult() throws org.apache.thrift.TException { } } + public void checkpointOutput(java.lang.String noteId, java.lang.String paragraphId, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { + checkReady(); + checkpointOutput_call method_call = new checkpointOutput_call(noteId, paragraphId, resultHandler, this, ___protocolFactory, ___transport); + this.___currentMethod = method_call; + ___manager.call(method_call); + } + + public static class checkpointOutput_call extends org.apache.thrift.async.TAsyncMethodCall { + private java.lang.String noteId; + private java.lang.String paragraphId; + public checkpointOutput_call(java.lang.String noteId, java.lang.String paragraphId, org.apache.thrift.async.AsyncMethodCallback resultHandler, org.apache.thrift.async.TAsyncClient client, org.apache.thrift.protocol.TProtocolFactory protocolFactory, org.apache.thrift.transport.TNonblockingTransport transport) throws org.apache.thrift.TException { + super(client, protocolFactory, transport, resultHandler, false); + this.noteId = noteId; + this.paragraphId = paragraphId; + } + + public void write_args(org.apache.thrift.protocol.TProtocol prot) throws org.apache.thrift.TException { + prot.writeMessageBegin(new org.apache.thrift.protocol.TMessage("checkpointOutput", org.apache.thrift.protocol.TMessageType.CALL, 0)); + checkpointOutput_args args = new checkpointOutput_args(); + args.setNoteId(noteId); + args.setParagraphId(paragraphId); + args.write(prot); + prot.writeMessageEnd(); + } + + public Void getResult() throws org.apache.thrift.TException { + if (getState() != org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) { + throw new java.lang.IllegalStateException("Method call not finished!"); + } + org.apache.thrift.transport.TMemoryInputTransport memoryTransport = new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array()); + org.apache.thrift.protocol.TProtocol prot = client.getProtocolFactory().getProtocol(memoryTransport); + return null; + } + } + public void runParagraphs(RunParagraphsEvent event, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { checkReady(); runParagraphs_call method_call = new runParagraphs_call(event, resultHandler, this, ___protocolFactory, ___transport); @@ -1036,6 +1096,7 @@ protected Processor(I iface, java.util.Map extends org.apache.thrift.ProcessFunction { + public checkpointOutput() { + super("checkpointOutput"); + } + + public checkpointOutput_args getEmptyArgsInstance() { + return new checkpointOutput_args(); + } + + protected boolean isOneway() { + return false; + } + + @Override + protected boolean rethrowUnhandledExceptions() { + return false; + } + + public checkpointOutput_result getResult(I iface, checkpointOutput_args args) throws org.apache.thrift.TException { + checkpointOutput_result result = new checkpointOutput_result(); + iface.checkpointOutput(args.noteId, args.paragraphId); + return result; + } + } + public static class runParagraphs extends org.apache.thrift.ProcessFunction { public runParagraphs() { super("runParagraphs"); @@ -1472,6 +1558,7 @@ protected AsyncProcessor(I iface, java.util.Map extends org.apache.thrift.AsyncProcessFunction { + public checkpointOutput() { + super("checkpointOutput"); + } + + public checkpointOutput_args getEmptyArgsInstance() { + return new checkpointOutput_args(); + } + + public org.apache.thrift.async.AsyncMethodCallback getResultHandler(final org.apache.thrift.server.AbstractNonblockingServer.AsyncFrameBuffer fb, final int seqid) { + final org.apache.thrift.AsyncProcessFunction fcall = this; + return new org.apache.thrift.async.AsyncMethodCallback() { + public void onComplete(Void o) { + checkpointOutput_result result = new checkpointOutput_result(); + try { + fcall.sendResponse(fb, result, org.apache.thrift.protocol.TMessageType.REPLY,seqid); + } catch (org.apache.thrift.transport.TTransportException e) { + _LOGGER.error("TTransportException writing to internal frame buffer", e); + fb.close(); + } catch (java.lang.Exception e) { + _LOGGER.error("Exception writing to internal frame buffer", e); + onError(e); + } + } + public void onError(java.lang.Exception e) { + byte msgType = org.apache.thrift.protocol.TMessageType.REPLY; + org.apache.thrift.TSerializable msg; + checkpointOutput_result result = new checkpointOutput_result(); + if (e instanceof org.apache.thrift.transport.TTransportException) { + _LOGGER.error("TTransportException inside handler", e); + fb.close(); + return; + } else if (e instanceof org.apache.thrift.TApplicationException) { + _LOGGER.error("TApplicationException inside handler", e); + msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION; + msg = (org.apache.thrift.TApplicationException)e; + } else { + _LOGGER.error("Exception inside handler", e); + msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION; + msg = new org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR, e.getMessage()); + } + try { + fcall.sendResponse(fb,msg,msgType,seqid); + } catch (java.lang.Exception ex) { + _LOGGER.error("Exception writing to internal frame buffer", ex); + fb.close(); + } + } + }; + } + + protected boolean isOneway() { + return false; + } + + public void start(I iface, checkpointOutput_args args, org.apache.thrift.async.AsyncMethodCallback resultHandler) throws org.apache.thrift.TException { + iface.checkpointOutput(args.noteId, args.paragraphId,resultHandler); + } + } + public static class runParagraphs extends org.apache.thrift.AsyncProcessFunction { public runParagraphs() { super("runParagraphs"); @@ -6836,6 +6983,732 @@ private static S scheme(org.apache. } } + public static class checkpointOutput_args implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("checkpointOutput_args"); + + private static final org.apache.thrift.protocol.TField NOTE_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("noteId", org.apache.thrift.protocol.TType.STRING, (short)1); + private static final org.apache.thrift.protocol.TField PARAGRAPH_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("paragraphId", org.apache.thrift.protocol.TType.STRING, (short)2); + + private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new checkpointOutput_argsStandardSchemeFactory(); + private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new checkpointOutput_argsTupleSchemeFactory(); + + public @org.apache.thrift.annotation.Nullable java.lang.String noteId; // required + public @org.apache.thrift.annotation.Nullable java.lang.String paragraphId; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + NOTE_ID((short)1, "noteId"), + PARAGRAPH_ID((short)2, "paragraphId"); + + private static final java.util.Map byName = new java.util.HashMap(); + + static { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + @org.apache.thrift.annotation.Nullable + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // NOTE_ID + return NOTE_ID; + case 2: // PARAGRAPH_ID + return PARAGRAPH_ID; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + @org.apache.thrift.annotation.Nullable + public static _Fields findByName(java.lang.String name) { + return byName.get(name); + } + + private final short _thriftId; + private final java.lang.String _fieldName; + + _Fields(short thriftId, java.lang.String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public java.lang.String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.NOTE_ID, new org.apache.thrift.meta_data.FieldMetaData("noteId", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.PARAGRAPH_ID, new org.apache.thrift.meta_data.FieldMetaData("paragraphId", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(checkpointOutput_args.class, metaDataMap); + } + + public checkpointOutput_args() { + } + + public checkpointOutput_args( + java.lang.String noteId, + java.lang.String paragraphId) + { + this(); + this.noteId = noteId; + this.paragraphId = paragraphId; + } + + /** + * Performs a deep copy on other. + */ + public checkpointOutput_args(checkpointOutput_args other) { + if (other.isSetNoteId()) { + this.noteId = other.noteId; + } + if (other.isSetParagraphId()) { + this.paragraphId = other.paragraphId; + } + } + + public checkpointOutput_args deepCopy() { + return new checkpointOutput_args(this); + } + + @Override + public void clear() { + this.noteId = null; + this.paragraphId = null; + } + + @org.apache.thrift.annotation.Nullable + public java.lang.String getNoteId() { + return this.noteId; + } + + public checkpointOutput_args setNoteId(@org.apache.thrift.annotation.Nullable java.lang.String noteId) { + this.noteId = noteId; + return this; + } + + public void unsetNoteId() { + this.noteId = null; + } + + /** Returns true if field noteId is set (has been assigned a value) and false otherwise */ + public boolean isSetNoteId() { + return this.noteId != null; + } + + public void setNoteIdIsSet(boolean value) { + if (!value) { + this.noteId = null; + } + } + + @org.apache.thrift.annotation.Nullable + public java.lang.String getParagraphId() { + return this.paragraphId; + } + + public checkpointOutput_args setParagraphId(@org.apache.thrift.annotation.Nullable java.lang.String paragraphId) { + this.paragraphId = paragraphId; + return this; + } + + public void unsetParagraphId() { + this.paragraphId = null; + } + + /** Returns true if field paragraphId is set (has been assigned a value) and false otherwise */ + public boolean isSetParagraphId() { + return this.paragraphId != null; + } + + public void setParagraphIdIsSet(boolean value) { + if (!value) { + this.paragraphId = null; + } + } + + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { + switch (field) { + case NOTE_ID: + if (value == null) { + unsetNoteId(); + } else { + setNoteId((java.lang.String)value); + } + break; + + case PARAGRAPH_ID: + if (value == null) { + unsetParagraphId(); + } else { + setParagraphId((java.lang.String)value); + } + break; + + } + } + + @org.apache.thrift.annotation.Nullable + public java.lang.Object getFieldValue(_Fields field) { + switch (field) { + case NOTE_ID: + return getNoteId(); + + case PARAGRAPH_ID: + return getParagraphId(); + + } + throw new java.lang.IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new java.lang.IllegalArgumentException(); + } + + switch (field) { + case NOTE_ID: + return isSetNoteId(); + case PARAGRAPH_ID: + return isSetParagraphId(); + } + throw new java.lang.IllegalStateException(); + } + + @Override + public boolean equals(java.lang.Object that) { + if (that == null) + return false; + if (that instanceof checkpointOutput_args) + return this.equals((checkpointOutput_args)that); + return false; + } + + public boolean equals(checkpointOutput_args that) { + if (that == null) + return false; + if (this == that) + return true; + + boolean this_present_noteId = true && this.isSetNoteId(); + boolean that_present_noteId = true && that.isSetNoteId(); + if (this_present_noteId || that_present_noteId) { + if (!(this_present_noteId && that_present_noteId)) + return false; + if (!this.noteId.equals(that.noteId)) + return false; + } + + boolean this_present_paragraphId = true && this.isSetParagraphId(); + boolean that_present_paragraphId = true && that.isSetParagraphId(); + if (this_present_paragraphId || that_present_paragraphId) { + if (!(this_present_paragraphId && that_present_paragraphId)) + return false; + if (!this.paragraphId.equals(that.paragraphId)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + int hashCode = 1; + + hashCode = hashCode * 8191 + ((isSetNoteId()) ? 131071 : 524287); + if (isSetNoteId()) + hashCode = hashCode * 8191 + noteId.hashCode(); + + hashCode = hashCode * 8191 + ((isSetParagraphId()) ? 131071 : 524287); + if (isSetParagraphId()) + hashCode = hashCode * 8191 + paragraphId.hashCode(); + + return hashCode; + } + + @Override + public int compareTo(checkpointOutput_args other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = java.lang.Boolean.valueOf(isSetNoteId()).compareTo(other.isSetNoteId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetNoteId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.noteId, other.noteId); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = java.lang.Boolean.valueOf(isSetParagraphId()).compareTo(other.isSetParagraphId()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetParagraphId()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.paragraphId, other.paragraphId); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + @org.apache.thrift.annotation.Nullable + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + scheme(iprot).read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + scheme(oprot).write(oprot, this); + } + + @Override + public java.lang.String toString() { + java.lang.StringBuilder sb = new java.lang.StringBuilder("checkpointOutput_args("); + boolean first = true; + + sb.append("noteId:"); + if (this.noteId == null) { + sb.append("null"); + } else { + sb.append(this.noteId); + } + first = false; + if (!first) sb.append(", "); + sb.append("paragraphId:"); + if (this.paragraphId == null) { + sb.append("null"); + } else { + sb.append(this.paragraphId); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class checkpointOutput_argsStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public checkpointOutput_argsStandardScheme getScheme() { + return new checkpointOutput_argsStandardScheme(); + } + } + + private static class checkpointOutput_argsStandardScheme extends org.apache.thrift.scheme.StandardScheme { + + public void read(org.apache.thrift.protocol.TProtocol iprot, checkpointOutput_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // NOTE_ID + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.noteId = iprot.readString(); + struct.setNoteIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // PARAGRAPH_ID + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.paragraphId = iprot.readString(); + struct.setParagraphIdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, checkpointOutput_args struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.noteId != null) { + oprot.writeFieldBegin(NOTE_ID_FIELD_DESC); + oprot.writeString(struct.noteId); + oprot.writeFieldEnd(); + } + if (struct.paragraphId != null) { + oprot.writeFieldBegin(PARAGRAPH_ID_FIELD_DESC); + oprot.writeString(struct.paragraphId); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class checkpointOutput_argsTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public checkpointOutput_argsTupleScheme getScheme() { + return new checkpointOutput_argsTupleScheme(); + } + } + + private static class checkpointOutput_argsTupleScheme extends org.apache.thrift.scheme.TupleScheme { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, checkpointOutput_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet optionals = new java.util.BitSet(); + if (struct.isSetNoteId()) { + optionals.set(0); + } + if (struct.isSetParagraphId()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); + if (struct.isSetNoteId()) { + oprot.writeString(struct.noteId); + } + if (struct.isSetParagraphId()) { + oprot.writeString(struct.paragraphId); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, checkpointOutput_args struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet incoming = iprot.readBitSet(2); + if (incoming.get(0)) { + struct.noteId = iprot.readString(); + struct.setNoteIdIsSet(true); + } + if (incoming.get(1)) { + struct.paragraphId = iprot.readString(); + struct.setParagraphIdIsSet(true); + } + } + } + + private static S scheme(org.apache.thrift.protocol.TProtocol proto) { + return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); + } + } + + public static class checkpointOutput_result implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("checkpointOutput_result"); + + + private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new checkpointOutput_resultStandardSchemeFactory(); + private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new checkpointOutput_resultTupleSchemeFactory(); + + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { +; + + private static final java.util.Map byName = new java.util.HashMap(); + + static { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + @org.apache.thrift.annotation.Nullable + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new java.lang.IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + @org.apache.thrift.annotation.Nullable + public static _Fields findByName(java.lang.String name) { + return byName.get(name); + } + + private final short _thriftId; + private final java.lang.String _fieldName; + + _Fields(short thriftId, java.lang.String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public java.lang.String getFieldName() { + return _fieldName; + } + } + public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(checkpointOutput_result.class, metaDataMap); + } + + public checkpointOutput_result() { + } + + /** + * Performs a deep copy on other. + */ + public checkpointOutput_result(checkpointOutput_result other) { + } + + public checkpointOutput_result deepCopy() { + return new checkpointOutput_result(this); + } + + @Override + public void clear() { + } + + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { + switch (field) { + } + } + + @org.apache.thrift.annotation.Nullable + public java.lang.Object getFieldValue(_Fields field) { + switch (field) { + } + throw new java.lang.IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new java.lang.IllegalArgumentException(); + } + + switch (field) { + } + throw new java.lang.IllegalStateException(); + } + + @Override + public boolean equals(java.lang.Object that) { + if (that == null) + return false; + if (that instanceof checkpointOutput_result) + return this.equals((checkpointOutput_result)that); + return false; + } + + public boolean equals(checkpointOutput_result that) { + if (that == null) + return false; + if (this == that) + return true; + + return true; + } + + @Override + public int hashCode() { + int hashCode = 1; + + return hashCode; + } + + @Override + public int compareTo(checkpointOutput_result other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + return 0; + } + + @org.apache.thrift.annotation.Nullable + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + scheme(iprot).read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + scheme(oprot).write(oprot, this); + } + + @Override + public java.lang.String toString() { + java.lang.StringBuilder sb = new java.lang.StringBuilder("checkpointOutput_result("); + boolean first = true; + + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, java.lang.ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class checkpointOutput_resultStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public checkpointOutput_resultStandardScheme getScheme() { + return new checkpointOutput_resultStandardScheme(); + } + } + + private static class checkpointOutput_resultStandardScheme extends org.apache.thrift.scheme.StandardScheme { + + public void read(org.apache.thrift.protocol.TProtocol iprot, checkpointOutput_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, checkpointOutput_result struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class checkpointOutput_resultTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public checkpointOutput_resultTupleScheme getScheme() { + return new checkpointOutput_resultTupleScheme(); + } + } + + private static class checkpointOutput_resultTupleScheme extends org.apache.thrift.scheme.TupleScheme { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, checkpointOutput_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, checkpointOutput_result struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + } + } + + private static S scheme(org.apache.thrift.protocol.TProtocol proto) { + return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); + } + } + public static class runParagraphs_args implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("runParagraphs_args"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java index e397607565e..06be47a1ed1 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { NO_OP(1), ANGULAR_OBJECT_ADD(2), diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java index 4f578eec968..2817903c9a8 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResult.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class RemoteInterpreterResult implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResult"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java index 8cbc0546f54..e0ec93678d3 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterResultMessage.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class RemoteInterpreterResultMessage implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteInterpreterResultMessage"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java index f7d3cb3e831..68dc727703d 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterService.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class RemoteInterpreterService { public interface Iface { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java index 8eba9c9af06..72cae21620b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RunParagraphsEvent.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class RunParagraphsEvent implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RunParagraphsEvent"); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ServiceException.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ServiceException.java index 039043d02d1..48861827f27 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ServiceException.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ServiceException.java @@ -24,7 +24,7 @@ package org.apache.zeppelin.interpreter.thrift; @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"}) -@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-06") +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2020-01-07") public class ServiceException extends org.apache.thrift.TException implements org.apache.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ServiceException"); diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterEventService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterEventService.thrift index 6470a67d09d..339f3e5ba41 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterEventService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterEventService.thrift @@ -107,6 +107,8 @@ service RemoteInterpreterEventService { void updateAppOutput(1: AppOutputUpdateEvent event); void updateAppStatus(1: AppStatusUpdateEvent event); + void checkpointOutput(1: string noteId, 2: string paragraphId); + void runParagraphs(1: RunParagraphsEvent event); void addAngularObject(1: string intpGroupId, 2: string json); diff --git a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java index 54112d47c43..deb4afa70e3 100644 --- a/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java +++ b/zeppelin-jupyter-interpreter/src/main/java/org/apache/zeppelin/jupyter/JupyterKernelClient.java @@ -119,6 +119,8 @@ private boolean checkForShinyApp(String response) throws IOException { height + "\" width=\"" + width + "\" frameBorder=\"0\">"); context.out.flush(); context.out.write("\n%text "); + context.getIntpEventClient().checkpointOutput(context.getNoteId(), + context.getParagraphId()); return true; } } diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 831c76e7eb3..291d2c240bb 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -1606,6 +1606,8 @@ public void onOutputUpdated(String noteId, String paragraphId, int index, LOG.warn("Note " + noteId + " note found"); return; } + Paragraph paragraph = note.getParagraph(paragraphId); + paragraph.updateOutputBuffer(index, type, output); if (note.isPersonalizedMode()) { String user = note.getParagraph(paragraphId).getUser(); if (null != user) { @@ -1886,6 +1888,16 @@ public void onOutputUpdateAll(Paragraph paragraph, List metaInfos); List getParagraphList(String user, String noteId) throws TException, IOException; + + void checkpointOutput(String noteId, String paragraphId); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index a9388a65151..dbecbf21a1c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -641,6 +641,7 @@ public Paragraph removeParagraph(String user, String paragraphId) { public void clearParagraphOutputFields(Paragraph p) { p.setReturn(null, null); p.cleanRuntimeInfos(); + p.cleanOutputBuffer(); } public Paragraph clearPersonalizedParagraphOutput(String paragraphId, String user) { @@ -1094,6 +1095,7 @@ public static Note fromJson(String json) throws IOException { public void postProcessParagraphs() { for (Paragraph p : paragraphs) { p.cleanRuntimeInfos(); + p.cleanOutputBuffer(); p.parseText(); if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 5405d9e9c67..e97ea3c4e23 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -107,6 +107,7 @@ public class Paragraph extends JobWithProgressPoller implemen private transient Map localProperties = new HashMap<>(); // serialize runtimeInfos to frontend but not to note file (via gson's ExclusionStrategy) private Map runtimeInfos = new HashMap<>(); + private transient List outputBuffer = new ArrayList<>(); public static String PARAGRAPH_CONFIG_RUNONSELECTIONCHANGE = "runOnSelectionChange"; private static boolean PARAGRAPH_CONFIG_RUNONSELECTIONCHANGE_DEFAULT = true; @@ -613,6 +614,7 @@ public void onUpdateAll(InterpreterOutput out) { paragraphJobListener.onOutputUpdateAll(self, messages); } updateParagraphResult(messages); + outputBuffer.clear(); } catch (IOException e) { LOGGER.error(e.getMessage(), e); } @@ -816,6 +818,22 @@ public void cleanRuntimeInfos() { this.runtimeInfos.clear(); } + public void cleanOutputBuffer() { + this.outputBuffer.clear(); + } + + /** + * Save the buffered output to InterpreterResults. So that open another tab or refresh + * note you can see the latest checkpoint's output. + */ + public void checkpointOutput() { + LOGGER.info("Checkpoint Paragraph output for paragraph: " + getId()); + this.results = new InterpreterResult(Code.SUCCESS); + for (InterpreterResultMessage buffer : outputBuffer) { + results.add(buffer); + } + } + private GUI getNoteGui() { GUI gui = new GUI(); gui.setParams(this.note.getNoteParams()); @@ -884,4 +902,15 @@ public static Paragraph fromJson(String json) { return Note.getGson().fromJson(json, Paragraph.class); } + public void updateOutputBuffer(int index, InterpreterResult.Type type, String output) { + InterpreterResultMessage interpreterResultMessage = new InterpreterResultMessage(type, output);; + if (outputBuffer.size() == index) { + outputBuffer.add(interpreterResultMessage); + } else if (outputBuffer.size() > index) { + outputBuffer.set(index, interpreterResultMessage); + } else { + LOGGER.warn("Get output of index: " + index + ", but there's only " + + outputBuffer.size() + " output in outputBuffer"); + } + } } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java index 5a3e2258507..d232c259be8 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java @@ -148,4 +148,8 @@ public List getParagraphList(String user, String noteId) { return null; } + @Override + public void checkpointOutput(String noteId, String paragraphId) { + + } } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index acba1c85823..c768e543131 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -294,4 +294,8 @@ public List getParagraphList(String user, String noteId) { return null; } + @Override + public void checkpointOutput(String noteId, String paragraphId) { + + } }