diff --git a/CHANGELOG.md b/CHANGELOG.md index 7362e4b4e7ed5..032ca6caea465 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add pointer based lag metric in pull-based ingestion ([#19635](https://github.com/opensearch-project/OpenSearch/pull/19635)) - Introduced internal API for retrieving metadata about requested indices from transport actions ([#18523](https://github.com/opensearch-project/OpenSearch/pull/18523)) - Add cluster defaults for merge autoThrottle, maxMergeThreads, and maxMergeCount; Add segment size filter to the merged segment warmer ([#19629](https://github.com/opensearch-project/OpenSearch/pull/19629)) +- Add SMILE/CBOR/YAML document format support to Bulk GRPC endpoint ([#19744](https://github.com/opensearch-project/OpenSearch/pull/19744)) ### Changed - Faster `terms` query creation for `keyword` field with index and docValues enabled ([#19350](https://github.com/opensearch-project/OpenSearch/pull/19350)) diff --git a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java index af48f4602b56c..f02a9bbd2238b 100644 --- a/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java +++ b/modules/transport-grpc/src/main/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtils.java @@ -82,6 +82,21 @@ private static Boolean valueOrDefault(Boolean value, Boolean globalDefault) { return value; } + /** + * Detects the media type from the byte content, with fallback to JSON if detection fails. + * This enables support for JSON, SMILE, and CBOR formats in gRPC bulk requests. + * + * @param document The document content as bytes + * @return The detected MediaType, or JSON if detection fails or document is empty + */ + static MediaType detectMediaType(byte[] document) { + if (document == null || document.length == 0) { + return MediaTypeRegistry.JSON; + } + MediaType detectedType = MediaTypeRegistry.mediaTypeFromBytes(document, 0, document.length); + return detectedType != null ? detectedType : MediaTypeRegistry.JSON; + } + /** * Similar to {@link BulkRequestParser#parse(BytesReference, String, String, FetchSourceContext, String, Boolean, boolean, MediaType, Consumer, Consumer, Consumer)}, except that it takes into account global values. * @@ -231,6 +246,7 @@ public static IndexRequest buildCreateRequest( pipeline = createOperation.hasPipeline() ? createOperation.getPipeline() : pipeline; requireAlias = createOperation.hasRequireAlias() ? createOperation.getRequireAlias() : requireAlias; + MediaType mediaType = detectMediaType(document); IndexRequest indexRequest = new IndexRequest(index).id(id) .routing(routing) .version(version) @@ -239,7 +255,7 @@ public static IndexRequest buildCreateRequest( .setPipeline(pipeline) .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) - .source(document, MediaTypeRegistry.JSON) + .source(document, mediaType) .setRequireAlias(requireAlias); return indexRequest; } @@ -288,6 +304,7 @@ public static IndexRequest buildIndexRequest( ifPrimaryTerm = indexOperation.hasIfPrimaryTerm() ? indexOperation.getIfPrimaryTerm() : ifPrimaryTerm; requireAlias = indexOperation.hasRequireAlias() ? indexOperation.getRequireAlias() : requireAlias; + MediaType mediaType = detectMediaType(document); IndexRequest indexRequest; if (opType == null) { indexRequest = new IndexRequest(index).id(id) @@ -297,7 +314,7 @@ public static IndexRequest buildIndexRequest( .setPipeline(pipeline) .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) - .source(document, MediaTypeRegistry.JSON) + .source(document, mediaType) .setRequireAlias(requireAlias); } else { indexRequest = new IndexRequest(index).id(id) @@ -308,7 +325,7 @@ public static IndexRequest buildIndexRequest( .setPipeline(pipeline) .setIfSeqNo(ifSeqNo) .setIfPrimaryTerm(ifPrimaryTerm) - .source(document, MediaTypeRegistry.JSON) + .source(document, mediaType) .setRequireAlias(requireAlias); } return indexRequest; @@ -408,7 +425,9 @@ public static UpdateRequest fromProto( } if (updateAction.hasUpsert()) { - updateRequest.upsert(updateAction.getUpsert(), MediaTypeRegistry.JSON); + byte[] upsertBytes = updateAction.getUpsert().toByteArray(); + MediaType upsertMediaType = detectMediaType(upsertBytes); + updateRequest.upsert(upsertBytes, upsertMediaType); } if (updateAction.hasDocAsUpsert()) { @@ -424,7 +443,8 @@ public static UpdateRequest fromProto( } } - updateRequest.doc(document, MediaTypeRegistry.JSON); + MediaType mediaType = detectMediaType(document); + updateRequest.doc(document, mediaType); if (updateOperation.hasIfSeqNo()) { updateRequest.setIfSeqNo(updateOperation.getIfSeqNo()); diff --git a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java index 2141479642c13..81fd666dbc980 100644 --- a/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java +++ b/modules/transport-grpc/src/test/java/org/opensearch/transport/grpc/proto/request/document/bulk/BulkRequestParserProtoUtilsTests.java @@ -14,6 +14,7 @@ import org.opensearch.action.index.IndexRequest; import org.opensearch.action.update.UpdateRequest; import org.opensearch.common.lucene.uid.Versions; +import org.opensearch.core.xcontent.MediaType; import org.opensearch.index.VersionType; import org.opensearch.index.seqno.SequenceNumbers; import org.opensearch.protobufs.BulkRequest; @@ -716,4 +717,325 @@ public void testFromProtoWithAllUpdateActionFields() { assertEquals("IfSeqNo should be set", 123L, result.ifSeqNo()); assertEquals("IfPrimaryTerm should be set", 456L, result.ifPrimaryTerm()); } + + public void testBuildCreateRequestWithSmileContent() throws Exception { + WriteOperation writeOperation = WriteOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + // Create SMILE-encoded document + byte[] smileDocument = createSmileDocument(); + + IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( + writeOperation, + smileDocument, + "default-index", + "default-id", + null, + Versions.MATCH_ANY, + VersionType.INTERNAL, + null, + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false + ); + + assertNotNull("IndexRequest should not be null", indexRequest); + assertEquals("Index should match", "test-index", indexRequest.index()); + assertEquals("Id should match", "test-id", indexRequest.id()); + assertNotNull("Source should be set", indexRequest.source()); + // Verify the content type was detected as SMILE + assertEquals("Content type should be SMILE", "application/smile", indexRequest.getContentType().mediaType()); + } + + public void testBuildCreateRequestWithCborContent() throws Exception { + WriteOperation writeOperation = WriteOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + // Create CBOR-encoded document + byte[] cborDocument = createCborDocument(); + + IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( + writeOperation, + cborDocument, + "default-index", + "default-id", + null, + Versions.MATCH_ANY, + VersionType.INTERNAL, + null, + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false + ); + + assertNotNull("IndexRequest should not be null", indexRequest); + assertEquals("Index should match", "test-index", indexRequest.index()); + assertEquals("Id should match", "test-id", indexRequest.id()); + assertNotNull("Source should be set", indexRequest.source()); + // Verify the content type was detected as CBOR + assertEquals("Content type should be CBOR", "application/cbor", indexRequest.getContentType().mediaType()); + } + + public void testBuildIndexRequestWithSmileContent() throws Exception { + IndexOperation indexOperation = IndexOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + // Create SMILE-encoded document + byte[] smileDocument = createSmileDocument(); + + IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( + indexOperation, + smileDocument, + null, + "default-index", + "default-id", + null, + Versions.MATCH_ANY, + VersionType.INTERNAL, + null, + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false + ); + + assertNotNull("IndexRequest should not be null", indexRequest); + assertEquals("Index should match", "test-index", indexRequest.index()); + assertNotNull("Source should be set", indexRequest.source()); + // Verify the content type was detected as SMILE + assertEquals("Content type should be SMILE", "application/smile", indexRequest.getContentType().mediaType()); + } + + public void testBuildIndexRequestWithCborContent() throws Exception { + IndexOperation indexOperation = IndexOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + // Create CBOR-encoded document + byte[] cborDocument = createCborDocument(); + + IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( + indexOperation, + cborDocument, + null, + "default-index", + "default-id", + null, + Versions.MATCH_ANY, + VersionType.INTERNAL, + null, + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false + ); + + assertNotNull("IndexRequest should not be null", indexRequest); + assertEquals("Index should match", "test-index", indexRequest.index()); + assertNotNull("Source should be set", indexRequest.source()); + // Verify the content type was detected as CBOR + assertEquals("Content type should be CBOR", "application/cbor", indexRequest.getContentType().mediaType()); + } + + public void testUpdateRequestWithCborUpsert() throws Exception { + UpdateRequest updateRequest = new UpdateRequest("test-index", "test-id"); + byte[] document = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); + + // Create CBOR-encoded upsert document + byte[] cborUpsert = createCborDocument(); + + BulkRequestBody bulkRequestBody = BulkRequestBody.newBuilder() + .setUpdateAction(org.opensearch.protobufs.UpdateAction.newBuilder().setUpsert(ByteString.copyFrom(cborUpsert)).build()) + .build(); + + UpdateOperation updateOperation = UpdateOperation.newBuilder().build(); + + UpdateRequest result = BulkRequestParserProtoUtils.fromProto(updateRequest, document, bulkRequestBody, updateOperation); + + assertNotNull("Result should not be null", result); + assertNotNull("Upsert should be set", result.upsertRequest()); + } + + public void testBuildCreateRequestWithEmptyDocument() { + WriteOperation writeOperation = WriteOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + byte[] emptyDocument = new byte[0]; + + IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( + writeOperation, + emptyDocument, + "default-index", + "default-id", + null, + Versions.MATCH_ANY, + VersionType.INTERNAL, + null, + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false + ); + + assertNotNull("IndexRequest should not be null", indexRequest); + assertNotNull("Source should be set", indexRequest.source()); + // Empty document should default to JSON + assertTrue("Content type should default to JSON", indexRequest.getContentType().mediaType().startsWith("application/json")); + } + + public void testBuildCreateRequestWithJsonContent() throws Exception { + WriteOperation writeOperation = WriteOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + // Create JSON document + byte[] jsonDocument = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); + + IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( + writeOperation, + jsonDocument, + "default-index", + "default-id", + null, + Versions.MATCH_ANY, + VersionType.INTERNAL, + null, + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false + ); + + assertNotNull("IndexRequest should not be null", indexRequest); + assertEquals("Index should match", "test-index", indexRequest.index()); + assertEquals("Id should match", "test-id", indexRequest.id()); + assertNotNull("Source should be set", indexRequest.source()); + // Verify the content type was detected as JSON (may include charset) + assertTrue("Content type should be JSON", indexRequest.getContentType().mediaType().startsWith("application/json")); + } + + public void testBuildCreateRequestWithYamlContent() throws Exception { + WriteOperation writeOperation = WriteOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + // Create YAML-encoded document + byte[] yamlDocument = createYamlDocument(); + + IndexRequest indexRequest = BulkRequestParserProtoUtils.buildCreateRequest( + writeOperation, + yamlDocument, + "default-index", + "default-id", + null, + Versions.MATCH_ANY, + VersionType.INTERNAL, + null, + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false + ); + + assertNotNull("IndexRequest should not be null", indexRequest); + assertEquals("Index should match", "test-index", indexRequest.index()); + assertEquals("Id should match", "test-id", indexRequest.id()); + assertNotNull("Source should be set", indexRequest.source()); + // Verify the content type was detected as YAML + assertEquals("Content type should be YAML", "application/yaml", indexRequest.getContentType().mediaType()); + } + + public void testBuildIndexRequestWithJsonContent() throws Exception { + IndexOperation indexOperation = IndexOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + // Create JSON document + byte[] jsonDocument = "{\"field\":\"value\"}".getBytes(StandardCharsets.UTF_8); + + IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( + indexOperation, + jsonDocument, + null, + "default-index", + "default-id", + null, + Versions.MATCH_ANY, + VersionType.INTERNAL, + null, + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false + ); + + assertNotNull("IndexRequest should not be null", indexRequest); + assertEquals("Index should match", "test-index", indexRequest.index()); + assertNotNull("Source should be set", indexRequest.source()); + // Verify the content type was detected as JSON (may include charset) + assertTrue("Content type should be JSON", indexRequest.getContentType().mediaType().startsWith("application/json")); + } + + public void testBuildIndexRequestWithYamlContent() throws Exception { + IndexOperation indexOperation = IndexOperation.newBuilder().setXIndex("test-index").setXId("test-id").build(); + + // Create YAML-encoded document + byte[] yamlDocument = createYamlDocument(); + + IndexRequest indexRequest = BulkRequestParserProtoUtils.buildIndexRequest( + indexOperation, + yamlDocument, + null, + "default-index", + "default-id", + null, + Versions.MATCH_ANY, + VersionType.INTERNAL, + null, + SequenceNumbers.UNASSIGNED_SEQ_NO, + UNASSIGNED_PRIMARY_TERM, + false + ); + + assertNotNull("IndexRequest should not be null", indexRequest); + assertEquals("Index should match", "test-index", indexRequest.index()); + assertNotNull("Source should be set", indexRequest.source()); + // Verify the content type was detected as YAML + assertEquals("Content type should be YAML", "application/yaml", indexRequest.getContentType().mediaType()); + } + + /** + * Helper method to create a SMILE-encoded document. + */ + private byte[] createSmileDocument() throws Exception { + org.opensearch.core.xcontent.XContentBuilder builder = org.opensearch.common.xcontent.XContentFactory.smileBuilder(); + builder.startObject(); + builder.field("field", "value"); + builder.endObject(); + return org.opensearch.core.common.bytes.BytesReference.toBytes(org.opensearch.core.common.bytes.BytesReference.bytes(builder)); + } + + /** + * Helper method to create a CBOR-encoded document. + */ + private byte[] createCborDocument() throws Exception { + org.opensearch.core.xcontent.XContentBuilder builder = org.opensearch.common.xcontent.XContentFactory.cborBuilder(); + builder.startObject(); + builder.field("field", "value"); + builder.endObject(); + return org.opensearch.core.common.bytes.BytesReference.toBytes(org.opensearch.core.common.bytes.BytesReference.bytes(builder)); + } + + /** + * Helper method to create a YAML-encoded document. + */ + private byte[] createYamlDocument() throws Exception { + org.opensearch.core.xcontent.XContentBuilder builder = org.opensearch.common.xcontent.XContentFactory.yamlBuilder(); + builder.startObject(); + builder.field("field", "value"); + builder.endObject(); + return org.opensearch.core.common.bytes.BytesReference.toBytes(org.opensearch.core.common.bytes.BytesReference.bytes(builder)); + } + + /** + * Test detectMediaType with null or empty document + */ + public void testDetectMediaTypeNullOrEmpty() { + MediaType result = BulkRequestParserProtoUtils.detectMediaType(null); + assertEquals("application/json", result.mediaTypeWithoutParameters()); + + result = BulkRequestParserProtoUtils.detectMediaType(new byte[0]); + assertEquals("application/json", result.mediaTypeWithoutParameters()); + } + + /** + * Test detectMediaType with unrecognizable format + */ + public void testDetectMediaTypeUnrecognizable() { + byte[] invalidBytes = new byte[] { (byte) 0xFF, (byte) 0xFE, (byte) 0xFD, (byte) 0xFC }; + MediaType result = BulkRequestParserProtoUtils.detectMediaType(invalidBytes); + assertEquals("application/json", result.mediaTypeWithoutParameters()); + } }