Skip to content

Commit

Permalink
[Bug Fix] Fix bugs of time counter in gremlin service (#2906)
Browse files Browse the repository at this point in the history
<!--
Thanks for your contribution! please review
https://github.com/alibaba/GraphScope/blob/main/CONTRIBUTING.md before
opening an issue.
-->

## What do these changes do?
as titled

<!-- Please give a short brief about these changes. -->

## Related issue number


<!-- Are there any issues opened that will be resolved by merging this
change? -->

Fixes #2905

---------

Co-authored-by: Longbin Lai <[email protected]>
  • Loading branch information
shirly121 and longbinlai authored Jul 7, 2023
1 parent 6ade1d5 commit 7330954
Show file tree
Hide file tree
Showing 12 changed files with 253 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.alibaba.graphscope.common.store.IrMeta;
import com.alibaba.graphscope.gremlin.integration.result.GraphProperties;
import com.alibaba.graphscope.gremlin.integration.result.GremlinTestResultProcessor;
import com.alibaba.graphscope.gremlin.plugin.QueryStatusCallback;
import com.alibaba.graphscope.gremlin.plugin.processor.IrStandardOpProcessor;
import com.alibaba.graphscope.gremlin.plugin.script.AntlrGremlinScriptEngine;

Expand Down Expand Up @@ -93,17 +94,21 @@ public ThrowingConsumer<Context> select(Context ctx) {
Traversal traversal =
(Traversal) scriptEngine.eval(script, this.context);
applyStrategies(traversal);

long jobId = JOB_ID_COUNTER.incrementAndGet();
IrMeta irMeta = metaQueryCallback.beforeExec();
QueryStatusCallback statusCallback =
createQueryStatusCallback(script, jobId);
processTraversal(
traversal,
new GremlinTestResultProcessor(
ctx, traversal, testGraph, this.configs),
jobId,
script,
ctx,
traversal,
statusCallback,
testGraph,
this.configs),
irMeta,
new QueryTimeoutConfig(ctx.getRequestTimeout()));
new QueryTimeoutConfig(ctx.getRequestTimeout()),
statusCallback.getQueryLogger());
metaQueryCallback.afterExec(irMeta);
});
return op;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.graphscope.gremlin.integration.result;

import com.alibaba.graphscope.common.config.Configs;
import com.alibaba.graphscope.gremlin.plugin.QueryStatusCallback;
import com.alibaba.graphscope.gremlin.result.processor.GremlinResultProcessor;
import com.google.common.collect.ImmutableMap;

Expand All @@ -43,8 +44,12 @@ public class GremlinTestResultProcessor extends GremlinResultProcessor {
private static String EDGE_PROPERTIES = "edge_properties";

public GremlinTestResultProcessor(
Context writeResult, Traversal traversal, GraphProperties testGraph, Configs configs) {
super(writeResult, traversal);
Context writeResult,
Traversal traversal,
QueryStatusCallback statusCallback,
GraphProperties testGraph,
Configs configs) {
super(writeResult, traversal, statusCallback);
this.cachedProperties = testGraph.getProperties(configs);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.gremlin.plugin;

import com.alibaba.graphscope.gremlin.Utils;
import com.codahale.metrics.Timer;

import java.util.concurrent.TimeUnit;

// collect metrics per gremlin query
public class MetricsCollector {
private final Timer.Context timeContext;
private long startMillis;
private long elapsedMillis;

public MetricsCollector(Timer timer) {
this.timeContext = timer.time();
}

public long getStartMillis() {
return this.startMillis;
}

public long getElapsedMillis() {
return this.elapsedMillis;
}

public void stop() {
this.startMillis =
TimeUnit.NANOSECONDS.toMillis(
Utils.getFieldValue(Timer.Context.class, timeContext, "startTime"));
this.elapsedMillis = TimeUnit.NANOSECONDS.toMillis(timeContext.stop());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.gremlin.plugin;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueryLogger {
private static final Logger defaultLogger = LoggerFactory.getLogger(QueryLogger.class);
private static Logger metricLogger = LoggerFactory.getLogger("MetricLog");

private final String query;
private final long queryId;

public QueryLogger(String query, long queryId) {
this.query = query;
this.queryId = queryId;
}

public void info(String format, Object... args) {
defaultLogger.info(this + " : " + format, args);
}

public void warn(String format, Object... args) {
defaultLogger.warn(this + " : " + format, args);
}

public void error(String format, Object... args) {
defaultLogger.error(this + " : " + format, args);
}

public void metricsInfo(String format, Object... args) {
metricLogger.info(queryId + " | " + query + " | " + format, args);
}

@Override
public String toString() {
return "[" + "query='" + query + '\'' + ", queryId=" + queryId + ']';
}

public String getQuery() {
return query;
}

public long getQueryId() {
return queryId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.graphscope.gremlin.plugin;

public class QueryStatusCallback {
private final MetricsCollector metricsCollector;
private final QueryLogger queryLogger;

public QueryStatusCallback(MetricsCollector metricsCollector, QueryLogger queryLogger) {
this.metricsCollector = metricsCollector;
this.queryLogger = queryLogger;
}

public void onStart() {}

public void onEnd(boolean isSucceed) {
this.metricsCollector.stop();
queryLogger.info("total execution time is {} ms", metricsCollector.getElapsedMillis());
queryLogger.metricsInfo(
"{} | {} | {}",
isSucceed,
metricsCollector.getElapsedMillis(),
metricsCollector.getStartMillis());
}

public QueryLogger getQueryLogger() {
return queryLogger;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
import com.alibaba.graphscope.common.store.IrMeta;
import com.alibaba.graphscope.gremlin.InterOpCollectionBuilder;
import com.alibaba.graphscope.gremlin.Utils;
import com.alibaba.graphscope.gremlin.plugin.MetricsCollector;
import com.alibaba.graphscope.gremlin.plugin.QueryLogger;
import com.alibaba.graphscope.gremlin.plugin.QueryStatusCallback;
import com.alibaba.graphscope.gremlin.plugin.script.AntlrGremlinScriptEngineFactory;
import com.alibaba.graphscope.gremlin.plugin.strategy.ExpandFusionStepStrategy;
import com.alibaba.graphscope.gremlin.plugin.strategy.RemoveUselessStepStrategy;
Expand Down Expand Up @@ -65,8 +68,6 @@
import org.apache.tinkerpop.gremlin.server.op.standard.StandardOpProcessor;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.codehaus.groovy.control.MultipleCompilationErrorsException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
Expand All @@ -81,9 +82,6 @@
import javax.script.SimpleBindings;

public class IrStandardOpProcessor extends StandardOpProcessor {
private static Logger metricLogger = LoggerFactory.getLogger("MetricLog");
private static Logger logger = LoggerFactory.getLogger(IrStandardOpProcessor.class);

protected static final AtomicLong JOB_ID_COUNTER = new AtomicLong(0L);
protected Graph graph;
protected GraphTraversalSource g;
Expand Down Expand Up @@ -116,8 +114,6 @@ protected void evalOpInternal(
final Context ctx,
final Supplier<GremlinExecutor> gremlinExecutorSupplier,
final AbstractEvalOpProcessor.BindingSupplier bindingsSupplier) {
long startTime = System.currentTimeMillis();
com.codahale.metrics.Timer.Context timerContext = evalOpTimer.time();
RequestMessage msg = ctx.getRequestMessage();
GremlinExecutor gremlinExecutor = gremlinExecutorSupplier.get();
Map<String, Object> args = msg.getArgs();
Expand All @@ -127,29 +123,18 @@ protected void evalOpInternal(

long jobId = JOB_ID_COUNTER.incrementAndGet();
IrMeta irMeta = metaQueryCallback.beforeExec();
QueryStatusCallback statusCallback = createQueryStatusCallback(script, jobId);
GremlinExecutor.LifeCycle lifeCycle =
createLifeCycle(
ctx, gremlinExecutorSupplier, bindingsSupplier, jobId, script, irMeta);
ctx, gremlinExecutorSupplier, bindingsSupplier, irMeta, statusCallback);
try {
CompletableFuture<Object> evalFuture =
gremlinExecutor.eval(script, language, new SimpleBindings(), lifeCycle);
evalFuture.handle(
(v, t) -> {
metaQueryCallback.afterExec(irMeta);
long elapsed = timerContext.stop();
logger.info(
"query \"{}\" total execution time is {} ms",
script,
elapsed / 1000000.0f);
boolean isSuccess = (t == null);
metricLogger.info(
"{} | {} | {} | {} | {}",
jobId,
script,
isSuccess,
elapsed / 1000000.0f,
startTime);
if (t != null) {
statusCallback.onEnd(false);
if (v instanceof AbstractResultProcessor) {
((AbstractResultProcessor) v).cancel();
}
Expand Down Expand Up @@ -180,7 +165,7 @@ protected void evalOpInternal(
+ " increasing the limit given to"
+ " TimedInterruptCustomizerProvider",
msg);
logger.warn(errorMessage);
statusCallback.getQueryLogger().warn(errorMessage);
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
Expand All @@ -196,7 +181,7 @@ protected void evalOpInternal(
"Script evaluation exceeded the configured"
+ " threshold for request [%s]",
msg);
logger.warn(errorMessage, t);
statusCallback.getQueryLogger().warn(errorMessage, t);
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(ResponseStatusCode.SERVER_ERROR_TIMEOUT)
Expand All @@ -216,7 +201,7 @@ protected void evalOpInternal(
+ " allowed by the JVM, please split it"
+ " into multiple smaller statements - %s",
msg);
logger.warn(errorMessage);
statusCallback.getQueryLogger().warn(errorMessage);
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(
Expand All @@ -228,12 +213,14 @@ protected void evalOpInternal(
} else {
errorMessage =
t.getMessage() == null ? t.toString() : t.getMessage();
logger.warn(
String.format(
"Exception processing a script on request"
+ " [%s].",
msg),
t);
statusCallback
.getQueryLogger()
.warn(
String.format(
"Exception processing a script on"
+ " request [%s].",
msg),
t);
ctx.writeAndFlush(
ResponseMessage.build(msg)
.code(
Expand All @@ -256,13 +243,17 @@ protected void evalOpInternal(
}
}

protected QueryStatusCallback createQueryStatusCallback(String query, long queryId) {
return new QueryStatusCallback(
new MetricsCollector(evalOpTimer), new QueryLogger(query, queryId));
}

protected GremlinExecutor.LifeCycle createLifeCycle(
Context ctx,
Supplier<GremlinExecutor> gremlinExecutorSupplier,
BindingSupplier bindingsSupplier,
long jobId,
String script,
IrMeta irMeta) {
IrMeta irMeta,
QueryStatusCallback statusCallback) {
QueryTimeoutConfig timeoutConfig = new QueryTimeoutConfig(ctx.getRequestTimeout());
return GremlinExecutor.LifeCycle.build()
.evaluationTimeoutOverride(timeoutConfig.getExecutionTimeoutMS())
Expand Down Expand Up @@ -290,11 +281,11 @@ protected GremlinExecutor.LifeCycle createLifeCycle(
Traversal traversal = (Traversal) o;
processTraversal(
traversal,
new GremlinResultProcessor(ctx, traversal),
jobId,
script,
new GremlinResultProcessor(
ctx, traversal, statusCallback),
irMeta,
timeoutConfig);
timeoutConfig,
statusCallback.getQueryLogger());
}
} catch (Exception e) {
throw new RuntimeException(e);
Expand All @@ -307,25 +298,21 @@ protected GremlinExecutor.LifeCycle createLifeCycle(
protected void processTraversal(
Traversal traversal,
ResultProcessor resultProcessor,
long jobId,
String script,
IrMeta irMeta,
QueryTimeoutConfig timeoutConfig)
QueryTimeoutConfig timeoutConfig,
QueryLogger queryLogger)
throws InvalidProtocolBufferException, IOException, RuntimeException {
InterOpCollection opCollection = (new InterOpCollectionBuilder(traversal)).build();
// fuse order with limit to topK
InterOpCollection.applyStrategies(opCollection);
// add sink operator
InterOpCollection.process(opCollection);

long jobId = queryLogger.getQueryId();
String jobName = "ir_plan_" + jobId;
IrPlan irPlan = new IrPlan(irMeta, opCollection);
// print script and jobName with ir plan
logger.info(
"gremlin query \"{}\", job conf name \"{}\", ir plan {}",
script,
jobName,
irPlan.getPlanAsJson());
queryLogger.info("ir plan {}", irPlan.getPlanAsJson());
byte[] physicalPlanBytes = irPlan.toPhysicalBytes(configs);
irPlan.close();

Expand Down
Empty file.
Empty file.
Loading

0 comments on commit 7330954

Please sign in to comment.