Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Bug 2678- tcp protocol support for es6 #2688

Merged
merged 6 commits into from
Jan 20, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@
package com.netflix.conductor.es6.config;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.convert.DurationUnit;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.convert.DurationUnit;
Expand Down Expand Up @@ -205,4 +209,29 @@ private URL toURL(String url) {
throw new IllegalArgumentException(url + "can not be converted to java.net.URL");
}
}

public List<URI> getURIs() {
deoramanas marked this conversation as resolved.
Show resolved Hide resolved

String clusterAddress = getURL();

String[] hosts = clusterAddress.split(",");

return Arrays.stream(hosts).map(host ->
(host.startsWith("http://") || host.startsWith("https://") || host.startsWith("tcp://")) ? URI.create(host) : URI.create("tcp://" + host)
).collect(Collectors.toList());
}

private String getProperty(String key, String defaultValue) {
String val;
val = System.getenv(key.replace('.', '_'));
if (val == null || val.isEmpty()) {
val = Optional.ofNullable(System.getProperty(key))
.orElse(defaultValue);
}
return val;
}

private String getURL() {
deoramanas marked this conversation as resolved.
Show resolved Hide resolved
return getProperty("conductor.elasticsearch.url", url);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package com.netflix.conductor.es6.config;

import java.net.InetAddress;
import java.net.URI;
import java.net.URL;
import java.util.List;
import java.util.Optional;
Expand All @@ -31,7 +32,6 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;

import com.netflix.conductor.dao.IndexDAO;
import com.netflix.conductor.es6.dao.index.ElasticSearchDAOV6;
import com.netflix.conductor.es6.dao.index.ElasticSearchRestDAOV6;
Expand All @@ -42,10 +42,10 @@
@EnableConfigurationProperties(ElasticSearchProperties.class)
@Conditional(ElasticSearchConditions.ElasticSearchV6Enabled.class)
public class ElasticSearchV6Configuration {

private static final Logger log = LoggerFactory.getLogger(ElasticSearchV6Configuration.class);

@Bean
@Conditional(IsTcpProtocol.class)
public Client client(ElasticSearchProperties properties) {
Settings settings =
Settings.builder()
Expand All @@ -55,12 +55,12 @@ public Client client(ElasticSearchProperties properties) {

TransportClient transportClient = new PreBuiltTransportClient(settings);

List<URL> clusterAddresses = properties.toURLs();
List<URI> clusterAddresses = properties.getURIs();

if (clusterAddresses.isEmpty()) {
log.warn("workflow.elasticsearch.url is not set. Indexing will remain DISABLED.");
}
for (URL hostAddress : clusterAddresses) {
for (URI hostAddress : clusterAddresses) {
int port = Optional.ofNullable(hostAddress.getPort()).orElse(9200);
try {
transportClient.addTransportAddress(
Expand All @@ -73,6 +73,7 @@ public Client client(ElasticSearchProperties properties) {
}

@Bean
@Conditional(IsHttpProtocol.class)
public RestClient restClient(ElasticSearchProperties properties) {
RestClientBuilder restClientBuilder =
RestClient.builder(convertToHttpHosts(properties.toURLs()));
Expand All @@ -86,24 +87,26 @@ public RestClient restClient(ElasticSearchProperties properties) {
}

@Bean
@Conditional(IsHttpProtocol.class)
public RestClientBuilder restClientBuilder(ElasticSearchProperties properties) {
return RestClient.builder(convertToHttpHosts(properties.toURLs()));
}

@Bean
public IndexDAO es6IndexDAO(
RestClientBuilder restClientBuilder,
Client client,
ElasticSearchProperties properties,
ObjectMapper objectMapper) {
String url = properties.getUrl();
if (url.startsWith("http") || url.startsWith("https")) {
return new ElasticSearchRestDAOV6(restClientBuilder, properties, objectMapper);
} else {
return new ElasticSearchDAOV6(client, properties, objectMapper);
}
@Conditional(IsHttpProtocol.class)
public IndexDAO es6IndexDAO(RestClientBuilder restClientBuilder, ElasticSearchProperties properties,
ObjectMapper objectMapper) {
return new ElasticSearchRestDAOV6(restClientBuilder, properties, objectMapper);
}

@Bean
@Conditional(IsTcpProtocol.class)
public IndexDAO es6IndexDAO1(Client client, ElasticSearchProperties properties,
ObjectMapper objectMapper) {
return new ElasticSearchDAOV6(client, properties, objectMapper);

}

private HttpHost[] convertToHttpHosts(List<URL> hosts) {
return hosts.stream()
.map(host -> new HttpHost(host.getHost(), host.getPort(), host.getProtocol()))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.netflix.conductor.es6.config;

import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.type.AnnotatedTypeMetadata;

@EnableConfigurationProperties(ElasticSearchProperties.class)
@Configuration
public class IsHttpProtocol implements Condition {
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
String url = context.getEnvironment().getProperty("conductor.elasticsearch.url");
if (url.startsWith("http") || url.startsWith("https")) {
return true;
}
return false;
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.netflix.conductor.es6.config;

import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.type.AnnotatedTypeMetadata;

@EnableConfigurationProperties(ElasticSearchProperties.class)
@Configuration
public class IsTcpProtocol implements Condition {
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
String url = context.getEnvironment().getProperty("conductor.elasticsearch.url");
if (url.startsWith("http") || url.startsWith("https")) {
return false;
}
return true;
}
}