Skip to content
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/91640.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 91640
summary: Add profiling plugin
area: Search
type: enhancement
issues: []
15 changes: 15 additions & 0 deletions x-pack/plugin/profiler/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
apply plugin: 'elasticsearch.internal-cluster-test'
apply plugin: 'elasticsearch.internal-es-plugin'

esplugin {
name 'x-pack-profiling'
description 'The profiler plugin adds support for retrieving data from Universal Profiler.'
classname 'org.elasticsearch.xpack.profiler.ProfilingPlugin'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.profiler;

import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.logging.log4j.LogManager;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Cancellable;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.search.lookup.LeafStoredFieldsLookup;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty4.Netty4Plugin;
import org.elasticsearch.xcontent.XContentType;
import org.junit.Before;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import static org.elasticsearch.action.support.ActionTestUtils.wrapAsRestResponseListener;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;

@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 1)
public class GetProfilingActionIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return List.of(ProfilingPlugin.class, ScriptedBlockPlugin.class, getTestTransportPlugin());
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(ProfilingPlugin.PROFILING_ENABLED.getKey(), true)
.put(NetworkModule.TRANSPORT_TYPE_KEY, Netty4Plugin.NETTY_TRANSPORT_NAME)
.put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
.build();
}

@Override
protected boolean addMockHttpTransport() {
return false; // enable http
}

@Override
protected boolean ignoreExternalCluster() {
return true;
}

private byte[] read(String resource) throws IOException {
return GetProfilingAction.class.getClassLoader().getResourceAsStream(resource).readAllBytes();
}

private void createIndex(String name, String bodyFileName) throws Exception {
client().admin().indices().prepareCreate(name).setSource(read(bodyFileName), XContentType.JSON).execute().get();
}

private void indexDoc(String index, String id, Map<String, Object> source) {
IndexResponse indexResponse = client().prepareIndex(index).setId(id).setSource(source).get();
assertEquals(RestStatus.CREATED, indexResponse.status());
}

@Before
public void setupData() throws Exception {

for (String idx : EventsIndex.indexNames()) {
createIndex(idx, "events.json");
}
createIndex("profiling-stackframes", "stackframes.json");
createIndex("profiling-stacktraces", "stacktraces.json");
createIndex("profiling-executables", "executables.json");
ensureGreen();

// ensure that we have this in every index, so we find an event
for (String idx : EventsIndex.indexNames()) {
indexDoc(
idx,
"QjoLteG7HX3VUUXr-J4kHQ",
Map.of("@timestamp", 1668761065, "Stacktrace.id", "QjoLteG7HX3VUUXr-J4kHQ", "Stacktrace.count", 1)
);
}

indexDoc(
"profiling-stacktraces",
"QjoLteG7HX3VUUXr-J4kHQ",
Map.of("Stacktrace.frame.ids", "QCCDqjSg3bMK1C4YRK6TiwAAAAAAEIpf", "Stacktrace.frame.types", "AQI")
);
indexDoc(
"profiling-stackframes",
"QCCDqjSg3bMK1C4YRK6TiwAAAAAAEIpf",
Map.of("Stackframe.function.name", "_raw_spin_unlock_irqrestore")
);
indexDoc("profiling-executables", "QCCDqjSg3bMK1C4YRK6Tiw", Map.of("Executable.file.name", "libc.so.6"));

refresh();
}

public void testGetProfilingDataUnfiltered() throws Exception {
GetProfilingRequest request = new GetProfilingRequest(1, null);
GetProfilingResponse response = client().execute(GetProfilingAction.INSTANCE, request).get();
assertEquals(RestStatus.OK, response.status());
assertEquals(1, response.getTotalFrames());
assertNotNull(response.getStackTraces());
StackTrace stackTrace = response.getStackTraces().get("QjoLteG7HX3VUUXr-J4kHQ");
assertArrayEquals(new int[] { 1083999 }, stackTrace.addressOrLines);
assertArrayEquals(new String[] { "QCCDqjSg3bMK1C4YRK6Tiw" }, stackTrace.fileIds);
assertArrayEquals(new String[] { "QCCDqjSg3bMK1C4YRK6TiwAAAAAAEIpf" }, stackTrace.frameIds);
assertArrayEquals(new int[] { 2 }, stackTrace.typeIds);

assertNotNull(response.getStackFrames());
StackFrame stackFrame = response.getStackFrames().get("QCCDqjSg3bMK1C4YRK6TiwAAAAAAEIpf");
assertEquals("_raw_spin_unlock_irqrestore", stackFrame.functionName);
assertNotNull(response.getStackTraceEvents());
assertEquals(1, (int) response.getStackTraceEvents().get("QjoLteG7HX3VUUXr-J4kHQ"));

assertNotNull(response.getExecutables());
assertNotNull("libc.so.6", response.getExecutables().get("QCCDqjSg3bMK1C4YRK6Tiw"));
}

public void testAutomaticCancellation() throws Exception {
Request restRequest = new Request("POST", "/_profiling/stacktraces");
restRequest.setEntity(new StringEntity("""
{
"sample_size": 10000,
"query": {
"bool": {
"filter": [
{
"script": {
"script": {
"lang": "mockscript",
"source": "search_block",
"params": {}
}
}
}
]
}
}
}
""", ContentType.APPLICATION_JSON.withCharset(StandardCharsets.UTF_8)));
verifyCancellation(GetProfilingAction.NAME, restRequest);
}

void verifyCancellation(String action, Request restRequest) throws Exception {
Map<String, String> nodeIdToName = readNodesInfo();
List<ScriptedBlockPlugin> plugins = initBlockFactory();

PlainActionFuture<Response> future = PlainActionFuture.newFuture();
Cancellable cancellable = getRestClient().performRequestAsync(restRequest, wrapAsRestResponseListener(future));

awaitForBlock(plugins);
cancellable.cancel();
ensureTaskIsCancelled(action, nodeIdToName::get);

disableBlocks(plugins);
expectThrows(CancellationException.class, future::actionGet);
}

private static Map<String, String> readNodesInfo() {
Map<String, String> nodeIdToName = new HashMap<>();
NodesInfoResponse nodesInfoResponse = client().admin().cluster().prepareNodesInfo().get();
assertFalse(nodesInfoResponse.hasFailures());
for (NodeInfo node : nodesInfoResponse.getNodes()) {
nodeIdToName.put(node.getNode().getId(), node.getNode().getName());
}
return nodeIdToName;
}

private static void ensureTaskIsCancelled(String transportAction, Function<String, String> nodeIdToName) throws Exception {
SetOnce<TaskInfo> searchTask = new SetOnce<>();
ListTasksResponse listTasksResponse = client().admin().cluster().prepareListTasks().get();
for (TaskInfo task : listTasksResponse.getTasks()) {
if (task.action().equals(transportAction)) {
searchTask.set(task);
}
}
assertNotNull(searchTask.get());
TaskId taskId = searchTask.get().taskId();
String nodeName = nodeIdToName.apply(taskId.getNodeId());
assertBusy(() -> {
TaskManager taskManager = internalCluster().getInstance(TransportService.class, nodeName).getTaskManager();
Task task = taskManager.getTask(taskId.getId());
assertThat(task, instanceOf(CancellableTask.class));
assertTrue(((CancellableTask) task).isCancelled());
Copy link
Member

@javanna javanna Nov 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check that all children tasks are cancelled? My understanding is that we are now checking that the top-level profiling action is cancelled. Ideally, we'd make sure that the children search tasks either don't start before the cancellation, or get cancelled, or complete before the cancellation.

We could check here that there are no tasks with this specific parent task id, on the other hand that would be the case even if we did not set the parent task id properly. Should we instead try and collect those children tasks before the cancellation and double check that they do get cancelled afterwards?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not entirely sure though where things get blocked with the script below, maybe always at the first inner search request? ideally it would block at any stage but that sounds like it would complicate this test quite a bit?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I've implemented this now in 55c158a.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll merge it then when CI is green. :)

});
}

private static List<ScriptedBlockPlugin> initBlockFactory() {
List<ScriptedBlockPlugin> plugins = new ArrayList<>();
for (PluginsService pluginsService : internalCluster().getDataNodeInstances(PluginsService.class)) {
plugins.addAll(pluginsService.filterPlugins(ScriptedBlockPlugin.class));
}
for (ScriptedBlockPlugin plugin : plugins) {
plugin.reset();
plugin.enableBlock();
}
return plugins;
}

private void awaitForBlock(List<ScriptedBlockPlugin> plugins) throws Exception {
assertBusy(() -> {
int numberOfBlockedPlugins = 0;
for (ScriptedBlockPlugin plugin : plugins) {
numberOfBlockedPlugins += plugin.hits.get();
}
logger.info("The plugin blocked on {} shards", numberOfBlockedPlugins);
assertThat(numberOfBlockedPlugins, greaterThan(0));
}, 10, TimeUnit.SECONDS);
}

private static void disableBlocks(List<ScriptedBlockPlugin> plugins) {
for (ScriptedBlockPlugin plugin : plugins) {
plugin.disableBlock();
}
}

public static class ScriptedBlockPlugin extends MockScriptPlugin {
static final String SCRIPT_NAME = "search_block";

private final AtomicInteger hits = new AtomicInteger();

private final AtomicBoolean shouldBlock = new AtomicBoolean(true);

void reset() {
hits.set(0);
}

void disableBlock() {
shouldBlock.set(false);
}

void enableBlock() {
shouldBlock.set(true);
}

@Override
public Map<String, Function<Map<String, Object>, Object>> pluginScripts() {
return Collections.singletonMap(SCRIPT_NAME, params -> {
LeafStoredFieldsLookup fieldsLookup = (LeafStoredFieldsLookup) params.get("_fields");
LogManager.getLogger(GetProfilingActionIT.class).info("Blocking on the document {}", fieldsLookup.get("_id"));
hits.incrementAndGet();
try {
waitUntil(() -> shouldBlock.get() == false);
} catch (Exception e) {
throw new RuntimeException(e);
}
return true;
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
{
"settings": {
"index": {
"number_of_shards": "4",
"max_result_window": 150000,
"refresh_interval": "10s",
"sort": {
"field": [
"service.name",
"@timestamp",
"orchestrator.resource.name",
"container.name",
"process.thread.name",
"host.id"
]
}
},
"codec": "best_compression"
},
"mappings": {
"_doc": {
"_source": {
"enabled": false
},
"properties": {
"ecs.version": {
"type": "keyword",
"index": true
},
"service.name": {
"type": "keyword"
},
"@timestamp": {
"type": "date",
"format": "epoch_second"
},
"host.id": {
"type": "keyword"
},
"Stacktrace.id": {
"type": "keyword",
"index": false
},
"orchestrator.resource.name": {
"type": "keyword"
},
"container.name": {
"type": "keyword"
},
"process.thread.name": {
"type": "keyword"
},
"Stacktrace.count": {
"type": "short",
"index": false
},
"agent.version": {
"type": "keyword"
},
"host.ip": {
"type": "ip"
},
"host.ipstring": {
"type": "keyword"
},
"host.name": {
"type": "keyword"
},
"os.kernel": {
"type": "keyword"
},
"tags": {
"type": "keyword"
}
}
}
}
}
Loading