Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.commons.alerting
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.common.io.stream.NamedWriteableRegistry
import org.opensearch.common.io.stream.Writeable
import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest
import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse
Expand Down Expand Up @@ -50,6 +51,31 @@ object AlertingPluginInterface {
}
)
}
/**

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not overload index monitor method. Let's change the original method's signature

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sure. Tnx

* Index monitor interface.
* @param client Node client for making transport action
* @param request The request object
* @param namedWriteableRegistry Registry for building aggregations
* @param listener The listener for getting response
*/
fun indexMonitor(
client: NodeClient,
request: IndexMonitorRequest,
namedWriteableRegistry: NamedWriteableRegistry,
listener: ActionListener<IndexMonitorResponse>
) {
client.execute(
AlertingActions.INDEX_MONITOR_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response, namedWriteableRegistry) {
IndexMonitorResponse(
it
)
}
}
)
}

fun deleteMonitor(
client: NodeClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.commons.utils

import org.opensearch.common.io.stream.InputStreamStreamInput
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput
import org.opensearch.common.io.stream.NamedWriteableRegistry
import org.opensearch.common.io.stream.OutputStreamStreamOutput
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
Expand Down Expand Up @@ -36,3 +38,19 @@ inline fun <reified Request> recreateObject(writeable: Writeable, block: (Stream
}
}
}

/**
* Re create the object from the writeable. Uses NamedWriteableRegistry in order to build the aggregations.
* This method needs to be inline and reified so that when this is called from
* doExecute() of transport action, the object may be created from other JVM.
*/
inline fun <reified Request> recreateObject(writeable: Writeable, namedWriteableRegistry: NamedWriteableRegistry, block: (StreamInput) -> Request): Request {
ByteArrayOutputStream().use { byteArrayOutputStream ->
OutputStreamStreamOutput(byteArrayOutputStream).use {
writeable.writeTo(it)
InputStreamStreamInput(ByteArrayInputStream(byteArrayOutputStream.toByteArray())).use { streamInput ->
return block(NamedWriteableAwareStreamInput(streamInput, namedWriteableRegistry))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import org.mockito.junit.jupiter.MockitoExtension
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionType
import org.opensearch.client.node.NodeClient
import org.opensearch.common.io.stream.NamedWriteableRegistry
import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.action.DeleteMonitorRequest
import org.opensearch.commons.alerting.action.DeleteMonitorResponse
import org.opensearch.commons.alerting.action.GetAlertsRequest
Expand All @@ -25,6 +27,7 @@ import org.opensearch.commons.alerting.model.FindingWithDocs
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.rest.RestStatus
import org.opensearch.search.SearchModule

@Suppress("UNCHECKED_CAST")
@ExtendWith(MockitoExtension::class)
Expand All @@ -51,6 +54,25 @@ internal class AlertingPluginInterfaceTests {
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
}

@Test
fun indexBucketMonitor() {
val monitor = randomBucketLevelMonitor()

val request = mock(IndexMonitorRequest::class.java)
val response = IndexMonitorResponse(Monitor.NO_ID, Monitor.NO_VERSION, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, monitor)
val listener: ActionListener<IndexMonitorResponse> =
mock(ActionListener::class.java) as ActionListener<IndexMonitorResponse>
val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables)

Mockito.doAnswer {
(it.getArgument(2) as ActionListener<IndexMonitorResponse>)
.onResponse(response)
}.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any())
AlertingPluginInterface.indexMonitor(client, request, namedWriteableRegistry, listener)
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
}

@Test
fun deleteMonitor() {
val request = mock(DeleteMonitorRequest::class.java)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@ import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.opensearch.action.support.WriteRequest
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.common.io.stream.NamedWriteableAwareStreamInput
import org.opensearch.common.io.stream.NamedWriteableRegistry
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.randomBucketLevelMonitor
import org.opensearch.commons.alerting.randomQueryLevelMonitor
import org.opensearch.commons.utils.recreateObject
import org.opensearch.rest.RestRequest
import org.opensearch.search.SearchModule
import org.opensearch.search.builder.SearchSourceBuilder

class IndexMonitorRequestTests {
Expand All @@ -32,6 +38,42 @@ class IndexMonitorRequestTests {
Assertions.assertNotNull(newReq.monitor)
}

@Test
fun `test index bucket monitor post request`() {
val req = IndexMonitorRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST,
randomBucketLevelMonitor()
)
Assertions.assertNotNull(req)

val out = BytesStreamOutput()
req.writeTo(out)
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val namedWriteableRegistry = NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables)
val newReq = IndexMonitorRequest(NamedWriteableAwareStreamInput(sin, namedWriteableRegistry))
Assertions.assertEquals("1234", newReq.monitorId)
Assertions.assertEquals(1L, newReq.seqNo)
Assertions.assertEquals(2L, newReq.primaryTerm)
Assertions.assertEquals(RestRequest.Method.POST, newReq.method)
Assertions.assertNotNull(newReq.monitor)
}

@Test
fun `Index bucket monitor serialize and deserialize transport object should be equal`() {
val req = IndexMonitorRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST,
randomBucketLevelMonitor()
)

val recreatedObject = recreateObject(req, NamedWriteableRegistry(SearchModule(Settings.EMPTY, emptyList()).namedWriteables)) { IndexMonitorRequest(it) }

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's have a test method where recreate object method WITHOUT NamedWriteableRegistry fails and in the same method let's verify that recreate Object WITH NamedWriteableRegistry passes using an actual aggregation in IndexMonitorRequest.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Let me try to do that

Assertions.assertEquals(req.monitorId, recreatedObject.monitorId)
Assertions.assertEquals(req.seqNo, recreatedObject.seqNo)
Assertions.assertEquals(req.primaryTerm, recreatedObject.primaryTerm)
Assertions.assertEquals(req.method, recreatedObject.method)
Assertions.assertNotNull(recreatedObject.monitor)
Assertions.assertEquals(req.monitor, recreatedObject.monitor)
}

@Test
fun `test index monitor put request`() {

Expand Down