3535import org .elasticsearch .common .unit .ByteSizeValue ;
3636import org .elasticsearch .common .unit .TimeValue ;
3737import org .elasticsearch .common .xcontent .XContentType ;
38+ import org .elasticsearch .index .mapper .MapperService ;
39+ import org .elasticsearch .rest .action .document .RestBulkAction ;
3840import org .elasticsearch .search .SearchHit ;
3941import org .hamcrest .Matcher ;
4042import org .hamcrest .Matchers ;
@@ -70,8 +72,15 @@ public class BulkProcessorIT extends ESRestHighLevelClientTestCase {
7072
7173 private static BulkProcessor .Builder initBulkProcessorBuilder (BulkProcessor .Listener listener ) {
7274 return BulkProcessor .builder (
73- (request , bulkListener ) -> highLevelClient ().bulkAsync (request , RequestOptions .DEFAULT , bulkListener ), listener );
75+ (request , bulkListener ) -> highLevelClient ().bulkAsync (request , RequestOptions .DEFAULT ,
76+ bulkListener ), listener );
7477 }
78+
79+ private static BulkProcessor .Builder initBulkProcessorBuilderUsingTypes (BulkProcessor .Listener listener ) {
80+ return BulkProcessor .builder (
81+ (request , bulkListener ) -> highLevelClient ().bulkAsync (request , expectWarnings (RestBulkAction .TYPES_DEPRECATION_MESSAGE ),
82+ bulkListener ), listener );
83+ }
7584
7685 public void testThatBulkProcessorCountIsCorrect () throws Exception {
7786 final CountDownLatch latch = new CountDownLatch (1 );
@@ -320,35 +329,105 @@ public void testGlobalParametersAndSingleRequest() throws Exception {
320329 public void testGlobalParametersAndBulkProcessor () throws Exception {
321330 createIndexWithMultipleShards ("test" );
322331
323- final CountDownLatch latch = new CountDownLatch (1 );
324- BulkProcessorTestListener listener = new BulkProcessorTestListener (latch );
325332 createFieldAddingPipleine ("pipeline_id" , "fieldNameXYZ" , "valueXYZ" );
333+ final String customType = "testType" ;
334+ final String ignoredType = "ignoredType" ;
326335
327336 int numDocs = randomIntBetween (10 , 10 );
328- try (BulkProcessor processor = initBulkProcessorBuilder (listener )
329- //let's make sure that the bulk action limit trips, one single execution will index all the documents
330- .setConcurrentRequests (randomIntBetween (0 , 1 )).setBulkActions (numDocs )
331- .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB ))
332- .setGlobalIndex ("test" )
333- .setGlobalType ("_doc" )
334- .setGlobalRouting ("routing" )
335- .setGlobalPipeline ("pipeline_id" )
336- .build ()) {
337-
338- indexDocs (processor , numDocs , null , null , "test" , "pipeline_id" );
339- latch .await ();
340-
341- assertThat (listener .beforeCounts .get (), equalTo (1 ));
342- assertThat (listener .afterCounts .get (), equalTo (1 ));
343- assertThat (listener .bulkFailures .size (), equalTo (0 ));
344- assertResponseItems (listener .bulkItems , numDocs );
345-
346- Iterable <SearchHit > hits = searchAll (new SearchRequest ("test" ).routing ("routing" ));
337+ {
338+ final CountDownLatch latch = new CountDownLatch (1 );
339+ BulkProcessorTestListener listener = new BulkProcessorTestListener (latch );
340+ //Check that untyped document additions inherit the global type
341+ String globalType = customType ;
342+ String localType = null ;
343+ try (BulkProcessor processor = initBulkProcessorBuilderUsingTypes (listener )
344+ //let's make sure that the bulk action limit trips, one single execution will index all the documents
345+ .setConcurrentRequests (randomIntBetween (0 , 1 )).setBulkActions (numDocs )
346+ .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB ))
347+ .setGlobalIndex ("test" )
348+ .setGlobalType (globalType )
349+ .setGlobalRouting ("routing" )
350+ .setGlobalPipeline ("pipeline_id" )
351+ .build ()) {
352+
353+ indexDocs (processor , numDocs , null , localType , "test" , globalType , "pipeline_id" );
354+ latch .await ();
355+
356+ assertThat (listener .beforeCounts .get (), equalTo (1 ));
357+ assertThat (listener .afterCounts .get (), equalTo (1 ));
358+ assertThat (listener .bulkFailures .size (), equalTo (0 ));
359+ assertResponseItems (listener .bulkItems , numDocs , globalType );
360+
361+ Iterable <SearchHit > hits = searchAll (new SearchRequest ("test" ).routing ("routing" ));
362+
363+ assertThat (hits , everyItem (hasProperty (fieldFromSource ("fieldNameXYZ" ), equalTo ("valueXYZ" ))));
364+ assertThat (hits , everyItem (Matchers .allOf (hasIndex ("test" ), hasType (globalType ))));
365+ assertThat (hits , containsInAnyOrder (expectedIds (numDocs )));
366+ }
347367
348- assertThat (hits , everyItem (hasProperty (fieldFromSource ("fieldNameXYZ" ), equalTo ("valueXYZ" ))));
349- assertThat (hits , everyItem (Matchers .allOf (hasIndex ("test" ), hasType ("_doc" ))));
350- assertThat (hits , containsInAnyOrder (expectedIds (numDocs )));
351368 }
369+ {
370+ //Check that typed document additions don't inherit the global type
371+ String globalType = ignoredType ;
372+ String localType = customType ;
373+ final CountDownLatch latch = new CountDownLatch (1 );
374+ BulkProcessorTestListener listener = new BulkProcessorTestListener (latch );
375+ try (BulkProcessor processor = initBulkProcessorBuilderUsingTypes (listener )
376+ //let's make sure that the bulk action limit trips, one single execution will index all the documents
377+ .setConcurrentRequests (randomIntBetween (0 , 1 )).setBulkActions (numDocs )
378+ .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB ))
379+ .setGlobalIndex ("test" )
380+ .setGlobalType (globalType )
381+ .setGlobalRouting ("routing" )
382+ .setGlobalPipeline ("pipeline_id" )
383+ .build ()) {
384+ indexDocs (processor , numDocs , null , localType , "test" , globalType , "pipeline_id" );
385+ latch .await ();
386+
387+ assertThat (listener .beforeCounts .get (), equalTo (1 ));
388+ assertThat (listener .afterCounts .get (), equalTo (1 ));
389+ assertThat (listener .bulkFailures .size (), equalTo (0 ));
390+ assertResponseItems (listener .bulkItems , numDocs , localType );
391+
392+ Iterable <SearchHit > hits = searchAll (new SearchRequest ("test" ).routing ("routing" ));
393+
394+ assertThat (hits , everyItem (hasProperty (fieldFromSource ("fieldNameXYZ" ), equalTo ("valueXYZ" ))));
395+ assertThat (hits , everyItem (Matchers .allOf (hasIndex ("test" ), hasType (localType ))));
396+ assertThat (hits , containsInAnyOrder (expectedIds (numDocs )));
397+ }
398+ }
399+ {
400+ //Check that untyped document additions and untyped global inherit the established custom type
401+ // (the custom document type introduced to the mapping by the earlier code in this test)
402+ String globalType = null ;
403+ String localType = null ;
404+ final CountDownLatch latch = new CountDownLatch (1 );
405+ BulkProcessorTestListener listener = new BulkProcessorTestListener (latch );
406+ try (BulkProcessor processor = initBulkProcessorBuilder (listener )
407+ //let's make sure that the bulk action limit trips, one single execution will index all the documents
408+ .setConcurrentRequests (randomIntBetween (0 , 1 )).setBulkActions (numDocs )
409+ .setFlushInterval (TimeValue .timeValueHours (24 )).setBulkSize (new ByteSizeValue (1 , ByteSizeUnit .GB ))
410+ .setGlobalIndex ("test" )
411+ .setGlobalType (globalType )
412+ .setGlobalRouting ("routing" )
413+ .setGlobalPipeline ("pipeline_id" )
414+ .build ()) {
415+ indexDocs (processor , numDocs , null , localType , "test" , globalType , "pipeline_id" );
416+ latch .await ();
417+
418+ assertThat (listener .beforeCounts .get (), equalTo (1 ));
419+ assertThat (listener .afterCounts .get (), equalTo (1 ));
420+ assertThat (listener .bulkFailures .size (), equalTo (0 ));
421+ assertResponseItems (listener .bulkItems , numDocs , MapperService .SINGLE_MAPPING_NAME );
422+
423+ Iterable <SearchHit > hits = searchAll (new SearchRequest ("test" ).routing ("routing" ));
424+
425+ assertThat (hits , everyItem (hasProperty (fieldFromSource ("fieldNameXYZ" ), equalTo ("valueXYZ" ))));
426+ assertThat (hits , everyItem (Matchers .allOf (hasIndex ("test" ), hasType (customType ))));
427+ assertThat (hits , containsInAnyOrder (expectedIds (numDocs )));
428+ }
429+ }
430+ assertWarnings (RestBulkAction .TYPES_DEPRECATION_MESSAGE );
352431 }
353432
354433 @ SuppressWarnings ("unchecked" )
@@ -359,15 +438,15 @@ private Matcher<SearchHit>[] expectedIds(int numDocs) {
359438 .<Matcher <SearchHit >>toArray (Matcher []::new );
360439 }
361440
362- private static MultiGetRequest indexDocs (BulkProcessor processor , int numDocs , String localIndex ,
441+ private static MultiGetRequest indexDocs (BulkProcessor processor , int numDocs , String localIndex , String localType ,
363442 String globalIndex , String globalType , String globalPipeline ) throws Exception {
364443 MultiGetRequest multiGetRequest = new MultiGetRequest ();
365444 for (int i = 1 ; i <= numDocs ; i ++) {
366445 if (randomBoolean ()) {
367- processor .add (new IndexRequest (localIndex ). id ( Integer .toString (i ))
446+ processor .add (new IndexRequest (localIndex , localType , Integer .toString (i ))
368447 .source (XContentType .JSON , "field" , randomRealisticUnicodeOfLengthBetween (1 , 30 )));
369448 } else {
370- BytesArray data = bytesBulkRequest (localIndex , "_doc" , i );
449+ BytesArray data = bytesBulkRequest (localIndex , localType , i );
371450 processor .add (data , globalIndex , globalType , globalPipeline , null , XContentType .JSON );
372451 }
373452 multiGetRequest .add (localIndex , Integer .toString (i ));
@@ -396,15 +475,19 @@ private static BytesArray bytesBulkRequest(String localIndex, String localType,
396475 }
397476
398477 private static MultiGetRequest indexDocs (BulkProcessor processor , int numDocs ) throws Exception {
399- return indexDocs (processor , numDocs , "test" , null , null , null );
478+ return indexDocs (processor , numDocs , "test" , null , null , null , null );
400479 }
401-
480+
402481 private static void assertResponseItems (List <BulkItemResponse > bulkItemResponses , int numDocs ) {
482+ assertResponseItems (bulkItemResponses , numDocs , MapperService .SINGLE_MAPPING_NAME );
483+ }
484+
485+ private static void assertResponseItems (List <BulkItemResponse > bulkItemResponses , int numDocs , String expectedType ) {
403486 assertThat (bulkItemResponses .size (), is (numDocs ));
404487 int i = 1 ;
405488 for (BulkItemResponse bulkItemResponse : bulkItemResponses ) {
406489 assertThat (bulkItemResponse .getIndex (), equalTo ("test" ));
407- assertThat (bulkItemResponse .getType (), equalTo ("_doc" ));
490+ assertThat (bulkItemResponse .getType (), equalTo (expectedType ));
408491 assertThat (bulkItemResponse .getId (), equalTo (Integer .toString (i ++)));
409492 assertThat ("item " + i + " failed with cause: " + bulkItemResponse .getFailureMessage (),
410493 bulkItemResponse .isFailed (), equalTo (false ));
0 commit comments