88
99package org .opensearch .indices .pollingingest .mappers ;
1010
11- import org .opensearch .index .IngestionShardPointer ;
12- import org .opensearch .index .Message ;
13- import org .opensearch .index .engine .FakeIngestionSource ;
14- import org .opensearch .indices .pollingingest .ShardUpdateMessage ;
1511import org .opensearch .test .OpenSearchTestCase ;
1612
17- import java .nio .charset .StandardCharsets ;
18- import java .util .Map ;
19-
20- import static org .opensearch .action .index .IndexRequest .UNSET_AUTO_GENERATED_TIMESTAMP ;
21-
2213public class IngestionMessageMapperTests extends OpenSearchTestCase {
2314
2415 public void testMapperTypeFromStringAndName () {
@@ -30,186 +21,4 @@ public void testMapperTypeFromStringAndName() {
3021 assertEquals ("default" , IngestionMessageMapper .MapperType .DEFAULT .getName ());
3122 assertEquals ("raw_payload" , IngestionMessageMapper .MapperType .RAW_PAYLOAD .getName ());
3223 }
33-
34- public void testMapperTypeFromStringInvalid () {
35- expectThrows (IllegalArgumentException .class , () -> IngestionMessageMapper .MapperType .fromString ("invalid" ));
36- }
37-
38- public void testMapperCreation () {
39- IngestionMessageMapper defaultMapper = IngestionMessageMapper .create ("default" , 0 );
40- assertNotNull (defaultMapper );
41- assertTrue (defaultMapper instanceof DefaultIngestionMessageMapper );
42-
43- IngestionMessageMapper rawPayloadMapper = IngestionMessageMapper .create ("raw_payload" , 0 );
44- assertNotNull (rawPayloadMapper );
45- assertTrue (rawPayloadMapper instanceof RawPayloadIngestionMessageMapper );
46-
47- expectThrows (IllegalArgumentException .class , () -> IngestionMessageMapper .create ("unknown" , 0 ));
48- }
49-
50- public void testDefaultMapperWithIdPresent () {
51- DefaultIngestionMessageMapper mapper = new DefaultIngestionMessageMapper ();
52- String payload = "{\" _id\" :\" 123\" ,\" _op_type\" :\" index\" ,\" _source\" :{\" name\" :\" alice\" ,\" age\" :30}}" ;
53- byte [] payloadBytes = payload .getBytes (StandardCharsets .UTF_8 );
54-
55- IngestionShardPointer pointer = new FakeIngestionSource .FakeIngestionShardPointer (5 );
56- Message message = new FakeIngestionSource .FakeIngestionMessage (payloadBytes );
57-
58- ShardUpdateMessage result = mapper .mapAndProcess (pointer , message );
59-
60- assertNotNull (result );
61- assertEquals (pointer , result .pointer ());
62- assertEquals (message , result .originalMessage ());
63- assertEquals (UNSET_AUTO_GENERATED_TIMESTAMP , result .autoGeneratedIdTimestamp ());
64-
65- Map <String , Object > parsedMap = result .parsedPayloadMap ();
66- assertEquals ("123" , parsedMap .get ("_id" ));
67- assertEquals ("index" , parsedMap .get ("_op_type" ));
68- assertNotNull (parsedMap .get ("_source" ));
69- assertTrue (parsedMap .get ("_source" ) instanceof Map );
70-
71- Map <String , Object > source = (Map <String , Object >) parsedMap .get ("_source" );
72- assertEquals ("alice" , source .get ("name" ));
73- assertEquals (30 , source .get ("age" ));
74- }
75-
76- public void testDefaultMapperWithoutId () {
77- DefaultIngestionMessageMapper mapper = new DefaultIngestionMessageMapper ();
78- String payload = "{\" _op_type\" :\" index\" ,\" _source\" :{\" name\" :\" bob\" ,\" age\" :25}}" ;
79- byte [] payloadBytes = payload .getBytes (StandardCharsets .UTF_8 );
80-
81- IngestionShardPointer pointer = new FakeIngestionSource .FakeIngestionShardPointer (10 );
82- Message message = new FakeIngestionSource .FakeIngestionMessage (payloadBytes );
83-
84- ShardUpdateMessage result = mapper .mapAndProcess (pointer , message );
85-
86- assertNotNull (result );
87- assertEquals (pointer , result .pointer ());
88- assertEquals (message , result .originalMessage ());
89- assertNotEquals (UNSET_AUTO_GENERATED_TIMESTAMP , result .autoGeneratedIdTimestamp ());
90-
91- Map <String , Object > parsedMap = result .parsedPayloadMap ();
92- assertNotNull (parsedMap .get ("_id" ));
93- assertTrue (parsedMap .get ("_id" ) instanceof String );
94- String generatedId = (String ) parsedMap .get ("_id" );
95- assertFalse (generatedId .isEmpty ());
96- }
97-
98- public void testDefaultMapperWithVersion () {
99- DefaultIngestionMessageMapper mapper = new DefaultIngestionMessageMapper ();
100- String payload = "{\" _id\" :\" 789\" ,\" _version\" :\" 5\" ,\" _op_type\" :\" index\" ,\" _source\" :{\" name\" :\" david\" }}" ;
101- byte [] payloadBytes = payload .getBytes (StandardCharsets .UTF_8 );
102-
103- IngestionShardPointer pointer = new FakeIngestionSource .FakeIngestionShardPointer (20 );
104- Message message = new FakeIngestionSource .FakeIngestionMessage (payloadBytes );
105-
106- ShardUpdateMessage result = mapper .mapAndProcess (pointer , message );
107-
108- assertNotNull (result );
109- Map <String , Object > parsedMap = result .parsedPayloadMap ();
110- assertEquals ("789" , parsedMap .get ("_id" ));
111- assertEquals ("5" , parsedMap .get ("_version" ));
112- }
113-
114- public void testRawPayloadMapper () {
115- RawPayloadIngestionMessageMapper mapper = new RawPayloadIngestionMessageMapper (0 );
116- String payload = "{\" name\" :\" alice\" ,\" age\" :30,\" city\" :\" Seattle\" }" ;
117- byte [] payloadBytes = payload .getBytes (StandardCharsets .UTF_8 );
118-
119- IngestionShardPointer pointer = new FakeIngestionSource .FakeIngestionShardPointer (100 );
120- Message message = new FakeIngestionSource .FakeIngestionMessage (payloadBytes );
121-
122- ShardUpdateMessage result = mapper .mapAndProcess (pointer , message );
123-
124- assertNotNull (result );
125- assertEquals (pointer , result .pointer ());
126- assertEquals (message , result .originalMessage ());
127- assertEquals (UNSET_AUTO_GENERATED_TIMESTAMP , result .autoGeneratedIdTimestamp ());
128-
129- Map <String , Object > parsedMap = result .parsedPayloadMap ();
130-
131- // Verify _id is set to shard ID + pointer value
132- assertEquals ("0-100" , parsedMap .get ("_id" ));
133-
134- // Verify _op_type is set to "index"
135- assertEquals ("index" , parsedMap .get ("_op_type" ));
136-
137- // Verify _source contains the original payload
138- assertNotNull (parsedMap .get ("_source" ));
139- assertTrue (parsedMap .get ("_source" ) instanceof Map );
140-
141- Map <String , Object > source = (Map <String , Object >) parsedMap .get ("_source" );
142- assertEquals ("alice" , source .get ("name" ));
143- assertEquals (30 , source .get ("age" ));
144- assertEquals ("Seattle" , source .get ("city" ));
145-
146- // Verify _source does not contain metadata fields
147- assertFalse (source .containsKey ("_id" ));
148- assertFalse (source .containsKey ("_op_type" ));
149- }
150-
151- public void testRawPayloadMapperWithComplexObject () {
152- RawPayloadIngestionMessageMapper mapper = new RawPayloadIngestionMessageMapper (1 );
153- String payload =
"{\" user\" :{\" name\" :\" bob\" ,\" email\" :\" [email protected] \" },\" tags\" :[\" tag1\" ,\" tag2\" ],\" count\" :42}" ;
154- byte [] payloadBytes = payload .getBytes (StandardCharsets .UTF_8 );
155-
156- IngestionShardPointer pointer = new FakeIngestionSource .FakeIngestionShardPointer (200 );
157- Message message = new FakeIngestionSource .FakeIngestionMessage (payloadBytes );
158-
159- ShardUpdateMessage result = mapper .mapAndProcess (pointer , message );
160-
161- assertNotNull (result );
162- Map <String , Object > parsedMap = result .parsedPayloadMap ();
163-
164- assertEquals ("1-200" , parsedMap .get ("_id" ));
165- assertEquals ("index" , parsedMap .get ("_op_type" ));
166-
167- Map <String , Object > source = (Map <String , Object >) parsedMap .get ("_source" );
168- assertEquals (3 , source .size ());
169- assertTrue (source .containsKey ("user" ));
170- assertTrue (source .containsKey ("tags" ));
171- assertEquals (42 , source .get ("count" ));
172- }
173-
174- public void testRawPayloadMapperWithEmptyObject () {
175- RawPayloadIngestionMessageMapper mapper = new RawPayloadIngestionMessageMapper (2 );
176- String payload = "{}" ;
177- byte [] payloadBytes = payload .getBytes (StandardCharsets .UTF_8 );
178-
179- IngestionShardPointer pointer = new FakeIngestionSource .FakeIngestionShardPointer (300 );
180- Message message = new FakeIngestionSource .FakeIngestionMessage (payloadBytes );
181-
182- ShardUpdateMessage result = mapper .mapAndProcess (pointer , message );
183-
184- assertNotNull (result );
185- Map <String , Object > parsedMap = result .parsedPayloadMap ();
186-
187- assertEquals ("2-300" , parsedMap .get ("_id" ));
188- assertEquals ("index" , parsedMap .get ("_op_type" ));
189-
190- Map <String , Object > source = (Map <String , Object >) parsedMap .get ("_source" );
191- assertTrue (source .isEmpty ());
192- }
193-
194- public void testDefaultMapperWithInvalidJson () {
195- DefaultIngestionMessageMapper mapper = new DefaultIngestionMessageMapper ();
196- String payload = "invalid json {{{" ;
197- byte [] payloadBytes = payload .getBytes (StandardCharsets .UTF_8 );
198-
199- IngestionShardPointer pointer = new FakeIngestionSource .FakeIngestionShardPointer (500 );
200- Message message = new FakeIngestionSource .FakeIngestionMessage (payloadBytes );
201-
202- expectThrows (Exception .class , () -> mapper .mapAndProcess (pointer , message ));
203- }
204-
205- public void testRawPayloadMapperWithInvalidJson () {
206- RawPayloadIngestionMessageMapper mapper = new RawPayloadIngestionMessageMapper (3 );
207- String payload = "not a json" ;
208- byte [] payloadBytes = payload .getBytes (StandardCharsets .UTF_8 );
209-
210- IngestionShardPointer pointer = new FakeIngestionSource .FakeIngestionShardPointer (600 );
211- Message message = new FakeIngestionSource .FakeIngestionMessage (payloadBytes );
212-
213- expectThrows (Exception .class , () -> mapper .mapAndProcess (pointer , message ));
214- }
21524}
0 commit comments