Skip to content

Commit

Permalink
Merge branch '2.x' into backport/backport-13721-to-2.x
Browse files Browse the repository at this point in the history
Signed-off-by: Rishabh Maurya <[email protected]>
  • Loading branch information
rishabhmaurya authored Jun 10, 2024
2 parents 5325d2a + 0634ccd commit 18899ee
Show file tree
Hide file tree
Showing 79 changed files with 2,103 additions and 243 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dco.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
steps:
- name: Get PR Commits
id: 'get-pr-commits'
uses: tim-actions/get-pr-commits@v1.1.0
uses: tim-actions/get-pr-commits@v1.3.1
with:
token: ${{ secrets.GITHUB_TOKEN }}
- name: DCO Check
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Move Remote Store Migration from DocRep to GA and modify remote migration settings name ([#14100](https://github.com/opensearch-project/OpenSearch/pull/14100))
- [Remote State] Add async remote state deletion task running on an interval, configurable by a setting ([#13995](https://github.com/opensearch-project/OpenSearch/pull/13995))
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))
- Add support for query level resource usage tracking ([#13172](https://github.com/opensearch-project/OpenSearch/pull/13172))
- [Query Insights] Add cpu and memory metrics to top n queries ([#13739](https://github.com/opensearch-project/OpenSearch/pull/13739))
- Derived field object type support ([#13720](https://github.com/opensearch-project/OpenSearch/pull/13720))

### Dependencies
Expand All @@ -40,11 +42,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Bump `org.apache.xmlbeans:xmlbeans` from 5.2.0 to 5.2.1 ([#13839](https://github.com/opensearch-project/OpenSearch/pull/13839))
- Bump `actions/checkout` from 3 to 4 ([#13935](https://github.com/opensearch-project/OpenSearch/pull/13935))
- Bump `com.netflix.nebula.ospackage-base` from 11.9.0 to 11.9.1 ([#13933](https://github.com/opensearch-project/OpenSearch/pull/13933))
- Bump `com.azure:azure-core-http-netty` from 1.12.8 to 1.15.1 ([#14128](https://github.com/opensearch-project/OpenSearch/pull/14128))
- Bump `tim-actions/get-pr-commits` from 1.1.0 to 1.3.1 ([#14126](https://github.com/opensearch-project/OpenSearch/pull/14126))

### Changed
- Add ability for Boolean and date field queries to run when only doc_values are enabled ([#11650](https://github.com/opensearch-project/OpenSearch/pull/11650))
- Refactor implementations of query phase searcher, allow QueryCollectorContext to have zero collectors ([#13481](https://github.com/opensearch-project/OpenSearch/pull/13481))
- Adds support to inject telemetry instances to plugins ([#13636](https://github.com/opensearch-project/OpenSearch/pull/13636))
- Adds support to provide tags with value in Gauge metric. ([#13994](https://github.com/opensearch-project/OpenSearch/pull/13994))
- Move cache removal notifications outside lru lock ([#14017](https://github.com/opensearch-project/OpenSearch/pull/14017))

### Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ public long getTotalValue() {
return endValue.get() - startValue;
}

public long getStartValue() {
return startValue;
}

@Override
public String toString() {
return String.valueOf(getTotalValue());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.core.tasks.resourcetracker;

import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.ParseField;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ConstructingObjectParser;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

import static org.opensearch.core.xcontent.ConstructingObjectParser.constructorArg;

/**
* Task resource usage information with minimal information about the task
* <p>
* Writeable TaskResourceInfo objects are used to represent resource usage
* information of running tasks, which can be propagated to coordinator node
* to infer query-level resource usage
*
* @opensearch.api
*/
@PublicApi(since = "2.15.0")
public class TaskResourceInfo implements Writeable, ToXContentObject {
private final String action;
private final long taskId;
private final long parentTaskId;
private final String nodeId;
private final TaskResourceUsage taskResourceUsage;

private static final ParseField ACTION = new ParseField("action");
private static final ParseField TASK_ID = new ParseField("taskId");
private static final ParseField PARENT_TASK_ID = new ParseField("parentTaskId");
private static final ParseField NODE_ID = new ParseField("nodeId");
private static final ParseField TASK_RESOURCE_USAGE = new ParseField("taskResourceUsage");

public TaskResourceInfo(
final String action,
final long taskId,
final long parentTaskId,
final String nodeId,
final TaskResourceUsage taskResourceUsage
) {
this.action = action;
this.taskId = taskId;
this.parentTaskId = parentTaskId;
this.nodeId = nodeId;
this.taskResourceUsage = taskResourceUsage;
}

public static final ConstructingObjectParser<TaskResourceInfo, Void> PARSER = new ConstructingObjectParser<>(
"task_resource_info",
a -> new Builder().setAction((String) a[0])
.setTaskId((Long) a[1])
.setParentTaskId((Long) a[2])
.setNodeId((String) a[3])
.setTaskResourceUsage((TaskResourceUsage) a[4])
.build()
);

static {
PARSER.declareString(constructorArg(), ACTION);
PARSER.declareLong(constructorArg(), TASK_ID);
PARSER.declareLong(constructorArg(), PARENT_TASK_ID);
PARSER.declareString(constructorArg(), NODE_ID);
PARSER.declareObject(constructorArg(), TaskResourceUsage.PARSER, TASK_RESOURCE_USAGE);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ACTION.getPreferredName(), this.action);
builder.field(TASK_ID.getPreferredName(), this.taskId);
builder.field(PARENT_TASK_ID.getPreferredName(), this.parentTaskId);
builder.field(NODE_ID.getPreferredName(), this.nodeId);
builder.startObject(TASK_RESOURCE_USAGE.getPreferredName());
this.taskResourceUsage.toXContent(builder, params);
builder.endObject();
builder.endObject();
return builder;
}

/**
* Builder for {@link TaskResourceInfo}
*/
public static class Builder {
private TaskResourceUsage taskResourceUsage;
private String action;
private long taskId;
private long parentTaskId;
private String nodeId;

public Builder setTaskResourceUsage(final TaskResourceUsage taskResourceUsage) {
this.taskResourceUsage = taskResourceUsage;
return this;
}

public Builder setAction(final String action) {
this.action = action;
return this;
}

public Builder setTaskId(final long taskId) {
this.taskId = taskId;
return this;
}

public Builder setParentTaskId(final long parentTaskId) {
this.parentTaskId = parentTaskId;
return this;
}

public Builder setNodeId(final String nodeId) {
this.nodeId = nodeId;
return this;
}

public TaskResourceInfo build() {
return new TaskResourceInfo(action, taskId, parentTaskId, nodeId, taskResourceUsage);
}
}

/**
* Read task info from a stream.
*
* @param in StreamInput to read
* @return {@link TaskResourceInfo}
* @throws IOException IOException
*/
public static TaskResourceInfo readFromStream(StreamInput in) throws IOException {
return new TaskResourceInfo.Builder().setAction(in.readString())
.setTaskId(in.readLong())
.setParentTaskId(in.readLong())
.setNodeId(in.readString())
.setTaskResourceUsage(TaskResourceUsage.readFromStream(in))
.build();
}

/**
* Get TaskResourceUsage
*
* @return taskResourceUsage
*/
public TaskResourceUsage getTaskResourceUsage() {
return taskResourceUsage;
}

/**
* Get parent task id
*
* @return parent task id
*/
public long getParentTaskId() {
return parentTaskId;
}

/**
* Get task id
* @return task id
*/
public long getTaskId() {
return taskId;
}

/**
* Get node id
* @return node id
*/
public String getNodeId() {
return nodeId;
}

/**
* Get task action
* @return task action
*/
public String getAction() {
return action;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(action);
out.writeLong(taskId);
out.writeLong(parentTaskId);
out.writeString(nodeId);
taskResourceUsage.writeTo(out);
}

@Override
public String toString() {
return Strings.toString(MediaTypeRegistry.JSON, this);
}

@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != TaskResourceInfo.class) {
return false;
}
TaskResourceInfo other = (TaskResourceInfo) obj;
return action.equals(other.action)
&& taskId == other.taskId
&& parentTaskId == other.parentTaskId
&& Objects.equals(nodeId, other.nodeId)
&& taskResourceUsage.equals(other.taskResourceUsage);
}

@Override
public int hashCode() {
return Objects.hash(action, taskId, parentTaskId, nodeId, taskResourceUsage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public Closeable createGauge(String name, String description, String unit, Suppl
return metricsTelemetry.createGauge(name, description, unit, valueProvider, tags);
}

@Override
public Closeable createGauge(String name, String description, String unit, Supplier<TaggedMeasurement> value) {
return metricsTelemetry.createGauge(name, description, unit, value);
}

@Override
public void close() throws IOException {
metricsTelemetry.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,16 @@ public interface MetricsRegistry extends Closeable {
*/
Closeable createGauge(String name, String description, String unit, Supplier<Double> valueProvider, Tags tags);

/**
* Creates the Observable Gauge type of Metric. Where the value provider will be called at a certain frequency
* to capture the value.
*
* @param name name of the observable gauge.
* @param description any description about the metric.
* @param unit unit of the metric.
* @param value value provider.
* @return closeable to dispose/close the Gauge metric.
*/
Closeable createGauge(String name, String description, String unit, Supplier<TaggedMeasurement> value);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.metrics;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.telemetry.metrics.tags.Tags;

/**
* Observable Measurement for the Asynchronous instruments.
* @opensearch.experimental
*/
@ExperimentalApi
public final class TaggedMeasurement {
private final Double value;
private final Tags tags;

/**
* Factory method to create the {@link TaggedMeasurement} object.
* @param value value.
* @param tags tags to be added per value.
* @return tagged measurement TaggedMeasurement
*/
public static TaggedMeasurement create(double value, Tags tags) {
return new TaggedMeasurement(value, tags);
}

private TaggedMeasurement(double value, Tags tags) {
this.value = value;
this.tags = tags;
}

/**
* Returns the value.
* @return value
*/
public Double getValue() {
return value;
}

/**
* Returns the tags.
* @return tags
*/
public Tags getTags() {
return tags;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.TaggedMeasurement;
import org.opensearch.telemetry.metrics.tags.Tags;

import java.io.Closeable;
Expand Down Expand Up @@ -52,6 +53,11 @@ public Closeable createGauge(String name, String description, String unit, Suppl
return () -> {};
}

@Override
public Closeable createGauge(String name, String description, String unit, Supplier<TaggedMeasurement> value) {
return () -> {};
}

@Override
public void close() throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,19 @@ public void testGauge() {
assertSame(mockCloseable, closeable);
}

@SuppressWarnings("unchecked")
public void testGaugeWithValueAndTagSupplier() {
Closeable mockCloseable = mock(Closeable.class);
when(defaultMeterRegistry.createGauge(any(String.class), any(String.class), any(String.class), any(Supplier.class))).thenReturn(
mockCloseable
);
Closeable closeable = defaultMeterRegistry.createGauge(
"org.opensearch.telemetry.metrics.DefaultMeterRegistryTests.testObservableGauge",
"test observable gauge",
"ms",
() -> TaggedMeasurement.create(1.0, Tags.EMPTY)
);
assertSame(mockCloseable, closeable);
}

}
Loading

0 comments on commit 18899ee

Please sign in to comment.