diff --git a/es6-persistence/src/main/java/com/netflix/conductor/es6/config/ElasticSearchV6Configuration.java b/es6-persistence/src/main/java/com/netflix/conductor/es6/config/ElasticSearchV6Configuration.java index 5ce45d07a5..5ed23ac950 100644 --- a/es6-persistence/src/main/java/com/netflix/conductor/es6/config/ElasticSearchV6Configuration.java +++ b/es6-persistence/src/main/java/com/netflix/conductor/es6/config/ElasticSearchV6Configuration.java @@ -13,9 +13,12 @@ package com.netflix.conductor.es6.config; import java.net.InetAddress; +import java.net.URI; import java.net.URL; +import java.util.Arrays; import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import org.apache.http.HttpHost; import org.elasticsearch.client.Client; @@ -42,10 +45,10 @@ @EnableConfigurationProperties(ElasticSearchProperties.class) @Conditional(ElasticSearchConditions.ElasticSearchV6Enabled.class) public class ElasticSearchV6Configuration { - private static final Logger log = LoggerFactory.getLogger(ElasticSearchV6Configuration.class); @Bean + @Conditional(IsTcpProtocol.class) public Client client(ElasticSearchProperties properties) { Settings settings = Settings.builder() @@ -55,12 +58,12 @@ public Client client(ElasticSearchProperties properties) { TransportClient transportClient = new PreBuiltTransportClient(settings); - List clusterAddresses = properties.toURLs(); + List clusterAddresses = getURIs(properties); if (clusterAddresses.isEmpty()) { log.warn("workflow.elasticsearch.url is not set. Indexing will remain DISABLED."); } - for (URL hostAddress : clusterAddresses) { + for (URI hostAddress : clusterAddresses) { int port = Optional.ofNullable(hostAddress.getPort()).orElse(9200); try { transportClient.addTransportAddress( @@ -73,6 +76,7 @@ public Client client(ElasticSearchProperties properties) { } @Bean + @Conditional(IsHttpProtocol.class) public RestClient restClient(ElasticSearchProperties properties) { RestClientBuilder restClientBuilder = RestClient.builder(convertToHttpHosts(properties.toURLs())); @@ -86,22 +90,25 @@ public RestClient restClient(ElasticSearchProperties properties) { } @Bean + @Conditional(IsHttpProtocol.class) public RestClientBuilder restClientBuilder(ElasticSearchProperties properties) { return RestClient.builder(convertToHttpHosts(properties.toURLs())); } @Bean - public IndexDAO es6IndexDAO( + @Conditional(IsHttpProtocol.class) + public IndexDAO es6IndexRestDAO( RestClientBuilder restClientBuilder, - Client client, ElasticSearchProperties properties, ObjectMapper objectMapper) { - String url = properties.getUrl(); - if (url.startsWith("http") || url.startsWith("https")) { - return new ElasticSearchRestDAOV6(restClientBuilder, properties, objectMapper); - } else { - return new ElasticSearchDAOV6(client, properties, objectMapper); - } + return new ElasticSearchRestDAOV6(restClientBuilder, properties, objectMapper); + } + + @Bean + @Conditional(IsTcpProtocol.class) + public IndexDAO es6IndexDAO( + Client client, ElasticSearchProperties properties, ObjectMapper objectMapper) { + return new ElasticSearchDAOV6(client, properties, objectMapper); } private HttpHost[] convertToHttpHosts(List hosts) { @@ -109,4 +116,19 @@ private HttpHost[] convertToHttpHosts(List hosts) { .map(host -> new HttpHost(host.getHost(), host.getPort(), host.getProtocol())) .toArray(HttpHost[]::new); } + + public List getURIs(ElasticSearchProperties properties) { + String clusterAddress = properties.getUrl(); + String[] hosts = clusterAddress.split(","); + + return Arrays.stream(hosts) + .map( + host -> + (host.startsWith("http://") + || host.startsWith("https://") + || host.startsWith("tcp://")) + ? URI.create(host) + : URI.create("tcp://" + host)) + .collect(Collectors.toList()); + } } diff --git a/es6-persistence/src/main/java/com/netflix/conductor/es6/config/IsHttpProtocol.java b/es6-persistence/src/main/java/com/netflix/conductor/es6/config/IsHttpProtocol.java new file mode 100644 index 0000000000..2437e1a227 --- /dev/null +++ b/es6-persistence/src/main/java/com/netflix/conductor/es6/config/IsHttpProtocol.java @@ -0,0 +1,32 @@ +/* + * Copyright 2022 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.es6.config; + +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.type.AnnotatedTypeMetadata; + +@EnableConfigurationProperties(ElasticSearchProperties.class) +@Configuration +public class IsHttpProtocol implements Condition { + @Override + public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { + String url = context.getEnvironment().getProperty("conductor.elasticsearch.url"); + if (url.startsWith("http") || url.startsWith("https")) { + return true; + } + return false; + } +} diff --git a/es6-persistence/src/main/java/com/netflix/conductor/es6/config/IsTcpProtocol.java b/es6-persistence/src/main/java/com/netflix/conductor/es6/config/IsTcpProtocol.java new file mode 100644 index 0000000000..accf3c4682 --- /dev/null +++ b/es6-persistence/src/main/java/com/netflix/conductor/es6/config/IsTcpProtocol.java @@ -0,0 +1,32 @@ +/* + * Copyright 2022 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.es6.config; + +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Condition; +import org.springframework.context.annotation.ConditionContext; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.type.AnnotatedTypeMetadata; + +@EnableConfigurationProperties(ElasticSearchProperties.class) +@Configuration +public class IsTcpProtocol implements Condition { + @Override + public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) { + String url = context.getEnvironment().getProperty("conductor.elasticsearch.url"); + if (url.startsWith("http") || url.startsWith("https")) { + return false; + } + return true; + } +}