Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove unwanted files

"java.configuration.updateBuildConfiguration": "interactive",
"java.compile.nullAnalysis.mode": "automatic"
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
</properties>

<dependencies>

<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.splunk.kafka.connect;

import org.apache.http.impl.client.CloseableHttpClient;

import com.splunk.hecclient.HecConfig;

public abstract class AbstractClientWrapper {
abstract CloseableHttpClient getClient(HecConfig config);
}
17 changes: 17 additions & 0 deletions src/main/java/com/splunk/kafka/connect/HecClientWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.splunk.kafka.connect;

import org.apache.http.impl.client.CloseableHttpClient;

import com.splunk.hecclient.Hec;
import com.splunk.hecclient.HecConfig;

public class HecClientWrapper extends AbstractClientWrapper {

@Override
CloseableHttpClient getClient(HecConfig config) {
return Hec.createHttpClient(config);

}


}
111 changes: 106 additions & 5 deletions src/main/java/com/splunk/kafka/connect/SplunkSinkConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,50 @@

import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.avro.Protocol;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these imports required? If not, please remove them.

import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.splunk.hecclient.Event;
import com.splunk.hecclient.EventBatch;
import com.splunk.hecclient.JsonEvent;
import com.splunk.hecclient.JsonEventBatch;

public final class SplunkSinkConnector extends SinkConnector {
private static final Logger log = LoggerFactory.getLogger(SplunkSinkConnector.class);
private Map<String, String> taskConfig;
private Map<String, ConfigValue> values;
private List<ConfigValue> validations;
private AbstractClientWrapper hecAb = new HecClientWrapper();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should have some meaningful name



public void setHecInstance(AbstractClientWrapper hecAb) {
this.hecAb = hecAb;
}

@Override
public void start(Map<String, String> taskConfig) {
Expand Down Expand Up @@ -76,14 +101,15 @@ public ConfigDef config() {
return SplunkSinkConnectorConfig.conf();
}


@Override
public Config validate(final Map<String, String> connectorConfigs) {
Config config = super.validate(connectorConfigs);
validations = config.configValues();
values = validations.stream().collect(Collectors.toMap(ConfigValue::name, Function.identity()));

validateKerberosConfigs(connectorConfigs);
validateHealthCheckForSplunkIndexes(connectorConfigs);
return new Config(validations);
}

Expand All @@ -100,9 +126,9 @@ void validateKerberosConfigs(final Map<String, String> configs) {
}

String errorMessage = String.format(
"Either both or neither '%s' and '%s' must be set for Kerberos authentication. ",
KERBEROS_KEYTAB_PATH_CONF,
KERBEROS_USER_PRINCIPAL_CONF
"Either both or neither '%s' and '%s' must be set for Kerberos authentication. ",
KERBEROS_KEYTAB_PATH_CONF,
KERBEROS_USER_PRINCIPAL_CONF
);
addErrorMessage(KERBEROS_KEYTAB_PATH_CONF, errorMessage);
addErrorMessage(KERBEROS_USER_PRINCIPAL_CONF, errorMessage);
Expand All @@ -111,4 +137,79 @@ void validateKerberosConfigs(final Map<String, String> configs) {
private void addErrorMessage(String property, String error) {
values.get(property).addErrorMessage(error);
}
}

private static String[] split(String data, String sep) {
if (data != null && !data.trim().isEmpty()) {
return data.trim().split(sep);
}
return null;
}


private void validateHealthCheckForSplunkIndexes(final Map<String, String> configs) throws ConfigException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it validating splunk health?

SplunkSinkConnectorConfig connectorConfig = new SplunkSinkConnectorConfig(configs);
String[] indexes = split(connectorConfig.indexes, ",");
if(indexes.length == 0) {
healthCheckForSplunkHEC(connectorConfig, "");
} else {
for (String index : indexes) {
healthCheckForSplunkHEC(connectorConfig, index);
}
}
}

private void healthCheckForSplunkHEC(SplunkSinkConnectorConfig connectorConfig, String index) throws ConfigException {
Header[] headers;
headers = new Header[1];
headers[0] = new BasicHeader("Authorization", String.format("Splunk %s", connectorConfig.splunkToken));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you define headers as an inline array?

String endpoint = "/services/collector";
String url = connectorConfig.splunkURI + endpoint;
final HttpPost httpPost = new HttpPost(url);
httpPost.setHeaders(headers);
EventBatch batch = new JsonEventBatch();
Event event = new JsonEvent("a:a", null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "a:a" represents?

event.setIndex(index);
batch.add(event);
httpPost.setEntity(batch.getHttpEntity());

CloseableHttpClient httpClient = hecAb.getClient(connectorConfig.getHecConfig());
executeHttpRequest(httpPost, httpClient);
}


private void executeHttpRequest(final HttpUriRequest req, CloseableHttpClient httpClient) throws ConfigException {
CloseableHttpResponse resp = null;
HttpContext context;
context = HttpClientContext.create();
try {
resp = httpClient.execute(req, context);
} catch (ClientProtocolException ex) {
throw new ConfigException("Invalid splunk SSL configuration detected while validating configuration",ex);
} catch (IOException ex) {
throw new ConfigException("Invalid Splunk Configurations",ex);
}
try {
String respPayload = EntityUtils.toString(resp.getEntity(), "utf-8");
if (respPayload.contains("Incorrect index")){
throw new ConfigException("Incorrect Index detected while validating configuration");
}
else if (respPayload.contains("Invalid token")){
throw new ConfigException("Incorrect HEC token detected while validating configuration");
}
} catch(ConfigException ex){
throw ex;
}
catch (Exception ex) {
throw new ConfigException("failed to process http payload",ex);
} finally {
try {
resp.close();
} catch (Exception ex) {
throw new ConfigException("failed to close http response",ex);
}

}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,10 @@ public final class SplunkSinkConnectorConfig extends AbstractConfig {
regex = getString(REGEX_CONF);
timestampFormat = getString(TIMESTAMP_FORMAT_CONF).trim();
validateRegexForTimestamp(regex);

}


public static ConfigDef conf() {
return new ConfigDef()
.define(TOKEN_CONF, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, TOKEN_DOC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class CloseableHttpClientMock extends CloseableHttpClient {
public static final String serverBusy = "{\"text\":\"Server busy\",\"code\":1}";
public static final String noDataError = "{\"text\":\"No data\",\"code\":5}";
public static final String invalidDataFormat = "{\"text\":\"Invalid data format\",\"code\":6}";
public static final String inValidToken = "{\"text\":\"Invalid token\",\"code\":4}";
public static final String inValidIndex = "{\"text\":\"Incorrect index\",\"code\":4,\"invalid-event-number\":1}";
public static final String exception = "excpetion";

private String resp = "";
Expand All @@ -49,6 +51,10 @@ protected CloseableHttpResponse doExecute(HttpHost target, HttpRequest request,
return createResponse(resp, 503);
} else if (resp.equals(noDataError)) {
return createResponse(resp, 400);
}else if (resp.equals(inValidToken)) {
return createResponse(resp, 400);
}else if (resp.equals(inValidIndex)) {
return createResponse(resp, 400);
} else {
return createResponse(success, 201);
}
Expand Down
20 changes: 20 additions & 0 deletions src/test/java/com/splunk/kafka/connect/MockHecClientWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.splunk.kafka.connect;

import org.apache.http.impl.client.CloseableHttpClient;

import com.splunk.hecclient.CloseableHttpClientMock;
import com.splunk.hecclient.Hec;
import com.splunk.hecclient.HecConfig;

public class MockHecClientWrapper extends AbstractClientWrapper{
public CloseableHttpClientMock client = new CloseableHttpClientMock();

@Override
CloseableHttpClient getClient(HecConfig config) {
// TODO Auto-generated method stub
if (config==null){}

return client;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import com.splunk.hecclient.CloseableHttpClientMock;

import java.util.*;

Expand Down Expand Up @@ -74,6 +78,10 @@ public void testValidKerberosBothEmpty() {
final Map<String, String> configs = new HashMap<>();
addNecessaryConfigs(configs);
SinkConnector connector = new SplunkSinkConnector();
configs.put("topics", "b");
configs.put("splunk.indexes", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Config result = connector.validate(configs);
assertNoErrors(result);
}
Expand All @@ -85,6 +93,10 @@ public void testValidKerberosBothSet() {
configs.put(KERBEROS_USER_PRINCIPAL_CONF, TEST_KERB_PRINCIPAL);
configs.put(KERBEROS_KEYTAB_PATH_CONF, TEST_KERB_KEYTAB_PATH);
SinkConnector connector = new SplunkSinkConnector();
configs.put("topics", "b");
configs.put("splunk.indexes", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Config result = connector.validate(configs);
assertNoErrors(result);
}
Expand All @@ -95,6 +107,10 @@ public void testInvalidKerberosOnlyPrincipalSet() {
addNecessaryConfigs(configs);
configs.put(KERBEROS_USER_PRINCIPAL_CONF, TEST_KERB_PRINCIPAL);
SplunkSinkConnector connector = new SplunkSinkConnector();
configs.put("topics", "b");
configs.put("splunk.indexes", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Config result = connector.validate(configs);
assertHasErrorMessage(result, KERBEROS_USER_PRINCIPAL_CONF, "must be set");
assertHasErrorMessage(result, KERBEROS_KEYTAB_PATH_CONF, "must be set");
Expand All @@ -106,11 +122,54 @@ public void testInvalidKerberosOnlyKeytabSet() {
addNecessaryConfigs(configs);
configs.put(KERBEROS_KEYTAB_PATH_CONF, TEST_KERB_KEYTAB_PATH);
SplunkSinkConnector connector = new SplunkSinkConnector();
configs.put("topics", "b");
configs.put("splunk.indexes", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Config result = connector.validate(configs);
assertHasErrorMessage(result, KERBEROS_USER_PRINCIPAL_CONF, "must be set");
assertHasErrorMessage(result, KERBEROS_KEYTAB_PATH_CONF, "must be set");
}

@Test
public void testInvalidToken() {
final Map<String, String> configs = new HashMap<>();
addNecessaryConfigs(configs);
SplunkSinkConnector connector = new SplunkSinkConnector();
configs.put("topics", "b");
configs.put("splunk.indexes", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
clientInstance.client.setResponse(CloseableHttpClientMock.inValidToken);
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
}

@Test
public void testInvalidIndex() {
final Map<String, String> configs = new HashMap<>();
addNecessaryConfigs(configs);
SplunkSinkConnector connector = new SplunkSinkConnector();
configs.put("topics", "b");
configs.put("splunk.indexes", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
clientInstance.client.setResponse(CloseableHttpClientMock.inValidIndex);
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Assertions.assertThrows(ConfigException.class, ()->connector.validate(configs));
}

@Test
public void testValidSplunkConfigurations() {
final Map<String, String> configs = new HashMap<>();
addNecessaryConfigs(configs);
SplunkSinkConnector connector = new SplunkSinkConnector();
configs.put("topics", "b");
configs.put("splunk.indexes", "b");
MockHecClientWrapper clientInstance = new MockHecClientWrapper();
clientInstance.client.setResponse(CloseableHttpClientMock.success);
((SplunkSinkConnector) connector).setHecInstance(clientInstance);
Assertions.assertDoesNotThrow(()->connector.validate(configs));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add tests for an invalid host, and invalid SSL configuration(i.e. HTTP request on HTTPS server and vice-versa)?

private void addNecessaryConfigs(Map<String, String> configs) {
configs.put(URI_CONF, TEST_URI);
configs.put(TOKEN_CONF, "blah");
Expand Down