Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.Scope;
import org.elasticsearch.rest.ServerlessScope;
import org.elasticsearch.rest.action.RestResponseListener;

import java.util.List;

Expand Down Expand Up @@ -98,20 +98,29 @@ public void onComplete(RestChannel channel, ReleasableBytesReference content, Re
client.execute(
PrometheusRemoteWriteTransportAction.TYPE,
transportRequest,
ActionListener.releaseBefore(transportRequest, new RestResponseListener<>(channel) {
@Override
public RestResponse buildResponse(PrometheusRemoteWriteTransportAction.RemoteWriteResponse r) {
if (r.getMessage() != null) {
logger.debug(
"Remote write request failed with status [{}] and message [{}]",
r.getStatus(),
r.getMessage()
);
return new RestResponse(r.getStatus(), r.getMessage());
ActionListener.releaseBefore(
transportRequest,
ActionListener.wrap(
r -> channel.sendResponse(
new RestResponse(RestStatus.NO_CONTENT, RestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY)
),
e -> {
logger.debug("Remote write transport action failed", e);
try {
channel.sendResponse(
new RestResponse(
ExceptionsHelper.status(e),
RestResponse.TEXT_CONTENT_TYPE,
new BytesArray(e.getMessage())
)
);
} catch (Exception sendException) {
sendException.addSuppressed(e);
logger.warn("failed to send failure response", sendException);
}
}
return new RestResponse(r.getStatus(), RestResponse.TEXT_CONTENT_TYPE, BytesArray.EMPTY);
}
})
)
)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
Expand Down Expand Up @@ -90,7 +90,7 @@ protected void doExecute(Task task, RemoteWriteRequest request, ActionListener<R

int totalSamples = countTotalSamples(writeRequest);
if (totalSamples == 0) {
listener.onResponse(new RemoteWriteResponse(RestStatus.NO_CONTENT));
listener.onResponse(new RemoteWriteResponse());
return;
}

Expand Down Expand Up @@ -118,36 +118,25 @@ protected void doExecute(Task task, RemoteWriteRequest request, ActionListener<R

if (bulkRequestBuilder.numberOfActions() == 0) {
String message = buildFailureSummary(totalSamples, droppedMissingName, droppedMissingName, Map.of());
listener.onResponse(new RemoteWriteResponse(RestStatus.BAD_REQUEST, message));
listener.onFailure(new ElasticsearchStatusException(message, RestStatus.BAD_REQUEST));
return;
}

final int finalDroppedMissingName = droppedMissingName;
bulkRequestBuilder.execute(new ActionListener<>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
if (bulkResponse.hasFailures() || finalDroppedMissingName > 0) {
handlePartialSuccess(bulkResponse, totalSamples, finalDroppedMissingName, listener);
} else {
listener.onResponse(new RemoteWriteResponse(RestStatus.NO_CONTENT));
}
bulkRequestBuilder.execute(listener.delegateFailure((delegate, bulkResponse) -> {
if (bulkResponse.hasFailures() || finalDroppedMissingName > 0) {
delegate.onFailure(buildPartialFailureException(bulkResponse, totalSamples, finalDroppedMissingName));
} else {
delegate.onResponse(new RemoteWriteResponse());
}

@Override
public void onFailure(Exception e) {
listener.onResponse(new RemoteWriteResponse(ExceptionsHelper.status(e)));
}
});
}));

} catch (InvalidProtocolBufferException e) {
// Invalid protobuf is a client error - return 400 so clients don't retry
listener.onResponse(
new RemoteWriteResponse(RestStatus.BAD_REQUEST, "Invalid Prometheus remote write payload: " + e.getMessage())
listener.onFailure(
new ElasticsearchStatusException("Invalid Prometheus remote write payload: " + e.getMessage(), RestStatus.BAD_REQUEST, e)
);
} catch (Exception e) {
listener.onResponse(
new RemoteWriteResponse(ExceptionsHelper.status(e), "Failed to process Prometheus remote write request: " + e.getMessage())
);
listener.onFailure(e);
}
}

Expand Down Expand Up @@ -203,15 +192,13 @@ private IndexRequest buildIndexRequest(TimeSeries timeSeries, Sample sample, Str
}
}

private static void handlePartialSuccess(
private static ElasticsearchStatusException buildPartialFailureException(
BulkResponse bulkResponse,
int totalSamples,
int droppedMissingName,
ActionListener<RemoteWriteResponse> listener
int droppedMissingName
) {
// Count failures and group by status
Map<String, Map<RestStatus, FailureGroup>> failureGroups = null;
// Default to returning 400 per the remote write spec for requests that should not be retried.
// Default to 400 per the remote write spec for requests that should not be retried.
RestStatus responseStatus = RestStatus.BAD_REQUEST;
int failures = droppedMissingName;

Expand All @@ -234,7 +221,7 @@ private static void handlePartialSuccess(
}

String message = buildFailureSummary(totalSamples, droppedMissingName, failures, failureGroups);
listener.onResponse(new RemoteWriteResponse(responseStatus, message));
return new ElasticsearchStatusException(message, responseStatus);
}

private static String buildFailureSummary(
Expand Down Expand Up @@ -316,31 +303,9 @@ public void close() {
}

public static class RemoteWriteResponse extends ActionResponse {
private final RestStatus status;
@Nullable
private final String message;

public RemoteWriteResponse(RestStatus status) {
this(status, null);
}

public RemoteWriteResponse(RestStatus status, @Nullable String message) {
this.status = status;
this.message = message;
}

@Override
public void writeTo(StreamOutput out) {
TransportAction.localOnly();
}

public RestStatus getStatus() {
return status;
}

@Nullable
public String getMessage() {
return message;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ public <Request extends ActionRequest, Response extends ActionResponse> void doE
assertThat(actionType, equalTo(PrometheusRemoteWriteTransportAction.TYPE));
var remoteWriteRequest = (PrometheusRemoteWriteTransportAction.RemoteWriteRequest) req;
remoteWriteRequest.close();
listener.onResponse((Response) new PrometheusRemoteWriteTransportAction.RemoteWriteResponse(RestStatus.OK));
listener.onResponse((Response) new PrometheusRemoteWriteTransportAction.RemoteWriteResponse());
}
};
try (var response = executeRemoteWrite(1024, 64)) {
assertThat(response.status(), equalTo(RestStatus.OK));
assertThat(response.status(), equalTo(RestStatus.NO_CONTENT));
}
}

Expand Down
Loading
Loading