Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix Allocation and Rebalance Constraints of WeightFunction are incorrectly reset ([#19012](https://github.com/opensearch-project/OpenSearch/pull/19012))
- Fix flaky test FieldDataLoadingIT.testIndicesFieldDataCacheSizeSetting ([#19571](https://github.com/opensearch-project/OpenSearch/pull/19571))
- Avoid primary shard failure caused by merged segment warmer exceptions ([#19436](https://github.com/opensearch-project/OpenSearch/pull/19436))
- Fix pull-based ingestion out-of-bounds offset scenarios and remove persisted offsets ([#19607](https://github.com/opensearch-project/OpenSearch/pull/19607))

### Dependencies
- Update to Gradle 9.1 ([#19575](https://github.com/opensearch-project/OpenSearch/pull/19575))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@

package org.opensearch.plugin.ingestion.fs;

import org.opensearch.cluster.metadata.IngestionSource;
import org.opensearch.index.IngestionConsumerFactory;

import java.util.Map;

/**
* Factory for creating file-based ingestion consumers.
*/
Expand All @@ -25,8 +24,8 @@ public class FileConsumerFactory implements IngestionConsumerFactory<FilePartiti
public FileConsumerFactory() {}

@Override
public void initialize(Map<String, Object> params) {
this.config = new FileSourceConfig(params);
public void initialize(IngestionSource ingestionSource) {
this.config = new FileSourceConfig(ingestionSource.params());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -493,14 +493,14 @@ public void testSnapshotRestoreOnAllActiveIngestion() throws Exception {
refresh(indexName);
waitForSearchableDocs(40, List.of(nodeA, nodeB));

// Verify both primary and replica have polled only remaining 20 messages
// Verify both primary and replica have indexed remaining messages
Map<String, PollingIngestStats> shardTypeToStats = getPollingIngestStatsForPrimaryAndReplica(indexName);
assertNotNull(shardTypeToStats.get("primary"));
assertNotNull(shardTypeToStats.get("replica"));
assertThat(shardTypeToStats.get("primary").getConsumerStats().totalPolledCount(), is(20L));
assertThat(shardTypeToStats.get("primary").getConsumerStats().totalPolledCount(), is(21L));
assertThat(shardTypeToStats.get("primary").getConsumerStats().totalPollerMessageDroppedCount(), is(0L));
assertThat(shardTypeToStats.get("primary").getConsumerStats().totalPollerMessageFailureCount(), is(0L));
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPolledCount(), is(20L));
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPolledCount(), is(21L));
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPollerMessageDroppedCount(), is(0L));
assertThat(shardTypeToStats.get("replica").getConsumerStats().totalPollerMessageFailureCount(), is(0L));
}
Expand Down Expand Up @@ -557,13 +557,13 @@ public void testResetPollerInAllActiveIngestion() throws Exception {
);
});

// validate there are 8 duplicate messages encountered after reset
// validate there are 8 messages polled after reset
waitForState(() -> {
Map<String, PollingIngestStats> shardTypeToStats = getPollingIngestStatsForPrimaryAndReplica(indexName);
assertNotNull(shardTypeToStats.get("primary"));
assertNotNull(shardTypeToStats.get("replica"));
return shardTypeToStats.get("primary").getConsumerStats().totalDuplicateMessageSkippedCount() == 8
&& shardTypeToStats.get("replica").getConsumerStats().totalDuplicateMessageSkippedCount() == 8;
return shardTypeToStats.get("primary").getConsumerStats().totalPolledCount() == 8
&& shardTypeToStats.get("replica").getConsumerStats().totalPolledCount() == 8;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,8 @@ public void testOffsetUpdateOnBlockErrorPolicy() throws Exception {
waitForSearchableDocs(4, Arrays.asList(nodeA, nodeB));
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
assertThat(stats.getConsumerStats().totalDuplicateMessageSkippedCount(), is(0L));
assertThat(stats.getConsumerStats().totalPolledCount(), is(3L));
assertThat(stats.getConsumerStats().totalPollerMessageFailureCount(), is(0L));
}

public void testConsumerResetByTimestamp() throws Exception {
Expand Down Expand Up @@ -625,10 +626,12 @@ public void testConsumerResetByTimestamp() throws Exception {
resumeResponse = resumeIngestion(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.TIMESTAMP, "102");
assertTrue(resumeResponse.isAcknowledged());
assertTrue(resumeResponse.isShardsAcknowledged());

waitForSearchableDocs(4, Arrays.asList(nodeA, nodeB));
waitForState(() -> {
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
return stats.getConsumerStats().totalDuplicateMessageSkippedCount() == 3;
return stats.getConsumerStats().totalPolledCount() == 3;
});
}

Expand Down Expand Up @@ -694,10 +697,10 @@ public void testRemoteSnapshotRestore() throws Exception {
waitForSearchableDocs(4, Arrays.asList(nodeA, nodeB));
assertHitCount(client().prepareSearch(indexName).get(), 4);

// after index is restored, it should only poll remaining 2 messages
// after index is restored, it should resume ingestion from batchStartPointer available in the latest commit
PollingIngestStats stats2 = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
assertEquals(2, stats2.getConsumerStats().totalPolledCount());
assertEquals(3, stats2.getConsumerStats().totalPolledCount());
assertEquals(0, stats2.getMessageProcessorStats().totalVersionConflictsCount());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@

package org.opensearch.plugin.kafka;

import org.opensearch.cluster.metadata.IngestionSource;
import org.opensearch.index.IngestionConsumerFactory;

import java.util.Map;

/**
* Factory for creating Kafka consumers
*/
Expand All @@ -28,8 +27,8 @@ public class KafkaConsumerFactory implements IngestionConsumerFactory<KafkaParti
public KafkaConsumerFactory() {}

@Override
public void initialize(Map<String, Object> params) {
config = new KafkaSourceConfig(params);
public void initialize(IngestionSource ingestionSource) {
config = new KafkaSourceConfig((int) ingestionSource.getMaxPollSize(), ingestionSource.params());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.plugin.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
Expand Down Expand Up @@ -89,25 +90,32 @@ protected KafkaPartitionConsumer(String clientId, KafkaSourceConfig config, int
}

/**
* Create a Kafka consumer. visible for testing
* Create consumer properties with default configurations and apply user provided overrides on top.
* @param clientId the client id
* @param config the Kafka source config
* @return the Kafka consumer
* @return the consumer properties
*/
protected static Consumer<byte[], byte[]> createConsumer(String clientId, KafkaSourceConfig config) {
protected static Properties createConsumerProperties(String clientId, KafkaSourceConfig config) {
Properties consumerProp = new Properties();
consumerProp.put("bootstrap.servers", config.getBootstrapServers());
consumerProp.put("client.id", clientId);
consumerProp.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);

logger.info("Kafka consumer properties for topic {}: {}", config.getTopic(), config.getConsumerConfigurations());
// apply user provided overrides
consumerProp.putAll(config.getConsumerConfigurations());

// TODO: why Class org.apache.kafka.common.serialization.StringDeserializer could not be found if set the deserializer as prop?
// consumerProp.put("key.deserializer",
// "org.apache.kafka.common.serialization.StringDeserializer");
// consumerProp.put("value.deserializer",
// "org.apache.kafka.common.serialization.StringDeserializer");
//
logger.info("Kafka consumer properties for topic {}: {}", config.getTopic(), consumerProp);
return consumerProp;
}

/**
* Create a Kafka consumer. visible for testing
* @param clientId the client id
* @param config the Kafka source config
* @return the Kafka consumer
*/
protected static Consumer<byte[], byte[]> createConsumer(String clientId, KafkaSourceConfig config) {
Properties consumerProp = createConsumerProperties(clientId, config);

// wrap the kafka consumer creation in a privileged block to apply plugin security policies
final ClassLoader restore = Thread.currentThread().getContextClassLoader();
try {
Expand All @@ -124,6 +132,15 @@ protected static Consumer<byte[], byte[]> createConsumer(String clientId, KafkaS
}
}

/**
* Read the next batch of messages from Kafka, starting from the provided offset.
* @param offset the pointer to start reading from,
* @param includeStart whether to include the start pointer in the read
* @param maxMessages this setting is not honored for Kafka at this stage. maxMessages is instead set at consumer initialization.
* @param timeoutMillis the maximum time to wait for messages
* @return
* @throws TimeoutException
*/
@Override
public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(
KafkaOffset offset,
Expand All @@ -132,25 +149,22 @@ public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(
int timeoutMillis
) throws TimeoutException {
List<ReadResult<KafkaOffset, KafkaMessage>> records = AccessController.doPrivileged(
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(
offset.getOffset(),
includeStart,
maxMessages,
timeoutMillis
)
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(offset.getOffset(), includeStart, timeoutMillis)
);
return records;
}

/**
* Read the next batch of messages from Kafka.
* @param maxMessages this setting is not honored for Kafka at this stage. maxMessages is instead set at consumer initialization.
* @param timeoutMillis the maximum time to wait for messages
* @return
* @throws TimeoutException
*/
@Override
public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(long maxMessages, int timeoutMillis) throws TimeoutException {
List<ReadResult<KafkaOffset, KafkaMessage>> records = AccessController.doPrivileged(
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(
lastFetchedOffset,
false,
maxMessages,
timeoutMillis
)
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(lastFetchedOffset, false, timeoutMillis)
);
return records;
}
Expand Down Expand Up @@ -209,12 +223,7 @@ public IngestionShardPointer pointerFromOffset(String offset) {
return new KafkaOffset(offsetValue);
}

private synchronized List<ReadResult<KafkaOffset, KafkaMessage>> fetch(
long startOffset,
boolean includeStart,
long maxMessages,
int timeoutMillis
) {
private synchronized List<ReadResult<KafkaOffset, KafkaMessage>> fetch(long startOffset, boolean includeStart, int timeoutMillis) {
long kafkaStartOffset = startOffset;
if (!includeStart) {
kafkaStartOffset += 1;
Expand All @@ -229,16 +238,10 @@ private synchronized List<ReadResult<KafkaOffset, KafkaMessage>> fetch(

ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(timeoutMillis));
List<ConsumerRecord<byte[], byte[]>> messageAndOffsets = consumerRecords.records(topicPartition);

long endOffset = kafkaStartOffset + maxMessages;
List<ReadResult<KafkaOffset, KafkaMessage>> results = new ArrayList<>();

for (ConsumerRecord<byte[], byte[]> messageAndOffset : messageAndOffsets) {
long currentOffset = messageAndOffset.offset();
if (currentOffset >= endOffset) {
// fetched more message than max
break;
}
lastFetchedOffset = currentOffset;
KafkaOffset kafkaOffset = new KafkaOffset(currentOffset);
KafkaMessage message = new KafkaMessage(messageAndOffset.key(), messageAndOffset.value(), messageAndOffset.timestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.plugin.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.opensearch.core.util.ConfigurationUtils;

import java.util.HashMap;
Expand All @@ -19,27 +20,39 @@
public class KafkaSourceConfig {
private final String PROP_TOPIC = "topic";
private final String PROP_BOOTSTRAP_SERVERS = "bootstrap_servers";
private final String PROP_AUTO_OFFSET_RESET = "auto.offset.reset";

private final String topic;
private final String bootstrapServers;
private final String autoOffsetResetConfig;
private final int maxPollRecords;

private final Map<String, Object> consumerConfigsMap;

/**
* Extracts and look for required and optional kafka consumer configurations.
* @param maxPollSize the maximum batch size to read in a single poll
* @param params the configuration parameters
*/
public KafkaSourceConfig(Map<String, Object> params) {
public KafkaSourceConfig(int maxPollSize, Map<String, Object> params) {
this.consumerConfigsMap = new HashMap<>(params);
this.topic = ConfigurationUtils.readStringProperty(params, PROP_TOPIC);
this.bootstrapServers = ConfigurationUtils.readStringProperty(params, PROP_BOOTSTRAP_SERVERS);
this.autoOffsetResetConfig = ConfigurationUtils.readOptionalStringProperty(params, PROP_AUTO_OFFSET_RESET);
this.consumerConfigsMap = new HashMap<>(params);

// remove above configurations
// 'auto.offset.reset' is handled differently for Kafka sources, with the default set to none.
// This ensures out-of-bounds offsets throw an error, unless the user explicitly sets different value.
this.autoOffsetResetConfig = ConfigurationUtils.readStringProperty(params, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");

// OpenSearch supports 'maxPollSize' setting for consumers. If user did not provide a 'max.poll.records' setting,
// maxPollSize will be used instead.
this.maxPollRecords = ConfigurationUtils.readIntProperty(params, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollSize);

// remove metadata configurations
consumerConfigsMap.remove(PROP_TOPIC);
consumerConfigsMap.remove(PROP_BOOTSTRAP_SERVERS);

// add or overwrite required configurations with defaults if not present
consumerConfigsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetResetConfig);
consumerConfigsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.plugin.kafka;

import org.opensearch.cluster.metadata.IngestionSource;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Assert;

Expand All @@ -21,7 +22,7 @@ public void testInitialize() {
params.put("topic", "test-topic");
params.put("bootstrap_servers", "localhost:9092");

factory.initialize(params);
factory.initialize(new IngestionSource.Builder("KAFKA").setParams(params).build());

KafkaSourceConfig config = factory.config;
Assert.assertNotNull("Config should be initialized", config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void setUp() throws Exception {
params.put("topic", "test-topic");
params.put("bootstrap_servers", "localhost:9092");

config = new KafkaSourceConfig(params);
config = new KafkaSourceConfig(1000, params);
mockConsumer = mock(KafkaConsumer.class);
// Mock the partitionsFor method
PartitionInfo partitionInfo = new PartitionInfo("test-topic", 0, null, null, null);
Expand Down Expand Up @@ -109,7 +109,7 @@ public void testTopicDoesNotExist() {
Map<String, Object> params = new HashMap<>();
params.put("topic", "non-existent-topic");
params.put("bootstrap_servers", "localhost:9092");
var kafkaSourceConfig = new KafkaSourceConfig(params);
var kafkaSourceConfig = new KafkaSourceConfig(1000, params);
when(mockConsumer.partitionsFor(eq("non-existent-topic"), any(Duration.class))).thenReturn(null);
try {
new KafkaPartitionConsumer("client1", kafkaSourceConfig, 0, mockConsumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,16 @@ public void testPauseAndResumeAPIs() throws Exception {
);
});

// validate duplicate messages are skipped
RangeQueryBuilder query = new RangeQueryBuilder("age").gte(29);
waitForState(() -> {
SearchResponse response = client().prepareSearch(indexName).setQuery(query).get();
PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0]
.getPollingIngestStats();
return stats.getConsumerStats().totalDuplicateMessageSkippedCount() == 2;

return response.getHits().getTotalHits().value() == 2
&& stats != null
&& stats.getConsumerStats().totalPolledCount() == 4
&& stats.getConsumerStats().totalPollerMessageFailureCount() == 0;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public void testKafkaSourceConfig() {
params.put("fetch.min.bytes", 30000);
params.put("enable.auto.commit", false);

KafkaSourceConfig config = new KafkaSourceConfig(params);
KafkaSourceConfig config = new KafkaSourceConfig(100, params);

Assert.assertEquals("The topic should be correctly initialized and returned", "topic", config.getTopic());
Assert.assertEquals(
Expand All @@ -33,5 +33,11 @@ public void testKafkaSourceConfig() {
);
Assert.assertEquals("Incorrect fetch.min.bytes", 30000, config.getConsumerConfigurations().get("fetch.min.bytes"));
Assert.assertEquals("Incorrect enable.auto.commit", false, config.getConsumerConfigurations().get("enable.auto.commit"));
Assert.assertEquals(
"auto.offset.reset must be 'none' by default",
"none",
config.getConsumerConfigurations().get("auto.offset.reset")
);
Assert.assertEquals("Incorrect max.poll.records", 100, config.getConsumerConfigurations().get("max.poll.records"));
}
}
Loading
Loading