Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add http delete method #211

Merged
merged 16 commits into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lombok {
}

group 'io.odpf'
version '0.7.2'
version '0.7.3'

def projName = "firehose"

Expand Down
4 changes: 2 additions & 2 deletions docs/docs/reference/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ configs mean, please have a look at the config section.

#### Does it support DELETE calls?

At the moment, the HTTP Sink supports only PUT, POST and PATCH methods.
Yes, the HTTP Sink supports DELETE calls since Firehose version 0.7.3.

#### How many messages are pushed in one call?

Expand Down Expand Up @@ -861,4 +861,4 @@ offset for the partition was deleted due to retention period, the following 3 ca
earliest available offset is now 21, Firehose will start reading from offset 21 for the partition. This is also the
default behaviour in case this config is not specified at all.
3. If the config **SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET** is set to `none` or anything else, then Firehose
will terminate with an abnormal status code.
will terminate with an abnormal status code.
14 changes: 11 additions & 3 deletions docs/docs/sinks/http-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ An Http sink Firehose \(`SINK_TYPE`=`http`\) requires the following variables to

### `SINK_HTTP_SERVICE_URL`

The HTTP endpoint of the service to which this consumer should PUT/POST/PATCH data. This can be configured as per the requirement, a constant or a dynamic one \(which extract given field values from each message and use that as the endpoint\)
The HTTP endpoint of the service to which this consumer should PUT/POST/PATCH/DELETE data. This can be configured as per the requirement, a constant or a dynamic one \(which extract given field values from each message and use that as the endpoint\)
If service url is constant, messages will be sent as batches while in case of dynamic one each message will be sent as a separate request \(Since they’d be having different endpoints\).

- Example value: `http://http-service.test.io`
Expand All @@ -15,7 +15,7 @@ If service url is constant, messages will be sent as batches while in case of dy

### `SINK_HTTP_REQUEST_METHOD`

Defines the HTTP verb supported by the endpoint, Supports PUT, POST and PATCH verbs as of now.
Defines the HTTP verb supported by the endpoint, Supports PUT, POST, PATCH and DELETE verbs as of now.

- Example value: `post`
- Type: `required`
Expand All @@ -39,7 +39,7 @@ Defines the maximum number of HTTP connections.

### `SINK_HTTP_RETRY_STATUS_CODE_RANGES`

Deifnes the range of HTTP status codes for which retry will be attempted.
Defines the range of HTTP status codes for which retry will be attempted. Please remove 404 from retry code range in case of HTTP DELETE otherwise it might try to retry to delete already deleted resources.

- Example value: `400-600`
- Type: `optional`
Expand Down Expand Up @@ -133,3 +133,11 @@ Space-delimited scope overrides. If scope override is not provided, no scopes wi

- Example value: `User:read, sys:info`
- Type: `optional`

### `SINK_HTTP_DELETE_BODY_ENABLE`

This config if set to true will allow body for the HTTP DELETE method, otherwise no payload will be sent with DELETE request.

- Example value: `false`
- Type: `optional`
- Default value: `true`
4 changes: 4 additions & 0 deletions src/main/java/io/odpf/firehose/config/HttpSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,8 @@ public interface HttpSinkConfig extends AppConfig {
@Key("SINK_HTTP_PARAMETER_SCHEMA_PROTO_CLASS")
String getSinkHttpParameterSchemaProtoClass();

@Key("SINK_HTTP_DELETE_BODY_ENABLE")
@DefaultValue("true")
Boolean getSinkHttpDeleteBodyEnable();

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
public enum HttpSinkRequestMethodType {
PUT,
POST,
PATCH
PATCH,
DELETE
}
4 changes: 4 additions & 0 deletions src/main/java/io/odpf/firehose/sink/http/HttpSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -58,6 +59,9 @@ protected void prepare(List<Message> messages) throws DeserializerException, IOE

@Override
protected List<String> readContent(HttpEntityEnclosingRequestBase httpRequest) throws IOException {
if (httpRequest.getMethod().equals("DELETE") && httpRequest.getEntity() == null) {
return new ArrayList<>();
}
try (InputStream inputStream = httpRequest.getEntity().getContent()) {
return new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)).lines().collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.odpf.firehose.sink.http.request;

import io.odpf.firehose.config.enums.HttpSinkRequestMethodType;
import io.odpf.firehose.sink.http.request.method.HttpDeleteWithBody;
import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;
import org.apache.http.client.methods.HttpPatch;
import org.apache.http.client.methods.HttpPost;
Expand All @@ -25,6 +26,8 @@ public static HttpEntityEnclosingRequestBase create(URI uri, HttpSinkRequestMeth
return new HttpPost(uri);
case PATCH:
return new HttpPatch(uri);
case DELETE:
return new HttpDeleteWithBody(uri);
default:
return new HttpPut(uri);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.odpf.firehose.sink.http.request.create;

import io.odpf.firehose.config.HttpSinkConfig;
import io.odpf.firehose.config.enums.HttpSinkRequestMethodType;
import io.odpf.firehose.message.Message;
import io.odpf.firehose.metrics.FirehoseInstrumentation;
Expand All @@ -23,12 +24,14 @@ public class BatchRequestCreator implements RequestCreator {
private HttpSinkRequestMethodType method;
private JsonBody jsonBody;
private FirehoseInstrumentation firehoseInstrumentation;
private HttpSinkConfig httpSinkConfig;

public BatchRequestCreator(FirehoseInstrumentation firehoseInstrumentation, UriBuilder uriBuilder, HeaderBuilder headerBuilder, HttpSinkRequestMethodType method, JsonBody jsonBody) {
public BatchRequestCreator(FirehoseInstrumentation firehoseInstrumentation, UriBuilder uriBuilder, HeaderBuilder headerBuilder, HttpSinkRequestMethodType method, JsonBody jsonBody, HttpSinkConfig httpSinkConfig) {
this.uriBuilder = uriBuilder;
this.headerBuilder = headerBuilder;
this.method = method;
this.jsonBody = jsonBody;
this.httpSinkConfig = httpSinkConfig;
this.firehoseInstrumentation = firehoseInstrumentation;
}

Expand All @@ -42,9 +45,14 @@ public List<HttpEntityEnclosingRequestBase> create(List<Message> messages, Reque
headerMap.forEach(request::addHeader);
String messagesString = jsonBody.serialize(messages).toString();

request.setEntity(requestEntityBuilder.buildHttpEntity(messagesString));
firehoseInstrumentation.logDebug("\nRequest URL: {}\nRequest headers: {}\nRequest content: {}\nRequest method: {}",
uri, headerMap, jsonBody.serialize(messages), method);
if (!(method == HttpSinkRequestMethodType.DELETE && !httpSinkConfig.getSinkHttpDeleteBodyEnable())) {
request.setEntity(requestEntityBuilder.buildHttpEntity(messagesString));
firehoseInstrumentation.logDebug("\nRequest URL: {}\nRequest headers: {}\nRequest content: {}\nRequest method: {}",
uri, headerMap, jsonBody.serialize(messages), method);
} else {
firehoseInstrumentation.logDebug("\nRequest URL: {}\nRequest headers: {}\nRequest content: no body\nRequest method: {}",
uri, headerMap, method);
}
return Collections.singletonList(request);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.odpf.firehose.sink.http.request.create;

import io.odpf.firehose.config.HttpSinkConfig;
import io.odpf.firehose.config.enums.HttpSinkRequestMethodType;
import io.odpf.firehose.message.Message;
import io.odpf.firehose.metrics.FirehoseInstrumentation;
Expand All @@ -23,13 +24,15 @@ public class IndividualRequestCreator implements RequestCreator {
private HttpSinkRequestMethodType method;
private UriBuilder uriBuilder;
private FirehoseInstrumentation firehoseInstrumentation;
private HttpSinkConfig httpSinkConfig;

public IndividualRequestCreator(FirehoseInstrumentation firehoseInstrumentation, UriBuilder uriBuilder, HeaderBuilder headerBuilder, HttpSinkRequestMethodType method, JsonBody body) {
public IndividualRequestCreator(FirehoseInstrumentation firehoseInstrumentation, UriBuilder uriBuilder, HeaderBuilder headerBuilder, HttpSinkRequestMethodType method, JsonBody body, HttpSinkConfig httpSinkConfig) {
this.uriBuilder = uriBuilder;
this.headerBuilder = headerBuilder;
this.jsonBody = body;
this.method = method;
this.firehoseInstrumentation = firehoseInstrumentation;
this.httpSinkConfig = httpSinkConfig;
}

@Override
Expand All @@ -43,11 +46,15 @@ public List<HttpEntityEnclosingRequestBase> create(List<Message> messages, Reque

Map<String, String> headerMap = headerBuilder.build(message);
headerMap.forEach(request::addHeader);
request.setEntity(entity.buildHttpEntity(bodyContents.get(i)));

firehoseInstrumentation.logDebug("\nRequest URL: {}\nRequest headers: {}\nRequest content: {}\nRequest method: {}",
requestUrl, headerMap, bodyContents.get(i), method);

if (!(method == HttpSinkRequestMethodType.DELETE && !httpSinkConfig.getSinkHttpDeleteBodyEnable())) {
request.setEntity(entity.buildHttpEntity(bodyContents.get(i)));

firehoseInstrumentation.logDebug("\nRequest URL: {}\nRequest headers: {}\nRequest content: {}\nRequest method: {}",
requestUrl, headerMap, bodyContents.get(i), method);
} else {
firehoseInstrumentation.logDebug("\nRequest URL: {}\nRequest headers: {}\nRequest content: no body\nRequest method: {}",
requestUrl, headerMap, method);
}
requests.add(request);
}
return requests;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.odpf.firehose.sink.http.request.method;

import org.apache.http.client.methods.HttpEntityEnclosingRequestBase;

import java.net.URI;

public class HttpDeleteWithBody extends HttpEntityEnclosingRequestBase {
public static final String METHOD_NAME = "DELETE";

@Override
public String getMethod() {
return METHOD_NAME;
}

public HttpDeleteWithBody(final URI uri) {
super();
setURI(uri);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public List<HttpEntityEnclosingRequestBase> build(List<Message> messages) throws
@Override
public Request setRequestStrategy(HeaderBuilder headerBuilder, UriBuilder uriBuilder, RequestEntityBuilder requestEntitybuilder) {
this.requestCreator = new IndividualRequestCreator(
new FirehoseInstrumentation(statsDReporter, IndividualRequestCreator.class), uriBuilder, headerBuilder, method, body);
new FirehoseInstrumentation(statsDReporter, IndividualRequestCreator.class), uriBuilder, headerBuilder, method, body, httpSinkConfig);
this.requestEntityBuilder = requestEntitybuilder;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Request setRequestStrategy(HeaderBuilder headerBuilder, UriBuilder uriBui
this.requestCreator = new IndividualRequestCreator(
new FirehoseInstrumentation(statsDReporter, IndividualRequestCreator.class), uriBuilder,
headerBuilder.withParameterizedHeader(protoToFieldMapper, httpSinkConfig.getSinkHttpParameterSource()),
method, body);
method, body, httpSinkConfig);
this.requestEntityBuilder = requestEntitybuilder;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public Request setRequestStrategy(HeaderBuilder headerBuilder, UriBuilder uriBui
this.requestCreator = new IndividualRequestCreator(
new FirehoseInstrumentation(statsDReporter, IndividualRequestCreator.class),
uriBuilder.withParameterizedURI(protoToFieldMapper, httpSinkConfig.getSinkHttpParameterSource()),
headerBuilder, method, body);
headerBuilder, method, body, httpSinkConfig);
this.requestEntityBuilder = requestEntitybuilder;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ public List<HttpEntityEnclosingRequestBase> build(List<Message> messages) throws
public Request setRequestStrategy(HeaderBuilder headerBuilder, UriBuilder uriBuilder, RequestEntityBuilder requestEntitybuilder) {
if (isTemplateBody(httpSinkConfig)) {
this.requestCreator = new IndividualRequestCreator(new FirehoseInstrumentation(
statsDReporter, IndividualRequestCreator.class), uriBuilder, headerBuilder, method, body);
statsDReporter, IndividualRequestCreator.class), uriBuilder, headerBuilder, method, body, httpSinkConfig);
} else {
this.requestCreator = new BatchRequestCreator(new FirehoseInstrumentation(
statsDReporter, BatchRequestCreator.class), uriBuilder, headerBuilder, method, body);
statsDReporter, BatchRequestCreator.class), uriBuilder, headerBuilder, method, body, httpSinkConfig);
}
this.requestEntityBuilder = requestEntitybuilder;
return this;
Expand Down
3 changes: 3 additions & 0 deletions src/test/java/io/odpf/firehose/sink/http/HttpSinkTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ public void shouldThrowNeedToRetryExceptionWhenResponseIsNull() throws Exception
when(httpEntity.getContent()).thenReturn(new StringInputStream(""));
when(request.build(messages)).thenReturn(httpRequests);
when(httpClient.execute(httpPut)).thenReturn(null);
when(httpPut.getMethod()).thenReturn("PUT");

HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient, retryStatusCodeRange, requestLogStatusCodeRanges);
httpSink.prepare(messages);
Expand Down Expand Up @@ -284,6 +285,7 @@ public void shouldNotLogEntireRequestIfNotInStatusCodeRange() throws Exception {
when(httpClient.execute(httpPut)).thenReturn(response);
when(response.getAllHeaders()).thenReturn(new Header[]{new BasicHeader("Accept", "text/plain")});
when(response.getEntity()).thenReturn(httpEntity);
when(httpPut.getMethod()).thenReturn("PUT");
when(httpEntity.getContent()).thenReturn(new StringInputStream("[{\"key\":\"value1\"},{\"key\":\"value2\"}]"));

HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient,
Expand Down Expand Up @@ -312,6 +314,7 @@ public void shouldCaptureDroppedMessagesMetricsIfNotInStatusCodeRange() throws E
when(response.getAllHeaders()).thenReturn(new Header[]{new BasicHeader("Accept", "text/plain")});
when(response.getEntity()).thenReturn(httpEntity);
when(httpEntity.getContent()).thenReturn(new StringInputStream("[{\"key\":\"value1\"},{\"key\":\"value2\"}]"));
when(httpPut.getMethod()).thenReturn("PUT");

HttpSink httpSink = new HttpSink(firehoseInstrumentation, request, httpClient, stencilClient,
new RangeToHashMapConverter().convert(null, "400-499"), requestLogStatusCodeRanges);
Expand Down
Loading