@@ -650,7 +650,7 @@ public <P> B convert(Class<P> payloadType) {
650650 * @see MethodInvokingTransformer
651651 * @see LambdaMessageProcessor
652652 */
653- public <P , T > B transform (@ Nullable Class <P > expectedType , GenericTransformer <P , T > genericTransformer ) {
653+ public <P , T > B transform (Class <P > expectedType , GenericTransformer <P , T > genericTransformer ) {
654654 return transformWith ((transformerSpec ) ->
655655 transformerSpec .transformer (genericTransformer ).expectedType (expectedType ));
656656 }
@@ -737,7 +737,7 @@ public B filter(String expression) {
737737 */
738738 public B filter (String expression , @ Nullable Consumer <FilterEndpointSpec > endpointConfigurer ) {
739739 Assert .hasText (expression , "'expression' must not be empty" );
740- return filter (null , new ExpressionEvaluatingSelector (expression ), endpointConfigurer );
740+ return doFilter (null , new ExpressionEvaluatingSelector (expression ), endpointConfigurer );
741741 }
742742
743743 /**
@@ -779,7 +779,7 @@ public B filter(Object service, @Nullable String methodName,
779779 StringUtils .hasText (methodName )
780780 ? new MethodInvokingSelector (service , methodName )
781781 : new MethodInvokingSelector (service );
782- return filter (null , selector , endpointConfigurer );
782+ return doFilter (null , selector , endpointConfigurer );
783783 }
784784
785785 /**
@@ -819,7 +819,7 @@ public B filter(MessageProcessorSpec<?> messageProcessorSpec,
819819 Assert .notNull (messageProcessorSpec , MESSAGE_PROCESSOR_SPEC_MUST_NOT_BE_NULL );
820820 MessageProcessor <?> processor = messageProcessorSpec .getObject ();
821821 return addComponent (processor )
822- .filter (null , new MethodInvokingSelector (processor ), endpointConfigurer );
822+ .doFilter (null , new MethodInvokingSelector (processor ), endpointConfigurer );
823823 }
824824
825825 /**
@@ -839,7 +839,7 @@ public B filter(MessageProcessorSpec<?> messageProcessorSpec,
839839 * @return the current {@link BaseIntegrationFlowDefinition}.
840840 * @see LambdaMessageProcessor
841841 */
842- public <P > B filter (@ Nullable Class <P > expectedType , GenericSelector <P > genericSelector ) {
842+ public <P > B filter (Class <P > expectedType , GenericSelector <P > genericSelector ) {
843843 return filter (expectedType , genericSelector , null );
844844 }
845845
@@ -863,7 +863,13 @@ public <P> B filter(@Nullable Class<P> expectedType, GenericSelector<P> genericS
863863 * @see LambdaMessageProcessor
864864 * @see FilterEndpointSpec
865865 */
866- public <P > B filter (@ Nullable Class <P > expectedType , GenericSelector <P > genericSelector ,
866+ public <P > B filter (Class <P > expectedType , GenericSelector <P > genericSelector ,
867+ @ Nullable Consumer <FilterEndpointSpec > endpointConfigurer ) {
868+
869+ return doFilter (expectedType , genericSelector , endpointConfigurer );
870+ }
871+
872+ protected <P > B doFilter (@ Nullable Class <P > expectedType , GenericSelector <P > genericSelector ,
867873 @ Nullable Consumer <FilterEndpointSpec > endpointConfigurer ) {
868874
869875 Assert .notNull (genericSelector , "'genericSelector' must not be null" );
@@ -1002,7 +1008,7 @@ public B handle(Object service, @Nullable String methodName,
10021008 * @return the current {@link BaseIntegrationFlowDefinition}.
10031009 * @see LambdaMessageProcessor
10041010 */
1005- public <P > B handle (@ Nullable Class <P > expectedType , GenericHandler <P > handler ) {
1011+ public <P > B handle (Class <P > expectedType , GenericHandler <P > handler ) {
10061012 return handle (expectedType , handler , null );
10071013 }
10081014
@@ -1026,7 +1032,13 @@ public <P> B handle(@Nullable Class<P> expectedType, GenericHandler<P> handler)
10261032 * @return the current {@link BaseIntegrationFlowDefinition}.
10271033 * @see LambdaMessageProcessor
10281034 */
1029- public <P > B handle (@ Nullable Class <P > expectedType , GenericHandler <P > handler ,
1035+ public <P > B handle (Class <P > expectedType , GenericHandler <P > handler ,
1036+ @ Nullable Consumer <GenericEndpointSpec <ServiceActivatingHandler >> endpointConfigurer ) {
1037+
1038+ return doHandle (expectedType , handler , endpointConfigurer );
1039+ }
1040+
1041+ protected <P > B doHandle (@ Nullable Class <P > expectedType , GenericHandler <P > handler ,
10301042 @ Nullable Consumer <GenericEndpointSpec <ServiceActivatingHandler >> endpointConfigurer ) {
10311043
10321044 ServiceActivatingHandler serviceActivatingHandler ;
@@ -1558,7 +1570,7 @@ public B claimCheckOut(MessageStore messageStore, boolean removeMessage,
15581570 * @return the current {@link BaseIntegrationFlowDefinition}.
15591571 */
15601572 public B resequence () {
1561- return resequence ( null );
1573+ return register ( new ResequencerSpec (), null );
15621574 }
15631575
15641576 /**
@@ -1579,7 +1591,7 @@ public B resequence() {
15791591 * @return the current {@link BaseIntegrationFlowDefinition}.
15801592 * @see ResequencerSpec
15811593 */
1582- public B resequence (@ Nullable Consumer <ResequencerSpec > resequencer ) {
1594+ public B resequence (Consumer <ResequencerSpec > resequencer ) {
15831595 return register (new ResequencerSpec (), resequencer );
15841596 }
15851597
@@ -1588,7 +1600,7 @@ public B resequence(@Nullable Consumer<ResequencerSpec> resequencer) {
15881600 * @return the current {@link BaseIntegrationFlowDefinition}.
15891601 */
15901602 public B aggregate () {
1591- return aggregate ( null );
1603+ return register ( new AggregatorSpec (), null );
15921604 }
15931605
15941606 /**
@@ -1617,7 +1629,7 @@ public B aggregate(Object aggregatorProcessor) {
16171629 * @return the current {@link BaseIntegrationFlowDefinition}.
16181630 * @see AggregatorSpec
16191631 */
1620- public B aggregate (@ Nullable Consumer <AggregatorSpec > aggregator ) {
1632+ public B aggregate (Consumer <AggregatorSpec > aggregator ) {
16211633 return register (new AggregatorSpec (), aggregator );
16221634 }
16231635
@@ -1641,7 +1653,7 @@ public B route(String beanName, @Nullable String method) {
16411653 * @return the current {@link BaseIntegrationFlowDefinition}.
16421654 */
16431655 public B route (String beanName , @ Nullable String method ,
1644- @ Nullable Consumer <RouterSpec <Object , MethodInvokingRouter >> routerConfigurer ) {
1656+ @ Nullable Consumer <RouterSpec <@ Nullable Object , MethodInvokingRouter >> routerConfigurer ) {
16451657
16461658 MethodInvokingRouter methodInvokingRouter =
16471659 new MethodInvokingRouter (new BeanNameMessageProcessor <>(beanName , method ));
@@ -1681,7 +1693,7 @@ public B route(Object service, @Nullable String methodName) {
16811693 * @see MethodInvokingRouter
16821694 */
16831695 public B route (Object service , @ Nullable String methodName ,
1684- @ Nullable Consumer <RouterSpec <Object , MethodInvokingRouter >> routerConfigurer ) {
1696+ @ Nullable Consumer <RouterSpec <@ Nullable Object , MethodInvokingRouter >> routerConfigurer ) {
16851697
16861698 MethodInvokingRouter router ;
16871699 if (StringUtils .hasText (methodName )) {
@@ -1711,7 +1723,7 @@ public B route(String expression) {
17111723 * @param <T> the target result type.
17121724 * @return the current {@link BaseIntegrationFlowDefinition}.
17131725 */
1714- public <T > B route (String expression ,
1726+ public <T extends @ Nullable Object > B route (String expression ,
17151727 @ Nullable Consumer <RouterSpec <T , ExpressionEvaluatingRouter >> routerConfigurer ) {
17161728
17171729 return route (new RouterSpec <>(new ExpressionEvaluatingRouter (PARSER .parseExpression (expression ))),
@@ -1736,7 +1748,7 @@ public <T> B route(String expression,
17361748 * @return the current {@link BaseIntegrationFlowDefinition}.
17371749 * @see LambdaMessageProcessor
17381750 */
1739- public <S , T > B route (@ Nullable Class <S > expectedType , Function <S , T > router ) {
1751+ public <S , T extends @ Nullable Object > B route (Class <S > expectedType , Function <S , T > router ) {
17401752 return route (expectedType , router , null );
17411753 }
17421754
@@ -1764,7 +1776,13 @@ public <S, T> B route(@Nullable Class<S> expectedType, Function<S, T> router) {
17641776 * @return the current {@link BaseIntegrationFlowDefinition}.
17651777 * @see LambdaMessageProcessor
17661778 */
1767- public <P , T > B route (@ Nullable Class <P > expectedType , Function <P , T > router ,
1779+ public <P , T extends @ Nullable Object > B route (Class <P > expectedType , Function <P , T > router ,
1780+ @ Nullable Consumer <RouterSpec <T , MethodInvokingRouter >> routerConfigurer ) {
1781+
1782+ return doRoute (expectedType , router , routerConfigurer );
1783+ }
1784+
1785+ protected <P , T extends @ Nullable Object > B doRoute (@ Nullable Class <P > expectedType , Function <P , T > router ,
17681786 @ Nullable Consumer <RouterSpec <T , MethodInvokingRouter >> routerConfigurer ) {
17691787
17701788 MethodInvokingRouter methodInvokingRouter =
@@ -1787,7 +1805,7 @@ public <P, T> B route(@Nullable Class<P> expectedType, Function<P, T> router,
17871805 * @return the current {@link BaseIntegrationFlowDefinition}.
17881806 */
17891807 public B route (MessageProcessorSpec <?> messageProcessorSpec ) {
1790- return route (messageProcessorSpec , (Consumer <RouterSpec <Object , MethodInvokingRouter >>) null );
1808+ return route (messageProcessorSpec , (Consumer <RouterSpec <@ Nullable Object , MethodInvokingRouter >>) null );
17911809 }
17921810
17931811 /**
@@ -1807,7 +1825,7 @@ public B route(MessageProcessorSpec<?> messageProcessorSpec) {
18071825 * @return the current {@link BaseIntegrationFlowDefinition}.
18081826 */
18091827 public B route (MessageProcessorSpec <?> messageProcessorSpec ,
1810- @ Nullable Consumer <RouterSpec <Object , MethodInvokingRouter >> routerConfigurer ) {
1828+ @ Nullable Consumer <RouterSpec <@ Nullable Object , MethodInvokingRouter >> routerConfigurer ) {
18111829
18121830 Assert .notNull (messageProcessorSpec , MESSAGE_PROCESSOR_SPEC_MUST_NOT_BE_NULL );
18131831 MessageProcessor <?> processor = messageProcessorSpec .getObject ();
0 commit comments