Skip to content

Commit

Permalink
Fix gRPC DevUI testing console
Browse files Browse the repository at this point in the history
  • Loading branch information
xstefank committed Sep 17, 2024
1 parent 2b814d3 commit a6768b6
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,20 +233,22 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) {

_renderCommandButtons(service, method){
if(this._streamsMap.size >=0){
if(method.type == 'UNARY'){
return html`<vaadin-button theme="secondary error" @mousedown=${() => this._clear(service.name, method)} @mouseup=${() => this._default(service.name, method)}>Reset</vaadin-button>
if(method.type == 'UNARY' || method.type == 'SERVER_STREAMING'){
return html`<vaadin-button theme="secondary error" @click=${() => this._default(service.name, method)}>Reset</vaadin-button>
<vaadin-button theme="secondary success" @click=${() => this._test(service, method)}>Send</vaadin-button>`;
}else if(this._isRunning(service.name, method)){
return html`<div class="streamButtons"><vaadin-button theme="secondary error" @click=${() => this._test(service, method)}>Cancel stream</vaadin-button>
<vaadin-progress-bar class="progress-short" indeterminate></vaadin-progress-bar></div>`;
return html`<vaadin-button theme="secondary error" @click=${() => this._default(service.name, method)}>Reset</vaadin-button>
<vaadin-button theme="secondary success" @click=${() => this._test(service, method)}>Send</vaadin-button>
<vaadin-button theme="secondary error" @click=${() => this._disconnect(service, method)}>Disconnect</vaadin-button>
<vaadin-progress-bar class="progress-short" indeterminate></vaadin-progress-bar>`;
}else {
return html`<vaadin-button theme="secondary success" @click=${() => this._test(service, method)}>Start stream</vaadin-button>`;
return html`<vaadin-button theme="secondary success" @click=${() => this._test(service, method)}>Send</vaadin-button>`;
}
}
}

_keypress(e, service, method){
if(method.type == 'UNARY' || !this._isRunning(service.name, method)){
if(method.type == 'UNARY' || method.type == 'SERVER_STREAMING' || !this._isRunning(service.name, method)){
if ((e.keyCode == 10 || e.keyCode == 13) && e.ctrlKey){ // ctlr-enter
this._test(service, method);
}
Expand All @@ -268,46 +270,75 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) {
}

_default(serviceName, method){
let requestTextArea = this._requestTextArea(serviceName, method);
requestTextArea.content = '';
let pv = JSON.parse(method.prototype);
this._requestTextArea(serviceName, method).populatePrettyJson(JSON.stringify(pv));
let prettyJson = JSON.stringify(pv, null, 2);
requestTextArea.populatePrettyJson(prettyJson);
}

_test(service, method){
let textArea = this._requestTextArea(service.name, method);
let content = textArea.getAttribute('value');
let requestTextArea = this._requestTextArea(service.name, method);
let content = requestTextArea.getAttribute('value');
let id = this._id(service.name, method);
let responseTextArea = this._responseTextArea(service.name, method);
if(method.type == 'UNARY'){
this.jsonRpc.testService({
id: id,
serviceName: service.name,
methodName: method.bareMethodName,
methodType: method.type,
content: content
}).then(jsonRpcResponse => {
const jsonObject = JSON.parse(jsonRpcResponse.result);
const prettyJson = JSON.stringify(jsonObject, null, 2);
this._responseTextArea(service.name, method).populatePrettyJson(prettyJson);
this._responseTextArea(service.name, method).populatePrettyJson(this._prettyJson(jsonRpcResponse.result));
});
}else{
let id = this._id(service.name, method);
if(this._isRunning(service.name, method)){
this._streamsMap.get(id).cancel();
this._streamsMap.delete(id);
this._clear(service.name, method);
this._default(service.name, method);
this.jsonRpc.streamService({
id: id,
serviceName: service.name,
methodName: method.bareMethodName,
isRunning: true,
content: content
});
// this._streamsMap.get(id).cancel();
// this._streamsMap.delete(id);
// this._clear(service.name, method);
// this._default(service.name, method);
}else{
// starting a new stream, clear the response area
responseTextArea.content = null;
let cancelable = this.jsonRpc.streamService({
id: id,
serviceName: service.name,
methodName: method.bareMethodName,
methodType: method.type,
isRunning: false,
content: content
}).onNext(jsonRpcResponse => {
this._responseTextArea(service.name, method).populatePrettyJson(jsonRpcResponse.result);
if (responseTextArea.content == null) {
responseTextArea.populatePrettyJson(this._prettyJson(jsonRpcResponse.result));
} else {
responseTextArea.populatePrettyJson(responseTextArea.content + '\n' + this._prettyJson(jsonRpcResponse.result));
}
});
this._streamsMap.set(id, cancelable);
if (method.type == 'BIDI_STREAMING' || method.type == 'CLIENT_STREAMING') {
this._streamsMap.set(id, cancelable);
}
}
this._testerButtons = this._renderCommandButtons(service, method);
this._forceUpdate();
}
}

_disconnect(service, method){
let id = this._id(service.name, method);
this.jsonRpc.disconnectService({
id: id,
});
this._streamsMap.delete(id);
this._testerButtons = this._renderCommandButtons(service, method);
this._forceUpdate();
}

_forceUpdate(){
if(this._detailsOpenedItem.length > 0){
Expand All @@ -332,5 +363,9 @@ export class QwcGrpcServices extends observeState(QwcHotReloadElement) {
_responseId(serviceName, method){
return serviceName + '/' + method.bareMethodName + '_response';
}

_prettyJson(content){
return JSON.stringify(JSON.parse(content), null, 2);
}
}
customElements.define('qwc-grpc-services', QwcGrpcServices);
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.protobuf.util.JsonFormat;

import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.ServiceDescriptor;
import io.grpc.netty.NettyChannelBuilder;
Expand Down Expand Up @@ -45,6 +46,7 @@ public class GrpcJsonRPCService {
private static final Logger LOG = Logger.getLogger(GrpcJsonRPCService.class);

private Map<String, GrpcServiceClassInfo> grpcServiceClassInfos;
private Map<String, StreamObserver<Message>> callsInProgress;

@Inject
HttpConfiguration httpConfiguration;
Expand Down Expand Up @@ -72,6 +74,7 @@ public void init() {
this.ssl = isTLSConfigured(httpConfiguration.ssl.certificate);
}
this.grpcServiceClassInfos = getGrpcServiceClassInfos();
this.callsInProgress = new HashMap<>();
}

private boolean isTLSConfigured(CertificateConfig certificate) {
Expand Down Expand Up @@ -107,25 +110,27 @@ public JsonArray getServices() {
return services;
}

public Uni<String> testService(String serviceName, String methodName, String methodType, String content) {
public Uni<String> testService(String id, String serviceName, String methodName, String content) {
try {
return streamService(serviceName, methodName, methodType, content).toUni();
return streamService(id, serviceName, methodName, false, content).toUni();
} catch (Throwable t) {
return Uni.createFrom().item(error(t.getMessage()).encodePrettily());
}
}

public Multi<String> streamService(String serviceName, String methodName, String methodType, String content)
public Multi<String> streamService(String id, String serviceName, String methodName, boolean isRunning,
String content)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InvalidProtocolBufferException {
if (content == null) {
return Multi.createFrom().item(error("Invalid messsge").encodePrettily());
return Multi.createFrom().item(error("Invalid message").encodePrettily());
}

BroadcastProcessor<String> streamEvent = BroadcastProcessor.create();

GrpcServiceClassInfo info = this.grpcServiceClassInfos.get(serviceName);

Object grpcStub = createStub(info.grpcServiceClass, host, port);
ManagedChannel channel = getChannel(host, port);
Object grpcStub = createStub(info.grpcServiceClass, channel);

ServiceDescriptor serviceDescriptor = info.serviceDescriptor;

Expand All @@ -134,20 +139,50 @@ public Multi<String> streamService(String serviceName, String methodName, String
MethodDescriptor.PrototypeMarshaller<?> protoMarshaller = (MethodDescriptor.PrototypeMarshaller<?>) requestMarshaller;
Class<?> requestType = protoMarshaller.getMessagePrototype().getClass();

Message message = createMessage(content, requestType);

if (isRunning) {
// we are already connected with this gRPC endpoint, just send the message
callsInProgress.get(id).onNext(message);
} else {
// Invoke the stub method and format the response as JSON
StreamObserver<?> responseObserver = new TestObserver<>(streamEvent);
StreamObserver<Message> incomingStream;

final Method stubMethod = getStubMethod(grpcStub, methodDescriptor.getBareMethodName());

if (stubMethod.getParameterCount() == 1 && stubMethod.getReturnType() == StreamObserver.class) {
// returned StreamObserver consumes incoming messages
//noinspection unchecked
incomingStream = (StreamObserver<Message>) stubMethod.invoke(grpcStub, responseObserver);
callsInProgress.put(id, incomingStream);
// will be streamed continuously
incomingStream.onNext(message);
} else {
// incoming message should be passed as the first parameter of the invocation
stubMethod.invoke(grpcStub, message, responseObserver);
}
}

channel.shutdown();
return streamEvent;
}

private static Message createMessage(String content, Class<?> requestType)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, InvalidProtocolBufferException {
// Create a new builder for the request message, e.g. HelloRequest.newBuilder()
Method newBuilderMethod = requestType.getDeclaredMethod("newBuilder");
Message.Builder builder = (Message.Builder) newBuilderMethod.invoke(null);

// Use the test data to build the request object
JsonFormat.parser().merge(content, builder);
Message message = builder.build();

StreamObserver<?> responseObserver = new TestObserver<Object>(streamEvent);

final Method stubMethod = getStubMethod(grpcStub, methodDescriptor.getBareMethodName());
stubMethod.invoke(grpcStub, message, responseObserver);
return builder.build();
}

return streamEvent;
public Uni<Void> disconnectService(String id) {
callsInProgress.get(id).onCompleted();
callsInProgress.remove(id);
return Uni.createFrom().voidItem();
}

private Map<String, GrpcJsonRPCService.GrpcServiceClassInfo> getGrpcServiceClassInfos() {
Expand Down Expand Up @@ -220,17 +255,17 @@ private MethodDescriptor getMethodDescriptor(ServiceDescriptor serviceDescriptor
return null;
}

private Object createStub(Class<?> grpcServiceClass, String host, int port) {
private Object createStub(Class<?> grpcServiceClass, Channel channel) {
try {
Method stubFactoryMethod = grpcServiceClass.getDeclaredMethod("newStub", Channel.class);
return stubFactoryMethod.invoke(null, getChannel(host, port));
return stubFactoryMethod.invoke(null, channel);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
LOG.warnf("Could not create stub for %s - " + e.getMessage(), grpcServiceClass);
return null;
}
}

private Channel getChannel(String host, int port) {
private ManagedChannel getChannel(String host, int port) {
return NettyChannelBuilder
.forAddress(host, port)
.usePlaintext()
Expand Down

0 comments on commit a6768b6

Please sign in to comment.