Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,29 @@
*/
package org.apache.pulsar.io.elasticsearch;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.security.CreateApiKeyRequest;
import co.elastic.clients.elasticsearch.security.CreateApiKeyResponse;
import co.elastic.clients.elasticsearch.security.GetTokenRequest;
import co.elastic.clients.elasticsearch.security.GetTokenResponse;
import co.elastic.clients.elasticsearch.security.get_token.AccessTokenGrantType;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.io.elasticsearch.client.elastic.ElasticSearchJavaRestClient;
import org.apache.pulsar.io.elasticsearch.client.opensearch.OpenSearchHighLevelRestClient;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.DockerImageName;

@Slf4j
public abstract class ElasticSearchTestBase {

public static final String ELASTICSEARCH_8 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1");
.orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.3");

public static final String ELASTICSEARCH_7 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V7"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:7.17.7");
Expand All @@ -54,17 +55,28 @@ public ElasticSearchTestBase(String elasticImageName) {
}

protected ElasticsearchContainer createElasticsearchContainer() {
ElasticsearchContainer elasticsearchContainer;
if (elasticImageName.equals(OPENSEARCH)) {
DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH).asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
return new ElasticsearchContainer(dockerImageName)
elasticsearchContainer = new ElasticsearchContainer(dockerImageName)
.withEnv("OPENSEARCH_JAVA_OPTS", "-Xms128m -Xmx256m")
.withEnv("bootstrap.memory_lock", "true")
.withEnv("plugins.security.disabled", "true");
} else {
elasticsearchContainer = new ElasticsearchContainer(elasticImageName)
.withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
.withEnv("xpack.security.enabled", "false")
.withEnv("xpack.security.http.ssl.enabled", "false");
}
configureElasticContainer(elasticsearchContainer);
return elasticsearchContainer;
}

protected void configureElasticContainer(ElasticsearchContainer elasticContainer) {
if (getCompatibilityMode() != ElasticSearchConfig.CompatibilityMode.OPENSEARCH) {
elasticContainer.withEnv("ingest.geoip.downloader.enabled", "false");
}
return new ElasticsearchContainer(elasticImageName)
.withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
.withEnv("xpack.security.enabled", "false")
.withEnv("xpack.security.http.ssl.enabled", "false");
elasticContainer.withLogConsumer(o -> log.info("elastic> {}", o.getUtf8String()));
}

protected ElasticSearchConfig.CompatibilityMode getCompatibilityMode() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.tests.integration.io.sinks;

import java.util.Optional;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

public class ElasticSearch7SinkTester extends ElasticSearchSinkTester {
Expand All @@ -32,8 +31,9 @@ public ElasticSearch7SinkTester(boolean schemaEnable) {
super(schemaEnable);
}


@Override
protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
protected ElasticsearchContainer createElasticContainer() {
return new ElasticsearchContainer(ELASTICSEARCH_7)
.withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,20 @@
package org.apache.pulsar.tests.integration.io.sinks;

import java.util.Optional;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

public class ElasticSearch8SinkTester extends ElasticSearchSinkTester {

public static final String ELASTICSEARCH_8 = Optional.ofNullable(System.getenv("ELASTICSEARCH_IMAGE_V8"))
.orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.1");
.orElse("docker.elastic.co/elasticsearch/elasticsearch:8.5.3");


public ElasticSearch8SinkTester(boolean schemaEnable) {
super(schemaEnable);
}

@Override
protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
protected ElasticsearchContainer createElasticContainer() {
return new ElasticsearchContainer(ELASTICSEARCH_8)
.withEnv("ES_JAVA_OPTS", "-Xms128m -Xmx256m")
.withEnv("xpack.security.enabled", "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,20 @@
package org.apache.pulsar.tests.integration.io.sinks;

import static org.testng.Assert.assertTrue;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.AllArgsConstructor;
import lombok.Cleanup;
import lombok.Data;
Expand All @@ -46,6 +44,7 @@
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
Expand Down Expand Up @@ -100,6 +99,29 @@ public ElasticSearchSinkTester(boolean schemaEnable) {
}
}

@Override
protected final ElasticsearchContainer createSinkService(PulsarCluster cluster) {
ElasticsearchContainer elasticContainer = createElasticContainer();
configureElasticContainer(elasticContainer);
return elasticContainer;
}

protected void configureElasticContainer(ElasticsearchContainer elasticContainer) {
if (!isOpenSearch()) {
elasticContainer.withEnv("ingest.geoip.downloader.enabled", "false");
}

// allow disk to fill up beyond default 90% threshold
elasticContainer.withEnv("cluster.routing.allocation.disk.threshold_enabled", "false");

elasticContainer.withLogConsumer(o -> log.info("elastic> {}", o.getUtf8String()));
}

protected boolean isOpenSearch() {
return false;
}

protected abstract ElasticsearchContainer createElasticContainer();

@Override
public void prepareSink() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
*/
package org.apache.pulsar.tests.integration.io.sinks;

import static org.testng.Assert.assertTrue;
import java.util.Map;
import java.util.Optional;
import org.apache.http.HttpHost;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
Expand All @@ -31,10 +32,6 @@
import org.testcontainers.elasticsearch.ElasticsearchContainer;
import org.testcontainers.utility.DockerImageName;

import java.util.Map;

import static org.testng.Assert.assertTrue;

public class OpenSearchSinkTester extends ElasticSearchSinkTester {

public static final String OPENSEARCH = Optional.ofNullable(System.getenv("OPENSEARCH_IMAGE"))
Expand All @@ -48,7 +45,7 @@ public OpenSearchSinkTester(boolean schemaEnable) {
}

@Override
protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
protected ElasticsearchContainer createElasticContainer() {
DockerImageName dockerImageName = DockerImageName.parse(OPENSEARCH)
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
return new ElasticsearchContainer(dockerImageName)
Expand All @@ -57,6 +54,10 @@ protected ElasticsearchContainer createSinkService(PulsarCluster cluster) {
.withEnv("plugins.security.disabled", "true");
}

protected boolean isOpenSearch() {
return true;
}

@Override
public void prepareSink() throws Exception {
RestClientBuilder builder = RestClient.builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean s
.withEnv("journalSyncData", "false")
.withEnv("journalMaxGroupWaitMSec", "0")
.withEnv("clusterName", clusterName)
.withEnv("PULSAR_PREFIX_diskUsageWarnThreshold", "0.95")
.withEnv("diskUsageThreshold", "0.99")
.withEnv("PULSAR_PREFIX_diskUsageLwmThreshold", "0.97")
.withEnv("nettyMaxFrameSizeBytes", String.valueOf(spec.maxMessageSize));
if (spec.bookkeeperEnvs != null) {
bookieContainer.withEnv(spec.bookkeeperEnvs);
Expand Down