diff --git a/oas_docs/bundle.json b/oas_docs/bundle.json index 995e717a84a21..85985f1e84f90 100644 --- a/oas_docs/bundle.json +++ b/oas_docs/bundle.json @@ -46283,10 +46283,56 @@ "type": "string" }, "type": "array" + }, + "queries": { + "items": { + "allOf": [ + { + "properties": { + "id": { + "minLength": 1, + "type": "string" + }, + "title": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "id", + "title" + ], + "type": "object" + }, + { + "properties": { + "kql": { + "additionalProperties": false, + "properties": { + "query": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "query" + ], + "type": "object" + } + }, + "required": [ + "kql" + ], + "type": "object" + } + ] + }, + "type": "array" } }, "required": [ - "dashboards" + "dashboards", + "queries" ], "type": "object" }, @@ -48004,10 +48050,56 @@ "type": "string" }, "type": "array" + }, + "queries": { + "items": { + "allOf": [ + { + "properties": { + "id": { + "minLength": 1, + "type": "string" + }, + "title": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "id", + "title" + ], + "type": "object" + }, + { + "properties": { + "kql": { + "additionalProperties": false, + "properties": { + "query": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "query" + ], + "type": "object" + } + }, + "required": [ + "kql" + ], + "type": "object" + } + ] + }, + "type": "array" } }, "required": [ - "dashboards" + "dashboards", + "queries" ], "type": "object" }, @@ -49474,10 +49566,56 @@ "type": "string" }, "type": "array" + }, + "queries": { + "items": { + "allOf": [ + { + "properties": { + "id": { + "minLength": 1, + "type": "string" + }, + "title": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "id", + "title" + ], + "type": "object" + }, + { + "properties": { + "kql": { + "additionalProperties": false, + "properties": { + "query": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "query" + ], + "type": "object" + } + }, + "required": [ + "kql" + ], + "type": "object" + } + ] + }, + "type": "array" } }, "required": [ - "dashboards" + "dashboards", + "queries" ], "type": "object" }, @@ -53417,6 +53555,385 @@ ], "x-state": "Technical Preview" } + }, + "/api/streams/{name}/queries": { + "get": { + "description": "Fetches all queries linked to a stream that are visible to the current user in the current space.", + "operationId": "get-streams-name-queries", + "parameters": [ + { + "in": "path", + "name": "name", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "anyOf": [ + { + "additionalProperties": false, + "properties": {}, + "type": "object" + }, + { + "enum": [ + "null" + ], + "nullable": true + }, + { + "not": {} + } + ] + } + } + } + }, + "responses": {}, + "summary": "Get stream queries", + "tags": [ + "streams" + ], + "x-state": "Technical Preview" + } + }, + "/api/streams/{name}/queries/_bulk": { + "post": { + "description": "Bulk update queries of a stream. Can add new queries and delete existing ones.", + "operationId": "post-streams-name-queries-bulk", + "parameters": [ + { + "description": "A required header to protect against CSRF attacks", + "in": "header", + "name": "kbn-xsrf", + "required": true, + "schema": { + "example": "true", + "type": "string" + } + }, + { + "in": "path", + "name": "name", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "additionalProperties": false, + "properties": { + "operations": { + "items": { + "anyOf": [ + { + "additionalProperties": false, + "properties": { + "index": { + "allOf": [ + { + "properties": { + "id": { + "minLength": 1, + "type": "string" + }, + "title": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "id", + "title" + ], + "type": "object" + }, + { + "properties": { + "kql": { + "additionalProperties": false, + "properties": { + "query": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "query" + ], + "type": "object" + } + }, + "required": [ + "kql" + ], + "type": "object" + } + ] + } + }, + "required": [ + "index" + ], + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "delete": { + "additionalProperties": false, + "properties": { + "id": { + "type": "string" + } + }, + "required": [ + "id" + ], + "type": "object" + } + }, + "required": [ + "delete" + ], + "type": "object" + } + ] + }, + "type": "array" + } + }, + "required": [ + "operations" + ], + "type": "object" + } + } + } + }, + "responses": {}, + "summary": "Bulk update queries", + "tags": [ + "streams" + ], + "x-state": "Technical Preview" + } + }, + "/api/streams/{name}/queries/{queryId}": { + "delete": { + "description": "Remove a query from a stream. Noop if the query is not found on the stream.", + "operationId": "delete-streams-name-queries-queryid", + "parameters": [ + { + "description": "A required header to protect against CSRF attacks", + "in": "header", + "name": "kbn-xsrf", + "required": true, + "schema": { + "example": "true", + "type": "string" + } + }, + { + "in": "path", + "name": "name", + "required": true, + "schema": { + "type": "string" + } + }, + { + "in": "path", + "name": "queryId", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "anyOf": [ + { + "additionalProperties": false, + "properties": {}, + "type": "object" + }, + { + "enum": [ + "null" + ], + "nullable": true + }, + { + "not": {} + } + ] + } + } + } + }, + "responses": {}, + "summary": "Remove a query from a stream", + "tags": [ + "streams" + ], + "x-state": "Technical Preview" + }, + "put": { + "description": "Adds a query to a stream. Noop if the query is already present on the stream.", + "operationId": "put-streams-name-queries-queryid", + "parameters": [ + { + "description": "A required header to protect against CSRF attacks", + "in": "header", + "name": "kbn-xsrf", + "required": true, + "schema": { + "example": "true", + "type": "string" + } + }, + { + "in": "path", + "name": "name", + "required": true, + "schema": { + "type": "string" + } + }, + { + "in": "path", + "name": "queryId", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "additionalProperties": false, + "properties": { + "kql": { + "additionalProperties": false, + "properties": { + "query": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "query" + ], + "type": "object" + }, + "title": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "title", + "kql" + ], + "type": "object" + } + } + } + }, + "responses": {}, + "summary": "Upsert a query to a stream", + "tags": [ + "streams" + ], + "x-state": "Technical Preview" + } + }, + "/api/streams/{name}/significant_events": { + "get": { + "description": "Read the significant events", + "operationId": "get-streams-name-significant-events", + "parameters": [ + { + "in": "path", + "name": "name", + "required": true, + "schema": { + "type": "string" + } + }, + { + "in": "query", + "name": "from", + "required": true, + "schema": { + "format": "date-time", + "type": "string" + } + }, + { + "in": "query", + "name": "to", + "required": true, + "schema": { + "format": "date-time", + "type": "string" + } + }, + { + "in": "query", + "name": "bucketSize", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "anyOf": [ + { + "additionalProperties": false, + "properties": {}, + "type": "object" + }, + { + "enum": [ + "null" + ], + "nullable": true + }, + { + "not": {} + } + ] + } + } + } + }, + "responses": {}, + "summary": "Read the significant events", + "tags": [ + "streams" + ], + "x-state": "Technical Preview" + } } }, "security": [ diff --git a/oas_docs/bundle.serverless.json b/oas_docs/bundle.serverless.json index be7b2b81e64b1..98b4eb55e0eb9 100644 --- a/oas_docs/bundle.serverless.json +++ b/oas_docs/bundle.serverless.json @@ -45874,10 +45874,56 @@ "type": "string" }, "type": "array" + }, + "queries": { + "items": { + "allOf": [ + { + "properties": { + "id": { + "minLength": 1, + "type": "string" + }, + "title": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "id", + "title" + ], + "type": "object" + }, + { + "properties": { + "kql": { + "additionalProperties": false, + "properties": { + "query": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "query" + ], + "type": "object" + } + }, + "required": [ + "kql" + ], + "type": "object" + } + ] + }, + "type": "array" } }, "required": [ - "dashboards" + "dashboards", + "queries" ], "type": "object" }, @@ -47595,10 +47641,56 @@ "type": "string" }, "type": "array" + }, + "queries": { + "items": { + "allOf": [ + { + "properties": { + "id": { + "minLength": 1, + "type": "string" + }, + "title": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "id", + "title" + ], + "type": "object" + }, + { + "properties": { + "kql": { + "additionalProperties": false, + "properties": { + "query": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "query" + ], + "type": "object" + } + }, + "required": [ + "kql" + ], + "type": "object" + } + ] + }, + "type": "array" } }, "required": [ - "dashboards" + "dashboards", + "queries" ], "type": "object" }, @@ -49065,10 +49157,56 @@ "type": "string" }, "type": "array" + }, + "queries": { + "items": { + "allOf": [ + { + "properties": { + "id": { + "minLength": 1, + "type": "string" + }, + "title": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "id", + "title" + ], + "type": "object" + }, + { + "properties": { + "kql": { + "additionalProperties": false, + "properties": { + "query": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "query" + ], + "type": "object" + } + }, + "required": [ + "kql" + ], + "type": "object" + } + ] + }, + "type": "array" } }, "required": [ - "dashboards" + "dashboards", + "queries" ], "type": "object" }, @@ -53008,6 +53146,385 @@ ], "x-state": "Technical Preview" } + }, + "/api/streams/{name}/queries": { + "get": { + "description": "Fetches all queries linked to a stream that are visible to the current user in the current space.", + "operationId": "get-streams-name-queries", + "parameters": [ + { + "in": "path", + "name": "name", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "anyOf": [ + { + "additionalProperties": false, + "properties": {}, + "type": "object" + }, + { + "enum": [ + "null" + ], + "nullable": true + }, + { + "not": {} + } + ] + } + } + } + }, + "responses": {}, + "summary": "Get stream queries", + "tags": [ + "streams" + ], + "x-state": "Technical Preview" + } + }, + "/api/streams/{name}/queries/_bulk": { + "post": { + "description": "Bulk update queries of a stream. Can add new queries and delete existing ones.", + "operationId": "post-streams-name-queries-bulk", + "parameters": [ + { + "description": "A required header to protect against CSRF attacks", + "in": "header", + "name": "kbn-xsrf", + "required": true, + "schema": { + "example": "true", + "type": "string" + } + }, + { + "in": "path", + "name": "name", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "additionalProperties": false, + "properties": { + "operations": { + "items": { + "anyOf": [ + { + "additionalProperties": false, + "properties": { + "index": { + "allOf": [ + { + "properties": { + "id": { + "minLength": 1, + "type": "string" + }, + "title": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "id", + "title" + ], + "type": "object" + }, + { + "properties": { + "kql": { + "additionalProperties": false, + "properties": { + "query": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "query" + ], + "type": "object" + } + }, + "required": [ + "kql" + ], + "type": "object" + } + ] + } + }, + "required": [ + "index" + ], + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "delete": { + "additionalProperties": false, + "properties": { + "id": { + "type": "string" + } + }, + "required": [ + "id" + ], + "type": "object" + } + }, + "required": [ + "delete" + ], + "type": "object" + } + ] + }, + "type": "array" + } + }, + "required": [ + "operations" + ], + "type": "object" + } + } + } + }, + "responses": {}, + "summary": "Bulk update queries", + "tags": [ + "streams" + ], + "x-state": "Technical Preview" + } + }, + "/api/streams/{name}/queries/{queryId}": { + "delete": { + "description": "Remove a query from a stream. Noop if the query is not found on the stream.", + "operationId": "delete-streams-name-queries-queryid", + "parameters": [ + { + "description": "A required header to protect against CSRF attacks", + "in": "header", + "name": "kbn-xsrf", + "required": true, + "schema": { + "example": "true", + "type": "string" + } + }, + { + "in": "path", + "name": "name", + "required": true, + "schema": { + "type": "string" + } + }, + { + "in": "path", + "name": "queryId", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "anyOf": [ + { + "additionalProperties": false, + "properties": {}, + "type": "object" + }, + { + "enum": [ + "null" + ], + "nullable": true + }, + { + "not": {} + } + ] + } + } + } + }, + "responses": {}, + "summary": "Remove a query from a stream", + "tags": [ + "streams" + ], + "x-state": "Technical Preview" + }, + "put": { + "description": "Adds a query to a stream. Noop if the query is already present on the stream.", + "operationId": "put-streams-name-queries-queryid", + "parameters": [ + { + "description": "A required header to protect against CSRF attacks", + "in": "header", + "name": "kbn-xsrf", + "required": true, + "schema": { + "example": "true", + "type": "string" + } + }, + { + "in": "path", + "name": "name", + "required": true, + "schema": { + "type": "string" + } + }, + { + "in": "path", + "name": "queryId", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "additionalProperties": false, + "properties": { + "kql": { + "additionalProperties": false, + "properties": { + "query": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "query" + ], + "type": "object" + }, + "title": { + "minLength": 1, + "type": "string" + } + }, + "required": [ + "title", + "kql" + ], + "type": "object" + } + } + } + }, + "responses": {}, + "summary": "Upsert a query to a stream", + "tags": [ + "streams" + ], + "x-state": "Technical Preview" + } + }, + "/api/streams/{name}/significant_events": { + "get": { + "description": "Read the significant events", + "operationId": "get-streams-name-significant-events", + "parameters": [ + { + "in": "path", + "name": "name", + "required": true, + "schema": { + "type": "string" + } + }, + { + "in": "query", + "name": "from", + "required": true, + "schema": { + "format": "date-time", + "type": "string" + } + }, + { + "in": "query", + "name": "to", + "required": true, + "schema": { + "format": "date-time", + "type": "string" + } + }, + { + "in": "query", + "name": "bucketSize", + "required": true, + "schema": { + "type": "string" + } + } + ], + "requestBody": { + "content": { + "application/json": { + "schema": { + "anyOf": [ + { + "additionalProperties": false, + "properties": {}, + "type": "object" + }, + { + "enum": [ + "null" + ], + "nullable": true + }, + { + "not": {} + } + ] + } + } + } + }, + "responses": {}, + "summary": "Read the significant events", + "tags": [ + "streams" + ], + "x-state": "Technical Preview" + } } }, "security": [ diff --git a/oas_docs/output/kibana.serverless.yaml b/oas_docs/output/kibana.serverless.yaml index 15ba7946a8c72..8ee284d94dba9 100644 --- a/oas_docs/output/kibana.serverless.yaml +++ b/oas_docs/output/kibana.serverless.yaml @@ -43064,8 +43064,37 @@ paths: minLength: 1 type: string type: array + queries: + items: + allOf: + - type: object + properties: + id: + minLength: 1 + type: string + title: + minLength: 1 + type: string + required: + - id + - title + - type: object + properties: + kql: + additionalProperties: false + type: object + properties: + query: + minLength: 1 + type: string + required: + - query + required: + - kql + type: array required: - dashboards + - queries - type: object properties: stream: @@ -44124,8 +44153,37 @@ paths: minLength: 1 type: string type: array + queries: + items: + allOf: + - type: object + properties: + id: + minLength: 1 + type: string + title: + minLength: 1 + type: string + required: + - id + - title + - type: object + properties: + kql: + additionalProperties: false + type: object + properties: + query: + minLength: 1 + type: string + required: + - query + required: + - kql + type: array required: - dashboards + - queries - type: object properties: stream: @@ -45037,8 +45095,37 @@ paths: minLength: 1 type: string type: array + queries: + items: + allOf: + - type: object + properties: + id: + minLength: 1 + type: string + title: + minLength: 1 + type: string + required: + - id + - title + - type: object + properties: + kql: + additionalProperties: false + type: object + properties: + query: + minLength: 1 + type: string + required: + - query + required: + - kql + type: array required: - dashboards + - queries - type: object properties: stream: @@ -47480,6 +47567,243 @@ paths: tags: - streams x-state: Technical Preview + /api/streams/{name}/queries: + get: + description: Fetches all queries linked to a stream that are visible to the current user in the current space. + operationId: get-streams-name-queries + parameters: + - in: path + name: name + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + anyOf: + - additionalProperties: false + type: object + properties: {} + - enum: + - 'null' + nullable: true + - not: {} + responses: {} + summary: Get stream queries + tags: + - streams + x-state: Technical Preview + /api/streams/{name}/queries/_bulk: + post: + description: Bulk update queries of a stream. Can add new queries and delete existing ones. + operationId: post-streams-name-queries-bulk + parameters: + - description: A required header to protect against CSRF attacks + in: header + name: kbn-xsrf + required: true + schema: + example: 'true' + type: string + - in: path + name: name + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + additionalProperties: false + type: object + properties: + operations: + items: + anyOf: + - additionalProperties: false + type: object + properties: + index: + allOf: + - type: object + properties: + id: + minLength: 1 + type: string + title: + minLength: 1 + type: string + required: + - id + - title + - type: object + properties: + kql: + additionalProperties: false + type: object + properties: + query: + minLength: 1 + type: string + required: + - query + required: + - kql + required: + - index + - additionalProperties: false + type: object + properties: + delete: + additionalProperties: false + type: object + properties: + id: + type: string + required: + - id + required: + - delete + type: array + required: + - operations + responses: {} + summary: Bulk update queries + tags: + - streams + x-state: Technical Preview + /api/streams/{name}/queries/{queryId}: + delete: + description: Remove a query from a stream. Noop if the query is not found on the stream. + operationId: delete-streams-name-queries-queryid + parameters: + - description: A required header to protect against CSRF attacks + in: header + name: kbn-xsrf + required: true + schema: + example: 'true' + type: string + - in: path + name: name + required: true + schema: + type: string + - in: path + name: queryId + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + anyOf: + - additionalProperties: false + type: object + properties: {} + - enum: + - 'null' + nullable: true + - not: {} + responses: {} + summary: Remove a query from a stream + tags: + - streams + x-state: Technical Preview + put: + description: Adds a query to a stream. Noop if the query is already present on the stream. + operationId: put-streams-name-queries-queryid + parameters: + - description: A required header to protect against CSRF attacks + in: header + name: kbn-xsrf + required: true + schema: + example: 'true' + type: string + - in: path + name: name + required: true + schema: + type: string + - in: path + name: queryId + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + additionalProperties: false + type: object + properties: + kql: + additionalProperties: false + type: object + properties: + query: + minLength: 1 + type: string + required: + - query + title: + minLength: 1 + type: string + required: + - title + - kql + responses: {} + summary: Upsert a query to a stream + tags: + - streams + x-state: Technical Preview + /api/streams/{name}/significant_events: + get: + description: Read the significant events + operationId: get-streams-name-significant-events + parameters: + - in: path + name: name + required: true + schema: + type: string + - in: query + name: from + required: true + schema: + format: date-time + type: string + - in: query + name: to + required: true + schema: + format: date-time + type: string + - in: query + name: bucketSize + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + anyOf: + - additionalProperties: false + type: object + properties: {} + - enum: + - 'null' + nullable: true + - not: {} + responses: {} + summary: Read the significant events + tags: + - streams + x-state: Technical Preview /api/task_manager/_health: get: description: | diff --git a/oas_docs/output/kibana.yaml b/oas_docs/output/kibana.yaml index fd26b86f8addd..1dfd112601b98 100644 --- a/oas_docs/output/kibana.yaml +++ b/oas_docs/output/kibana.yaml @@ -46579,8 +46579,37 @@ paths: minLength: 1 type: string type: array + queries: + items: + allOf: + - type: object + properties: + id: + minLength: 1 + type: string + title: + minLength: 1 + type: string + required: + - id + - title + - type: object + properties: + kql: + additionalProperties: false + type: object + properties: + query: + minLength: 1 + type: string + required: + - query + required: + - kql + type: array required: - dashboards + - queries - type: object properties: stream: @@ -47639,8 +47668,37 @@ paths: minLength: 1 type: string type: array + queries: + items: + allOf: + - type: object + properties: + id: + minLength: 1 + type: string + title: + minLength: 1 + type: string + required: + - id + - title + - type: object + properties: + kql: + additionalProperties: false + type: object + properties: + query: + minLength: 1 + type: string + required: + - query + required: + - kql + type: array required: - dashboards + - queries - type: object properties: stream: @@ -48552,8 +48610,37 @@ paths: minLength: 1 type: string type: array + queries: + items: + allOf: + - type: object + properties: + id: + minLength: 1 + type: string + title: + minLength: 1 + type: string + required: + - id + - title + - type: object + properties: + kql: + additionalProperties: false + type: object + properties: + query: + minLength: 1 + type: string + required: + - query + required: + - kql + type: array required: - dashboards + - queries - type: object properties: stream: @@ -50995,6 +51082,243 @@ paths: tags: - streams x-state: Technical Preview + /api/streams/{name}/queries: + get: + description: Fetches all queries linked to a stream that are visible to the current user in the current space. + operationId: get-streams-name-queries + parameters: + - in: path + name: name + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + anyOf: + - additionalProperties: false + type: object + properties: {} + - enum: + - 'null' + nullable: true + - not: {} + responses: {} + summary: Get stream queries + tags: + - streams + x-state: Technical Preview + /api/streams/{name}/queries/_bulk: + post: + description: Bulk update queries of a stream. Can add new queries and delete existing ones. + operationId: post-streams-name-queries-bulk + parameters: + - description: A required header to protect against CSRF attacks + in: header + name: kbn-xsrf + required: true + schema: + example: 'true' + type: string + - in: path + name: name + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + additionalProperties: false + type: object + properties: + operations: + items: + anyOf: + - additionalProperties: false + type: object + properties: + index: + allOf: + - type: object + properties: + id: + minLength: 1 + type: string + title: + minLength: 1 + type: string + required: + - id + - title + - type: object + properties: + kql: + additionalProperties: false + type: object + properties: + query: + minLength: 1 + type: string + required: + - query + required: + - kql + required: + - index + - additionalProperties: false + type: object + properties: + delete: + additionalProperties: false + type: object + properties: + id: + type: string + required: + - id + required: + - delete + type: array + required: + - operations + responses: {} + summary: Bulk update queries + tags: + - streams + x-state: Technical Preview + /api/streams/{name}/queries/{queryId}: + delete: + description: Remove a query from a stream. Noop if the query is not found on the stream. + operationId: delete-streams-name-queries-queryid + parameters: + - description: A required header to protect against CSRF attacks + in: header + name: kbn-xsrf + required: true + schema: + example: 'true' + type: string + - in: path + name: name + required: true + schema: + type: string + - in: path + name: queryId + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + anyOf: + - additionalProperties: false + type: object + properties: {} + - enum: + - 'null' + nullable: true + - not: {} + responses: {} + summary: Remove a query from a stream + tags: + - streams + x-state: Technical Preview + put: + description: Adds a query to a stream. Noop if the query is already present on the stream. + operationId: put-streams-name-queries-queryid + parameters: + - description: A required header to protect against CSRF attacks + in: header + name: kbn-xsrf + required: true + schema: + example: 'true' + type: string + - in: path + name: name + required: true + schema: + type: string + - in: path + name: queryId + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + additionalProperties: false + type: object + properties: + kql: + additionalProperties: false + type: object + properties: + query: + minLength: 1 + type: string + required: + - query + title: + minLength: 1 + type: string + required: + - title + - kql + responses: {} + summary: Upsert a query to a stream + tags: + - streams + x-state: Technical Preview + /api/streams/{name}/significant_events: + get: + description: Read the significant events + operationId: get-streams-name-significant-events + parameters: + - in: path + name: name + required: true + schema: + type: string + - in: query + name: from + required: true + schema: + format: date-time + type: string + - in: query + name: to + required: true + schema: + format: date-time + type: string + - in: query + name: bucketSize + required: true + schema: + type: string + requestBody: + content: + application/json: + schema: + anyOf: + - additionalProperties: false + type: object + properties: {} + - enum: + - 'null' + nullable: true + - not: {} + responses: {} + summary: Read the significant events + tags: + - streams + x-state: Technical Preview /api/task_manager/_health: get: description: | diff --git a/src/platform/packages/shared/kbn-storage-adapter/index.ts b/src/platform/packages/shared/kbn-storage-adapter/index.ts index 0b35732928f57..134619d295c15 100644 --- a/src/platform/packages/shared/kbn-storage-adapter/index.ts +++ b/src/platform/packages/shared/kbn-storage-adapter/index.ts @@ -135,24 +135,24 @@ type Exact = T extends U // The IStorageClient type then checks if the application type is a subset of the storage // document type. If this is not the case, the IStorageClient type is set to never, which // will cause a type error in the consuming code. -export type IStorageClient = Exact< - ApplicationDocument, - Partial> -> extends true - ? InternalIStorageClient> +export type IStorageClient< + TSchema extends IndexStorageSettings, + TApplicationType extends StorageDocumentOf +> = Exact> extends true + ? InternalIStorageClient : never; export type SimpleIStorageClient = IStorageClient< TStorageSettings, - Omit, '_id'> + StorageDocumentOf >; -export type ApplicationDocument = TApplicationType & { _id: string }; - -export type StorageDocumentOf = StorageFieldTypeOf<{ - type: 'object'; - properties: TStorageSettings['schema']['properties']; -}> & { _id: string }; +export type StorageDocumentOf = Partial< + StorageFieldTypeOf<{ + type: 'object'; + properties: TStorageSettings['schema']['properties']; + }> +>; export { StorageIndexAdapter } from './src/index_adapter'; diff --git a/src/platform/packages/shared/kbn-storage-adapter/src/index_adapter/index.ts b/src/platform/packages/shared/kbn-storage-adapter/src/index_adapter/index.ts index d0c3a11ee0bc0..5158a870fb82f 100644 --- a/src/platform/packages/shared/kbn-storage-adapter/src/index_adapter/index.ts +++ b/src/platform/packages/shared/kbn-storage-adapter/src/index_adapter/index.ts @@ -35,7 +35,6 @@ import { StorageClientSearchResponse, StorageClientClean, StorageClientCleanResponse, - ApplicationDocument, InternalIStorageClient, } from '../..'; import { getSchemaVersion } from '../get_schema_version'; @@ -97,7 +96,10 @@ function wrapEsCall(p: Promise): Promise { * - Index Lifecycle Management * - Schema upgrades w/ fallbacks */ -export class StorageIndexAdapter { +export class StorageIndexAdapter< + TStorageSettings extends IndexStorageSettings, + TApplicationType extends Partial> +> { private readonly logger: Logger; constructor( private readonly esClient: ElasticsearchClient, @@ -320,7 +322,7 @@ export class StorageIndexAdapter> = async (request) => { + private search: StorageClientSearch = async (request) => { return (await wrapEsCall( this.esClient .search({ @@ -328,7 +330,7 @@ export class StorageIndexAdapter, any> => { + .catch((error): StorageClientSearchResponse => { if (isNotFoundError(error)) { return { _shards: { @@ -349,10 +351,10 @@ export class StorageIndexAdapter>>; + )) as unknown as ReturnType>; }; - private index: StorageClientIndex> = async ({ + private index: StorageClientIndex = async ({ id, refresh = 'wait_for', ...request @@ -391,7 +393,7 @@ export class StorageIndexAdapter> = ({ + private bulk: StorageClientBulk = ({ operations, refresh = 'wait_for', ...request @@ -522,10 +524,7 @@ export class StorageIndexAdapter> = async ({ - id, - ...request - }) => { + private get: StorageClientGet = async ({ id, ...request }) => { const response = await this.search({ track_total_hits: false, size: 1, @@ -566,7 +565,7 @@ export class StorageIndexAdapter, + _source: hit._source as TApplicationType, _ignored: hit._ignored, _primary_term: hit._primary_term, _routing: hit._routing, @@ -582,7 +581,7 @@ export class StorageIndexAdapter> { + getClient(): InternalIStorageClient { return { bulk: this.bulk, delete: this.delete, @@ -596,4 +595,4 @@ export class StorageIndexAdapter = - StorageIndexAdapter, '_id'>>; + StorageIndexAdapter>; diff --git a/src/platform/packages/shared/kbn-zod-helpers/src/deep_strict.ts b/src/platform/packages/shared/kbn-zod-helpers/src/deep_strict.ts index d458b301b0299..0164f4dfd3fa5 100644 --- a/src/platform/packages/shared/kbn-zod-helpers/src/deep_strict.ts +++ b/src/platform/packages/shared/kbn-zod-helpers/src/deep_strict.ts @@ -33,6 +33,7 @@ const primitiveTypes = [ ZodFirstPartyTypeKind.ZodLiteral, ZodFirstPartyTypeKind.ZodEnum, ZodFirstPartyTypeKind.ZodNativeEnum, + ZodFirstPartyTypeKind.ZodDate, ] as const; /** @@ -56,6 +57,7 @@ const typeNames = [ ZodFirstPartyTypeKind.ZodDefault, ZodFirstPartyTypeKind.ZodLazy, ZodFirstPartyTypeKind.ZodNullable, + ZodFirstPartyTypeKind.ZodPipeline, ...primitiveTypes, ...dangerousTypes, ] as const; @@ -235,6 +237,9 @@ function getHandlingSchemas(schema: z.Schema, key: string, value: object): z.Sch case ZodFirstPartyTypeKind.ZodNullable: return [def.innerType]; + + case ZodFirstPartyTypeKind.ZodPipeline: + return [def.in]; } } diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/models/base/api.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/base/api.ts index ec206746c8c24..72f24eb7470bb 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/src/models/base/api.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/base/api.ts @@ -5,21 +5,70 @@ * 2.0. */ +import { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types'; import { z } from '@kbn/zod'; import { NonEmptyString } from '@kbn/zod-helpers'; +import { primitive } from '../record_types'; +import { createIsNarrowSchema } from '../../helpers'; + +interface StreamQueryBase { + id: string; + title: string; +} + +export interface StreamQueryKql extends StreamQueryBase { + kql: { + query: string; + }; +} + +export type StreamQuery = StreamQueryKql; export interface StreamGetResponseBase { dashboards: string[]; + queries: StreamQuery[]; } export interface StreamUpsertRequestBase { dashboards: string[]; + queries: StreamQuery[]; } +const streamQueryBaseSchema: z.Schema = z.object({ + id: NonEmptyString, + title: NonEmptyString, +}); + +export const streamQueryKqlSchema: z.Schema = z.intersection( + streamQueryBaseSchema, + z.object({ + kql: z.object({ + query: NonEmptyString, + }), + }) +); + +export const querySchema: z.ZodType = z.lazy(() => + z.record(z.union([primitive, z.array(z.union([primitive, querySchema])), querySchema])) +); + +export const streamQuerySchema: z.Schema = streamQueryKqlSchema; + +export const upsertStreamQueryRequestSchema = z.object({ + title: NonEmptyString, + kql: z.object({ + query: NonEmptyString, + }), +}); + +export const isStreamQueryKql = createIsNarrowSchema(streamQuerySchema, streamQueryKqlSchema); + export const streamUpsertRequestSchemaBase: z.Schema = z.object({ dashboards: z.array(NonEmptyString), + queries: z.array(streamQuerySchema), }); export const streamGetResponseSchemaBase: z.Schema = z.object({ dashboards: z.array(NonEmptyString), + queries: z.array(streamQuerySchema), }); diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/models/index.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/index.ts index 759bb5a2b5a35..223ddb512242a 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/src/models/index.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/index.ts @@ -6,9 +6,11 @@ */ export * from './ingest'; +export * from './base/api'; export * from './api'; export * from './core'; export * from './helpers'; export * from './group'; export * from './record_types'; export * from './content'; +export * from './significant_events'; diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/api.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/api.ts index 1a27700625dd9..3f659e2cd7e03 100644 --- a/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/api.ts +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/ingest/api.ts @@ -149,6 +149,8 @@ export { ingestStreamUpsertRequestSchema, ingestUpsertRequestSchema, ingestStreamGetResponseSchema, + wiredStreamUpsertRequestSchema, + unwiredStreamUpsertRequestSchema, wiredStreamGetResponseSchema, unwiredStreamGetResponseSchema, type IngestGetResponse, diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/models/significant_events/api.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/significant_events/api.ts new file mode 100644 index 0000000000000..df84cc7d6b616 --- /dev/null +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/significant_events/api.ts @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StreamQueryKql } from '../base/api'; + +/** + * SignificantEvents Get Response + */ +type ChangePointsType = + | 'dip' + | 'distribution_change' + | 'non_stationary' + | 'spike' + | 'stationary' + | 'step_change' + | 'trend_change'; + +type SignificantEventsResponse = StreamQueryKql & { + occurrences: Array<{ date: string; count: number }>; + change_points: { + type: Record; + }; +}; + +type SignificantEventsGetResponse = SignificantEventsResponse[]; + +export type { SignificantEventsResponse, SignificantEventsGetResponse }; diff --git a/x-pack/platform/packages/shared/kbn-streams-schema/src/models/significant_events/index.ts b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/significant_events/index.ts new file mode 100644 index 0000000000000..473b51dc47c28 --- /dev/null +++ b/x-pack/platform/packages/shared/kbn-streams-schema/src/models/significant_events/index.ts @@ -0,0 +1,8 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './api'; diff --git a/x-pack/platform/plugins/shared/streams/common/assets.ts b/x-pack/platform/plugins/shared/streams/common/assets.ts index 9353418f14618..85d87e43aed47 100644 --- a/x-pack/platform/plugins/shared/streams/common/assets.ts +++ b/x-pack/platform/plugins/shared/streams/common/assets.ts @@ -6,35 +6,60 @@ */ import { ValuesType } from 'utility-types'; +import { StreamQuery } from '@kbn/streams-schema'; export const ASSET_TYPES = { Dashboard: 'dashboard' as const, Rule: 'rule' as const, Slo: 'slo' as const, + Query: 'query' as const, }; export type AssetType = ValuesType; -export interface AssetLink { - assetType: TAssetType; - assetId: string; +interface AssetLinkBase { + 'asset.uuid': string; + 'asset.type': TAssetType; + 'asset.id': string; } -export type DashboardLink = AssetLink<'dashboard'>; -export type SloLink = AssetLink<'slo'>; -export type RuleLink = AssetLink<'rule'>; +export type DashboardLink = AssetLinkBase<'dashboard'>; +export type SloLink = AssetLinkBase<'slo'>; +export type RuleLink = AssetLinkBase<'rule'>; +export type QueryLink = AssetLinkBase<'query'> & { + query: StreamQuery; +}; + +export type AssetLink = DashboardLink | SloLink | RuleLink | QueryLink; + +type OmitFrom = T extends any ? (K extends keyof T ? Omit : never) : never; + +export type AssetLinkRequest = OmitFrom; + +export type AssetUnlinkRequest = Pick; -export interface Asset extends AssetLink { - label: string; +interface AssetBase extends AssetLinkBase { + title: string; +} + +interface SavedObjectAssetBase + extends AssetBase { tags: string[]; } -export type DashboardAsset = Asset<'dashboard'>; -export type SloAsset = Asset<'slo'>; -export type RuleAsset = Asset<'rule'>; +export type DashboardAsset = SavedObjectAssetBase<'dashboard'>; +export type SloAsset = SavedObjectAssetBase<'slo'>; +export type RuleAsset = SavedObjectAssetBase<'rule'>; +export type QueryAsset = AssetBase<'query'> & { + query: StreamQuery; +}; + +export type Asset = DashboardAsset | SloAsset | RuleAsset | QueryAsset; +export type AssetWithoutUuid = Omit; export interface AssetTypeToAssetMap { [ASSET_TYPES.Dashboard]: DashboardAsset; [ASSET_TYPES.Slo]: SloAsset; [ASSET_TYPES.Rule]: RuleAsset; + [ASSET_TYPES.Query]: QueryAsset; } diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/assets/asset_client.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/assets/asset_client.ts index 9fab99661222c..6e9fbe14796aa 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/assets/asset_client.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/assets/asset_client.ts @@ -4,33 +4,48 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ +import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types'; import { SanitizedRule } from '@kbn/alerting-plugin/common'; import { RulesClient } from '@kbn/alerting-plugin/server'; import { SavedObject, SavedObjectsClientContract } from '@kbn/core/server'; import { IStorageClient } from '@kbn/storage-adapter'; -import { keyBy } from 'lodash'; +import { keyBy, partition } from 'lodash'; import objectHash from 'object-hash'; import pLimit from 'p-limit'; -import type { QueryDslQueryContainer } from '@elastic/elasticsearch/lib/api/types'; import { ASSET_TYPES, Asset, AssetLink, + AssetLinkRequest, AssetType, - DashboardAsset, - SloAsset, - RuleAsset, + AssetUnlinkRequest, + AssetWithoutUuid, + DashboardLink, + QueryAsset, + QueryLink, + RuleLink, + SloLink, } from '../../../../common/assets'; -import { ASSET_ENTITY_ID, ASSET_ENTITY_TYPE, ASSET_TYPE } from './fields'; +import { + ASSET_ID, + ASSET_TYPE, + ASSET_UUID, + QUERY_KQL_BODY, + QUERY_TITLE, + STREAM_NAME, +} from './fields'; import { AssetStorageSettings } from './storage_settings'; +import { AssetNotFoundError } from '../errors/asset_not_found_error'; interface TermQueryOpts { queryEmptyString: boolean; } +type TermQueryFieldValue = string | boolean | number | null; + function termQuery( field: T, - value: string | boolean | number | undefined | null, + value: TermQueryFieldValue | undefined, opts: TermQueryOpts = { queryEmptyString: true } ): QueryDslQueryContainer[] { if (value === null || value === undefined || (!opts.queryEmptyString && value === '')) { @@ -40,76 +55,126 @@ function termQuery( return [{ term: { [field]: value } }]; } +function termsQuery( + field: T, + values: Array | null | undefined +): QueryDslQueryContainer[] { + if (values === null || values === undefined || values.length === 0) { + return []; + } + + const filteredValues = values.filter( + (value) => value !== undefined + ) as unknown as TermQueryFieldValue[]; + + return [{ terms: { [field]: filteredValues } }]; +} + +function getUuid(name: string, asset: Pick) { + return objectHash({ + [STREAM_NAME]: name, + [ASSET_ID]: asset[ASSET_ID], + [ASSET_TYPE]: asset[ASSET_TYPE], + }); +} + +function toAssetLink( + name: string, + asset: TAssetLink +): TAssetLink & { [ASSET_UUID]: string } { + return { + ...asset, + [ASSET_UUID]: getUuid(name, asset), + }; +} + function sloSavedObjectToAsset( sloId: string, savedObject: SavedObject<{ name: string; tags: string[] }> -): SloAsset { +) { return { - assetId: sloId, - label: savedObject.attributes.name, + [ASSET_ID]: sloId, + [ASSET_TYPE]: 'slo' as const, + title: savedObject.attributes.name, tags: savedObject.attributes.tags.concat( savedObject.references.filter((ref) => ref.type === 'tag').map((ref) => ref.id) ), - assetType: 'slo', }; } function dashboardSavedObjectToAsset( dashboardId: string, savedObject: SavedObject<{ title: string }> -): DashboardAsset { +) { return { - assetId: dashboardId, - label: savedObject.attributes.title, + [ASSET_ID]: dashboardId, + [ASSET_TYPE]: 'dashboard' as const, + title: savedObject.attributes.title, tags: savedObject.references.filter((ref) => ref.type === 'tag').map((ref) => ref.id), - assetType: 'dashboard', }; } -function ruleToAsset(ruleId: string, rule: SanitizedRule): RuleAsset { +function ruleToAsset(ruleId: string, rule: SanitizedRule) { return { - assetType: 'rule', - assetId: ruleId, - label: rule.name, + [ASSET_TYPE]: 'rule' as const, + [ASSET_ID]: ruleId, + title: rule.name, tags: rule.tags, }; } -function getAssetDocument({ - assetId, - entityId, - entityType, - assetType, -}: AssetLink & { entityId: string; entityType: string }) { - const doc = { - 'asset.id': assetId, - 'asset.type': assetType, - 'entity.id': entityId, - 'entity.type': entityType, - }; +type StoredQueryLink = Omit & { + [QUERY_TITLE]: string; + [QUERY_KQL_BODY]: string; +}; - return { - _id: objectHash(doc), - ...doc, - }; -} +export type StoredAssetLink = (SloLink | RuleLink | DashboardLink | StoredQueryLink) & { + [STREAM_NAME]: string; +}; interface AssetBulkIndexOperation { - index: { asset: AssetLink }; + index: { asset: AssetLinkRequest }; } interface AssetBulkDeleteOperation { - delete: { asset: AssetLink }; + delete: { asset: AssetUnlinkRequest }; } -export type AssetBulkOperation = AssetBulkIndexOperation | AssetBulkDeleteOperation; +function fromStorage(link: StoredAssetLink): AssetLink { + if (link['asset.type'] === 'query') { + return { + ...link, + query: { + id: link['asset.id'], + title: link['query.title'], + kql: { + query: link['query.kql.query'], + }, + }, + } satisfies QueryLink; + } + return link; +} + +function toStorage(name: string, request: AssetLinkRequest): StoredAssetLink { + const link = toAssetLink(name, request); + if (link['asset.type'] === 'query') { + const { query, ...rest } = link; + return { + ...rest, + [STREAM_NAME]: name, + 'query.title': query.title, + 'query.kql.query': query.kql.query, + }; + } -export interface StoredAssetLink { - 'asset.id': string; - 'asset.type': AssetType; - 'entity.id': string; - 'entity.type': string; + return { + ...link, + [STREAM_NAME]: name, + }; } +export type AssetBulkOperation = AssetBulkIndexOperation | AssetBulkDeleteOperation; + export class AssetClient { constructor( private readonly clients: { @@ -119,178 +184,132 @@ export class AssetClient { } ) {} - async linkAsset( - properties: { - entityId: string; - entityType: string; - } & AssetLink - ) { - const { _id: id, ...document } = getAssetDocument(properties); + async linkAsset(name: string, link: AssetLinkRequest) { + const document = toStorage(name, link); await this.clients.storageClient.index({ - id, + id: document[ASSET_UUID], document, }); } - async syncAssetList({ - entityId, - entityType, - assetType, - assetIds, - }: { - entityId: string; - entityType: string; - assetType: AssetType; - assetIds: string[]; - }) { + async syncAssetList( + name: string, + links: AssetLinkRequest[] + ): Promise<{ deleted: AssetLink[]; indexed: AssetLink[] }> { const assetsResponse = await this.clients.storageClient.search({ size: 10_000, track_total_hits: false, query: { bool: { - filter: [ - ...termQuery(ASSET_ENTITY_ID, entityId), - ...termQuery(ASSET_ENTITY_TYPE, entityType), - ...termQuery(ASSET_TYPE, assetType), - ], + filter: [...termQuery(STREAM_NAME, name)], }, }, }); - const existingAssetLinks = assetsResponse.hits.hits.map((hit) => hit._source); + const existingAssetLinks = assetsResponse.hits.hits.map((hit) => { + return fromStorage(hit._source); + }); - const newAssetIds = assetIds.filter( - (assetId) => - !existingAssetLinks.some((existingAssetLink) => existingAssetLink['asset.id'] === assetId) - ); + const newStoredLinks = links.map((link) => { + return toAssetLink(name, link); + }); - const assetIdsToRemove = existingAssetLinks - .map((existingAssetLink) => existingAssetLink['asset.id']) - .filter((assetId) => !assetIds.includes(assetId)); - - await Promise.all([ - ...newAssetIds.map((assetId) => - this.linkAsset({ - entityId, - entityType, - assetId, - assetType, - }) - ), - ...assetIdsToRemove.map((assetId) => - this.unlinkAsset({ - entityId, - entityType, - assetId, - assetType, - }) - ), - ]); + const nextIds = newStoredLinks.map((link) => link[ASSET_UUID]); + + const docsToRemove = existingAssetLinks.filter((link) => !nextIds.includes(link[ASSET_UUID])); + + const operations: AssetBulkOperation[] = [ + ...docsToRemove.map((asset) => ({ delete: { asset } })), + ...newStoredLinks.map((asset) => ({ index: { asset } })), + ]; + + if (operations.length) { + await this.bulk(name, operations); + } + + return { + deleted: docsToRemove, + indexed: newStoredLinks, + }; } - async unlinkAsset( - properties: { - entityId: string; - entityType: string; - } & AssetLink - ) { - const { _id: id } = getAssetDocument(properties); + async unlinkAsset(name: string, asset: AssetUnlinkRequest) { + const id = getUuid(name, asset); - await this.clients.storageClient.delete({ id }); + const { result } = await this.clients.storageClient.delete({ id }); + + if (result === 'not_found') { + throw new AssetNotFoundError(`${asset[ASSET_TYPE]} not found`); + } } async clean() { await this.clients.storageClient.clean(); } - async getAssetIds({ - entityId, - entityType, - assetType, - }: { - entityId: string; - entityType: 'stream'; - assetType: AssetType; - }): Promise { + async getAssetLinks( + name: string, + types?: TAssetType[] + ): Promise>> { const assetsResponse = await this.clients.storageClient.search({ size: 10_000, track_total_hits: false, query: { bool: { filter: [ - ...termQuery(ASSET_ENTITY_ID, entityId), - ...termQuery(ASSET_ENTITY_TYPE, entityType), - ...termQuery(ASSET_TYPE, assetType), + ...termQuery(STREAM_NAME, name), + ...(types?.length ? termsQuery(ASSET_TYPE, types) : []), ], }, }, }); - return assetsResponse.hits.hits.map((hit) => hit._source['asset.id']); + return assetsResponse.hits.hits.map( + (hit) => fromStorage(hit._source) as Extract + ); } - async bulk( - { entityId, entityType }: { entityId: string; entityType: string }, - operations: AssetBulkOperation[] - ) { + async bulk(name: string, operations: AssetBulkOperation[]) { return await this.clients.storageClient.bulk({ operations: operations.map((operation) => { - const { _id, ...document } = getAssetDocument({ - ...Object.values(operation)[0].asset, - entityId, - entityType, - }); - if ('index' in operation) { + const document = toStorage(name, Object.values(operation)[0].asset as AssetLinkRequest); return { index: { document, - _id, + _id: document[ASSET_UUID], }, }; } + const id = getUuid(name, operation.delete.asset); return { delete: { - _id, + _id: id, }, }; }), }); } - async getAssets({ - entityId, - entityType, - }: { - entityId: string; - entityType: 'stream'; - }): Promise { - const assetsResponse = await this.clients.storageClient.search({ - size: 10_000, - track_total_hits: false, - query: { - bool: { - filter: [ - ...termQuery(ASSET_ENTITY_ID, entityId), - ...termQuery(ASSET_ENTITY_TYPE, entityType), - ], - }, - }, - }); - - const assetLinks = assetsResponse.hits.hits.map((hit) => hit._source); + async getAssets(name: string): Promise { + const assetLinks = await this.getAssetLinks(name); if (!assetLinks.length) { return []; } + const [queryAssetLinks, savedObjectAssetLinks] = partition( + assetLinks, + (link): link is QueryLink => link[ASSET_TYPE] === 'query' + ); + const idsByType = Object.fromEntries( Object.values(ASSET_TYPES).map((type) => [type, [] as string[]]) ) as Record; - assetLinks.forEach((assetLink) => { + savedObjectAssetLinks.forEach((assetLink) => { const assetType = assetLink['asset.type'] as AssetType; const assetId = assetLink['asset.id']; idsByType[assetType].push(assetId); @@ -307,7 +326,7 @@ export class AssetClient { .then((response) => { const dashboardsById = keyBy(response.saved_objects, 'id'); - return idsByType.dashboard.flatMap((dashboardId): Asset[] => { + return idsByType.dashboard.flatMap((dashboardId) => { const dashboard = dashboardsById[dashboardId]; if (dashboard && !dashboard.error) { return [dashboardSavedObjectToAsset(dashboardId, dashboard)]; @@ -319,7 +338,7 @@ export class AssetClient { Promise.all( idsByType.rule.map((ruleId) => { return limiter(() => - this.clients.rulesClient.get({ id: ruleId }).then((rule): Asset => { + this.clients.rulesClient.get({ id: ruleId }).then((rule) => { return ruleToAsset(ruleId, rule); }) ); @@ -337,7 +356,7 @@ export class AssetClient { .then((soResponse) => { const sloDefinitionsById = keyBy(soResponse.saved_objects, 'slo.attributes.id'); - return idsByType.slo.flatMap((sloId): Asset[] => { + return idsByType.slo.flatMap((sloId) => { const sloDefinition = sloDefinitionsById[sloId]; if (sloDefinition && !sloDefinition.error) { return [sloSavedObjectToAsset(sloId, sloDefinition)]; @@ -348,7 +367,25 @@ export class AssetClient { : [], ]); - return [...dashboards, ...rules, ...slos]; + const savedObjectAssetsWithUuids = [...dashboards, ...rules, ...slos].map((asset) => { + return { + ...asset, + [ASSET_UUID]: getUuid(name, asset), + }; + }); + + return [ + ...savedObjectAssetsWithUuids, + ...queryAssetLinks.map((link): QueryAsset => { + return { + [ASSET_ID]: link[ASSET_ID], + [ASSET_UUID]: link[ASSET_UUID], + [ASSET_TYPE]: link[ASSET_TYPE], + query: link.query, + title: link.query.title, + }; + }), + ]; } async getSuggestions({ @@ -359,7 +396,7 @@ export class AssetClient { query: string; assetTypes?: AssetType[]; tags?: string[]; - }): Promise<{ hasMore: boolean; assets: Asset[] }> { + }): Promise<{ hasMore: boolean; assets: AssetWithoutUuid[] }> { const perPage = 101; const searchAll = !assetTypes; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/assets/fields.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/assets/fields.ts index 9dc57594829f7..f8dbae061d97b 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/assets/fields.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/assets/fields.ts @@ -5,7 +5,10 @@ * 2.0. */ -export const ASSET_ENTITY_ID = 'entity.id'; -export const ASSET_ENTITY_TYPE = 'entity.type'; -export const ASSET_ASSET_ID = 'asset.id'; +export const STREAM_NAME = 'stream.name'; +export const ASSET_UUID = 'asset.uuid'; +export const ASSET_ID = 'asset.id'; export const ASSET_TYPE = 'asset.type'; + +export const QUERY_KQL_BODY = 'query.kql.query'; +export const QUERY_TITLE = 'query.title'; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/assets/storage_settings.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/assets/storage_settings.ts index 2d9515ddd281f..62fcf7d199162 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/assets/storage_settings.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/assets/storage_settings.ts @@ -6,16 +6,25 @@ */ import { IndexStorageSettings, types } from '@kbn/storage-adapter'; -import { ASSET_ASSET_ID, ASSET_ENTITY_ID, ASSET_ENTITY_TYPE, ASSET_TYPE } from './fields'; +import { + ASSET_ID, + ASSET_TYPE, + ASSET_UUID, + QUERY_KQL_BODY, + QUERY_TITLE, + STREAM_NAME, +} from './fields'; export const assetStorageSettings = { name: '.kibana_streams_assets', schema: { properties: { - [ASSET_ASSET_ID]: types.keyword(), + [ASSET_UUID]: types.keyword(), + [ASSET_ID]: types.keyword(), [ASSET_TYPE]: types.keyword(), - [ASSET_ENTITY_ID]: types.keyword(), - [ASSET_ENTITY_TYPE]: types.keyword(), + [STREAM_NAME]: types.keyword(), + [QUERY_KQL_BODY]: types.match_only_text(), + [QUERY_TITLE]: types.keyword(), }, }, } satisfies IndexStorageSettings; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts index 698448daa02d8..8d9064d1e1d84 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/client.ts @@ -11,7 +11,7 @@ import { QueryDslQueryContainer, Result, } from '@elastic/elasticsearch/lib/api/types'; -import type { IScopedClusterClient, Logger } from '@kbn/core/server'; +import { type IScopedClusterClient, type KibanaRequest, type Logger } from '@kbn/core/server'; import { isResponseError } from '@kbn/es-errors'; import { Condition, @@ -71,6 +71,7 @@ import { MalformedStreamIdError } from './errors/malformed_stream_id_error'; import { SecurityError } from './errors/security_error'; import { NameTakenError } from './errors/name_taken_error'; import { MalformedStreamError } from './errors/malformed_stream_error'; +import { ASSET_ID, ASSET_TYPE } from './assets/fields'; interface AcknowledgeResponse { acknowledged: true; @@ -93,6 +94,20 @@ function isDefinitionNotFoundError(error: unknown): error is DefinitionNotFoundE return error instanceof DefinitionNotFoundError; } +/* + * When calling into Elasticsearch, the stack trace is lost. + * If we create an error before calling, and append it to + * any stack of the caught error, we get a more useful stack + * trace. + */ +function wrapEsCall(p: Promise): Promise { + const error = new Error(); + return p.catch((caughtError) => { + caughtError.stack += error.stack; + throw caughtError; + }); +} + export class StreamsClient { constructor( private readonly dependencies: { @@ -101,6 +116,7 @@ export class StreamsClient { storageClient: StreamsStorageClient; logger: Logger; isServerless: boolean; + request: KibanaRequest; } ) {} @@ -136,6 +152,7 @@ export class StreamsClient { await this.upsertStream({ request: { dashboards: [], + queries: [], stream: omit(rootStreamDefinition, 'name'), }, name: rootStreamDefinition.name, @@ -150,10 +167,11 @@ export class StreamsClient { * such as data streams. That means it deletes all data * belonging to wired streams. * - * It does NOT delete ingest streams. + * It does NOT delete unwired streams. */ async disableStreams(): Promise { const isEnabled = await this.isStreamsEnabled(); + if (!isEnabled) { return { acknowledged: true, result: 'noop' }; } @@ -243,7 +261,7 @@ export class StreamsClient { request: StreamUpsertRequest; }): Promise { const stream: StreamDefinition = { ...request.stream, name }; - const { dashboards } = request; + const { dashboards, queries } = request; const { result, parentDefinition } = await this.validateAndUpsertStream({ definition: stream, }); @@ -275,6 +293,7 @@ export class StreamsClient { name: parentId, request: { dashboards: [], + queries: [], stream: { ingest: { lifecycle: { inherit: {} }, @@ -295,12 +314,19 @@ export class StreamsClient { } } - await this.dependencies.assetClient.syncAssetList({ - entityId: stream.name, - entityType: 'stream', - assetIds: dashboards, - assetType: 'dashboard', - }); + const queryLinks = queries.map((query) => ({ + [ASSET_ID]: query.id, + [ASSET_TYPE]: 'query' as const, + query, + })); + + await this.dependencies.assetClient.syncAssetList(stream.name, [ + ...dashboards.map((dashboard) => ({ + [ASSET_ID]: dashboard, + [ASSET_TYPE]: 'dashboard' as const, + })), + ...queryLinks, + ]); return { acknowledged: true, result }; } @@ -689,28 +715,28 @@ export class StreamsClient { } async getDataStream(name: string): Promise { - return this.dependencies.scopedClusterClient.asCurrentUser.indices - .getDataStream({ name }) - .then((response) => { - if (response.data_streams.length === 0) { - throw new errors.ResponseError({ - meta: { - aborted: false, - attempts: 1, - connection: null, - context: null, - name: 'resource_not_found_exception', - request: {} as unknown as DiagnosticResult['meta']['request'], - }, - warnings: [], - body: 'resource_not_found_exception', - statusCode: 404, - }); - } + return wrapEsCall( + this.dependencies.scopedClusterClient.asCurrentUser.indices.getDataStream({ name }) + ).then((response) => { + if (response.data_streams.length === 0) { + throw new errors.ResponseError({ + meta: { + aborted: false, + attempts: 1, + connection: null, + context: null, + name: 'resource_not_found_exception', + request: {} as unknown as DiagnosticResult['meta']['request'], + }, + warnings: [], + body: 'resource_not_found_exception', + statusCode: 404, + }); + } - const dataStream = response.data_streams[0]; - return dataStream; - }); + const dataStream = response.data_streams[0]; + return dataStream; + }); } /** @@ -774,8 +800,9 @@ export class StreamsClient { * stored definition). */ private async getUnmanagedDataStreams(): Promise { - const response = - await this.dependencies.scopedClusterClient.asCurrentUser.indices.getDataStream(); + const response = await wrapEsCall( + this.dependencies.scopedClusterClient.asCurrentUser.indices.getDataStream() + ); return response.data_streams.map((dataStream) => ({ name: dataStream.name, @@ -859,12 +886,7 @@ export class StreamsClient { await deleteStreamObjects({ scopedClusterClient, name: definition.name, logger }); } - await assetClient.syncAssetList({ - entityId: definition.name, - entityType: 'stream', - assetType: 'dashboard', - assetIds: [], - }); + await assetClient.syncAssetList(definition.name, []); await this.dependencies.storageClient.delete({ id: definition.name }); } diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts index e10edef3a38d0..de5d3c411bbbb 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/data_streams/manage_data_streams.ts @@ -5,8 +5,8 @@ * 2.0. */ -import { ElasticsearchClient, Logger } from '@kbn/core/server'; import { MappingTypeMapping } from '@elastic/elasticsearch/lib/api/types'; +import { ElasticsearchClient, Logger } from '@kbn/core/server'; import { IngestStreamLifecycle, isDslLifecycle, isIlmLifecycle } from '@kbn/streams-schema'; import { retryTransientEsErrors } from '../helpers/retry'; diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/errors/asset_not_found_error.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/errors/asset_not_found_error.ts new file mode 100644 index 0000000000000..a5915ca72ba0f --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/errors/asset_not_found_error.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { StatusError } from './status_error'; + +export class AssetNotFoundError extends StatusError { + constructor(message: string) { + super(message, 404); + this.name = 'AssetNotFoundError'; + } +} diff --git a/x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts b/x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts index bd39469d1b040..4b3ddde22ef1d 100644 --- a/x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts +++ b/x-pack/platform/plugins/shared/streams/server/lib/streams/service.ts @@ -47,11 +47,10 @@ export class StreamsService { const isServerless = coreStart.elasticsearch.getCapabilities().serverless; - const storageAdapter = new StorageIndexAdapter( - scopedClusterClient.asInternalUser, - logger, - streamsStorageSettings - ); + const storageAdapter = new StorageIndexAdapter< + StreamsStorageSettings, + StreamDefinition & { _id: string } + >(scopedClusterClient.asInternalUser, logger, streamsStorageSettings); return new StreamsClient({ assetClient, @@ -59,6 +58,7 @@ export class StreamsService { scopedClusterClient, storageClient: storageAdapter.getClient(), isServerless, + request, }); } } diff --git a/x-pack/platform/plugins/shared/streams/server/routes/content/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/content/route.ts index d7ec9f79e5c4a..a413f55bda186 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/content/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/content/route.ts @@ -5,18 +5,21 @@ * 2.0. */ -import { Readable } from 'stream'; -import { z } from '@kbn/zod'; +import { createSavedObjectsStreamFromNdJson } from '@kbn/core-saved-objects-server-internal/src/routes/utils'; +import { ContentPack, contentPackSchema } from '@kbn/streams-schema'; import { createConcatStream, createListStream, createMapStream, createPromiseFromStreams, } from '@kbn/utils'; -import { createSavedObjectsStreamFromNdJson } from '@kbn/core-saved-objects-server-internal/src/routes/utils'; -import { ContentPack, contentPackSchema } from '@kbn/streams-schema'; -import { createServerRoute } from '../create_server_route'; +import { z } from '@kbn/zod'; +import { Readable } from 'stream'; +import { Asset } from '../../../common'; +import { DashboardAsset, DashboardLink } from '../../../common/assets'; +import { ASSET_ID, ASSET_TYPE } from '../../lib/streams/assets/fields'; import { StatusError } from '../../lib/streams/errors/status_error'; +import { createServerRoute } from '../create_server_route'; const exportContentRoute = createServerRoute({ endpoint: 'POST /api/streams/{name}/content/export 2023-10-31', @@ -42,9 +45,11 @@ const exportContentRoute = createServerRoute({ await streamsClient.ensureStream(params.path.name); - const dashboards = await assetClient - .getAssets({ entityId: params.path.name, entityType: 'stream' }) - .then((assets) => assets.filter(({ assetType }) => assetType === 'dashboard')); + function isDashboard(asset: Asset): asset is DashboardAsset { + return asset[ASSET_TYPE] === 'dashboard'; + } + + const dashboards = (await assetClient.getAssets(params.path.name)).filter(isDashboard); if (dashboards.length === 0) { throw new StatusError(`No dashboards are linked to [${params.path.name}] stream`, 400); } @@ -52,7 +57,7 @@ const exportContentRoute = createServerRoute({ const exporter = (await context.core).savedObjects.getExporter(soClient); const exportStream = await exporter.exportByObjects({ request, - objects: dashboards.map((dashboard) => ({ id: dashboard.assetId, type: 'dashboard' })), + objects: dashboards.map((dashboard) => ({ id: dashboard[ASSET_ID], type: 'dashboard' })), includeReferencesDeep: true, }); @@ -131,19 +136,18 @@ const importContentRoute = createServerRoute({ overwrite: true, }); - const createdAssets = (successResults ?? []) - .filter((savedObject) => savedObject.type === 'dashboard') - .map((dashboard) => ({ - assetType: 'dashboard' as const, - assetId: dashboard.destinationId ?? dashboard.id, - })); + const createdAssets: Array> = + successResults + ?.filter((savedObject) => savedObject.type === 'dashboard') + .map((dashboard) => ({ + [ASSET_TYPE]: 'dashboard', + [ASSET_ID]: dashboard.destinationId ?? dashboard.id, + })) ?? []; if (createdAssets.length > 0) { await assetClient.bulk( - { entityId: params.path.name, entityType: 'stream' }, - createdAssets.map((asset) => ({ - index: { asset }, - })) + params.path.name, + createdAssets.map((asset) => ({ index: { asset } })) ); } diff --git a/x-pack/platform/plugins/shared/streams/server/routes/dashboards/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/dashboards/route.ts index 66cc10cae5597..a42202f3cc5b3 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/dashboards/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/dashboards/route.ts @@ -10,10 +10,11 @@ import { ErrorCause } from '@elastic/elasticsearch/lib/api/types'; import { internal } from '@hapi/boom'; import { Asset, DashboardAsset } from '../../../common/assets'; import { createServerRoute } from '../create_server_route'; +import { ASSET_ID, ASSET_TYPE } from '../../lib/streams/assets/fields'; export interface SanitizedDashboardAsset { id: string; - label: string; + title: string; tags: string[]; } @@ -41,8 +42,8 @@ export type BulkUpdateAssetsResponse = function sanitizeDashboardAsset(asset: DashboardAsset): SanitizedDashboardAsset { return { - id: asset.assetId, - label: asset.label, + id: asset[ASSET_ID], + title: asset.title, tags: asset.tags, }; } @@ -79,16 +80,11 @@ const listDashboardsRoute = createServerRoute({ } = params; function isDashboard(asset: Asset): asset is DashboardAsset { - return asset.assetType === 'dashboard'; + return asset[ASSET_TYPE] === 'dashboard'; } return { - dashboards: ( - await assetClient.getAssets({ - entityId: streamName, - entityType: 'stream', - }) - ) + dashboards: (await assetClient.getAssets(streamName)) .filter(isDashboard) .map(sanitizeDashboardAsset), }; @@ -127,11 +123,9 @@ const linkDashboardRoute = createServerRoute({ await streamsClient.ensureStream(streamName); - await assetClient.linkAsset({ - entityId: streamName, - entityType: 'stream', - assetId: dashboardId, - assetType: 'dashboard', + await assetClient.linkAsset(streamName, { + [ASSET_TYPE]: 'dashboard', + [ASSET_ID]: dashboardId, }); return { @@ -173,11 +167,9 @@ const unlinkDashboardRoute = createServerRoute({ path: { dashboardId, name: streamName }, } = params; - await assetClient.unlinkAsset({ - entityId: streamName, - entityType: 'stream', - assetId: dashboardId, - assetType: 'dashboard', + await assetClient.unlinkAsset(streamName, { + [ASSET_ID]: dashboardId, + [ASSET_TYPE]: 'dashboard', }); return { @@ -290,17 +282,14 @@ const bulkDashboardsRoute = createServerRoute({ await streamsClient.ensureStream(streamName); const result = await assetClient.bulk( - { - entityId: streamName, - entityType: 'stream', - }, + streamName, operations.map((operation) => { if ('index' in operation) { return { index: { asset: { - assetType: 'dashboard', - assetId: operation.index.id, + [ASSET_TYPE]: 'dashboard', + [ASSET_ID]: operation.index.id, }, }, }; @@ -308,8 +297,8 @@ const bulkDashboardsRoute = createServerRoute({ return { delete: { asset: { - assetType: 'dashboard', - assetId: operation.delete.id, + [ASSET_TYPE]: 'dashboard', + [ASSET_ID]: operation.delete.id, }, }, }; diff --git a/x-pack/platform/plugins/shared/streams/server/routes/index.ts b/x-pack/platform/plugins/shared/streams/server/routes/index.ts index 0369291509388..ea92e49dd67fc 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/index.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/index.ts @@ -19,6 +19,8 @@ import { contentRoutes } from './content/route'; import { internalDashboardRoutes } from './internal/dashboards/route'; import { internalCrudRoutes } from './internal/streams/crud/route'; import { internalManagementRoutes } from './internal/streams/management/route'; +import { significantEventsRoutes } from './streams/significant_events/route'; +import { queryRoutes } from './queries/route'; export const streamsRouteRepository = { // internal APIs @@ -37,6 +39,8 @@ export const streamsRouteRepository = { ...ingestRoutes, ...groupRoutes, ...contentRoutes, + ...significantEventsRoutes, + ...queryRoutes, }; export type StreamsRouteRepository = typeof streamsRouteRepository; diff --git a/x-pack/platform/plugins/shared/streams/server/routes/internal/dashboards/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/internal/dashboards/route.ts index fc754f2b65100..2628c70c2ca03 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/internal/dashboards/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/internal/dashboards/route.ts @@ -9,6 +9,7 @@ import { z } from '@kbn/zod'; import { DashboardAsset } from '../../../../common/assets'; import { createServerRoute } from '../../create_server_route'; import { SanitizedDashboardAsset } from '../../dashboards/route'; +import { ASSET_ID } from '../../../lib/streams/assets/fields'; export interface SuggestDashboardResponse { suggestions: SanitizedDashboardAsset[]; @@ -16,8 +17,8 @@ export interface SuggestDashboardResponse { function sanitizeDashboardAsset(asset: DashboardAsset): SanitizedDashboardAsset { return { - id: asset.assetId, - label: asset.label, + id: asset[ASSET_ID], + title: asset.title, tags: asset.tags, }; } diff --git a/x-pack/platform/plugins/shared/streams/server/routes/queries/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/queries/route.ts new file mode 100644 index 0000000000000..9aa8598045a3a --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/routes/queries/route.ts @@ -0,0 +1,258 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ErrorCause } from '@elastic/elasticsearch/lib/api/types'; +import { internal } from '@hapi/boom'; +import { + StreamQuery, + streamQuerySchema, + upsertStreamQueryRequestSchema, +} from '@kbn/streams-schema'; +import { z } from '@kbn/zod'; +import { ASSET_ID, ASSET_TYPE } from '../../lib/streams/assets/fields'; +import { createServerRoute } from '../create_server_route'; +export interface ListQueriesResponse { + queries: StreamQuery[]; +} + +export interface UpsertQueryResponse { + acknowledged: boolean; +} + +export interface DeleteQueryResponse { + acknowledged: boolean; +} + +export type BulkUpdateAssetsResponse = { acknowledged: boolean } | { errors: ErrorCause[] }; + +const listQueriesRoute = createServerRoute({ + endpoint: 'GET /api/streams/{name}/queries 2023-10-31', + options: { + access: 'public', + summary: 'Get stream queries', + description: + 'Fetches all queries linked to a stream that are visible to the current user in the current space.', + availability: { + stability: 'experimental', + }, + }, + params: z.object({ + path: z.object({ + name: z.string(), + }), + }), + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + async handler({ params, request, getScopedClients }): Promise { + const { assetClient, streamsClient } = await getScopedClients({ request }); + await streamsClient.ensureStream(params.path.name); + + const { + path: { name: streamName }, + } = params; + + const queryAssets = await assetClient.getAssetLinks(streamName, ['query']); + + return { + queries: queryAssets.map((queryAsset) => queryAsset.query), + }; + }, +}); + +const upsertQueryRoute = createServerRoute({ + endpoint: 'PUT /api/streams/{name}/queries/{queryId} 2023-10-31', + options: { + access: 'public', + summary: 'Upsert a query to a stream', + description: 'Adds a query to a stream. Noop if the query is already present on the stream.', + availability: { + stability: 'experimental', + }, + }, + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + + params: z.object({ + path: z.object({ + name: z.string(), + queryId: z.string(), + }), + body: upsertStreamQueryRequestSchema, + }), + handler: async ({ params, request, getScopedClients }): Promise => { + const { assetClient, streamsClient } = await getScopedClients({ request }); + const { + path: { name: streamName, queryId }, + body, + } = params; + + await streamsClient.ensureStream(streamName); + + await assetClient.linkAsset(streamName, { + [ASSET_TYPE]: 'query', + [ASSET_ID]: queryId, + query: { + id: queryId, + title: body.title, + kql: { + query: body.kql.query, + }, + }, + }); + + return { + acknowledged: true, + }; + }, +}); + +const deleteQueryRoute = createServerRoute({ + endpoint: 'DELETE /api/streams/{name}/queries/{queryId} 2023-10-31', + options: { + access: 'public', + summary: 'Remove a query from a stream', + description: 'Remove a query from a stream. Noop if the query is not found on the stream.', + availability: { + stability: 'experimental', + }, + }, + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + params: z.object({ + path: z.object({ + name: z.string(), + queryId: z.string(), + }), + }), + handler: async ({ params, request, getScopedClients }): Promise => { + const { assetClient, streamsClient } = await getScopedClients({ request }); + + const { + path: { queryId, name: streamName }, + } = params; + + await streamsClient.ensureStream(streamName); + + await assetClient.unlinkAsset(streamName, { + [ASSET_TYPE]: 'query', + [ASSET_ID]: queryId, + }); + + return { + acknowledged: true, + }; + }, +}); + +const bulkQueriesRoute = createServerRoute({ + endpoint: `POST /api/streams/{name}/queries/_bulk 2023-10-31`, + options: { + access: 'public', + summary: 'Bulk update queries', + description: 'Bulk update queries of a stream. Can add new queries and delete existing ones.', + availability: { + stability: 'experimental', + }, + }, + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + params: z.object({ + path: z.object({ + name: z.string(), + }), + body: z.object({ + operations: z.array( + z.union([ + z.object({ + index: streamQuerySchema, + }), + z.object({ + delete: z.object({ id: z.string() }), + }), + ]) + ), + }), + }), + handler: async ({ + params, + request, + getScopedClients, + logger, + }): Promise => { + const { assetClient, streamsClient } = await getScopedClients({ request }); + + const { + path: { name: streamName }, + body: { operations }, + } = params; + + await streamsClient.ensureStream(streamName); + + const result = await assetClient.bulk( + streamName, + operations.map((operation) => { + if ('index' in operation) { + return { + index: { + asset: { + [ASSET_TYPE]: 'query', + [ASSET_ID]: operation.index.id, + query: { + id: operation.index.id, + title: operation.index.title, + kql: { query: operation.index.kql.query }, + }, + }, + }, + }; + } + return { + delete: { + asset: { + [ASSET_TYPE]: 'query', + [ASSET_ID]: operation.delete.id, + }, + }, + }; + }) + ); + + if (result.errors) { + logger.error(`Error indexing some items`); + throw internal(`Could not index all items`, { errors: result.errors }); + } + + return { acknowledged: true }; + }, +}); + +export const queryRoutes = { + ...listQueriesRoute, + ...upsertQueryRoute, + ...deleteQueryRoute, + ...bulkQueriesRoute, +}; diff --git a/x-pack/platform/plugins/shared/streams/server/routes/streams/crud/read_stream.ts b/x-pack/platform/plugins/shared/streams/server/routes/streams/crud/read_stream.ts index 5852df1c6b71d..7b63c48ba1f8e 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/streams/crud/read_stream.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/streams/crud/read_stream.ts @@ -14,12 +14,15 @@ import { isUnwiredStreamDefinition, } from '@kbn/streams-schema'; import { IScopedClusterClient } from '@kbn/core/server'; +import { partition } from 'lodash'; import { AssetClient } from '../../../lib/streams/assets/asset_client'; import { StreamsClient } from '../../../lib/streams/client'; import { getDataStreamLifecycle, getUnmanagedElasticsearchAssets, } from '../../../lib/streams/stream_crud'; +import { DashboardLink } from '../../../../common/assets'; +import { ASSET_TYPE } from '../../../lib/streams/assets/fields'; export async function readStream({ name, @@ -32,19 +35,26 @@ export async function readStream({ streamsClient: StreamsClient; scopedClusterClient: IScopedClusterClient; }): Promise { - const [streamDefinition, dashboards] = await Promise.all([ + const [streamDefinition, dashboardsAndQueries] = await Promise.all([ streamsClient.getStream(name), - assetClient.getAssetIds({ - entityId: name, - entityType: 'stream', - assetType: 'dashboard', - }), + await assetClient.getAssetLinks(name, ['dashboard', 'query']), ]); + const [dashboardLinks, queryLinks] = partition( + dashboardsAndQueries, + (asset): asset is DashboardLink => asset[ASSET_TYPE] === 'dashboard' + ); + + const dashboards = dashboardLinks.map((dashboard) => dashboard['asset.id']); + const queries = queryLinks.map((query) => { + return query.query; + }); + if (isGroupStreamDefinition(streamDefinition)) { return { stream: streamDefinition, dashboards, + queries, }; } @@ -71,6 +81,7 @@ export async function readStream({ data_stream_exists: !!dataStream, effective_lifecycle: getDataStreamLifecycle(dataStream), dashboards, + queries, inherited_fields: {}, }; } @@ -78,6 +89,7 @@ export async function readStream({ const body: WiredStreamGetResponse = { stream: streamDefinition, dashboards, + queries, effective_lifecycle: findInheritedLifecycle(streamDefinition, ancestors), inherited_fields: getInheritedFieldsFromAncestors(ancestors), }; diff --git a/x-pack/platform/plugins/shared/streams/server/routes/streams/group/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/streams/group/route.ts index 115733426f94e..cae9f3a62133f 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/streams/group/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/streams/group/route.ts @@ -14,6 +14,8 @@ import { isGroupStreamDefinition, } from '@kbn/streams-schema'; import { createServerRoute } from '../../create_server_route'; +import { ASSET_TYPE, ASSET_UUID } from '../../../lib/streams/assets/fields'; +import { QueryAsset } from '../../../../common/assets'; const readGroupRoute = createServerRoute({ endpoint: 'GET /api/streams/{name}/_group 2023-10-31', @@ -90,20 +92,22 @@ const upsertGroupRoute = createServerRoute({ throw badRequest('A group stream name can not start with [logs.]'); } - const assets = await assetClient.getAssets({ - entityId: name, - entityType: 'stream', - }); + const assets = await assetClient.getAssets(name); const groupUpsertRequest = params.body; const dashboards = assets - .filter((asset) => asset.assetType === 'dashboard') - .map((asset) => asset.assetId); + .filter((asset) => asset[ASSET_TYPE] === 'dashboard') + .map((asset) => asset[ASSET_UUID]); + + const queries = assets + .filter((asset): asset is QueryAsset => asset[ASSET_TYPE] === 'query') + .map((asset) => asset.query); const upsertRequest = { dashboards, stream: groupUpsertRequest, + queries, } as GroupStreamUpsertRequest; return await streamsClient.upsertStream({ diff --git a/x-pack/platform/plugins/shared/streams/server/routes/streams/ingest/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/streams/ingest/route.ts index a70d07472ab14..92c2d9a977329 100644 --- a/x-pack/platform/plugins/shared/streams/server/routes/streams/ingest/route.ts +++ b/x-pack/platform/plugins/shared/streams/server/routes/streams/ingest/route.ts @@ -15,6 +15,8 @@ import { } from '@kbn/streams-schema'; import { z } from '@kbn/zod'; import { createServerRoute } from '../../create_server_route'; +import { ASSET_ID, ASSET_TYPE } from '../../../lib/streams/assets/fields'; +import { QueryAsset } from '../../../../common/assets'; const readIngestRoute = createServerRoute({ endpoint: 'GET /api/streams/{name}/_ingest 2023-10-31', @@ -94,20 +96,22 @@ const upsertIngestRoute = createServerRoute({ const name = params.path.name; - const assets = await assetClient.getAssets({ - entityId: name, - entityType: 'stream', - }); + const assets = await assetClient.getAssets(name); const ingestUpsertRequest = params.body; const dashboards = assets - .filter((asset) => asset.assetType === 'dashboard') - .map((asset) => asset.assetId); + .filter((asset) => asset[ASSET_TYPE] === 'dashboard') + .map((asset) => asset[ASSET_ID]); + + const queries = assets + .filter((asset): asset is QueryAsset => asset[ASSET_TYPE] === 'query') + .map((asset) => asset.query); const upsertRequest = { dashboards, stream: ingestUpsertRequest, + queries, } as StreamUpsertRequest; return await streamsClient.upsertStream({ diff --git a/x-pack/platform/plugins/shared/streams/server/routes/streams/significant_events/read_significant_events.ts b/x-pack/platform/plugins/shared/streams/server/routes/streams/significant_events/read_significant_events.ts new file mode 100644 index 0000000000000..eca2c369a6ade --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/routes/streams/significant_events/read_significant_events.ts @@ -0,0 +1,131 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { AggregationsDateHistogramAggregate } from '@elastic/elasticsearch/lib/api/types'; +import { IScopedClusterClient } from '@kbn/core/server'; +import { buildEsQuery } from '@kbn/es-query'; +import { ChangePointType } from '@kbn/es-types/src'; +import { SignificantEventsGetResponse } from '@kbn/streams-schema'; +import { get, isArray, isEmpty } from 'lodash'; +import { AssetClient } from '../../../lib/streams/assets/asset_client'; + +export async function readSignificantEvents( + params: { name: string; from: Date; to: Date; bucketSize: string }, + dependencies: { + assetClient: AssetClient; + scopedClusterClient: IScopedClusterClient; + } +): Promise { + const { assetClient, scopedClusterClient } = dependencies; + const { name, from, to, bucketSize } = params; + + const assetQueries = await assetClient.getAssetLinks(name, ['query']); + if (isEmpty(assetQueries)) { + return []; + } + + const searchRequests = assetQueries.flatMap((asset) => { + return [ + { index: name }, + { + size: 0, + query: { + bool: { + filter: [ + { + range: { + '@timestamp': { + gte: from.toISOString(), + lte: to.toISOString(), + }, + }, + }, + buildQuery(asset.query.kql.query), + ], + }, + }, + aggs: { + occurrences: { + date_histogram: { + field: '@timestamp', + fixed_interval: bucketSize, + extended_bounds: { + min: from.toISOString(), + max: to.toISOString(), + }, + }, + }, + change_points: { + change_point: { + buckets_path: 'occurrences>_count', + }, + }, + }, + }, + ]; + }); + + const response = await scopedClusterClient.asCurrentUser.msearch< + unknown, + { + occurrences: AggregationsDateHistogramAggregate; + change_points: { + type: { + [key in ChangePointType]: { p_value: number; change_point: number }; + }; + }; + } + >({ searches: searchRequests }); + + const significantEvents = response.responses.map((queryResponse, queryIndex) => { + const query = assetQueries[queryIndex]; + if ('error' in queryResponse) { + return { + id: query.query.id, + title: query.query.title, + kql: query.query.kql, + occurrences: [], + change_points: {}, + }; + } + + const buckets = get(queryResponse, 'aggregations.occurrences.buckets'); + const changePoints = get(queryResponse, 'aggregations.change_points') ?? {}; + + return { + id: query.query.id, + title: query.query.title, + kql: query.query.kql, + occurrences: isArray(buckets) + ? buckets.map((bucket) => ({ + date: bucket.key_as_string, + count: bucket.doc_count, + })) + : [], + change_points: changePoints, + }; + }); + + // changePoints type is not inferred correclty + return significantEvents as SignificantEventsGetResponse; +} + +function buildQuery(kql: string) { + try { + return buildEsQuery( + undefined, + { + query: kql, + language: 'kuery', + }, + [], + { allowLeadingWildcards: true } + ); + } catch (err) { + return { match_all: {} }; + } +} diff --git a/x-pack/platform/plugins/shared/streams/server/routes/streams/significant_events/route.ts b/x-pack/platform/plugins/shared/streams/server/routes/streams/significant_events/route.ts new file mode 100644 index 0000000000000..b2fc453e615db --- /dev/null +++ b/x-pack/platform/plugins/shared/streams/server/routes/streams/significant_events/route.ts @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { badRequest } from '@hapi/boom'; +import { SignificantEventsGetResponse } from '@kbn/streams-schema'; +import { z } from '@kbn/zod'; +import { createServerRoute } from '../../create_server_route'; +import { readSignificantEvents } from './read_significant_events'; + +export const readSignificantEventsRoute = createServerRoute({ + endpoint: 'GET /api/streams/{name}/significant_events 2023-10-31', + params: z.object({ + path: z.object({ name: z.string() }), + query: z.object({ from: z.coerce.date(), to: z.coerce.date(), bucketSize: z.string() }), + }), + + options: { + access: 'public', + summary: 'Read the significant events', + description: 'Read the significant events', + availability: { + stability: 'experimental', + }, + }, + security: { + authz: { + enabled: false, + reason: + 'This API delegates security to the currently logged in user and their Elasticsearch permissions.', + }, + }, + handler: async ({ params, request, getScopedClients }): Promise => { + const { streamsClient, assetClient, scopedClusterClient } = await getScopedClients({ + request, + }); + + const isStreamEnabled = await streamsClient.isStreamsEnabled(); + if (!isStreamEnabled) { + throw badRequest('Streams are not enabled'); + } + + const { name } = params.path; + const { from, to, bucketSize } = params.query; + + return await readSignificantEvents( + { name, from, to, bucketSize }, + { assetClient, scopedClusterClient } + ); + }, +}); + +export const significantEventsRoutes = { + ...readSignificantEventsRoute, +}; diff --git a/x-pack/platform/plugins/shared/streams/tsconfig.json b/x-pack/platform/plugins/shared/streams/tsconfig.json index 2cb36c4145dc0..0fcbe01b67de6 100644 --- a/x-pack/platform/plugins/shared/streams/tsconfig.json +++ b/x-pack/platform/plugins/shared/streams/tsconfig.json @@ -42,6 +42,7 @@ "@kbn/core-elasticsearch-client-server-internal", "@kbn/utils", "@kbn/core-saved-objects-server-internal", - "@kbn/core-analytics-server" + "@kbn/core-analytics-server", + "@kbn/es-types", ] } diff --git a/x-pack/platform/plugins/shared/streams_app/.storybook/get_mock_streams_app_context.tsx b/x-pack/platform/plugins/shared/streams_app/.storybook/get_mock_streams_app_context.tsx index 83634949c853d..5a05f816f42d3 100644 --- a/x-pack/platform/plugins/shared/streams_app/.storybook/get_mock_streams_app_context.tsx +++ b/x-pack/platform/plugins/shared/streams_app/.storybook/get_mock_streams_app_context.tsx @@ -5,19 +5,21 @@ * 2.0. */ +import { ChartsPluginStart } from '@kbn/charts-plugin/public'; import { coreMock } from '@kbn/core/public/mocks'; import type { DataPublicPluginStart } from '@kbn/data-plugin/public'; import type { DataViewsPublicPluginStart } from '@kbn/data-views-plugin/public'; -import type { StreamsPluginStart } from '@kbn/streams-plugin/public'; -import type { UnifiedSearchPublicPluginStart } from '@kbn/unified-search-plugin/public'; -import type { SharePublicStart } from '@kbn/share-plugin/public/plugin'; -import { NavigationPublicStart } from '@kbn/navigation-plugin/public/types'; -import type { SavedObjectTaggingPluginStart } from '@kbn/saved-objects-tagging-plugin/public'; -import { fieldsMetadataPluginPublicMock } from '@kbn/fields-metadata-plugin/public/mocks'; import { DataStreamsStatsClient } from '@kbn/dataset-quality-plugin/public/services/data_streams_stats/data_streams_stats_client'; -import { LicensingPluginStart } from '@kbn/licensing-plugin/public'; +import type { DiscoverStart } from '@kbn/discover-plugin/public'; +import { fieldsMetadataPluginPublicMock } from '@kbn/fields-metadata-plugin/public/mocks'; import { IndexManagementPluginStart } from '@kbn/index-management-shared-types'; import { IngestPipelinesPluginStart } from '@kbn/ingest-pipelines-plugin/public'; +import { LicensingPluginStart } from '@kbn/licensing-plugin/public'; +import { NavigationPublicStart } from '@kbn/navigation-plugin/public/types'; +import type { SavedObjectTaggingPluginStart } from '@kbn/saved-objects-tagging-plugin/public'; +import type { SharePublicStart } from '@kbn/share-plugin/public/plugin'; +import type { StreamsPluginStart } from '@kbn/streams-plugin/public'; +import type { UnifiedSearchPublicPluginStart } from '@kbn/unified-search-plugin/public'; import { DiscoverSharedPublicStart } from '@kbn/discover-shared-plugin/public'; import { ObservabilityAIAssistantPublicStart } from '@kbn/observability-ai-assistant-plugin/public'; import type { StreamsAppKibanaContext } from '../public/hooks/use_kibana'; @@ -48,6 +50,8 @@ export function getMockStreamsAppContext(): StreamsAppKibanaContext { indexManagement: {} as unknown as IndexManagementPluginStart, ingestPipelines: {} as unknown as IngestPipelinesPluginStart, discoverShared: {} as unknown as DiscoverSharedPublicStart, + charts: {} as unknown as ChartsPluginStart, + discover: {} as unknown as DiscoverStart, observabilityAIAssistant: {} as unknown as ObservabilityAIAssistantPublicStart, }, }, diff --git a/x-pack/platform/plugins/shared/streams_app/kibana.jsonc b/x-pack/platform/plugins/shared/streams_app/kibana.jsonc index e75758995ff19..07be06a836c73 100644 --- a/x-pack/platform/plugins/shared/streams_app/kibana.jsonc +++ b/x-pack/platform/plugins/shared/streams_app/kibana.jsonc @@ -10,9 +10,11 @@ "browser": true, "configPath": ["xpack", "streamsApp"], "requiredPlugins": [ + "charts", "data", "datasetQuality", "dataViews", + "discover", "discoverShared", "fieldsMetadata", "licensing", @@ -23,10 +25,8 @@ "savedObjectsTagging", "share", "streams", - "unifiedSearch", - ], - "requiredBundles": [ - "kibanaReact" + "unifiedSearch" ], + "requiredBundles": ["kibanaReact"] } } diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_dashboards_view/dashboard_table.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_dashboards_view/dashboard_table.tsx index 3cc24eedd5a5e..5c3c20a346b7b 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_dashboards_view/dashboard_table.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_dashboards_view/dashboard_table.tsx @@ -54,7 +54,7 @@ export function DashboardsTable({ name: i18n.translate('xpack.streams.dashboardTable.dashboardNameColumnTitle', { defaultMessage: 'Dashboard name', }), - render: (_, { label, id }) => ( + render: (_, { title, id }) => ( { @@ -71,7 +71,7 @@ export function DashboardsTable({ } }} > - {label} + {title} ), }, diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_dashboards_view/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_dashboards_view/index.tsx index 1aceaeefa6e66..4c32daf32ef31 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_dashboards_view/index.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_dashboards_view/index.tsx @@ -33,7 +33,7 @@ export function StreamDetailDashboardsView({ const filteredDashboards = useMemo(() => { return linkedDashboards.filter((dashboard) => { - return dashboard.label.toLowerCase().includes(query.toLowerCase()); + return dashboard.title.toLowerCase().includes(query.toLowerCase()); }); }, [linkedDashboards, query]); diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/index.tsx new file mode 100644 index 0000000000000..7564b0b806a27 --- /dev/null +++ b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_significant_events_view/index.tsx @@ -0,0 +1,16 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { IngestStreamGetResponse } from '@kbn/streams-schema'; +import React from 'react'; + +export function StreamDetailSignificantEventsView({ + definition, +}: { + definition?: IngestStreamGetResponse; +}) { + return <>; +} diff --git a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_view/index.tsx b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_view/index.tsx index e1a7da7e6beb4..fb9968d0294d5 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_view/index.tsx +++ b/x-pack/platform/plugins/shared/streams_app/public/components/stream_detail_view/index.tsx @@ -5,16 +5,16 @@ * 2.0. */ import { i18n } from '@kbn/i18n'; -import React from 'react'; import { Outlet } from '@kbn/typed-react-router-config'; +import React from 'react'; import { useKibana } from '../../hooks/use_kibana'; +import { StreamDetailContextProvider, useStreamDetail } from '../../hooks/use_stream_detail'; import { useStreamsAppParams } from '../../hooks/use_streams_app_params'; +import { StreamDetailManagement } from '../data_management/stream_detail_management'; import { EntityDetailViewWithoutParams, EntityViewTab } from '../entity_detail_view'; +import { RedirectTo } from '../redirect_to'; import { StreamDetailDashboardsView } from '../stream_detail_dashboards_view'; -import { StreamDetailManagement } from '../data_management/stream_detail_management'; import { StreamDetailOverview } from '../stream_detail_overview'; -import { StreamDetailContextProvider, useStreamDetail } from '../../hooks/use_stream_detail'; -import { RedirectTo } from '../redirect_to'; export function StreamDetailView() { const { streamsRepositoryClient } = useKibana().dependencies.start.streams; diff --git a/x-pack/platform/plugins/shared/streams_app/public/types.ts b/x-pack/platform/plugins/shared/streams_app/public/types.ts index 5b19784538044..2a04f99fca876 100644 --- a/x-pack/platform/plugins/shared/streams_app/public/types.ts +++ b/x-pack/platform/plugins/shared/streams_app/public/types.ts @@ -4,31 +4,33 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ +import { ChartsPluginStart } from '@kbn/charts-plugin/public'; +import { AppMountParameters } from '@kbn/core/public'; import type { DataPublicPluginSetup, DataPublicPluginStart } from '@kbn/data-plugin/public'; import type { DataViewsPublicPluginSetup, DataViewsPublicPluginStart, } from '@kbn/data-views-plugin/public'; -import type { StreamsPluginSetup, StreamsPluginStart } from '@kbn/streams-plugin/public'; -import type { UnifiedSearchPublicPluginStart } from '@kbn/unified-search-plugin/public'; -import type { SharePublicSetup, SharePublicStart } from '@kbn/share-plugin/public/plugin'; -import type { SavedObjectTaggingPluginStart } from '@kbn/saved-objects-tagging-plugin/public'; -import { NavigationPublicStart } from '@kbn/navigation-plugin/public/types'; +import { DiscoverStart } from '@kbn/discover-plugin/public'; +import { + DiscoverSharedPublicSetup, + DiscoverSharedPublicStart, +} from '@kbn/discover-shared-plugin/public'; import { FieldsMetadataPublicStart } from '@kbn/fields-metadata-plugin/public'; +import { IndexManagementPluginStart } from '@kbn/index-management-shared-types'; +import { IngestPipelinesPluginStart } from '@kbn/ingest-pipelines-plugin/public'; +import { LicensingPluginStart } from '@kbn/licensing-plugin/public'; +import { NavigationPublicStart } from '@kbn/navigation-plugin/public/types'; import { ObservabilityAIAssistantPublicSetup, ObservabilityAIAssistantPublicStart, } from '@kbn/observability-ai-assistant-plugin/public'; -import { AppMountParameters } from '@kbn/core/public'; -import { LicensingPluginStart } from '@kbn/licensing-plugin/public'; -import { IndexManagementPluginStart } from '@kbn/index-management-shared-types'; -import { IngestPipelinesPluginStart } from '@kbn/ingest-pipelines-plugin/public'; -import { - DiscoverSharedPublicSetup, - DiscoverSharedPublicStart, -} from '@kbn/discover-shared-plugin/public'; -/* eslint-disable @typescript-eslint/no-empty-interface*/ +import type { SavedObjectTaggingPluginStart } from '@kbn/saved-objects-tagging-plugin/public'; +import type { SharePublicSetup, SharePublicStart } from '@kbn/share-plugin/public/plugin'; +import type { StreamsPluginSetup, StreamsPluginStart } from '@kbn/streams-plugin/public'; +import type { UnifiedSearchPublicPluginStart } from '@kbn/unified-search-plugin/public'; +/* eslint-disable @typescript-eslint/no-empty-interface*/ export interface ConfigSchema {} export interface StreamsApplicationProps { @@ -49,13 +51,15 @@ export interface StreamsAppSetupDependencies { } export interface StreamsAppStartDependencies { + charts: ChartsPluginStart; data: DataPublicPluginStart; dataViews: DataViewsPublicPluginStart; + discover: DiscoverStart; discoverShared: DiscoverSharedPublicStart; fieldsMetadata: FieldsMetadataPublicStart; - licensing: LicensingPluginStart; indexManagement: IndexManagementPluginStart; ingestPipelines: IngestPipelinesPluginStart; + licensing: LicensingPluginStart; navigation: NavigationPublicStart; observabilityAIAssistant: ObservabilityAIAssistantPublicStart; savedObjectsTagging: SavedObjectTaggingPluginStart; diff --git a/x-pack/platform/plugins/shared/streams_app/tsconfig.json b/x-pack/platform/plugins/shared/streams_app/tsconfig.json index c55f3389d6422..ef9b88356c949 100644 --- a/x-pack/platform/plugins/shared/streams_app/tsconfig.json +++ b/x-pack/platform/plugins/shared/streams_app/tsconfig.json @@ -27,6 +27,8 @@ "@kbn/react-kibana-context-render", "@kbn/code-editor", "@kbn/ui-theme", + "@kbn/charts-plugin", + "@kbn/discover-plugin", "@kbn/kibana-react-plugin", "@kbn/es-query", "@kbn/server-route-repository-client", @@ -64,6 +66,6 @@ "@kbn/core-analytics-browser", "@kbn/index-management-shared-types", "@kbn/ingest-pipelines-plugin", - "@kbn/deeplinks-observability", + "@kbn/deeplinks-observability" ] } diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/assets/dashboard.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/assets/dashboard.ts index 950b1396bae6b..262d9191e984f 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/assets/dashboard.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/assets/dashboard.ts @@ -135,8 +135,8 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }); after(async () => { - await unloadDashboards(); await unlinkDashboard(SEARCH_DASHBOARD_ID); + await unloadDashboards(); }); it('lists the dashboard in the stream response', async () => { @@ -177,7 +177,6 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }); it('recovers on write and lists the linked dashboard ', async () => { - await unlinkDashboard(SEARCH_DASHBOARD_ID); await linkDashboard(SEARCH_DASHBOARD_ID); const response = await apiClient.fetch('GET /api/streams/{name}/dashboards 2023-10-31', { diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/classic.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/classic.ts index 8994239fb6653..478ca08b476f5 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/classic.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/classic.ts @@ -63,6 +63,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }, body: { dashboards: [], + queries: [], stream: { ingest: { lifecycle: { inherit: {} }, @@ -98,12 +99,14 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const { dashboards, + queries, stream, effective_lifecycle: effectiveLifecycle, elasticsearch_assets: elasticsearchAssets, } = body; expect(dashboards).to.eql([]); + expect(queries).to.eql([]); expect(stream).to.eql({ name: TEST_STREAM_NAME, @@ -159,6 +162,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { params: { path: { name: TEST_STREAM_NAME }, body: { + queries: [], dashboards: [], stream: { ingest: { @@ -255,6 +259,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { name: DATA_STREAM_NAME, }, body: { + queries: [], dashboards: [], stream: { ingest: { @@ -319,6 +324,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }, body: { dashboards: [], + queries: [], stream: { ingest: { lifecycle: { inherit: {} }, diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/enrichment.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/enrichment.ts index cf41cdac6e985..18685f29fc550 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/enrichment.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/enrichment.ts @@ -52,6 +52,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { it('Place processing steps', async () => { const body: IngestStreamUpsertRequest = { dashboards: [], + queries: [], stream: { ingest: { lifecycle: { inherit: {} }, diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/full_flow.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/full_flow.ts index de20471ddd28b..39b2f8b358fce 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/full_flow.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/full_flow.ts @@ -359,6 +359,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { it('should allow to update field type to incompatible type', async () => { const body: IngestStreamUpsertRequest = { dashboards: [], + queries: [], stream: { ingest: { lifecycle: { inherit: {} }, @@ -401,6 +402,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { it('should not allow to update field type to system', async () => { const body: IngestStreamUpsertRequest = { dashboards: [], + queries: [], stream: { ingest: { lifecycle: { inherit: {} }, diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/group_streams.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/group_streams.ts index a616b85f7878b..da77df94ccad8 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/group_streams.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/group_streams.ts @@ -46,6 +46,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }, }, dashboards: [], + queries: [], }, }, }) @@ -65,6 +66,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }, }, dashboards: [], + queries: [], }, }, }) @@ -84,6 +86,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }, }, dashboards: [], + queries: [], }, }, }) @@ -102,6 +105,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }, }, dashboards: [], + queries: [], }, }, }) @@ -120,6 +124,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }, }, dashboards: [], + queries: [], }, }, }) @@ -152,6 +157,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }, }, dashboards: [], + queries: [], }); }); @@ -208,6 +214,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }, }, dashboards: [], + queries: [], }, }, }) @@ -226,6 +233,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }, }, dashboards: [], + queries: [], }, }, }) diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/create_streams.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/create_streams.ts index 5af710ffdb74b..26ea5ec89236f 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/create_streams.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/create_streams.ts @@ -9,7 +9,7 @@ import { StreamUpsertRequest } from '@kbn/streams-schema'; import expect from '@kbn/expect'; import { StreamsSupertestRepositoryClient } from './repository_client'; -type StreamPutItem = Omit & { name: string }; +type StreamPutItem = Omit & { name: string }; const streams: StreamPutItem[] = [ { @@ -135,6 +135,7 @@ export async function createStreams(apiClient: StreamsSupertestRepositoryClient) body: { ...stream, dashboards: [], + queries: [], } as StreamUpsertRequest, path: { name }, }, diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/requests.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/requests.ts index 37631048316a0..1135a505cc47c 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/requests.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/helpers/requests.ts @@ -119,3 +119,18 @@ export async function getIlmStats( .expect(expectStatusCode) .then((response) => response.body); } + +export async function getQueries( + apiClient: StreamsSupertestRepositoryClient, + name: string, + expectStatusCode: number = 200 +) { + return await apiClient + .fetch('GET /api/streams/{name}/queries 2023-10-31', { + params: { + path: { name }, + }, + }) + .expect(expectStatusCode) + .then((response) => response.body); +} diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts index 9fbeeab4981fa..b8533749db15c 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/index.ts @@ -19,6 +19,8 @@ export default function ({ loadTestFile }: DeploymentAgnosticFtrProviderContext) loadTestFile(require.resolve('./root_stream')); loadTestFile(require.resolve('./group_streams')); loadTestFile(require.resolve('./lifecycle')); + loadTestFile(require.resolve('./significant_events')); + loadTestFile(require.resolve('./queries')); loadTestFile(require.resolve('./discover')); }); } diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/lifecycle.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/lifecycle.ts index 75d129d5a75d5..f3068d4a95360 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/lifecycle.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/lifecycle.ts @@ -91,6 +91,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }, }, dashboards: [], + queries: [], }; describe('Wired streams update', () => { @@ -99,6 +100,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const response = await putStream(apiClient, 'logs', { dashboards: [], + queries: [], stream: { ingest: { ...(rootDefinition as WiredStreamGetResponse).stream.ingest, @@ -126,6 +128,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { 'logs', { dashboards: [], + queries: [], stream: { ingest: { ...(rootDefinition as WiredStreamGetResponse).stream.ingest, @@ -146,6 +149,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const rootDefinition = await getStream(apiClient, 'logs'); await putStream(apiClient, 'logs', { dashboards: [], + queries: [], stream: { ingest: { ...(rootDefinition as WiredStreamGetResponse).stream.ingest, @@ -155,6 +159,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }); await putStream(apiClient, 'logs.overrides', { dashboards: [], + queries: [], stream: { ingest: { ...wiredPutBody.stream.ingest, @@ -181,6 +186,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { it('applies the nearest parent lifecycle when deleted', async () => { await putStream(apiClient, 'logs.10d', { dashboards: [], + queries: [], stream: { ingest: { ...wiredPutBody.stream.ingest, @@ -190,6 +196,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }); await putStream(apiClient, 'logs.10d.20d', { dashboards: [], + queries: [], stream: { ingest: { ...wiredPutBody.stream.ingest, @@ -202,6 +209,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { // delete lifecycle of the 20d override await putStream(apiClient, 'logs.10d.20d', { dashboards: [], + queries: [], stream: { ingest: { ...wiredPutBody.stream.ingest, @@ -222,6 +230,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { it('handles no retention dsl', async () => { await putStream(apiClient, 'logs.no', { dashboards: [], + queries: [], stream: { ingest: { ...wiredPutBody.stream.ingest, @@ -232,6 +241,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { await putStream(apiClient, 'logs.no.retention', { dashboards: [], + queries: [], stream: { ingest: { ...wiredPutBody.stream.ingest, @@ -253,6 +263,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { 'logs.ilm', { dashboards: [], + queries: [], stream: { ingest: { ...wiredPutBody.stream.ingest, @@ -268,6 +279,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { await putStream(apiClient, 'logs.ilm.stream', wiredPutBody); await putStream(apiClient, 'logs.ilm', { dashboards: [], + queries: [], stream: { ingest: { ...wiredPutBody.stream.ingest, @@ -290,6 +302,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const name = 'logs.ilm-with-backing-indices'; await putStream(apiClient, name, { dashboards: [], + queries: [], stream: { ingest: { ...wiredPutBody.stream.ingest, @@ -303,6 +316,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { await putStream(apiClient, name, { dashboards: [], + queries: [], stream: { ingest: { ...wiredPutBody.stream.ingest, @@ -322,6 +336,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const name = 'logs.dlm-with-backing-indices'; await putStream(apiClient, name, { dashboards: [], + queries: [], stream: { ingest: { ...wiredPutBody.stream.ingest, @@ -339,6 +354,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { await putStream(apiClient, name, { dashboards: [], + queries: [], stream: { ingest: { ...wiredPutBody.stream.ingest, @@ -366,6 +382,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { }, }, dashboards: [], + queries: [], }; const createDataStream = async (name: string, lifecycle: IngestStreamLifecycle) => { @@ -416,6 +433,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { await putStream(apiClient, indexName, { dashboards: [], + queries: [], stream: { ingest: { ...unwiredPutBody.stream.ingest, @@ -439,6 +457,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { indexName, { dashboards: [], + queries: [], stream: { ingest: { ...unwiredPutBody.stream.ingest, @@ -459,6 +478,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const indexName = 'logs.dslnostats'; await putStream(apiClient, indexName, { dashboards: [], + queries: [], stream: { ingest: { ...wiredPutBody.stream.ingest, @@ -478,6 +498,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { const indexName = 'logs.ilmpolicydontexists'; await putStream(apiClient, indexName, { dashboards: [], + queries: [], stream: { ingest: { ...wiredPutBody.stream.ingest, @@ -508,6 +529,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { await putStream(apiClient, indexName, { dashboards: [], + queries: [], stream: { ingest: { ...wiredPutBody.stream.ingest, diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/queries.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/queries.ts new file mode 100644 index 0000000000000..1f3855b5ccf79 --- /dev/null +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/queries.ts @@ -0,0 +1,238 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import { WiredIngestUpsertRequest } from '@kbn/streams-schema'; +import { v4 } from 'uuid'; +import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context'; +import { + StreamsSupertestRepositoryClient, + createStreamsRepositoryAdminClient, +} from './helpers/repository_client'; +import { disableStreams, enableStreams, getQueries, putStream } from './helpers/requests'; + +export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { + const roleScopedSupertest = getService('roleScopedSupertest'); + let apiClient: StreamsSupertestRepositoryClient; + + const STREAM_NAME = 'logs.queries-test'; + const stream: WiredIngestUpsertRequest = { + ingest: { + lifecycle: { inherit: {} }, + processing: [], + wired: { + routing: [], + fields: { + numberfield: { + type: 'long', + }, + }, + }, + }, + }; + + describe('Queries API', () => { + before(async () => { + apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest); + await enableStreams(apiClient); + }); + + after(async () => { + await disableStreams(apiClient); + }); + + beforeEach(async () => { + await putStream(apiClient, STREAM_NAME, { + stream, + dashboards: [], + queries: [], + }); + }); + + it('lists empty queries when none are defined on the stream', async () => { + const response = await getQueries(apiClient, STREAM_NAME); + + expect(response).to.eql({ queries: [] }); + }); + + it('lists queries when defined on the stream', async () => { + const queries = [ + { id: v4(), title: 'OutOfMemoryError', kql: { query: "message:'OutOfMemoryError'" } }, + { + id: v4(), + title: 'cluster_block_exception', + kql: { query: "message:'cluster_block_exception'" }, + }, + ]; + + const updateStreamResponse = await putStream(apiClient, STREAM_NAME, { + stream, + dashboards: [], + queries, + }); + expect(updateStreamResponse).to.have.property('acknowledged', true); + + const getQueriesResponse = await getQueries(apiClient, STREAM_NAME); + expect(getQueriesResponse.queries).to.eql(queries); + }); + + it('inserts a query when inexistant', async () => { + const query = { id: v4(), title: 'Significant Query', kql: { query: "message:'query'" } }; + const upsertQueryResponse = await apiClient + .fetch('PUT /api/streams/{name}/queries/{queryId} 2023-10-31', { + params: { + path: { name: STREAM_NAME, queryId: query.id }, + body: { + title: query.title, + kql: query.kql, + }, + }, + }) + .expect(200) + .then((res) => res.body); + expect(upsertQueryResponse.acknowledged).to.be(true); + + const getQueriesResponse = await getQueries(apiClient, STREAM_NAME); + expect(getQueriesResponse.queries).to.eql([query]); + }); + + it('updates a query when already defined', async () => { + const queryId = v4(); + await putStream(apiClient, STREAM_NAME, { + stream, + dashboards: [], + queries: [ + { + id: queryId, + title: 'Significant Query', + kql: { query: "message:'query'" }, + }, + ], + }); + + const upsertQueryResponse = await apiClient + .fetch('PUT /api/streams/{name}/queries/{queryId} 2023-10-31', { + params: { + path: { name: STREAM_NAME, queryId }, + body: { + title: 'Another title', + kql: { query: "message:'Something else'" }, + }, + }, + }) + .expect(200) + .then((res) => res.body); + expect(upsertQueryResponse.acknowledged).to.be(true); + + const getQueriesResponse = await getQueries(apiClient, STREAM_NAME); + expect(getQueriesResponse.queries).to.eql([ + { + id: queryId, + title: 'Another title', + kql: { query: "message:'Something else'" }, + }, + ]); + }); + + it('deletes an existing query successfully', async () => { + const queryId = v4(); + await putStream(apiClient, STREAM_NAME, { + stream, + dashboards: [], + queries: [ + { + id: queryId, + title: 'Significant Query', + kql: { query: "message:'query'" }, + }, + ], + }); + + const deleteQueryResponse = await apiClient + .fetch('DELETE /api/streams/{name}/queries/{queryId} 2023-10-31', { + params: { path: { name: STREAM_NAME, queryId } }, + }) + .expect(200) + .then((res) => res.body); + expect(deleteQueryResponse.acknowledged).to.be(true); + + const getQueriesResponse = await getQueries(apiClient, STREAM_NAME); + expect(getQueriesResponse.queries).to.eql([]); + }); + + it('throws when deleting an inexistant query', async () => { + const queryId = v4(); + await apiClient + .fetch('DELETE /api/streams/{name}/queries/{queryId} 2023-10-31', { + params: { path: { name: STREAM_NAME, queryId } }, + }) + .expect(404); + + const getQueriesResponse = await getQueries(apiClient, STREAM_NAME); + expect(getQueriesResponse.queries).to.eql([]); + }); + + it('bulks insert and remove queries', async () => { + const existingQueryId = v4(); + await putStream(apiClient, STREAM_NAME, { + stream, + dashboards: [], + queries: [ + { + id: existingQueryId, + title: 'Significant Query', + kql: { query: "message:'query'" }, + }, + ], + }); + + const newQuery1 = { + id: v4(), + title: 'query1', + kql: { query: 'irrelevant1' }, + }; + const newQuery2 = { + id: v4(), + title: 'query2', + kql: { query: 'irrelevant2' }, + }; + + const bulkResponse = await apiClient + .fetch('POST /api/streams/{name}/queries/_bulk 2023-10-31', { + params: { + path: { name: STREAM_NAME }, + body: { + operations: [ + { + index: newQuery1, + }, + { + delete: { + id: 'inexistant', + }, + }, + { + index: newQuery2, + }, + { + delete: { + id: existingQueryId, + }, + }, + ], + }, + }, + }) + .expect(200) + .then((res) => res.body); + expect(bulkResponse).to.have.property('acknowledged', true); + + const getQueriesResponse = await getQueries(apiClient, STREAM_NAME); + expect(getQueriesResponse.queries).to.eql([newQuery1, newQuery2]); + }); + }); +} diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/root_stream.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/root_stream.ts index d264fce2d64d0..a1c5b5c296693 100644 --- a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/root_stream.ts +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/root_stream.ts @@ -60,6 +60,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { it('Should not allow processing changes', async () => { const body: IngestStreamUpsertRequest = { dashboards: [], + queries: [], stream: { ingest: { ...rootStreamDefinition.ingest, @@ -87,6 +88,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { it('Should not allow fields changes', async () => { const body: IngestStreamUpsertRequest = { dashboards: [], + queries: [], stream: { ingest: { ...rootStreamDefinition.ingest, @@ -109,6 +111,7 @@ export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { it('Should allow routing changes', async () => { const body: IngestStreamUpsertRequest = { dashboards: [], + queries: [], stream: { ingest: { ...rootStreamDefinition.ingest, diff --git a/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/significant_events.ts b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/significant_events.ts new file mode 100644 index 0000000000000..5c730e8f9391b --- /dev/null +++ b/x-pack/test/api_integration/deployment_agnostic/apis/observability/streams/significant_events.ts @@ -0,0 +1,133 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import expect from '@kbn/expect'; +import { + IngestStreamLifecycle, + IngestStreamUpsertRequest, + WiredStreamGetResponse, + isDslLifecycle, + isIlmLifecycle, +} from '@kbn/streams-schema'; +import { DeploymentAgnosticFtrProviderContext } from '../../../ftr_provider_context'; +import { + StreamsSupertestRepositoryClient, + createStreamsRepositoryAdminClient, +} from './helpers/repository_client'; +import { disableStreams, enableStreams, getStream, putStream } from './helpers/requests'; + +export default function ({ getService }: DeploymentAgnosticFtrProviderContext) { + const roleScopedSupertest = getService('roleScopedSupertest'); + const esClient = getService('es'); + let apiClient: StreamsSupertestRepositoryClient; + + describe('Significant Events', () => { + before(async () => { + apiClient = await createStreamsRepositoryAdminClient(roleScopedSupertest); + await enableStreams(apiClient); + }); + + after(async () => { + await disableStreams(apiClient); + }); + + describe('Wired streams update', () => { + it('updates the queries', async () => { + let streamDefinition = await getStream(apiClient, 'logs'); + expect(streamDefinition.queries.length).to.eql(0); + + const response = await putStream(apiClient, 'logs', { + stream: { + ingest: { + ...(streamDefinition as WiredStreamGetResponse).stream.ingest, + }, + }, + dashboards: [], + queries: [{ id: 'aaa', title: 'OOM Error', kql: { query: "message: 'OOM Error'" } }], + }); + expect(response).to.have.property('acknowledged', true); + + streamDefinition = await getStream(apiClient, 'logs'); + expect(streamDefinition.queries.length).to.eql(1); + expect(streamDefinition.queries[0]).to.eql({ + id: 'aaa', + title: 'OOM Error', + kql: { query: "message: 'OOM Error'" }, + }); + }); + }); + + describe('Unwired streams update', () => { + const unwiredPutBody: IngestStreamUpsertRequest = { + stream: { + ingest: { + lifecycle: { inherit: {} }, + processing: [], + unwired: {}, + }, + }, + dashboards: [], + queries: [], + }; + + const createDataStream = async (name: string, lifecycle: IngestStreamLifecycle) => { + await esClient.indices.putIndexTemplate({ + name, + index_patterns: [name], + data_stream: {}, + template: isDslLifecycle(lifecycle) + ? { + lifecycle: { data_retention: lifecycle.dsl.data_retention }, + settings: { + 'index.lifecycle.prefer_ilm': false, + 'index.default_pipeline': 'logs@default-pipeline', + }, + } + : isIlmLifecycle(lifecycle) + ? { + settings: { + 'index.default_pipeline': 'logs@default-pipeline', + 'index.lifecycle.prefer_ilm': true, + 'index.lifecycle.name': lifecycle.ilm.policy, + }, + } + : undefined, + }); + await esClient.indices.createDataStream({ name }); + + return async () => { + await esClient.indices.deleteDataStream({ name }); + await esClient.indices.deleteIndexTemplate({ name }); + }; + }; + + it('updates the queries', async () => { + const indexName = 'unwired-stream-queries'; + const clean = await createDataStream(indexName, { dsl: { data_retention: '77d' } }); + await putStream(apiClient, indexName, unwiredPutBody); + + let streamDefinition = await getStream(apiClient, indexName); + expect(streamDefinition.queries.length).to.eql(0); + + await putStream(apiClient, indexName, { + ...unwiredPutBody, + queries: [{ id: 'aaa', title: 'OOM Error', kql: { query: "message: 'OOM Error'" } }], + }); + + streamDefinition = await getStream(apiClient, indexName); + expect(streamDefinition.queries.length).to.eql(1); + expect(streamDefinition.queries[0]).to.eql({ + id: 'aaa', + title: 'OOM Error', + kql: { query: "message: 'OOM Error'" }, + }); + + await clean(); + }); + }); + }); +}