Skip to content

Commit

Permalink
Fix issue of union result parser in the new compiler of GIE (#175)
Browse files Browse the repository at this point in the history
  • Loading branch information
shirly121 authored Mar 3, 2021
1 parent be74042 commit d709119
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,11 @@ public static void writeResultList(final Context context, List<Object> resultLis
final boolean useBinary = ctx.channel().attr(StateKey.USE_BINARY).get();

if (statusCode == ResponseStatusCode.SERVER_ERROR) {
ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR).create());
}
if (resultList.isEmpty()) {
ctx.writeAndFlush(ResponseMessage.build(msg).code(ResponseStatusCode.NO_CONTENT).create());
ResponseMessage.Builder builder = ResponseMessage.build(msg).code(ResponseStatusCode.SERVER_ERROR);
if (resultList.size() > 0) {
builder.statusMessage((String) resultList.get(0));
}
ctx.writeAndFlush(builder.create());
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class GremlinResultProcessor implements ResultProcessor {
private static Logger logger = LoggerFactory.getLogger(GremlinResultProcessor.class);
private Context writeResult;
private boolean hasResult = false;
private List<Object> resultCollectors = new ArrayList<>();
private boolean locked = false;

public GremlinResultProcessor(Context writeResult) {
this.writeResult = writeResult;
Expand All @@ -40,41 +43,38 @@ public GremlinResultProcessor(Context writeResult) {
public void process(PegasusClient.JobResponse response) {
synchronized (this) {
try {
logger.info("start to process response {}", GremlinResult.Result.parseFrom(response.getData()));
if (!hasResult) {
if (response.getResultCase() == PegasusClient.JobResponse.ResultCase.ERR) {
MaxGraphOpProcessor.writeResultList(writeResult, Collections.EMPTY_LIST, ResponseStatusCode.SERVER_ERROR);
} else {
MaxGraphOpProcessor.writeResultList(writeResult, ResultParser.parseFrom(response), ResponseStatusCode.SUCCESS);
if (!locked) {
logger.info("start to process response {}", GremlinResult.Result.parseFrom(response.getData()));
if (response.getResultCase() == PegasusClient.JobResponse.ResultCase.DATA) {
resultCollectors.addAll(ResultParser.parseFrom(response));
}
}
} catch (Exception e) {
logger.error("exception is {}", e);
MaxGraphOpProcessor.writeResultList(writeResult, Collections.EMPTY_LIST, ResponseStatusCode.SERVER_ERROR);
} finally {
hasResult = true;
MaxGraphOpProcessor.writeResultList(writeResult, Collections.singletonList(e.getMessage()), ResponseStatusCode.SERVER_ERROR);
// cannot write to this context any more
locked = true;
}
}
}

@Override
public void finish() {
synchronized (this) {
if (!hasResult) {
if (!locked) {
logger.info("start to process finish");
MaxGraphOpProcessor.writeResultList(writeResult, Collections.EMPTY_LIST, ResponseStatusCode.SUCCESS);
hasResult = true;
MaxGraphOpProcessor.writeResultList(writeResult, resultCollectors, ResponseStatusCode.SUCCESS);
locked = true;
}
}
}

@Override
public void error(Status status) {
synchronized (this) {
if (!hasResult) {
if (!locked) {
logger.info("start to process error");
MaxGraphOpProcessor.writeResultList(writeResult, Collections.EMPTY_LIST, ResponseStatusCode.SERVER_ERROR);
hasResult = true;
MaxGraphOpProcessor.writeResultList(writeResult, Collections.singletonList(status.toString()), ResponseStatusCode.SERVER_ERROR);
locked = true;
}
}
}
Expand Down

0 comments on commit d709119

Please sign in to comment.