Skip to content

Commit 9230a48

Browse files
authored
HLRC: ML Post Data (#33443)
* HLRC: ML Post data
1 parent c12d232 commit 9230a48

File tree

11 files changed

+736
-3
lines changed

11 files changed

+736
-3
lines changed

client/rest-high-level/src/main/java/org/elasticsearch/client/MLRequestConverters.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919

2020
package org.elasticsearch.client;
2121

22+
import org.apache.http.HttpEntity;
2223
import org.apache.http.client.methods.HttpDelete;
2324
import org.apache.http.client.methods.HttpGet;
2425
import org.apache.http.client.methods.HttpPost;
2526
import org.apache.http.client.methods.HttpPut;
27+
import org.apache.http.entity.ByteArrayEntity;
28+
import org.apache.lucene.util.BytesRef;
2629
import org.elasticsearch.client.RequestConverters.EndpointBuilder;
2730
import org.elasticsearch.client.ml.CloseJobRequest;
2831
import org.elasticsearch.client.ml.DeleteJobRequest;
@@ -34,13 +37,16 @@
3437
import org.elasticsearch.client.ml.GetOverallBucketsRequest;
3538
import org.elasticsearch.client.ml.GetRecordsRequest;
3639
import org.elasticsearch.client.ml.OpenJobRequest;
40+
import org.elasticsearch.client.ml.PostDataRequest;
3741
import org.elasticsearch.client.ml.PutJobRequest;
3842
import org.elasticsearch.client.ml.UpdateJobRequest;
3943
import org.elasticsearch.common.Strings;
44+
import org.elasticsearch.common.bytes.BytesReference;
4045

4146
import java.io.IOException;
4247

4348
import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TYPE;
49+
import static org.elasticsearch.client.RequestConverters.createContentType;
4450
import static org.elasticsearch.client.RequestConverters.createEntity;
4551

4652
final class MLRequestConverters {
@@ -202,6 +208,35 @@ static Request getRecords(GetRecordsRequest getRecordsRequest) throws IOExceptio
202208
return request;
203209
}
204210

211+
static Request postData(PostDataRequest postDataRequest) throws IOException {
212+
String endpoint = new EndpointBuilder()
213+
.addPathPartAsIs("_xpack")
214+
.addPathPartAsIs("ml")
215+
.addPathPartAsIs("anomaly_detectors")
216+
.addPathPart(postDataRequest.getJobId())
217+
.addPathPartAsIs("_data")
218+
.build();
219+
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
220+
221+
RequestConverters.Params params = new RequestConverters.Params(request);
222+
if (postDataRequest.getResetStart() != null) {
223+
params.putParam(PostDataRequest.RESET_START.getPreferredName(), postDataRequest.getResetStart());
224+
}
225+
if (postDataRequest.getResetEnd() != null) {
226+
params.putParam(PostDataRequest.RESET_END.getPreferredName(), postDataRequest.getResetEnd());
227+
}
228+
BytesReference content = postDataRequest.getContent();
229+
if (content != null) {
230+
BytesRef source = postDataRequest.getContent().toBytesRef();
231+
HttpEntity byteEntity = new ByteArrayEntity(source.bytes,
232+
source.offset,
233+
source.length,
234+
createContentType(postDataRequest.getXContentType()));
235+
request.setEntity(byteEntity);
236+
}
237+
return request;
238+
}
239+
205240
static Request getInfluencers(GetInfluencersRequest getInfluencersRequest) throws IOException {
206241
String endpoint = new EndpointBuilder()
207242
.addPathPartAsIs("_xpack")

client/rest-high-level/src/main/java/org/elasticsearch/client/MachineLearningClient.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.elasticsearch.client;
2020

2121
import org.elasticsearch.action.ActionListener;
22+
import org.elasticsearch.client.ml.PostDataRequest;
23+
import org.elasticsearch.client.ml.PostDataResponse;
2224
import org.elasticsearch.client.ml.UpdateJobRequest;
2325
import org.elasticsearch.client.ml.CloseJobRequest;
2426
import org.elasticsearch.client.ml.CloseJobResponse;
@@ -501,6 +503,52 @@ public void getRecordsAsync(GetRecordsRequest request, RequestOptions options, A
501503
Collections.emptySet());
502504
}
503505

506+
/**
507+
* Sends data to an anomaly detection job for analysis.
508+
*
509+
* NOTE: The job must have a state of open to receive and process the data.
510+
*
511+
* <p>
512+
* For additional info
513+
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-post-data.html">ML POST Data documentation</a>
514+
* </p>
515+
*
516+
* @param request PostDataRequest containing the data to post and some additional options
517+
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
518+
* @return response containing operational progress about the job
519+
* @throws IOException when there is a serialization issue sending the request or receiving the response
520+
*/
521+
public PostDataResponse postData(PostDataRequest request, RequestOptions options) throws IOException {
522+
return restHighLevelClient.performRequestAndParseEntity(request,
523+
MLRequestConverters::postData,
524+
options,
525+
PostDataResponse::fromXContent,
526+
Collections.emptySet());
527+
}
528+
529+
/**
530+
* Sends data to an anomaly detection job for analysis, asynchronously
531+
*
532+
* NOTE: The job must have a state of open to receive and process the data.
533+
*
534+
* <p>
535+
* For additional info
536+
* see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/ml-post-data.html">ML POST Data documentation</a>
537+
* </p>
538+
*
539+
* @param request PostDataRequest containing the data to post and some additional options
540+
* @param options Additional request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
541+
* @param listener Listener to be notified upon request completion
542+
*/
543+
public void postDataAsync(PostDataRequest request, RequestOptions options, ActionListener<PostDataResponse> listener) {
544+
restHighLevelClient.performRequestAsyncAndParseEntity(request,
545+
MLRequestConverters::postData,
546+
options,
547+
PostDataResponse::fromXContent,
548+
listener,
549+
Collections.emptySet());
550+
}
551+
504552
/**
505553
* Gets the influencers for a Machine Learning Job.
506554
* <p>
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.client.ml;
20+
21+
import org.elasticsearch.action.ActionRequest;
22+
import org.elasticsearch.action.ActionRequestValidationException;
23+
import org.elasticsearch.client.ml.job.config.Job;
24+
import org.elasticsearch.common.ParseField;
25+
import org.elasticsearch.common.bytes.BytesArray;
26+
import org.elasticsearch.common.bytes.BytesReference;
27+
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
28+
import org.elasticsearch.common.xcontent.ToXContentObject;
29+
import org.elasticsearch.common.xcontent.XContentBuilder;
30+
import org.elasticsearch.common.xcontent.XContentType;
31+
32+
import java.io.IOException;
33+
import java.nio.ByteBuffer;
34+
import java.nio.charset.StandardCharsets;
35+
import java.util.ArrayList;
36+
import java.util.List;
37+
import java.util.Map;
38+
import java.util.Objects;
39+
40+
/**
41+
* POJO for posting data to a Machine Learning job
42+
*/
43+
public class PostDataRequest extends ActionRequest implements ToXContentObject {
44+
45+
public static final ParseField RESET_START = new ParseField("reset_start");
46+
public static final ParseField RESET_END = new ParseField("reset_end");
47+
public static final ParseField CONTENT_TYPE = new ParseField("content_type");
48+
49+
public static final ConstructingObjectParser<PostDataRequest, Void> PARSER =
50+
new ConstructingObjectParser<>("post_data_request",
51+
(a) -> new PostDataRequest((String)a[0], XContentType.fromMediaTypeOrFormat((String)a[1]), new byte[0]));
52+
53+
static {
54+
PARSER.declareString(ConstructingObjectParser.constructorArg(), Job.ID);
55+
PARSER.declareString(ConstructingObjectParser.constructorArg(), CONTENT_TYPE);
56+
PARSER.declareStringOrNull(PostDataRequest::setResetEnd, RESET_END);
57+
PARSER.declareStringOrNull(PostDataRequest::setResetStart, RESET_START);
58+
}
59+
60+
private final String jobId;
61+
private final XContentType xContentType;
62+
private final BytesReference content;
63+
private String resetStart;
64+
private String resetEnd;
65+
66+
/**
67+
* Create a new PostDataRequest object
68+
*
69+
* @param jobId non-null jobId of the job to post data to
70+
* @param xContentType content type of the data to post. Only {@link XContentType#JSON} or {@link XContentType#SMILE} are supported
71+
* @param content bulk serialized content in the format of the passed {@link XContentType}
72+
*/
73+
public PostDataRequest(String jobId, XContentType xContentType, BytesReference content) {
74+
this.jobId = Objects.requireNonNull(jobId, "job_id must not be null");
75+
this.xContentType = Objects.requireNonNull(xContentType, "content_type must not be null");
76+
this.content = Objects.requireNonNull(content, "content must not be null");
77+
}
78+
79+
/**
80+
* Create a new PostDataRequest object referencing the passed {@code byte[]} content
81+
*
82+
* @param jobId non-null jobId of the job to post data to
83+
* @param xContentType content type of the data to post. Only {@link XContentType#JSON} or {@link XContentType#SMILE} are supported
84+
* @param content bulk serialized content in the format of the passed {@link XContentType}
85+
*/
86+
public PostDataRequest(String jobId, XContentType xContentType, byte[] content) {
87+
this(jobId, xContentType, new BytesArray(content));
88+
}
89+
90+
/**
91+
* Create a new PostDataRequest object referencing the passed {@link JsonBuilder} object
92+
*
93+
* @param jobId non-null jobId of the job to post data to
94+
* @param builder {@link JsonBuilder} object containing documents to be serialized and sent in {@link XContentType#JSON} format
95+
*/
96+
public PostDataRequest(String jobId, JsonBuilder builder) {
97+
this(jobId, XContentType.JSON, builder.build());
98+
}
99+
100+
public String getJobId() {
101+
return jobId;
102+
}
103+
104+
public String getResetStart() {
105+
return resetStart;
106+
}
107+
108+
/**
109+
* Specifies the start of the bucket resetting range
110+
*
111+
* @param resetStart String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO 8601 string
112+
*/
113+
public void setResetStart(String resetStart) {
114+
this.resetStart = resetStart;
115+
}
116+
117+
public String getResetEnd() {
118+
return resetEnd;
119+
}
120+
121+
/**
122+
* Specifies the end of the bucket resetting range
123+
*
124+
* @param resetEnd String representation of a timestamp; may be an epoch seconds, epoch millis or an ISO 8601 string
125+
*/
126+
public void setResetEnd(String resetEnd) {
127+
this.resetEnd = resetEnd;
128+
}
129+
130+
public BytesReference getContent() {
131+
return content;
132+
}
133+
134+
public XContentType getXContentType() {
135+
return xContentType;
136+
}
137+
138+
@Override
139+
public int hashCode() {
140+
//We leave out the content for server side parity
141+
return Objects.hash(jobId, resetStart, resetEnd, xContentType);
142+
}
143+
144+
@Override
145+
public boolean equals(Object obj) {
146+
if(obj == this) {
147+
return true;
148+
}
149+
150+
if (obj == null || getClass() != obj.getClass()) {
151+
return false;
152+
}
153+
154+
//We leave out the content for server side parity
155+
PostDataRequest other = (PostDataRequest) obj;
156+
return Objects.equals(jobId, other.jobId) &&
157+
Objects.equals(resetStart, other.resetStart) &&
158+
Objects.equals(resetEnd, other.resetEnd) &&
159+
Objects.equals(xContentType, other.xContentType);
160+
}
161+
162+
@Override
163+
public ActionRequestValidationException validate() {
164+
return null;
165+
}
166+
167+
@Override
168+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
169+
builder.startObject();
170+
builder.field(Job.ID.getPreferredName(), jobId);
171+
builder.field(CONTENT_TYPE.getPreferredName(), xContentType.mediaType());
172+
if (resetEnd != null) {
173+
builder.field(RESET_END.getPreferredName(), resetEnd);
174+
}
175+
if (resetStart != null) {
176+
builder.field(RESET_START.getPreferredName(), resetStart);
177+
}
178+
builder.endObject();
179+
return builder;
180+
}
181+
182+
/**
183+
* Class for incrementally building a bulk document request in {@link XContentType#JSON} format
184+
*/
185+
public static class JsonBuilder {
186+
187+
private final List<ByteBuffer> bytes = new ArrayList<>();
188+
189+
/**
190+
* Add a document via a {@code byte[]} array
191+
*
192+
* @param doc {@code byte[]} array of a serialized JSON object
193+
*/
194+
public JsonBuilder addDoc(byte[] doc) {
195+
bytes.add(ByteBuffer.wrap(doc));
196+
return this;
197+
}
198+
199+
/**
200+
* Add a document via a serialized JSON String
201+
*
202+
* @param doc a serialized JSON String
203+
*/
204+
public JsonBuilder addDoc(String doc) {
205+
bytes.add(ByteBuffer.wrap(doc.getBytes(StandardCharsets.UTF_8)));
206+
return this;
207+
}
208+
209+
/**
210+
* Add a document via an object map
211+
*
212+
* @param doc document object to add to bulk request
213+
* @throws IOException on parsing/serialization errors
214+
*/
215+
public JsonBuilder addDoc(Map<String, Object> doc) throws IOException {
216+
try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) {
217+
builder.map(doc);
218+
bytes.add(ByteBuffer.wrap(BytesReference.toBytes(BytesReference.bytes(builder))));
219+
}
220+
return this;
221+
}
222+
223+
private BytesReference build() {
224+
ByteBuffer[] buffers = bytes.toArray(new ByteBuffer[bytes.size()]);
225+
return BytesReference.fromByteBuffers(buffers);
226+
}
227+
228+
}
229+
}

0 commit comments

Comments
 (0)