Skip to content

Commit 8d694f9

Browse files
jonathan-buttnerprwhelan
authored andcommitted
[8.x] [ML] Inference duration and error metrics (elastic#115876) (elastic#118700)
* [ML] Inference duration and error metrics (elastic#115876) Add `es.inference.requests.time` metric around `infer` API. As recommended by OTel spec, errors are determined by the presence or absence of the `error.type` attribute in the metric. "error.type" will be the http status code (as a string) if it is available, otherwise it will be the name of the exception (e.g. NullPointerException). Additional notes: - ApmInferenceStats is merged into InferenceStats. Originally we planned to have multiple implementations, but now we're only using APM. - Request count is now always recorded, even when there are failures loading the endpoint configuration. - Added a hook in streaming for cancel messages, so we can close the metrics when a user cancels the stream. (cherry picked from commit 26870ef) * fixing switch with class issue --------- Co-authored-by: Pat Whelan <[email protected]>
1 parent 14b68dd commit 8d694f9

File tree

10 files changed

+370
-172
lines changed

10 files changed

+370
-172
lines changed

docs/changelog/115876.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 115876
2+
summary: Inference duration and error metrics
3+
area: Machine Learning
4+
type: enhancement
5+
issues: []

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@
108108
import org.elasticsearch.xpack.inference.services.ibmwatsonx.IbmWatsonxService;
109109
import org.elasticsearch.xpack.inference.services.mistral.MistralService;
110110
import org.elasticsearch.xpack.inference.services.openai.OpenAiService;
111-
import org.elasticsearch.xpack.inference.telemetry.ApmInferenceStats;
112111
import org.elasticsearch.xpack.inference.telemetry.InferenceStats;
113112

114113
import java.util.ArrayList;
@@ -256,7 +255,7 @@ public Collection<?> createComponents(PluginServices services) {
256255
shardBulkInferenceActionFilter.set(actionFilter);
257256

258257
var meterRegistry = services.telemetryProvider().getMeterRegistry();
259-
var stats = new PluginComponentBinding<>(InferenceStats.class, ApmInferenceStats.create(meterRegistry));
258+
var stats = new PluginComponentBinding<>(InferenceStats.class, InferenceStats.create(meterRegistry));
260259

261260
return List.of(modelRegistry, registry, httpClientManager, stats);
262261
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/common/DelegatingProcessor.java

+1
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ public void request(long n) {
9595
public void cancel() {
9696
if (isClosed.compareAndSet(false, true) && upstream != null) {
9797
upstream.cancel();
98+
onCancel();
9899
}
99100
}
100101
};

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/ApmInferenceStats.java

-49
This file was deleted.

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/telemetry/InferenceStats.java

+81-7
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,89 @@
77

88
package org.elasticsearch.xpack.inference.telemetry;
99

10+
import org.elasticsearch.ElasticsearchStatusException;
11+
import org.elasticsearch.core.Nullable;
1012
import org.elasticsearch.inference.Model;
13+
import org.elasticsearch.inference.UnparsedModel;
14+
import org.elasticsearch.telemetry.metric.LongCounter;
15+
import org.elasticsearch.telemetry.metric.LongHistogram;
16+
import org.elasticsearch.telemetry.metric.MeterRegistry;
1117

12-
public interface InferenceStats {
18+
import java.util.Map;
19+
import java.util.Objects;
20+
import java.util.stream.Collectors;
21+
import java.util.stream.Stream;
1322

14-
/**
15-
* Increment the counter for a particular value in a thread safe manner.
16-
* @param model the model to increment request count for
17-
*/
18-
void incrementRequestCount(Model model);
23+
import static java.util.Map.entry;
24+
import static java.util.stream.Stream.concat;
1925

20-
InferenceStats NOOP = model -> {};
26+
public record InferenceStats(LongCounter requestCount, LongHistogram inferenceDuration) {
27+
28+
public InferenceStats {
29+
Objects.requireNonNull(requestCount);
30+
Objects.requireNonNull(inferenceDuration);
31+
}
32+
33+
public static InferenceStats create(MeterRegistry meterRegistry) {
34+
return new InferenceStats(
35+
meterRegistry.registerLongCounter(
36+
"es.inference.requests.count.total",
37+
"Inference API request counts for a particular service, task type, model ID",
38+
"operations"
39+
),
40+
meterRegistry.registerLongHistogram(
41+
"es.inference.requests.time",
42+
"Inference API request counts for a particular service, task type, model ID",
43+
"ms"
44+
)
45+
);
46+
}
47+
48+
public static Map<String, Object> modelAttributes(Model model) {
49+
return toMap(modelAttributeEntries(model));
50+
}
51+
52+
private static Stream<Map.Entry<String, Object>> modelAttributeEntries(Model model) {
53+
var stream = Stream.<Map.Entry<String, Object>>builder()
54+
.add(entry("service", model.getConfigurations().getService()))
55+
.add(entry("task_type", model.getTaskType().toString()));
56+
if (model.getServiceSettings().modelId() != null) {
57+
stream.add(entry("model_id", model.getServiceSettings().modelId()));
58+
}
59+
return stream.build();
60+
}
61+
62+
private static Map<String, Object> toMap(Stream<Map.Entry<String, Object>> stream) {
63+
return stream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
64+
}
65+
66+
public static Map<String, Object> responseAttributes(Model model, @Nullable Throwable t) {
67+
return toMap(concat(modelAttributeEntries(model), errorAttributes(t)));
68+
}
69+
70+
public static Map<String, Object> responseAttributes(UnparsedModel model, @Nullable Throwable t) {
71+
var unknownModelAttributes = Stream.<Map.Entry<String, Object>>builder()
72+
.add(entry("service", model.service()))
73+
.add(entry("task_type", model.taskType().toString()))
74+
.build();
75+
76+
return toMap(concat(unknownModelAttributes, errorAttributes(t)));
77+
}
78+
79+
public static Map<String, Object> responseAttributes(@Nullable Throwable t) {
80+
return toMap(errorAttributes(t));
81+
}
82+
83+
private static Stream<Map.Entry<String, Object>> errorAttributes(@Nullable Throwable t) {
84+
if (t == null) {
85+
return Stream.of(entry("status_code", 200));
86+
} else if (t instanceof ElasticsearchStatusException ese) {
87+
return Stream.<Map.Entry<String, Object>>builder()
88+
.add(entry("status_code", ese.status().getStatus()))
89+
.add(entry("error.type", String.valueOf(ese.status().getStatus())))
90+
.build();
91+
} else {
92+
return Stream.of(entry("error.type", t.getClass().getSimpleName()));
93+
}
94+
}
2195
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.inference.telemetry;
9+
10+
import java.time.Clock;
11+
import java.time.Duration;
12+
import java.time.Instant;
13+
import java.util.Objects;
14+
15+
public record InferenceTimer(Instant startTime, Clock clock) {
16+
17+
public InferenceTimer {
18+
Objects.requireNonNull(startTime);
19+
Objects.requireNonNull(clock);
20+
}
21+
22+
public static InferenceTimer start() {
23+
return start(Clock.systemUTC());
24+
}
25+
26+
public static InferenceTimer start(Clock clock) {
27+
return new InferenceTimer(clock.instant(), clock);
28+
}
29+
30+
public long elapsedMillis() {
31+
return Duration.between(startTime(), clock().instant()).toMillis();
32+
}
33+
}
Original file line numberDiff line numberDiff line change
@@ -1,45 +0,0 @@
1-
/*
2-
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3-
* or more contributor license agreements. Licensed under the Elastic License
4-
* 2.0; you may not use this file except in compliance with the Elastic License
5-
* 2.0.
6-
*/
7-
8-
package org.elasticsearch.xpack.inference.action;
9-
10-
import org.elasticsearch.action.support.ActionFilters;
11-
import org.elasticsearch.inference.InferenceServiceRegistry;
12-
import org.elasticsearch.transport.TransportService;
13-
import org.elasticsearch.xpack.core.inference.action.InferenceAction;
14-
import org.elasticsearch.xpack.inference.action.task.StreamingTaskManager;
15-
import org.elasticsearch.xpack.inference.registry.ModelRegistry;
16-
import org.elasticsearch.xpack.inference.telemetry.InferenceStats;
17-
18-
import static org.mockito.Mockito.mock;
19-
20-
public class TransportInferenceActionTests extends BaseTransportInferenceActionTestCase<InferenceAction.Request> {
21-
22-
@Override
23-
protected BaseTransportInferenceAction<InferenceAction.Request> createAction(
24-
TransportService transportService,
25-
ActionFilters actionFilters,
26-
ModelRegistry modelRegistry,
27-
InferenceServiceRegistry serviceRegistry,
28-
InferenceStats inferenceStats,
29-
StreamingTaskManager streamingTaskManager
30-
) {
31-
return new TransportInferenceAction(
32-
transportService,
33-
actionFilters,
34-
modelRegistry,
35-
serviceRegistry,
36-
inferenceStats,
37-
streamingTaskManager
38-
);
39-
}
40-
41-
@Override
42-
protected InferenceAction.Request createRequest() {
43-
return mock();
44-
}
45-
}

x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/telemetry/ApmInferenceStatsTests.java

-69
This file was deleted.

0 commit comments

Comments
 (0)