@@ -80,15 +80,16 @@ public void preview(
8080 final Aggregations aggregations = r .getAggregations ();
8181 if (aggregations == null ) {
8282 listener .onFailure (
83- new ElasticsearchStatusException ("Source indices have been deleted or closed." , RestStatus .BAD_REQUEST ));
83+ new ElasticsearchStatusException ("Source indices have been deleted or closed." , RestStatus .BAD_REQUEST )
84+ );
8485 return ;
8586 }
8687 final CompositeAggregation agg = aggregations .get (COMPOSITE_AGGREGATION_NAME );
8788 TransformIndexerStats stats = new TransformIndexerStats ();
8889
89- List <Map <String , Object >> docs = extractResults (agg , fieldTypeMap , stats , null )
90- . map ( this ::documentTransformationFunction )
91- .collect (Collectors .toList ());
90+ List <Map <String , Object >> docs = extractResults (agg , fieldTypeMap , stats , null ). map (
91+ this ::documentTransformationFunction
92+ ) .collect (Collectors .toList ());
9293
9394 listener .onResponse (docs );
9495 } catch (AggregationResultUtils .AggregationExtractionException extractionException ) {
@@ -108,11 +109,10 @@ public void validateQuery(Client client, SourceConfig sourceConfig, ActionListen
108109 }
109110 if (response .status () != RestStatus .OK ) {
110111 listener .onFailure (
111- new ValidationException ()
112- .addValidationError (
113- new ParameterizedMessage ("Unexpected status from response of test query: {}" , response .status ())
114- .getFormattedMessage ()
115- )
112+ new ValidationException ().addValidationError (
113+ new ParameterizedMessage ("Unexpected status from response of test query: {}" , response .status ())
114+ .getFormattedMessage ()
115+ )
116116 );
117117 return ;
118118 }
@@ -123,10 +123,9 @@ public void validateQuery(Client client, SourceConfig sourceConfig, ActionListen
123123 ? ((ElasticsearchException ) unwrapped ).status ()
124124 : RestStatus .SERVICE_UNAVAILABLE ;
125125 listener .onFailure (
126- new ValidationException (unwrapped )
127- .addValidationError (
128- new ParameterizedMessage ("Failed to test query, received status: {}" , status ).getFormattedMessage ()
129- )
126+ new ValidationException (unwrapped ).addValidationError (
127+ new ParameterizedMessage ("Failed to test query, received status: {}" , status ).getFormattedMessage ()
128+ )
130129 );
131130 }));
132131 }
@@ -153,16 +152,15 @@ public Tuple<Stream<IndexRequest>, Map<String, Object>> processSearchResponse(
153152 return null ;
154153 }
155154
156- Stream <IndexRequest > indexRequestStream = extractResults (compositeAgg , fieldTypeMap , stats , progress )
157- .map (doc -> {
158- String docId = (String )doc .remove (TransformField .DOCUMENT_ID_FIELD );
159- return DocumentConversionUtils .convertDocumentToIndexRequest (
160- docId ,
161- documentTransformationFunction (doc ),
162- destinationIndex ,
163- destinationPipeline
164- );
165- });
155+ Stream <IndexRequest > indexRequestStream = extractResults (compositeAgg , fieldTypeMap , stats , progress ).map (doc -> {
156+ String docId = (String ) doc .remove (TransformField .DOCUMENT_ID_FIELD );
157+ return DocumentConversionUtils .convertDocumentToIndexRequest (
158+ docId ,
159+ documentTransformationFunction (doc ),
160+ destinationIndex ,
161+ destinationPipeline
162+ );
163+ });
166164
167165 return Tuple .tuple (indexRequestStream , compositeAgg .afterKey ());
168166 }
@@ -177,13 +175,10 @@ protected abstract Stream<Map<String, Object>> extractResults(
177175 );
178176
179177 private SearchRequest buildSearchRequest (SourceConfig sourceConfig , Map <String , Object > position , int pageSize ) {
180- SearchSourceBuilder sourceBuilder = new SearchSourceBuilder ()
181- .query (sourceConfig .getQueryConfig ().getQuery ())
178+ SearchSourceBuilder sourceBuilder = new SearchSourceBuilder ().query (sourceConfig .getQueryConfig ().getQuery ())
182179 .runtimeMappings (sourceConfig .getRuntimeMappings ());
183180 buildSearchQuery (sourceBuilder , null , pageSize );
184- return new SearchRequest (sourceConfig .getIndex ())
185- .source (sourceBuilder )
186- .indicesOptions (IndicesOptions .LENIENT_EXPAND_OPEN );
181+ return new SearchRequest (sourceConfig .getIndex ()).source (sourceBuilder ).indicesOptions (IndicesOptions .LENIENT_EXPAND_OPEN );
187182 }
188183
189184 @ Override
0 commit comments