From 2c0fcb001993d8efd729ea46c6ca96d1d954eef7 Mon Sep 17 00:00:00 2001 From: Varun Bharadwaj Date: Thu, 13 Nov 2025 11:35:48 -0800 Subject: [PATCH 1/2] refactor pull-based ingestion to support message mappers Signed-off-by: Varun Bharadwaj --- CHANGELOG.md | 1 + .../plugin/kafka/IngestFromKafkaIT.java | 117 ++++++++++ .../cluster/metadata/IndexMetadata.java | 15 ++ .../cluster/metadata/IngestionSource.java | 30 ++- .../common/settings/IndexScopedSettings.java | 1 + .../index/engine/IngestionEngine.java | 1 + .../pollingingest/DefaultStreamPoller.java | 30 ++- .../MessageProcessorRunnable.java | 7 +- .../PartitionedBlockingQueueContainer.java | 32 +-- .../pollingingest/ShardUpdateMessage.java | 4 +- .../DefaultIngestionMessageMapper.java | 49 ++++ .../mappers/IngestionMessageMapper.java | 85 +++++++ .../RawPayloadIngestionMessageMapper.java | 56 +++++ .../pollingingest/mappers/package-info.java | 10 + .../metadata/IngestionSourceTests.java | 2 +- .../DefaultStreamPollerTests.java | 28 ++- ...artitionedBlockingQueueContainerTests.java | 7 +- .../mappers/IngestionMessageMapperTests.java | 215 ++++++++++++++++++ 18 files changed, 639 insertions(+), 51 deletions(-) create mode 100644 server/src/main/java/org/opensearch/indices/pollingingest/mappers/DefaultIngestionMessageMapper.java create mode 100644 server/src/main/java/org/opensearch/indices/pollingingest/mappers/IngestionMessageMapper.java create mode 100644 server/src/main/java/org/opensearch/indices/pollingingest/mappers/RawPayloadIngestionMessageMapper.java create mode 100644 server/src/main/java/org/opensearch/indices/pollingingest/mappers/package-info.java create mode 100644 server/src/test/java/org/opensearch/indices/pollingingest/mappers/IngestionMessageMapperTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 825d643c87368..c038621102691 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add async periodic flush task support for pull-based ingestion ([#19878](https://github.com/opensearch-project/OpenSearch/pull/19878)) - Add support for context aware segments ([#19098](https://github.com/opensearch-project/OpenSearch/pull/19098)) - Implement GRPC FunctionScoreQuery ([#19888](https://github.com/opensearch-project/OpenSearch/pull/19888)) +- Support pull-based ingestion message mappers and raw payload support ([#19765](https://github.com/opensearch-project/OpenSearch/pull/19765)] ### Changed - Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350)) diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java index e8c9193c0d2fd..dfa00b58aa949 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java @@ -778,6 +778,123 @@ public void testAllActiveIngestionPeriodicFlush() throws Exception { waitForSearchableDocs(10, Arrays.asList(nodeA)); waitForState(() -> getPeriodicFlushCount(nodeA, indexName) >= 1); + } + + public void testRawPayloadMapperIngestion() throws Exception { + // Start cluster + internalCluster().startClusterManagerOnlyNode(); + final String nodeA = internalCluster().startDataOnlyNode(); + + // Publish 2 valid messages + String validMessage1 = "{\"name\":\"alice\",\"age\":30}"; + String validMessage2 = "{\"name\":\"bob\",\"age\":25}"; + produceData(validMessage1); + produceData(validMessage2); + + // Create index with raw_payload mapper + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("ingestion_source.type", "kafka") + .put("ingestion_source.param.topic", topicName) + .put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers()) + .put("ingestion_source.pointer.init.reset", "earliest") + .put("ingestion_source.mapper_type", "raw_payload") + .put("ingestion_source.error_strategy", "drop") + .put("ingestion_source.all_active", true) + .build(), + "{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}" + ); + + ensureGreen(indexName); + + // Wait for both messages to be indexed + waitForSearchableDocs(2, List.of(nodeA)); + + // Verify stats show 2 processed messages + waitForState(() -> { + PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0] + .getPollingIngestStats(); + return stats != null + && stats.getMessageProcessorStats().totalProcessedCount() == 2L + && stats.getConsumerStats().totalPolledCount() == 2L + && stats.getConsumerStats().totalPollerMessageFailureCount() == 0L + && stats.getConsumerStats().totalPollerMessageDroppedCount() == 0L + && stats.getMessageProcessorStats().totalInvalidMessageCount() == 0L; + }); + + // Validate document content + SearchResponse searchResponse = client().prepareSearch(indexName).get(); + assertEquals(2, searchResponse.getHits().getHits().length); + for (int i = 0; i < searchResponse.getHits().getHits().length; i++) { + Map source = searchResponse.getHits().getHits()[i].getSourceAsMap(); + assertTrue(source.containsKey("name")); + assertTrue(source.containsKey("age")); + } + + // Publish invalid JSON message + String invalidJsonMessage = "{ invalid json"; + produceData(invalidJsonMessage); + + // Wait for consumer to encounter the error and drop it + waitForState(() -> { + PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0] + .getPollingIngestStats(); + return stats != null + && stats.getConsumerStats().totalPolledCount() == 3L + && stats.getConsumerStats().totalPollerMessageFailureCount() == 1L + && stats.getConsumerStats().totalPollerMessageDroppedCount() == 1L + && stats.getMessageProcessorStats().totalProcessedCount() == 2L; + }); + + // Publish message with invalid content that will fail at processor level + String invalidFieldTypeMessage = "{\"name\":123,\"age\":\"not a number\"}"; + produceData(invalidFieldTypeMessage); + + // Wait for processor to encounter the error + waitForState(() -> { + PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0] + .getPollingIngestStats(); + return stats != null + && stats.getConsumerStats().totalPolledCount() == 4L + && stats.getConsumerStats().totalPollerMessageFailureCount() == 1L + && stats.getMessageProcessorStats().totalProcessedCount() == 3L + && stats.getMessageProcessorStats().totalFailedCount() == 1L + && stats.getMessageProcessorStats().totalFailuresDroppedCount() == 1L; + }); + + // Pause ingestion, reset to offset 0, and resume + pauseIngestion(indexName); + waitForState(() -> { + GetIngestionStateResponse ingestionState = getIngestionState(indexName); + return ingestionState.getShardStates().length == 1 + && ingestionState.getFailedShards() == 0 + && ingestionState.getShardStates()[0].isPollerPaused() + && ingestionState.getShardStates()[0].getPollerState().equalsIgnoreCase("paused"); + }); + + // Resume with reset to offset 0 (will re-process the 2 valid messages) + resumeIngestion(indexName, 0, ResumeIngestionRequest.ResetSettings.ResetMode.OFFSET, "0"); + waitForState(() -> { + GetIngestionStateResponse ingestionState = getIngestionState(indexName); + return ingestionState.getShardStates().length == 1 + && ingestionState.getShardStates()[0].isPollerPaused() == false + && (ingestionState.getShardStates()[0].getPollerState().equalsIgnoreCase("polling") + || ingestionState.getShardStates()[0].getPollerState().equalsIgnoreCase("processing")); + }); + // Wait for the 3 messages to be processed by the processor after reset (1 will be dropped by the poller) + waitForState(() -> { + PollingIngestStats stats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName).getShards()[0] + .getPollingIngestStats(); + return stats != null && stats.getMessageProcessorStats().totalProcessedCount() == 3L; + }); + + // Verify still only 2 documents (no duplicates must be indexed) + RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0); + SearchResponse response = client().prepareSearch(indexName).setQuery(query).get(); + assertThat(response.getHits().getTotalHits().value(), is(2L)); } } diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index 195cb49d997c7..be7dae504fe9e 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -72,6 +72,7 @@ import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.indices.pollingingest.IngestionErrorStrategy; import org.opensearch.indices.pollingingest.StreamPoller; +import org.opensearch.indices.pollingingest.mappers.IngestionMessageMapper; import org.opensearch.indices.replication.SegmentReplicationSource; import org.opensearch.indices.replication.common.ReplicationType; @@ -920,6 +921,18 @@ public Iterator> settings() { Property.Final ); + /** + * Defines how the incoming ingestion message payload is mapped to the internal message format. + */ + public static final String SETTING_INGESTION_SOURCE_MAPPER_TYPE = "index.ingestion_source.mapper_type"; + public static final Setting INGESTION_SOURCE_MAPPER_TYPE_SETTING = new Setting<>( + SETTING_INGESTION_SOURCE_MAPPER_TYPE, + IngestionMessageMapper.MapperType.DEFAULT.getName(), + IngestionMessageMapper.MapperType::fromString, + Property.IndexScope, + Property.Final + ); + /** * Defines if all-active pull-based ingestion is enabled. In this mode, replicas will directly consume from the * streaming source and process the updates. In the default document replication mode, this setting must be enabled. @@ -1225,6 +1238,7 @@ public IngestionSource getIngestionSource() { final int blockingQueueSize = INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING.get(settings); final boolean allActiveIngestionEnabled = INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING.get(settings); final TimeValue pointerBasedLagUpdateInterval = INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING.get(settings); + final IngestionMessageMapper.MapperType mapperType = INGESTION_SOURCE_MAPPER_TYPE_SETTING.get(settings); return new IngestionSource.Builder(ingestionSourceType).setParams(ingestionSourceParams) .setPointerInitReset(pointerInitReset) @@ -1235,6 +1249,7 @@ public IngestionSource getIngestionSource() { .setBlockingQueueSize(blockingQueueSize) .setAllActiveIngestion(allActiveIngestionEnabled) .setPointerBasedLagUpdateInterval(pointerBasedLagUpdateInterval) + .setMapperType(mapperType) .build(); } return null; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java b/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java index 9a8b4bfa1baf5..c2fcdb8e0fcd0 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java @@ -13,6 +13,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.indices.pollingingest.IngestionErrorStrategy; import org.opensearch.indices.pollingingest.StreamPoller; +import org.opensearch.indices.pollingingest.mappers.IngestionMessageMapper; import java.util.HashMap; import java.util.Map; @@ -20,6 +21,7 @@ import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING; +import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_MAPPER_TYPE_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_MAX_POLL_SIZE; import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_NUM_PROCESSOR_THREADS_SETTING; import static org.opensearch.cluster.metadata.IndexMetadata.INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING; @@ -40,6 +42,7 @@ public class IngestionSource { private int blockingQueueSize; private final boolean allActiveIngestion; private final TimeValue pointerBasedLagUpdateInterval; + private final IngestionMessageMapper.MapperType mapperType; private IngestionSource( String type, @@ -51,7 +54,8 @@ private IngestionSource( int numProcessorThreads, int blockingQueueSize, boolean allActiveIngestion, - TimeValue pointerBasedLagUpdateInterval + TimeValue pointerBasedLagUpdateInterval, + IngestionMessageMapper.MapperType mapperType ) { this.type = type; this.pointerInitReset = pointerInitReset; @@ -63,6 +67,7 @@ private IngestionSource( this.blockingQueueSize = blockingQueueSize; this.allActiveIngestion = allActiveIngestion; this.pointerBasedLagUpdateInterval = pointerBasedLagUpdateInterval; + this.mapperType = mapperType; } public String getType() { @@ -105,6 +110,10 @@ public TimeValue getPointerBasedLagUpdateInterval() { return pointerBasedLagUpdateInterval; } + public IngestionMessageMapper.MapperType getMapperType() { + return mapperType; + } + @Override public boolean equals(Object o) { if (this == o) return true; @@ -119,7 +128,8 @@ public boolean equals(Object o) { && Objects.equals(numProcessorThreads, ingestionSource.numProcessorThreads) && Objects.equals(blockingQueueSize, ingestionSource.blockingQueueSize) && Objects.equals(allActiveIngestion, ingestionSource.allActiveIngestion) - && Objects.equals(pointerBasedLagUpdateInterval, ingestionSource.pointerBasedLagUpdateInterval); + && Objects.equals(pointerBasedLagUpdateInterval, ingestionSource.pointerBasedLagUpdateInterval) + && Objects.equals(mapperType, ingestionSource.mapperType); } @Override @@ -134,7 +144,8 @@ public int hashCode() { numProcessorThreads, blockingQueueSize, allActiveIngestion, - pointerBasedLagUpdateInterval + pointerBasedLagUpdateInterval, + mapperType ); } @@ -164,6 +175,9 @@ public String toString() { + allActiveIngestion + ", pointerBasedLagUpdateInterval=" + pointerBasedLagUpdateInterval + + ", mapperType='" + + mapperType + + '\'' + '}'; } @@ -225,6 +239,7 @@ public static class Builder { private TimeValue pointerBasedLagUpdateInterval = INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING.getDefault( Settings.EMPTY ); + private IngestionMessageMapper.MapperType mapperType = INGESTION_SOURCE_MAPPER_TYPE_SETTING.getDefault(Settings.EMPTY); public Builder(String type) { this.type = type; @@ -239,6 +254,7 @@ public Builder(IngestionSource ingestionSource) { this.blockingQueueSize = ingestionSource.blockingQueueSize; this.allActiveIngestion = ingestionSource.allActiveIngestion; this.pointerBasedLagUpdateInterval = ingestionSource.pointerBasedLagUpdateInterval; + this.mapperType = ingestionSource.mapperType; } public Builder setPointerInitReset(PointerInitReset pointerInitReset) { @@ -291,6 +307,11 @@ public Builder setPointerBasedLagUpdateInterval(TimeValue pointerBasedLagUpdateI return this; } + public Builder setMapperType(IngestionMessageMapper.MapperType mapperType) { + this.mapperType = mapperType; + return this; + } + public IngestionSource build() { return new IngestionSource( type, @@ -302,7 +323,8 @@ public IngestionSource build() { numProcessorThreads, blockingQueueSize, allActiveIngestion, - pointerBasedLagUpdateInterval + pointerBasedLagUpdateInterval, + mapperType ); } diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 6a3d47986122e..d7d711de75fb7 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -285,6 +285,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexMetadata.INGESTION_SOURCE_INTERNAL_QUEUE_SIZE_SETTING, IndexMetadata.INGESTION_SOURCE_ALL_ACTIVE_INGESTION_SETTING, IndexMetadata.INGESTION_SOURCE_POINTER_BASED_LAG_UPDATE_INTERVAL_SETTING, + IndexMetadata.INGESTION_SOURCE_MAPPER_TYPE_SETTING, // Settings for search replica IndexMetadata.INDEX_NUMBER_OF_SEARCH_REPLICAS_SETTING, diff --git a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java index 2b643bde26caf..888e79c7043a1 100644 --- a/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/IngestionEngine.java @@ -146,6 +146,7 @@ private void initializeStreamPoller( .numProcessorThreads(ingestionSource.getNumProcessorThreads()) .blockingQueueSize(ingestionSource.getBlockingQueueSize()) .pointerBasedLagUpdateInterval(ingestionSource.getPointerBasedLagUpdateInterval().millis()) + .mapperType(ingestionSource.getMapperType()) .build(); registerStreamPollerListener(); diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java b/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java index 21559ae2cb236..583c0d463f6a3 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/DefaultStreamPoller.java @@ -21,6 +21,7 @@ import org.opensearch.index.IngestionShardPointer; import org.opensearch.index.Message; import org.opensearch.index.engine.IngestionEngine; +import org.opensearch.indices.pollingingest.mappers.IngestionMessageMapper; import java.util.Comparator; import java.util.List; @@ -70,6 +71,7 @@ public class DefaultStreamPoller implements StreamPoller { private long maxPollSize; private int pollTimeout; private long pointerBasedLagUpdateIntervalMs; + private final IngestionMessageMapper messageMapper; private final String indexName; @@ -95,7 +97,8 @@ private DefaultStreamPoller( int pollTimeout, int numProcessorThreads, int blockingQueueSize, - long pointerBasedLagUpdateIntervalMs + long pointerBasedLagUpdateIntervalMs, + IngestionMessageMapper.MapperType mapperType ) { this( startPointer, @@ -110,7 +113,8 @@ private DefaultStreamPoller( maxPollSize, pollTimeout, pointerBasedLagUpdateIntervalMs, - ingestionEngine.config().getIndexSettings() + ingestionEngine.config().getIndexSettings(), + IngestionMessageMapper.create(mapperType.getName(), shardId) ); } @@ -130,7 +134,8 @@ private DefaultStreamPoller( long maxPollSize, int pollTimeout, long pointerBasedLagUpdateIntervalMs, - IndexSettings indexSettings + IndexSettings indexSettings, + IngestionMessageMapper messageMapper ) { this.consumerFactory = Objects.requireNonNull(consumerFactory); this.consumerClientId = Objects.requireNonNull(consumerClientId); @@ -148,6 +153,7 @@ private DefaultStreamPoller( ); this.errorStrategy = errorStrategy; this.indexName = indexSettings.getIndex().getName(); + this.messageMapper = Objects.requireNonNull(messageMapper); // handle initial poller states this.paused = initialState == State.PAUSED; @@ -257,7 +263,11 @@ private IngestionShardPointer processRecords( for (IngestionShardConsumer.ReadResult result : results) { try { totalPolledCount.inc(); - blockingQueueContainer.add(result); + + // Use mapper to create ShardUpdateMessage + ShardUpdateMessage shardUpdateMessage = messageMapper.mapAndProcess(result.getPointer(), result.getMessage()); + + blockingQueueContainer.add(shardUpdateMessage); setLastPolledMessageTimestamp(result.getMessage().getTimestamp() == null ? 0 : result.getMessage().getTimestamp()); logger.debug( "Put message {} with pointer {} to the blocking queue", @@ -533,6 +543,7 @@ public static class Builder { private int numProcessorThreads = 1; private int blockingQueueSize = 100; private long pointerBasedLagUpdateIntervalMs = 10000; + private IngestionMessageMapper.MapperType mapperType = IngestionMessageMapper.MapperType.DEFAULT; /** * Initialize the builder with mandatory parameters @@ -624,6 +635,14 @@ public Builder pointerBasedLagUpdateInterval(long pointerBasedLagUpdateInterval) return this; } + /** + * Set mapper type + */ + public Builder mapperType(IngestionMessageMapper.MapperType mapperType) { + this.mapperType = mapperType; + return this; + } + /** * Build the DefaultStreamPoller instance */ @@ -642,7 +661,8 @@ public DefaultStreamPoller build() { pollTimeout, numProcessorThreads, blockingQueueSize, - pointerBasedLagUpdateIntervalMs + pointerBasedLagUpdateIntervalMs, + mapperType ); } } diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java b/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java index dfdc4a2fde852..c7e779989495e 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/MessageProcessorRunnable.java @@ -49,10 +49,11 @@ * engine operation. */ public class MessageProcessorRunnable implements Runnable, Closeable { + public static final String ID = "_id"; + public static final String OP_TYPE = "_op_type"; + public static final String SOURCE = "_source"; + private static final Logger logger = LogManager.getLogger(MessageProcessorRunnable.class); - private static final String ID = "_id"; - private static final String OP_TYPE = "_op_type"; - private static final String SOURCE = "_source"; private static final int MIN_RETRY_COUNT = 2; private static final int WAIT_BEFORE_RETRY_DURATION_MS = 2000; diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainer.java b/server/src/main/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainer.java index c5a033c8b4739..c332835f67a4e 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainer.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainer.java @@ -10,9 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.common.util.RequestUtils; import org.opensearch.core.common.Strings; -import org.opensearch.index.IngestionShardConsumer; import org.opensearch.index.IngestionShardPointer; import org.opensearch.index.Message; import org.opensearch.index.engine.IngestionEngine; @@ -27,8 +25,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import static org.opensearch.action.index.IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; - /** * A partitioned blocking queue approach is used to support multiple writer threads. This class holds a blocking queue * per partition. A processor thread is started for each partition to consume updates and write to the lucene index. @@ -121,32 +117,14 @@ public void startProcessorThreads() { } /** - * Add a document into the blocking queue. ID of the document will be used to identify the blocking queue partition. - * If an ID is not present, a new one will be auto generated. + * Add a shard update message to the blocking queue. ID of the document will be used to identify the blocking queue partition. */ - public void add(IngestionShardConsumer.ReadResult readResult) - throws InterruptedException { - Map payloadMap = IngestionUtils.getParsedPayloadMap((byte[]) readResult.getMessage().getPayload()); - String id; - long autoGeneratedIdTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP; - - if (payloadMap.containsKey(IdFieldMapper.NAME)) { - id = (String) payloadMap.get(IdFieldMapper.NAME); - } else { - id = RequestUtils.generateID(); - payloadMap.put(IdFieldMapper.NAME, id); - autoGeneratedIdTimestamp = System.currentTimeMillis(); - } - - ShardUpdateMessage updateMessage = new ShardUpdateMessage( - readResult.getPointer(), - readResult.getMessage(), - payloadMap, - autoGeneratedIdTimestamp - ); + public void add(ShardUpdateMessage shardUpdateMessage) throws InterruptedException { + Map payloadMap = shardUpdateMessage.parsedPayloadMap(); + String id = (String) payloadMap.get(IdFieldMapper.NAME); int partition = getPartitionFromID(id); - partitionToQueueMap.get(partition).put(updateMessage); + partitionToQueueMap.get(partition).put(shardUpdateMessage); } /** diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/ShardUpdateMessage.java b/server/src/main/java/org/opensearch/indices/pollingingest/ShardUpdateMessage.java index fd4af460f33f5..d9a020be97661 100644 --- a/server/src/main/java/org/opensearch/indices/pollingingest/ShardUpdateMessage.java +++ b/server/src/main/java/org/opensearch/indices/pollingingest/ShardUpdateMessage.java @@ -15,7 +15,9 @@ /** * Holds the original message consumed from the streaming source, corresponding pointer and parsed payload map. This - * will be used by the pull-based ingestion processor/writer threads to update the index. + * will be used by the pull-based ingestion processor/writer threads to update the index. The poller implementations + * will publish a version of ShardUpdateMessage into the blocking queue, which will then be processed by the message + * processors. */ public record ShardUpdateMessage(T pointer, M originalMessage, Map< String, diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/mappers/DefaultIngestionMessageMapper.java b/server/src/main/java/org/opensearch/indices/pollingingest/mappers/DefaultIngestionMessageMapper.java new file mode 100644 index 0000000000000..5a574ea2f4693 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/pollingingest/mappers/DefaultIngestionMessageMapper.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.pollingingest.mappers; + +import org.opensearch.common.util.RequestUtils; +import org.opensearch.index.IngestionShardPointer; +import org.opensearch.index.Message; +import org.opensearch.indices.pollingingest.IngestionUtils; +import org.opensearch.indices.pollingingest.ShardUpdateMessage; + +import java.util.Map; + +import static org.opensearch.action.index.IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; +import static org.opensearch.indices.pollingingest.MessageProcessorRunnable.ID; + +/** + * Default mapper implementation that expects messages in the format: + * { + * "_id": "document_id", + * "_op_type": "index|create|delete", + * "_version": "external document version" + * "_source": { ... document fields ... } + * } + * + *

Document ID will be auto-generated if not present in the incoming message. + */ +public class DefaultIngestionMessageMapper implements IngestionMessageMapper { + + @Override + public ShardUpdateMessage mapAndProcess(IngestionShardPointer pointer, Message message) throws IllegalArgumentException { + Map payloadMap = IngestionUtils.getParsedPayloadMap((byte[]) message.getPayload()); + + // Extract or generate _id + long autoGeneratedIdTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP; + if (payloadMap.containsKey(ID) == false) { + String id = RequestUtils.generateID(); + payloadMap.put(ID, id); + autoGeneratedIdTimestamp = System.currentTimeMillis(); + } + + return new ShardUpdateMessage(pointer, message, payloadMap, autoGeneratedIdTimestamp); + } +} diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/mappers/IngestionMessageMapper.java b/server/src/main/java/org/opensearch/indices/pollingingest/mappers/IngestionMessageMapper.java new file mode 100644 index 0000000000000..eab0ec064e8e5 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/pollingingest/mappers/IngestionMessageMapper.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.pollingingest.mappers; + +import org.opensearch.common.annotation.ExperimentalApi; +import org.opensearch.index.IngestionShardPointer; +import org.opensearch.index.Message; +import org.opensearch.indices.pollingingest.ShardUpdateMessage; + +import java.util.Locale; + +/** + * Interface for mapping ingestion messages to ShardUpdateMessage format. + * Different implementations can support different message formats from streaming sources. + * + *

Note that IngestionMessageMapper will only map the incoming message to a {@link ShardUpdateMessage} and will not + * validate and drop messages. Validations will be done as part of message processing in the {@link org.opensearch.indices.pollingingest.MessageProcessorRunnable}

+ */ +public interface IngestionMessageMapper { + + /** + * Maps and processes an ingestion message to a shard update message. + * + * @param pointer the shard pointer for this message + * @param message the message from the streaming source + * @return the shard update message + * @throws IllegalArgumentException if the message format is invalid + */ + ShardUpdateMessage mapAndProcess(IngestionShardPointer pointer, Message message) throws IllegalArgumentException; + + /** + * Enum representing different mapper types. + */ + @ExperimentalApi + enum MapperType { + DEFAULT("default"), + RAW_PAYLOAD("raw_payload"); + + private final String name; + + MapperType(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + public static MapperType fromString(String value) { + for (MapperType type : MapperType.values()) { + if (type.name.equalsIgnoreCase(value)) { + return type; + } + } + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Unknown ingestion mapper type: %s. Valid values are: default, raw_payload", value) + ); + } + } + + /** + * Factory method to create a mapper instance based on type string. + * + * @param mapperTypeString the type of mapper to create as a string + * @param shardId the shard ID + * @return the mapper instance + */ + static IngestionMessageMapper create(String mapperTypeString, int shardId) { + MapperType mapperType = MapperType.fromString(mapperTypeString); + switch (mapperType) { + case DEFAULT: + return new DefaultIngestionMessageMapper(); + case RAW_PAYLOAD: + return new RawPayloadIngestionMessageMapper(shardId); + default: + throw new IllegalArgumentException("Unknown mapper type: " + mapperType); + } + } +} diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/mappers/RawPayloadIngestionMessageMapper.java b/server/src/main/java/org/opensearch/indices/pollingingest/mappers/RawPayloadIngestionMessageMapper.java new file mode 100644 index 0000000000000..099731d8133fe --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/pollingingest/mappers/RawPayloadIngestionMessageMapper.java @@ -0,0 +1,56 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.pollingingest.mappers; + +import org.opensearch.index.IngestionShardPointer; +import org.opensearch.index.Message; +import org.opensearch.indices.pollingingest.IngestionUtils; +import org.opensearch.indices.pollingingest.ShardUpdateMessage; + +import java.util.Map; + +import static org.opensearch.action.index.IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; +import static org.opensearch.indices.pollingingest.MessageProcessorRunnable.ID; +import static org.opensearch.indices.pollingingest.MessageProcessorRunnable.OP_TYPE; +import static org.opensearch.indices.pollingingest.MessageProcessorRunnable.SOURCE; + +/** + * Mapper implementation that uses the entire message payload as the document source. + * The message pointer (Kafka offset, Kinesis sequence number, etc) along with shardID is used as the document ID, and the operation type is "index". + * For the document ID, the shard ID is prefixed with the shard pointer to ensure uniqueness across all shards. + * + *

Note that raw payload will not support document versioning. Eventually consistent view of the documents can be expected + * if there is a shard recovery resulting in message replay. + */ +public class RawPayloadIngestionMessageMapper implements IngestionMessageMapper { + + private static final String OP_TYPE_INDEX = "index"; + private final int shardId; + + public RawPayloadIngestionMessageMapper(int shardId) { + this.shardId = shardId; + } + + @Override + public ShardUpdateMessage mapAndProcess(IngestionShardPointer pointer, Message message) throws IllegalArgumentException { + // Parse the raw payload - this will be the _source content + Map sourceMap = IngestionUtils.getParsedPayloadMap((byte[]) message.getPayload()); + + // Use shard ID prefix + pointer as the document ID to ensure uniqueness across shards + String id = shardId + "-" + pointer.asString(); + + // No auto-generated ID timestamp since we're using the pointer as ID + long autoGeneratedIdTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP; + + // Construct the full payload map with metadata fields and source + Map payloadMap = Map.of(ID, id, OP_TYPE, OP_TYPE_INDEX, SOURCE, sourceMap); + + return new ShardUpdateMessage(pointer, message, payloadMap, autoGeneratedIdTimestamp); + } +} diff --git a/server/src/main/java/org/opensearch/indices/pollingingest/mappers/package-info.java b/server/src/main/java/org/opensearch/indices/pollingingest/mappers/package-info.java new file mode 100644 index 0000000000000..bec0b56c97b16 --- /dev/null +++ b/server/src/main/java/org/opensearch/indices/pollingingest/mappers/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** Pull-based ingestion message mappers package. */ +package org.opensearch.indices.pollingingest.mappers; diff --git a/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java index b83c78ecf5eee..feddeb00eab20 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/IngestionSourceTests.java @@ -108,7 +108,7 @@ public void testToString() { .setErrorStrategy(DROP) .build(); String expected = - "IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='RESET_BY_OFFSET', value=1000}',error_strategy='DROP', params={key=value}, maxPollSize=1000, pollTimeout=1000, numProcessorThreads=1, blockingQueueSize=100, allActiveIngestion=false, pointerBasedLagUpdateInterval=10s}"; + "IngestionSource{type='type',pointer_init_reset='PointerInitReset{type='RESET_BY_OFFSET', value=1000}',error_strategy='DROP', params={key=value}, maxPollSize=1000, pollTimeout=1000, numProcessorThreads=1, blockingQueueSize=100, allActiveIngestion=false, pointerBasedLagUpdateInterval=10s, mapperType='DEFAULT'}"; assertEquals(expected, source.toString()); } diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java index 1b7b3cc87609a..9c65d67c55426 100644 --- a/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java +++ b/server/src/test/java/org/opensearch/indices/pollingingest/DefaultStreamPollerTests.java @@ -21,6 +21,7 @@ import org.opensearch.index.IngestionShardConsumer; import org.opensearch.index.engine.FakeIngestionSource; import org.opensearch.index.engine.IngestionEngine; +import org.opensearch.indices.pollingingest.mappers.DefaultIngestionMessageMapper; import org.opensearch.test.IndexSettingsModule; import org.opensearch.test.OpenSearchTestCase; import org.junit.After; @@ -91,7 +92,8 @@ public void setUp() throws Exception { 1000, 1000, 10000, - indexSettings + indexSettings, + new DefaultIngestionMessageMapper() ); partitionedBlockingQueueContainer.startProcessorThreads(); } @@ -164,7 +166,8 @@ public void testResetStateEarliest() throws InterruptedException { 1000, 1000, 10000, - indexSettings + indexSettings, + new DefaultIngestionMessageMapper() ); CountDownLatch latch = new CountDownLatch(2); doAnswer(invocation -> { @@ -193,7 +196,8 @@ public void testResetStateLatest() throws InterruptedException { 1000, 1000, 10000, - indexSettings + indexSettings, + new DefaultIngestionMessageMapper() ); poller.start(); @@ -218,7 +222,8 @@ public void testResetStateRewindByOffset() throws InterruptedException { 1000, 1000, 10000, - indexSettings + indexSettings, + new DefaultIngestionMessageMapper() ); CountDownLatch latch = new CountDownLatch(1); doAnswer(invocation -> { @@ -303,7 +308,8 @@ public void testDropErrorIngestionStrategy() throws TimeoutException, Interrupte 1000, 1000, 10000, - indexSettings + indexSettings, + new DefaultIngestionMessageMapper() ); poller.start(); Thread.sleep(sleepTime); @@ -365,7 +371,8 @@ public void testBlockErrorIngestionStrategy() throws TimeoutException, Interrupt 1000, 1000, 10000, - indexSettings + indexSettings, + new DefaultIngestionMessageMapper() ); poller.start(); Thread.sleep(sleepTime); @@ -401,7 +408,8 @@ public void testProcessingErrorWithBlockErrorIngestionStrategy() throws TimeoutE 1000, 1000, 10000, - indexSettings + indexSettings, + new DefaultIngestionMessageMapper() ); poller.start(); Thread.sleep(sleepTime); @@ -473,7 +481,8 @@ public void testPersistedBatchStartPointer() throws TimeoutException, Interrupte 1000, 1000, 10000, - indexSettings + indexSettings, + new DefaultIngestionMessageMapper() ); poller.start(); Thread.sleep(sleepTime); @@ -542,7 +551,8 @@ public void testConsumerInitializationRetry() throws Exception { 1000, 1000, 10000, - indexSettings + indexSettings, + new DefaultIngestionMessageMapper() ); poller.start(); diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainerTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainerTests.java index 3fc41ec98ed2c..751fc524feea7 100644 --- a/server/src/test/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainerTests.java +++ b/server/src/test/java/org/opensearch/indices/pollingingest/PartitionedBlockingQueueContainerTests.java @@ -11,6 +11,8 @@ import org.opensearch.core.common.Strings; import org.opensearch.index.IngestionShardConsumer; import org.opensearch.index.engine.FakeIngestionSource; +import org.opensearch.indices.pollingingest.mappers.DefaultIngestionMessageMapper; +import org.opensearch.indices.pollingingest.mappers.IngestionMessageMapper; import org.opensearch.test.OpenSearchTestCase; import org.junit.After; import org.junit.Before; @@ -34,6 +36,7 @@ public class PartitionedBlockingQueueContainerTests extends OpenSearchTestCase { private PartitionedBlockingQueueContainer blockingQueueContainer; private FakeIngestionSource.FakeIngestionConsumer fakeConsumer; private List messages; + private IngestionMessageMapper mapper; @Before public void setUp() throws Exception { @@ -51,6 +54,7 @@ public void setUp() throws Exception { 0 ); this.blockingQueueContainer = new PartitionedBlockingQueueContainer(processorRunnable, 0); + this.mapper = new DefaultIngestionMessageMapper(); } @After @@ -83,7 +87,8 @@ public void testAddMessage() throws TimeoutException, InterruptedException { for (IngestionShardConsumer.ReadResult< FakeIngestionSource.FakeIngestionShardPointer, FakeIngestionSource.FakeIngestionMessage> readResult : readResults) { - blockingQueueContainer.add(readResult); + ShardUpdateMessage shardUpdateMessage = mapper.mapAndProcess(readResult.getPointer(), readResult.getMessage()); + blockingQueueContainer.add(shardUpdateMessage); } // verify ID is present on all messages diff --git a/server/src/test/java/org/opensearch/indices/pollingingest/mappers/IngestionMessageMapperTests.java b/server/src/test/java/org/opensearch/indices/pollingingest/mappers/IngestionMessageMapperTests.java new file mode 100644 index 0000000000000..291e6fa1269a7 --- /dev/null +++ b/server/src/test/java/org/opensearch/indices/pollingingest/mappers/IngestionMessageMapperTests.java @@ -0,0 +1,215 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.indices.pollingingest.mappers; + +import org.opensearch.index.IngestionShardPointer; +import org.opensearch.index.Message; +import org.opensearch.index.engine.FakeIngestionSource; +import org.opensearch.indices.pollingingest.ShardUpdateMessage; +import org.opensearch.test.OpenSearchTestCase; + +import java.nio.charset.StandardCharsets; +import java.util.Map; + +import static org.opensearch.action.index.IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP; + +public class IngestionMessageMapperTests extends OpenSearchTestCase { + + public void testMapperTypeFromStringAndName() { + assertEquals(IngestionMessageMapper.MapperType.DEFAULT, IngestionMessageMapper.MapperType.fromString("default")); + assertEquals(IngestionMessageMapper.MapperType.DEFAULT, IngestionMessageMapper.MapperType.fromString("DEFAULT")); + assertEquals(IngestionMessageMapper.MapperType.RAW_PAYLOAD, IngestionMessageMapper.MapperType.fromString("raw_payload")); + assertEquals(IngestionMessageMapper.MapperType.RAW_PAYLOAD, IngestionMessageMapper.MapperType.fromString("RAW_PAYLOAD")); + + assertEquals("default", IngestionMessageMapper.MapperType.DEFAULT.getName()); + assertEquals("raw_payload", IngestionMessageMapper.MapperType.RAW_PAYLOAD.getName()); + } + + public void testMapperTypeFromStringInvalid() { + expectThrows(IllegalArgumentException.class, () -> IngestionMessageMapper.MapperType.fromString("invalid")); + } + + public void testMapperCreation() { + IngestionMessageMapper defaultMapper = IngestionMessageMapper.create("default", 0); + assertNotNull(defaultMapper); + assertTrue(defaultMapper instanceof DefaultIngestionMessageMapper); + + IngestionMessageMapper rawPayloadMapper = IngestionMessageMapper.create("raw_payload", 0); + assertNotNull(rawPayloadMapper); + assertTrue(rawPayloadMapper instanceof RawPayloadIngestionMessageMapper); + + expectThrows(IllegalArgumentException.class, () -> IngestionMessageMapper.create("unknown", 0)); + } + + public void testDefaultMapperWithIdPresent() { + DefaultIngestionMessageMapper mapper = new DefaultIngestionMessageMapper(); + String payload = "{\"_id\":\"123\",\"_op_type\":\"index\",\"_source\":{\"name\":\"alice\",\"age\":30}}"; + byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8); + + IngestionShardPointer pointer = new FakeIngestionSource.FakeIngestionShardPointer(5); + Message message = new FakeIngestionSource.FakeIngestionMessage(payloadBytes); + + ShardUpdateMessage result = mapper.mapAndProcess(pointer, message); + + assertNotNull(result); + assertEquals(pointer, result.pointer()); + assertEquals(message, result.originalMessage()); + assertEquals(UNSET_AUTO_GENERATED_TIMESTAMP, result.autoGeneratedIdTimestamp()); + + Map parsedMap = result.parsedPayloadMap(); + assertEquals("123", parsedMap.get("_id")); + assertEquals("index", parsedMap.get("_op_type")); + assertNotNull(parsedMap.get("_source")); + assertTrue(parsedMap.get("_source") instanceof Map); + + Map source = (Map) parsedMap.get("_source"); + assertEquals("alice", source.get("name")); + assertEquals(30, source.get("age")); + } + + public void testDefaultMapperWithoutId() { + DefaultIngestionMessageMapper mapper = new DefaultIngestionMessageMapper(); + String payload = "{\"_op_type\":\"index\",\"_source\":{\"name\":\"bob\",\"age\":25}}"; + byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8); + + IngestionShardPointer pointer = new FakeIngestionSource.FakeIngestionShardPointer(10); + Message message = new FakeIngestionSource.FakeIngestionMessage(payloadBytes); + + ShardUpdateMessage result = mapper.mapAndProcess(pointer, message); + + assertNotNull(result); + assertEquals(pointer, result.pointer()); + assertEquals(message, result.originalMessage()); + assertNotEquals(UNSET_AUTO_GENERATED_TIMESTAMP, result.autoGeneratedIdTimestamp()); + + Map parsedMap = result.parsedPayloadMap(); + assertNotNull(parsedMap.get("_id")); + assertTrue(parsedMap.get("_id") instanceof String); + String generatedId = (String) parsedMap.get("_id"); + assertFalse(generatedId.isEmpty()); + } + + public void testDefaultMapperWithVersion() { + DefaultIngestionMessageMapper mapper = new DefaultIngestionMessageMapper(); + String payload = "{\"_id\":\"789\",\"_version\":\"5\",\"_op_type\":\"index\",\"_source\":{\"name\":\"david\"}}"; + byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8); + + IngestionShardPointer pointer = new FakeIngestionSource.FakeIngestionShardPointer(20); + Message message = new FakeIngestionSource.FakeIngestionMessage(payloadBytes); + + ShardUpdateMessage result = mapper.mapAndProcess(pointer, message); + + assertNotNull(result); + Map parsedMap = result.parsedPayloadMap(); + assertEquals("789", parsedMap.get("_id")); + assertEquals("5", parsedMap.get("_version")); + } + + public void testRawPayloadMapper() { + RawPayloadIngestionMessageMapper mapper = new RawPayloadIngestionMessageMapper(0); + String payload = "{\"name\":\"alice\",\"age\":30,\"city\":\"Seattle\"}"; + byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8); + + IngestionShardPointer pointer = new FakeIngestionSource.FakeIngestionShardPointer(100); + Message message = new FakeIngestionSource.FakeIngestionMessage(payloadBytes); + + ShardUpdateMessage result = mapper.mapAndProcess(pointer, message); + + assertNotNull(result); + assertEquals(pointer, result.pointer()); + assertEquals(message, result.originalMessage()); + assertEquals(UNSET_AUTO_GENERATED_TIMESTAMP, result.autoGeneratedIdTimestamp()); + + Map parsedMap = result.parsedPayloadMap(); + + // Verify _id is set to shard ID + pointer value + assertEquals("0-100", parsedMap.get("_id")); + + // Verify _op_type is set to "index" + assertEquals("index", parsedMap.get("_op_type")); + + // Verify _source contains the original payload + assertNotNull(parsedMap.get("_source")); + assertTrue(parsedMap.get("_source") instanceof Map); + + Map source = (Map) parsedMap.get("_source"); + assertEquals("alice", source.get("name")); + assertEquals(30, source.get("age")); + assertEquals("Seattle", source.get("city")); + + // Verify _source does not contain metadata fields + assertFalse(source.containsKey("_id")); + assertFalse(source.containsKey("_op_type")); + } + + public void testRawPayloadMapperWithComplexObject() { + RawPayloadIngestionMessageMapper mapper = new RawPayloadIngestionMessageMapper(1); + String payload = "{\"user\":{\"name\":\"bob\",\"email\":\"bob@example.com\"},\"tags\":[\"tag1\",\"tag2\"],\"count\":42}"; + byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8); + + IngestionShardPointer pointer = new FakeIngestionSource.FakeIngestionShardPointer(200); + Message message = new FakeIngestionSource.FakeIngestionMessage(payloadBytes); + + ShardUpdateMessage result = mapper.mapAndProcess(pointer, message); + + assertNotNull(result); + Map parsedMap = result.parsedPayloadMap(); + + assertEquals("1-200", parsedMap.get("_id")); + assertEquals("index", parsedMap.get("_op_type")); + + Map source = (Map) parsedMap.get("_source"); + assertEquals(3, source.size()); + assertTrue(source.containsKey("user")); + assertTrue(source.containsKey("tags")); + assertEquals(42, source.get("count")); + } + + public void testRawPayloadMapperWithEmptyObject() { + RawPayloadIngestionMessageMapper mapper = new RawPayloadIngestionMessageMapper(2); + String payload = "{}"; + byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8); + + IngestionShardPointer pointer = new FakeIngestionSource.FakeIngestionShardPointer(300); + Message message = new FakeIngestionSource.FakeIngestionMessage(payloadBytes); + + ShardUpdateMessage result = mapper.mapAndProcess(pointer, message); + + assertNotNull(result); + Map parsedMap = result.parsedPayloadMap(); + + assertEquals("2-300", parsedMap.get("_id")); + assertEquals("index", parsedMap.get("_op_type")); + + Map source = (Map) parsedMap.get("_source"); + assertTrue(source.isEmpty()); + } + + public void testDefaultMapperWithInvalidJson() { + DefaultIngestionMessageMapper mapper = new DefaultIngestionMessageMapper(); + String payload = "invalid json {{{"; + byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8); + + IngestionShardPointer pointer = new FakeIngestionSource.FakeIngestionShardPointer(500); + Message message = new FakeIngestionSource.FakeIngestionMessage(payloadBytes); + + expectThrows(Exception.class, () -> mapper.mapAndProcess(pointer, message)); + } + + public void testRawPayloadMapperWithInvalidJson() { + RawPayloadIngestionMessageMapper mapper = new RawPayloadIngestionMessageMapper(3); + String payload = "not a json"; + byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8); + + IngestionShardPointer pointer = new FakeIngestionSource.FakeIngestionShardPointer(600); + Message message = new FakeIngestionSource.FakeIngestionMessage(payloadBytes); + + expectThrows(Exception.class, () -> mapper.mapAndProcess(pointer, message)); + } +} From 881989d193651d99c5284084ea6f35876e0b3be6 Mon Sep 17 00:00:00 2001 From: Varun Bharadwaj Date: Thu, 13 Nov 2025 13:46:27 -0800 Subject: [PATCH 2/2] remove DefaultAssertionStatus false from segment warmer test Signed-off-by: Varun Bharadwaj --- .../org/opensearch/index/engine/MergedSegmentWarmerTests.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/test/java/org/opensearch/index/engine/MergedSegmentWarmerTests.java b/server/src/test/java/org/opensearch/index/engine/MergedSegmentWarmerTests.java index 65f6c398b2d6f..d84e82545ba73 100644 --- a/server/src/test/java/org/opensearch/index/engine/MergedSegmentWarmerTests.java +++ b/server/src/test/java/org/opensearch/index/engine/MergedSegmentWarmerTests.java @@ -142,8 +142,6 @@ public void testShouldWarm_minNodeVersionNotSatisfied() throws IOException { } public void testShouldWarm_failure() throws IOException { - ClassLoader.getSystemClassLoader().setDefaultAssertionStatus(false); - MergedSegmentWarmer warmer = spy(new MergedSegmentWarmer(null, null, null, mockIndexShard)); doThrow(new RuntimeException("test exception")).when(warmer).shouldWarm(any()); doReturn(mock(SegmentCommitInfo.class)).when(warmer).segmentCommitInfo(any());